Chapter 12: Security and Privacy

A comment system handles user data and must be built with security and privacy as priorities. This chapter covers essential security practices and GDPR compliance.

Security Fundamentals

OWASP Top 10 for Comments

Risk Description Mitigation
Injection SQL, NoSQL, command injection Parameterized queries
XSS Script injection in comments Input sanitization, CSP
CSRF Forged requests CSRF tokens
Broken Auth Session hijacking Secure cookies, JWT
Data Exposure Leaking sensitive data Encryption, access control

Input Validation and Sanitization

Server-Side Validation

from pydantic import BaseModel, validator, EmailStr
import re
import bleach

class CommentInput(BaseModel):
    page_id: str
    author_name: str
    author_email: EmailStr | None = None
    content: str
    parent_id: str | None = None
    
    @validator('page_id')
    def validate_page_id(cls, v):
        # Only allow safe characters in page_id
        if not re.match(r'^[a-zA-Z0-9/_-]+$', v):
            raise ValueError('Invalid page_id format')
        if len(v) > 255:
            raise ValueError('page_id too long')
        return v
    
    @validator('author_name')
    def validate_author_name(cls, v):
        v = v.strip()
        if len(v) < 2 or len(v) > 100:
            raise ValueError('Name must be 2-100 characters')
        # Remove any HTML
        return bleach.clean(v, tags=[], strip=True)
    
    @validator('content')
    def validate_content(cls, v):
        v = v.strip()
        if len(v) < 1:
            raise ValueError('Content cannot be empty')
        if len(v) > 10000:
            raise ValueError('Content too long (max 10000 chars)')
        return v

HTML Sanitization

import bleach
from markupsafe import Markup

# Allowed HTML for comments
ALLOWED_TAGS = ['p', 'br', 'strong', 'em', 'a', 'code', 'pre', 'blockquote', 'ul', 'ol', 'li']
ALLOWED_ATTRS = {
    'a': ['href', 'title', 'rel'],
}

def sanitize_html(content: str) -> str:
    """Sanitize user content, allowing safe HTML"""
    
    # Clean HTML
    cleaned = bleach.clean(
        content,
        tags=ALLOWED_TAGS,
        attributes=ALLOWED_ATTRS,
        strip=True
    )
    
    # Add rel="nofollow noopener" to all links
    cleaned = bleach.linkify(
        cleaned,
        callbacks=[
            lambda attrs, new: {**attrs, (None, 'rel'): 'nofollow noopener'}
        ]
    )
    
    return cleaned

def render_comment_safe(content: str) -> Markup:
    """Render comment as safe HTML"""
    sanitized = sanitize_html(content)
    return Markup(sanitized)

XSS Prevention

# Content Security Policy headers
@app.middleware("http")
async def security_headers(request: Request, call_next):
    response = await call_next(request)
    
    response.headers["Content-Security-Policy"] = (
        "default-src 'self'; "
        "script-src 'self' https://challenges.cloudflare.com; "
        "style-src 'self' 'unsafe-inline'; "
        "img-src 'self' https://www.gravatar.com data:; "
        "connect-src 'self' https://api.yourdomain.com; "
        "frame-ancestors 'none';"
    )
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    
    return response

SQL Injection Prevention

Parameterized Queries

# NEVER do this
def bad_query(page_id: str):
    return db.execute(f"SELECT * FROM comments WHERE page_id = '{page_id}'")

# ALWAYS use parameters
def good_query(page_id: str):
    return db.execute(
        "SELECT * FROM comments WHERE page_id = :page_id",
        {"page_id": page_id}
    )

# SQLAlchemy ORM (safe by default)
def orm_query(page_id: str):
    return session.query(Comment).filter(Comment.page_id == page_id).all()

NoSQL Injection

# MongoDB - avoid operator injection
def safe_mongo_query(page_id: str):
    # Ensure page_id is a string, not an object with operators
    if not isinstance(page_id, str):
        raise ValueError("Invalid page_id")
    
    return collection.find({"page_id": page_id})

CSRF Protection

Token-Based CSRF

import secrets
import hmac
import hashlib

def generate_csrf_token(session_id: str) -> str:
    """Generate CSRF token tied to session"""
    return hmac.new(
        SECRET_KEY.encode(),
        session_id.encode(),
        hashlib.sha256
    ).hexdigest()

def verify_csrf_token(token: str, session_id: str) -> bool:
    """Verify CSRF token"""
    expected = generate_csrf_token(session_id)
    return hmac.compare_digest(token, expected)

# Middleware
@app.middleware("http")
async def csrf_protection(request: Request, call_next):
    if request.method in ["POST", "PUT", "DELETE", "PATCH"]:
        session_id = request.cookies.get("session_id")
        csrf_token = request.headers.get("X-CSRF-Token")
        
        if not session_id or not csrf_token:
            return JSONResponse({"error": "CSRF token missing"}, 403)
        
        if not verify_csrf_token(csrf_token, session_id):
            return JSONResponse({"error": "Invalid CSRF token"}, 403)
    
    return await call_next(request)

SameSite Cookies

from fastapi.responses import JSONResponse

def set_session_cookie(response: JSONResponse, session_id: str):
    response.set_cookie(
        key="session_id",
        value=session_id,
        httponly=True,      # Not accessible via JavaScript
        secure=True,        # HTTPS only
        samesite="lax",     # CSRF protection
        max_age=604800,     # 7 days
        path="/"
    )

API Security

Rate Limiting

from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/comments")
@limiter.limit("5/minute")  # 5 comments per minute per IP
async def create_comment(request: Request, data: CommentInput):
    return await comment_service.create(data)

@app.get("/api/comments")
@limiter.limit("60/minute")  # 60 reads per minute per IP
async def get_comments(request: Request, page_id: str):
    return await comment_service.get_for_page(page_id)

API Key Authentication (for admin)

from fastapi import Security
from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def verify_api_key(api_key: str = Security(api_key_header)):
    if not secrets.compare_digest(api_key, ADMIN_API_KEY):
        raise HTTPException(401, "Invalid API key")
    return True

@app.get("/api/admin/stats")
async def admin_stats(authenticated: bool = Depends(verify_api_key)):
    return await get_stats()

Request Signing (for widgets)

import time

def sign_request(data: dict, timestamp: int) -> str:
    """Sign request with timestamp"""
    payload = f"{timestamp}:{json.dumps(data, sort_keys=True)}"
    return hmac.new(
        WIDGET_SECRET.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

def verify_request(data: dict, timestamp: int, signature: str) -> bool:
    """Verify signed request"""
    # Check timestamp (5 minute window)
    if abs(time.time() - timestamp) > 300:
        return False
    
    expected = sign_request(data, timestamp)
    return hmac.compare_digest(signature, expected)

Data Privacy

PII Handling

# Hash IP addresses for storage
def hash_ip(ip: str) -> str:
    """One-way hash of IP for privacy"""
    salt = os.environ["IP_HASH_SALT"]
    return hashlib.sha256(f"{salt}:{ip}".encode()).hexdigest()[:16]

# Encrypt email addresses
from cryptography.fernet import Fernet

fernet = Fernet(os.environ["ENCRYPTION_KEY"])

def encrypt_email(email: str) -> str:
    return fernet.encrypt(email.encode()).decode()

def decrypt_email(encrypted: str) -> str:
    return fernet.decrypt(encrypted.encode()).decode()

Data Minimization

class Comment(Base):
    __tablename__ = "comments"
    
    id = Column(UUID, primary_key=True)
    page_id = Column(String(255), nullable=False)
    
    # Display data (public)
    author_name = Column(String(100), nullable=False)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    
    # Private data (never exposed in API)
    author_email_hash = Column(String(64))  # For gravatar only
    ip_hash = Column(String(16))            # Hashed, for abuse prevention
    
    # Don't store: full IP, user agent, referer (unless needed)

Data Retention

async def cleanup_old_data():
    """Remove old PII while keeping comment content"""
    
    # Remove metadata from comments older than 1 year
    cutoff = datetime.utcnow() - timedelta(days=365)
    
    await db.execute("""
        UPDATE comments 
        SET ip_hash = NULL,
            author_email_hash = NULL
        WHERE created_at < :cutoff
    """, {"cutoff": cutoff})
    
    # Delete rejected/spam comments older than 30 days
    spam_cutoff = datetime.utcnow() - timedelta(days=30)
    
    await db.execute("""
        DELETE FROM comments 
        WHERE status IN ('spam', 'rejected')
        AND created_at < :cutoff
    """, {"cutoff": spam_cutoff})

GDPR Compliance

Privacy Policy Requirements

Your privacy policy must disclose:

  1. What data you collect:
    • Name (displayed publicly)
    • Email (optional, for notifications/gravatar)
    • IP address (for spam prevention, hashed)
    • Comment content
  2. Legal basis:
    • Legitimate interest (spam prevention)
    • Consent (optional email)
  3. Data retention periods:
    • Comments: indefinite (or specify)
    • IP addresses: 30-90 days
    • Emails: until consent withdrawn
  4. User rights:
    • Access their data
    • Correct their data
    • Delete their data
    • Export their data

Right to Access (Data Export)

@app.get("/api/my-data")
async def export_user_data(
    email: str,
    verification_token: str
):
    """Export all data for a user"""
    
    # Verify email ownership
    if not await verify_email_token(email, verification_token):
        raise HTTPException(403, "Invalid verification")
    
    email_hash = hash_email(email)
    
    # Get all comments by this email
    comments = await db.execute(
        select(Comment).where(Comment.author_email_hash == email_hash)
    )
    
    return {
        "email": email,
        "comments": [
            {
                "page": c.page_id,
                "content": c.content,
                "created_at": c.created_at.isoformat(),
                "status": c.status
            }
            for c in comments.scalars()
        ],
        "exported_at": datetime.utcnow().isoformat()
    }

Right to Erasure (Delete Data)

@app.delete("/api/my-data")
async def delete_user_data(
    email: str,
    verification_token: str
):
    """Delete all data for a user"""
    
    if not await verify_email_token(email, verification_token):
        raise HTTPException(403, "Invalid verification")
    
    email_hash = hash_email(email)
    
    # Option 1: Full delete
    await db.execute(
        delete(Comment).where(Comment.author_email_hash == email_hash)
    )
    
    # Option 2: Anonymize (preserve content, remove identity)
    # await db.execute(
    #     update(Comment)
    #     .where(Comment.author_email_hash == email_hash)
    #     .values(
    #         author_name="[deleted]",
    #         author_email_hash=None,
    #         ip_hash=None
    #     )
    # )
    
    await db.commit()
    
    return {"status": "deleted", "affected_comments": result.rowcount}
// Simple cookie consent for comment widget
class CookieConsent {
    constructor() {
        this.storageKey = 'comment_consent';
    }
    
    hasConsent() {
        return localStorage.getItem(this.storageKey) === 'true';
    }
    
    showBanner() {
        if (this.hasConsent()) return;
        
        const banner = document.createElement('div');
        banner.className = 'cookie-consent';
        banner.innerHTML = `
            <p>This comment system stores cookies for spam prevention.</p>
            <button onclick="consent.accept()">Accept</button>
            <button onclick="consent.decline()">Decline</button>
        `;
        document.body.appendChild(banner);
    }
    
    accept() {
        localStorage.setItem(this.storageKey, 'true');
        document.querySelector('.cookie-consent').remove();
    }
    
    decline() {
        localStorage.setItem(this.storageKey, 'false');
        document.querySelector('.cookie-consent').remove();
        // Disable comment functionality
        document.querySelector('.comment-form').remove();
    }
}

Security Monitoring

Logging Security Events

import logging

security_logger = logging.getLogger("security")

async def log_security_event(
    event_type: str,
    ip: str,
    details: dict
):
    security_logger.warning(
        f"Security event: {event_type}",
        extra={
            "event_type": event_type,
            "ip_hash": hash_ip(ip),
            "timestamp": datetime.utcnow().isoformat(),
            **details
        }
    )

# Log suspicious activity
if spam_score > 0.8:
    await log_security_event("high_spam_score", request.client.host, {
        "spam_score": spam_score,
        "page_id": data.page_id
    })

if rate_limited:
    await log_security_event("rate_limited", request.client.host, {
        "endpoint": request.url.path
    })

Alerting

async def send_security_alert(event_type: str, details: dict):
    """Send alert for critical security events"""
    
    critical_events = ["multiple_auth_failures", "injection_attempt", "ddos_detected"]
    
    if event_type in critical_events:
        await send_slack_alert(f"🚨 Security Alert: {event_type}\n{json.dumps(details)}")
        await send_email_alert(f"Security Alert: {event_type}", details)

Chapter Summary

Area Key Practices
Input Validate all input, sanitize HTML
XSS CSP headers, output encoding
SQL Injection Parameterized queries only
CSRF SameSite cookies + tokens
Privacy Minimize data, hash PII
GDPR Export/delete endpoints, consent

Security checklist:

  1. βœ… Validate and sanitize all input
  2. βœ… Use parameterized queries
  3. βœ… Implement CSRF protection
  4. βœ… Set security headers
  5. βœ… Hash/encrypt PII
  6. βœ… Implement data export/deletion
  7. βœ… Log security events

Navigation: