A comment system handles user data and must be built with security and privacy as priorities. This chapter covers essential security practices and GDPR compliance.
| Risk | Description | Mitigation |
|---|---|---|
| Injection | SQL, NoSQL, command injection | Parameterized queries |
| XSS | Script injection in comments | Input sanitization, CSP |
| CSRF | Forged requests | CSRF tokens |
| Broken Auth | Session hijacking | Secure cookies, JWT |
| Data Exposure | Leaking sensitive data | Encryption, access control |
from pydantic import BaseModel, validator, EmailStr
import re
import bleach
class CommentInput(BaseModel):
page_id: str
author_name: str
author_email: EmailStr | None = None
content: str
parent_id: str | None = None
@validator('page_id')
def validate_page_id(cls, v):
# Only allow safe characters in page_id
if not re.match(r'^[a-zA-Z0-9/_-]+$', v):
raise ValueError('Invalid page_id format')
if len(v) > 255:
raise ValueError('page_id too long')
return v
@validator('author_name')
def validate_author_name(cls, v):
v = v.strip()
if len(v) < 2 or len(v) > 100:
raise ValueError('Name must be 2-100 characters')
# Remove any HTML
return bleach.clean(v, tags=[], strip=True)
@validator('content')
def validate_content(cls, v):
v = v.strip()
if len(v) < 1:
raise ValueError('Content cannot be empty')
if len(v) > 10000:
raise ValueError('Content too long (max 10000 chars)')
return v
import bleach
from markupsafe import Markup
# Allowed HTML for comments
ALLOWED_TAGS = ['p', 'br', 'strong', 'em', 'a', 'code', 'pre', 'blockquote', 'ul', 'ol', 'li']
ALLOWED_ATTRS = {
'a': ['href', 'title', 'rel'],
}
def sanitize_html(content: str) -> str:
"""Sanitize user content, allowing safe HTML"""
# Clean HTML
cleaned = bleach.clean(
content,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRS,
strip=True
)
# Add rel="nofollow noopener" to all links
cleaned = bleach.linkify(
cleaned,
callbacks=[
lambda attrs, new: {**attrs, (None, 'rel'): 'nofollow noopener'}
]
)
return cleaned
def render_comment_safe(content: str) -> Markup:
"""Render comment as safe HTML"""
sanitized = sanitize_html(content)
return Markup(sanitized)
# Content Security Policy headers
@app.middleware("http")
async def security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self' https://challenges.cloudflare.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' https://www.gravatar.com data:; "
"connect-src 'self' https://api.yourdomain.com; "
"frame-ancestors 'none';"
)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
return response
# NEVER do this
def bad_query(page_id: str):
return db.execute(f"SELECT * FROM comments WHERE page_id = '{page_id}'")
# ALWAYS use parameters
def good_query(page_id: str):
return db.execute(
"SELECT * FROM comments WHERE page_id = :page_id",
{"page_id": page_id}
)
# SQLAlchemy ORM (safe by default)
def orm_query(page_id: str):
return session.query(Comment).filter(Comment.page_id == page_id).all()
# MongoDB - avoid operator injection
def safe_mongo_query(page_id: str):
# Ensure page_id is a string, not an object with operators
if not isinstance(page_id, str):
raise ValueError("Invalid page_id")
return collection.find({"page_id": page_id})
import secrets
import hmac
import hashlib
def generate_csrf_token(session_id: str) -> str:
"""Generate CSRF token tied to session"""
return hmac.new(
SECRET_KEY.encode(),
session_id.encode(),
hashlib.sha256
).hexdigest()
def verify_csrf_token(token: str, session_id: str) -> bool:
"""Verify CSRF token"""
expected = generate_csrf_token(session_id)
return hmac.compare_digest(token, expected)
# Middleware
@app.middleware("http")
async def csrf_protection(request: Request, call_next):
if request.method in ["POST", "PUT", "DELETE", "PATCH"]:
session_id = request.cookies.get("session_id")
csrf_token = request.headers.get("X-CSRF-Token")
if not session_id or not csrf_token:
return JSONResponse({"error": "CSRF token missing"}, 403)
if not verify_csrf_token(csrf_token, session_id):
return JSONResponse({"error": "Invalid CSRF token"}, 403)
return await call_next(request)
from fastapi.responses import JSONResponse
def set_session_cookie(response: JSONResponse, session_id: str):
response.set_cookie(
key="session_id",
value=session_id,
httponly=True, # Not accessible via JavaScript
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=604800, # 7 days
path="/"
)
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/comments")
@limiter.limit("5/minute") # 5 comments per minute per IP
async def create_comment(request: Request, data: CommentInput):
return await comment_service.create(data)
@app.get("/api/comments")
@limiter.limit("60/minute") # 60 reads per minute per IP
async def get_comments(request: Request, page_id: str):
return await comment_service.get_for_page(page_id)
from fastapi import Security
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if not secrets.compare_digest(api_key, ADMIN_API_KEY):
raise HTTPException(401, "Invalid API key")
return True
@app.get("/api/admin/stats")
async def admin_stats(authenticated: bool = Depends(verify_api_key)):
return await get_stats()
import time
def sign_request(data: dict, timestamp: int) -> str:
"""Sign request with timestamp"""
payload = f"{timestamp}:{json.dumps(data, sort_keys=True)}"
return hmac.new(
WIDGET_SECRET.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
def verify_request(data: dict, timestamp: int, signature: str) -> bool:
"""Verify signed request"""
# Check timestamp (5 minute window)
if abs(time.time() - timestamp) > 300:
return False
expected = sign_request(data, timestamp)
return hmac.compare_digest(signature, expected)
# Hash IP addresses for storage
def hash_ip(ip: str) -> str:
"""One-way hash of IP for privacy"""
salt = os.environ["IP_HASH_SALT"]
return hashlib.sha256(f"{salt}:{ip}".encode()).hexdigest()[:16]
# Encrypt email addresses
from cryptography.fernet import Fernet
fernet = Fernet(os.environ["ENCRYPTION_KEY"])
def encrypt_email(email: str) -> str:
return fernet.encrypt(email.encode()).decode()
def decrypt_email(encrypted: str) -> str:
return fernet.decrypt(encrypted.encode()).decode()
class Comment(Base):
__tablename__ = "comments"
id = Column(UUID, primary_key=True)
page_id = Column(String(255), nullable=False)
# Display data (public)
author_name = Column(String(100), nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Private data (never exposed in API)
author_email_hash = Column(String(64)) # For gravatar only
ip_hash = Column(String(16)) # Hashed, for abuse prevention
# Don't store: full IP, user agent, referer (unless needed)
async def cleanup_old_data():
"""Remove old PII while keeping comment content"""
# Remove metadata from comments older than 1 year
cutoff = datetime.utcnow() - timedelta(days=365)
await db.execute("""
UPDATE comments
SET ip_hash = NULL,
author_email_hash = NULL
WHERE created_at < :cutoff
""", {"cutoff": cutoff})
# Delete rejected/spam comments older than 30 days
spam_cutoff = datetime.utcnow() - timedelta(days=30)
await db.execute("""
DELETE FROM comments
WHERE status IN ('spam', 'rejected')
AND created_at < :cutoff
""", {"cutoff": spam_cutoff})
Your privacy policy must disclose:
@app.get("/api/my-data")
async def export_user_data(
email: str,
verification_token: str
):
"""Export all data for a user"""
# Verify email ownership
if not await verify_email_token(email, verification_token):
raise HTTPException(403, "Invalid verification")
email_hash = hash_email(email)
# Get all comments by this email
comments = await db.execute(
select(Comment).where(Comment.author_email_hash == email_hash)
)
return {
"email": email,
"comments": [
{
"page": c.page_id,
"content": c.content,
"created_at": c.created_at.isoformat(),
"status": c.status
}
for c in comments.scalars()
],
"exported_at": datetime.utcnow().isoformat()
}
@app.delete("/api/my-data")
async def delete_user_data(
email: str,
verification_token: str
):
"""Delete all data for a user"""
if not await verify_email_token(email, verification_token):
raise HTTPException(403, "Invalid verification")
email_hash = hash_email(email)
# Option 1: Full delete
await db.execute(
delete(Comment).where(Comment.author_email_hash == email_hash)
)
# Option 2: Anonymize (preserve content, remove identity)
# await db.execute(
# update(Comment)
# .where(Comment.author_email_hash == email_hash)
# .values(
# author_name="[deleted]",
# author_email_hash=None,
# ip_hash=None
# )
# )
await db.commit()
return {"status": "deleted", "affected_comments": result.rowcount}
// Simple cookie consent for comment widget
class CookieConsent {
constructor() {
this.storageKey = 'comment_consent';
}
hasConsent() {
return localStorage.getItem(this.storageKey) === 'true';
}
showBanner() {
if (this.hasConsent()) return;
const banner = document.createElement('div');
banner.className = 'cookie-consent';
banner.innerHTML = `
<p>This comment system stores cookies for spam prevention.</p>
<button onclick="consent.accept()">Accept</button>
<button onclick="consent.decline()">Decline</button>
`;
document.body.appendChild(banner);
}
accept() {
localStorage.setItem(this.storageKey, 'true');
document.querySelector('.cookie-consent').remove();
}
decline() {
localStorage.setItem(this.storageKey, 'false');
document.querySelector('.cookie-consent').remove();
// Disable comment functionality
document.querySelector('.comment-form').remove();
}
}
import logging
security_logger = logging.getLogger("security")
async def log_security_event(
event_type: str,
ip: str,
details: dict
):
security_logger.warning(
f"Security event: {event_type}",
extra={
"event_type": event_type,
"ip_hash": hash_ip(ip),
"timestamp": datetime.utcnow().isoformat(),
**details
}
)
# Log suspicious activity
if spam_score > 0.8:
await log_security_event("high_spam_score", request.client.host, {
"spam_score": spam_score,
"page_id": data.page_id
})
if rate_limited:
await log_security_event("rate_limited", request.client.host, {
"endpoint": request.url.path
})
async def send_security_alert(event_type: str, details: dict):
"""Send alert for critical security events"""
critical_events = ["multiple_auth_failures", "injection_attempt", "ddos_detected"]
if event_type in critical_events:
await send_slack_alert(f"π¨ Security Alert: {event_type}\n{json.dumps(details)}")
await send_email_alert(f"Security Alert: {event_type}", details)
| Area | Key Practices |
|---|---|
| Input | Validate all input, sanitize HTML |
| XSS | CSP headers, output encoding |
| SQL Injection | Parameterized queries only |
| CSRF | SameSite cookies + tokens |
| Privacy | Minimize data, hash PII |
| GDPR | Export/delete endpoints, consent |
Security checklist:
Navigation: