Even with excellent spam prevention, you need robust moderation tools. This chapter covers building an effective moderation system.
All comments reviewed before publishing:
async def create_comment(data: CommentCreate) -> Comment:
comment = Comment(
**data.dict(),
status=CommentStatus.PENDING # Always pending first
)
db.add(comment)
await db.commit()
# Notify moderators
await notify_moderators(comment)
return comment
Pros: Maximum control, no bad content ever shown Cons: Delays engagement, requires constant attention
Publish immediately, review later:
async def create_comment(data: CommentCreate, spam_analysis: SpamAnalysis) -> Comment:
# Auto-approve if spam score is low
if spam_analysis.confidence < 0.3:
status = CommentStatus.APPROVED
elif spam_analysis.confidence < 0.7:
status = CommentStatus.PENDING
else:
status = CommentStatus.SPAM
comment = Comment(**data.dict(), status=status)
db.add(comment)
await db.commit()
return comment
Pros: Immediate engagement, less moderator burden Cons: Potential for inappropriate content to appear briefly
Community-driven with user flagging:
FLAG_THRESHOLD = 3 # Hide after N flags
async def flag_comment(comment_id: str, user_id: str, reason: str):
# Record the flag
flag = CommentFlag(
comment_id=comment_id,
user_id=user_id,
reason=reason
)
db.add(flag)
# Count total flags
flag_count = await db.scalar(
select(func.count()).where(CommentFlag.comment_id == comment_id)
)
if flag_count >= FLAG_THRESHOLD:
await db.execute(
update(Comment)
.where(Comment.id == comment_id)
.values(status=CommentStatus.FLAGGED)
)
await notify_moderators_flagged(comment_id)
await db.commit()
@router.get("/admin/moderation/queue")
async def get_moderation_queue(
status: str = "pending",
page: int = 1,
limit: int = 20,
admin: Admin = Depends(require_admin)
):
offset = (page - 1) * limit
query = (
select(Comment)
.where(Comment.status == status)
.order_by(Comment.created_at.asc())
.offset(offset)
.limit(limit)
)
comments = await db.execute(query)
total = await db.scalar(
select(func.count()).where(Comment.status == status)
)
return {
"items": comments.scalars().all(),
"total": total,
"page": page,
"pages": (total + limit - 1) // limit
}
@router.post("/admin/moderation/{comment_id}/approve")
async def approve_comment(comment_id: str, admin: Admin = Depends(require_admin)):
await db.execute(
update(Comment)
.where(Comment.id == comment_id)
.values(
status=CommentStatus.APPROVED,
moderated_by=admin.id,
moderated_at=datetime.utcnow()
)
)
await db.commit()
# Update IP reputation
comment = await db.get(Comment, comment_id)
await ip_reputation.record_approved(comment.ip_address)
return {"success": True}
@router.post("/admin/moderation/{comment_id}/reject")
async def reject_comment(
comment_id: str,
reason: str = None,
admin: Admin = Depends(require_admin)
):
comment = await db.get(Comment, comment_id)
await db.execute(
update(Comment)
.where(Comment.id == comment_id)
.values(
status=CommentStatus.REJECTED,
moderated_by=admin.id,
moderated_at=datetime.utcnow(),
rejection_reason=reason
)
)
await db.commit()
# Update IP reputation
await ip_reputation.record_spam(comment.ip_address)
return {"success": True}
@router.post("/admin/moderation/bulk")
async def bulk_moderation(
action: str, # approve, reject, spam
comment_ids: list[str],
admin: Admin = Depends(require_admin)
):
status_map = {
"approve": CommentStatus.APPROVED,
"reject": CommentStatus.REJECTED,
"spam": CommentStatus.SPAM
}
new_status = status_map.get(action)
if not new_status:
raise HTTPException(400, "Invalid action")
await db.execute(
update(Comment)
.where(Comment.id.in_(comment_ids))
.values(
status=new_status,
moderated_by=admin.id,
moderated_at=datetime.utcnow()
)
)
await db.commit()
return {"success": True, "updated": len(comment_ids)}
<!DOCTYPE html>
<html>
<head>
<title>Comment Moderation</title>
<style>
.queue { max-width: 900px; margin: 0 auto; }
.comment-card {
border: 1px solid #ddd;
padding: 1rem;
margin: 1rem 0;
border-radius: 8px;
}
.comment-meta { color: #666; font-size: 0.9rem; }
.comment-content { margin: 1rem 0; }
.actions { display: flex; gap: 0.5rem; }
.btn { padding: 0.5rem 1rem; border: none; border-radius: 4px; cursor: pointer; }
.btn-approve { background: #22c55e; color: white; }
.btn-reject { background: #ef4444; color: white; }
.btn-spam { background: #f59e0b; color: white; }
.stats { display: flex; gap: 2rem; margin-bottom: 2rem; }
.stat-card { background: #f3f4f6; padding: 1rem; border-radius: 8px; }
</style>
</head>
<body>
<div class="queue">
<h1>Moderation Queue</h1>
<div class="stats" id="stats"></div>
<div class="filters">
<button onclick="loadQueue('pending')" class="active">Pending</button>
<button onclick="loadQueue('flagged')">Flagged</button>
<button onclick="loadQueue('spam')">Spam</button>
</div>
<div id="queue"></div>
</div>
<script>
async function loadQueue(status = 'pending') {
const res = await fetch(`/admin/moderation/queue?status=${status}`);
const data = await res.json();
document.getElementById('queue').innerHTML = data.items
.map(renderComment)
.join('');
}
function renderComment(c) {
return `
<div class="comment-card" data-id="${c.id}">
<div class="comment-meta">
<strong>${escapeHtml(c.author_name)}</strong>
(${c.author_email || 'no email'})
• ${c.ip_address}
• ${new Date(c.created_at).toLocaleString()}
</div>
<div class="comment-content">${escapeHtml(c.content)}</div>
<div class="comment-meta">Page: ${c.page_id}</div>
<div class="actions">
<button class="btn btn-approve" onclick="moderate('${c.id}', 'approve')">
✓ Approve
</button>
<button class="btn btn-reject" onclick="moderate('${c.id}', 'reject')">
✗ Reject
</button>
<button class="btn btn-spam" onclick="moderate('${c.id}', 'spam')">
Mark Spam
</button>
</div>
</div>
`;
}
async function moderate(id, action) {
await fetch(`/admin/moderation/${id}/${action}`, { method: 'POST' });
document.querySelector(`[data-id="${id}"]`).remove();
}
loadQueue();
</script>
</body>
</html>
class ProfanityFilter:
def __init__(self):
self.bad_words = self._load_word_list()
self.pattern = self._build_pattern()
def _load_word_list(self) -> set:
# Load from file or use built-in list
return {'badword1', 'badword2', ...}
def _build_pattern(self):
# Build regex that handles common obfuscation
patterns = []
for word in self.bad_words:
# Handle l33t speak: a->@4, e->3, i->1!, etc.
obfuscated = word
obfuscated = obfuscated.replace('a', '[a@4]')
obfuscated = obfuscated.replace('e', '[e3]')
obfuscated = obfuscated.replace('i', '[i1!]')
obfuscated = obfuscated.replace('o', '[o0]')
obfuscated = obfuscated.replace('s', '[s$5]')
patterns.append(obfuscated)
return re.compile('|'.join(patterns), re.IGNORECASE)
def contains_profanity(self, text: str) -> bool:
return bool(self.pattern.search(text))
def censor(self, text: str) -> str:
def replace(match):
return '*' * len(match.group())
return self.pattern.sub(replace, text)
Using a simple keyword approach (for ML, use external APIs):
TOXIC_INDICATORS = {
'high': ['kill', 'die', 'hate you', 'threat'],
'medium': ['stupid', 'idiot', 'dumb', 'loser'],
'low': ['annoying', 'boring', 'whatever']
}
def analyze_toxicity(text: str) -> dict:
text_lower = text.lower()
found = {'high': [], 'medium': [], 'low': []}
for level, words in TOXIC_INDICATORS.items():
for word in words:
if word in text_lower:
found[level].append(word)
# Calculate score
score = (
len(found['high']) * 0.5 +
len(found['medium']) * 0.2 +
len(found['low']) * 0.05
)
return {
'score': min(score, 1.0),
'is_toxic': score > 0.3,
'matches': found
}
For advanced content moderation:
# Perspective API (Google)
async def check_perspective(text: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
f"https://commentanalyzer.googleapis.com/v1alpha1/comments:analyze?key={API_KEY}",
json={
"comment": {"text": text},
"requestedAttributes": {
"TOXICITY": {},
"SEVERE_TOXICITY": {},
"INSULT": {},
"THREAT": {}
}
}
)
scores = response.json()["attributeScores"]
return {
attr: data["summaryScore"]["value"]
for attr, data in scores.items()
}
class BanService:
def __init__(self, db: AsyncSession, redis: Redis):
self.db = db
self.redis = redis
async def ban_user(
self,
identifier: str, # email, user_id, or IP
ban_type: str, # email, user, ip
reason: str,
duration: int = None, # None = permanent
admin_id: str = None
):
ban = Ban(
identifier=identifier,
ban_type=ban_type,
reason=reason,
expires_at=datetime.utcnow() + timedelta(seconds=duration) if duration else None,
created_by=admin_id
)
self.db.add(ban)
await self.db.commit()
# Cache for quick lookups
cache_key = f"ban:{ban_type}:{identifier}"
if duration:
await self.redis.setex(cache_key, duration, "1")
else:
await self.redis.set(cache_key, "1")
async def is_banned(self, email: str = None, user_id: str = None, ip: str = None) -> bool:
checks = []
if email:
checks.append(("email", email))
if user_id:
checks.append(("user", user_id))
if ip:
checks.append(("ip", ip))
for ban_type, identifier in checks:
# Check cache first
if await self.redis.exists(f"ban:{ban_type}:{identifier}"):
return True
return False
async def unban(self, identifier: str, ban_type: str):
await self.db.execute(
delete(Ban)
.where(Ban.identifier == identifier)
.where(Ban.ban_type == ban_type)
)
await self.db.commit()
await self.redis.delete(f"ban:{ban_type}:{identifier}")
User can still post, but no one else sees:
async def get_comments_for_page(page_id: str, viewer_ip: str = None):
query = (
select(Comment)
.where(Comment.page_id == page_id)
.where(Comment.status == CommentStatus.APPROVED)
)
comments = await db.execute(query)
result = []
for comment in comments.scalars():
# Check if author is shadow banned
is_shadow_banned = await ban_service.is_shadow_banned(comment.author_email)
if is_shadow_banned:
# Only show to the shadow banned user
if comment.ip_address == viewer_ip:
result.append(comment)
else:
result.append(comment)
return result
class ModerationNotifier:
def __init__(self, config: Config):
self.slack_webhook = config.slack_webhook
self.email_recipients = config.moderator_emails
async def notify_pending(self, comment: Comment):
"""Notify moderators of new pending comment"""
if await self._should_notify():
await self._send_slack(
f"🔔 New comment pending review\n"
f"Author: {comment.author_name}\n"
f"Content: {comment.content[:200]}...\n"
f"<{ADMIN_URL}/moderation|Review>"
)
async def notify_flagged(self, comment: Comment, flag_count: int):
"""Notify when comment reaches flag threshold"""
await self._send_slack(
f"🚩 Comment flagged {flag_count} times\n"
f"Author: {comment.author_name}\n"
f"Content: {comment.content[:200]}...\n"
f"<{ADMIN_URL}/moderation|Review>"
)
async def _send_slack(self, message: str):
async with httpx.AsyncClient() as client:
await client.post(
self.slack_webhook,
json={"text": message}
)
async def _should_notify(self) -> bool:
# Rate limit notifications
key = "mod_notify_rate"
count = await redis.incr(key)
if count == 1:
await redis.expire(key, 300) # 5 minute window
return count <= 10 # Max 10 notifications per 5 minutes
Track all moderation actions:
class AuditLog:
async def log_action(
self,
action: str,
target_type: str,
target_id: str,
actor_id: str,
details: dict = None
):
entry = AuditEntry(
action=action,
target_type=target_type,
target_id=target_id,
actor_id=actor_id,
details=details or {},
ip_address=get_current_ip(),
created_at=datetime.utcnow()
)
db.add(entry)
await db.commit()
# Usage
await audit.log_action(
action="comment.approved",
target_type="comment",
target_id=comment_id,
actor_id=admin.id,
details={"previous_status": "pending"}
)
| Aspect | Recommendation |
|---|---|
| Strategy | Hybrid: auto-approve low risk, queue medium risk |
| Interface | Simple admin panel with bulk actions |
| Filtering | Profanity filter + toxicity scoring |
| Banning | Support IP, email, and user-level bans |
| Auditing | Log all moderation actions |
In the next chapter, we’ll explore hosting and deployment options.
Navigation: