Chapter 8: Moderation and Content Filtering

Even with excellent spam prevention, you need robust moderation tools. This chapter covers building an effective moderation system.

Moderation Strategies

Pre-Moderation

All comments reviewed before publishing:

async def create_comment(data: CommentCreate) -> Comment:
    comment = Comment(
        **data.dict(),
        status=CommentStatus.PENDING  # Always pending first
    )
    db.add(comment)
    await db.commit()
    
    # Notify moderators
    await notify_moderators(comment)
    
    return comment

Pros: Maximum control, no bad content ever shown Cons: Delays engagement, requires constant attention

Post-Moderation

Publish immediately, review later:

async def create_comment(data: CommentCreate, spam_analysis: SpamAnalysis) -> Comment:
    # Auto-approve if spam score is low
    if spam_analysis.confidence < 0.3:
        status = CommentStatus.APPROVED
    elif spam_analysis.confidence < 0.7:
        status = CommentStatus.PENDING
    else:
        status = CommentStatus.SPAM
    
    comment = Comment(**data.dict(), status=status)
    db.add(comment)
    await db.commit()
    
    return comment

Pros: Immediate engagement, less moderator burden Cons: Potential for inappropriate content to appear briefly

Reactive Moderation

Community-driven with user flagging:

FLAG_THRESHOLD = 3  # Hide after N flags

async def flag_comment(comment_id: str, user_id: str, reason: str):
    # Record the flag
    flag = CommentFlag(
        comment_id=comment_id,
        user_id=user_id,
        reason=reason
    )
    db.add(flag)
    
    # Count total flags
    flag_count = await db.scalar(
        select(func.count()).where(CommentFlag.comment_id == comment_id)
    )
    
    if flag_count >= FLAG_THRESHOLD:
        await db.execute(
            update(Comment)
            .where(Comment.id == comment_id)
            .values(status=CommentStatus.FLAGGED)
        )
        await notify_moderators_flagged(comment_id)
    
    await db.commit()

Moderation Queue Interface

Queue API Endpoints

@router.get("/admin/moderation/queue")
async def get_moderation_queue(
    status: str = "pending",
    page: int = 1,
    limit: int = 20,
    admin: Admin = Depends(require_admin)
):
    offset = (page - 1) * limit
    
    query = (
        select(Comment)
        .where(Comment.status == status)
        .order_by(Comment.created_at.asc())
        .offset(offset)
        .limit(limit)
    )
    
    comments = await db.execute(query)
    total = await db.scalar(
        select(func.count()).where(Comment.status == status)
    )
    
    return {
        "items": comments.scalars().all(),
        "total": total,
        "page": page,
        "pages": (total + limit - 1) // limit
    }

@router.post("/admin/moderation/{comment_id}/approve")
async def approve_comment(comment_id: str, admin: Admin = Depends(require_admin)):
    await db.execute(
        update(Comment)
        .where(Comment.id == comment_id)
        .values(
            status=CommentStatus.APPROVED,
            moderated_by=admin.id,
            moderated_at=datetime.utcnow()
        )
    )
    await db.commit()
    
    # Update IP reputation
    comment = await db.get(Comment, comment_id)
    await ip_reputation.record_approved(comment.ip_address)
    
    return {"success": True}

@router.post("/admin/moderation/{comment_id}/reject")
async def reject_comment(
    comment_id: str,
    reason: str = None,
    admin: Admin = Depends(require_admin)
):
    comment = await db.get(Comment, comment_id)
    
    await db.execute(
        update(Comment)
        .where(Comment.id == comment_id)
        .values(
            status=CommentStatus.REJECTED,
            moderated_by=admin.id,
            moderated_at=datetime.utcnow(),
            rejection_reason=reason
        )
    )
    await db.commit()
    
    # Update IP reputation
    await ip_reputation.record_spam(comment.ip_address)
    
    return {"success": True}

Bulk Actions

@router.post("/admin/moderation/bulk")
async def bulk_moderation(
    action: str,  # approve, reject, spam
    comment_ids: list[str],
    admin: Admin = Depends(require_admin)
):
    status_map = {
        "approve": CommentStatus.APPROVED,
        "reject": CommentStatus.REJECTED,
        "spam": CommentStatus.SPAM
    }
    
    new_status = status_map.get(action)
    if not new_status:
        raise HTTPException(400, "Invalid action")
    
    await db.execute(
        update(Comment)
        .where(Comment.id.in_(comment_ids))
        .values(
            status=new_status,
            moderated_by=admin.id,
            moderated_at=datetime.utcnow()
        )
    )
    await db.commit()
    
    return {"success": True, "updated": len(comment_ids)}

Admin Dashboard

Simple HTML Interface

<!DOCTYPE html>
<html>
<head>
    <title>Comment Moderation</title>
    <style>
        .queue { max-width: 900px; margin: 0 auto; }
        .comment-card {
            border: 1px solid #ddd;
            padding: 1rem;
            margin: 1rem 0;
            border-radius: 8px;
        }
        .comment-meta { color: #666; font-size: 0.9rem; }
        .comment-content { margin: 1rem 0; }
        .actions { display: flex; gap: 0.5rem; }
        .btn { padding: 0.5rem 1rem; border: none; border-radius: 4px; cursor: pointer; }
        .btn-approve { background: #22c55e; color: white; }
        .btn-reject { background: #ef4444; color: white; }
        .btn-spam { background: #f59e0b; color: white; }
        .stats { display: flex; gap: 2rem; margin-bottom: 2rem; }
        .stat-card { background: #f3f4f6; padding: 1rem; border-radius: 8px; }
    </style>
</head>
<body>
    <div class="queue">
        <h1>Moderation Queue</h1>
        
        <div class="stats" id="stats"></div>
        
        <div class="filters">
            <button onclick="loadQueue('pending')" class="active">Pending</button>
            <button onclick="loadQueue('flagged')">Flagged</button>
            <button onclick="loadQueue('spam')">Spam</button>
        </div>
        
        <div id="queue"></div>
    </div>
    
    <script>
        async function loadQueue(status = 'pending') {
            const res = await fetch(`/admin/moderation/queue?status=${status}`);
            const data = await res.json();
            
            document.getElementById('queue').innerHTML = data.items
                .map(renderComment)
                .join('');
        }
        
        function renderComment(c) {
            return `
                <div class="comment-card" data-id="${c.id}">
                    <div class="comment-meta">
                        <strong>${escapeHtml(c.author_name)}</strong>
                        (${c.author_email || 'no email'})
                        • ${c.ip_address}${new Date(c.created_at).toLocaleString()}
                    </div>
                    <div class="comment-content">${escapeHtml(c.content)}</div>
                    <div class="comment-meta">Page: ${c.page_id}</div>
                    <div class="actions">
                        <button class="btn btn-approve" onclick="moderate('${c.id}', 'approve')">
                            ✓ Approve
                        </button>
                        <button class="btn btn-reject" onclick="moderate('${c.id}', 'reject')">
                            ✗ Reject
                        </button>
                        <button class="btn btn-spam" onclick="moderate('${c.id}', 'spam')">
                            Mark Spam
                        </button>
                    </div>
                </div>
            `;
        }
        
        async function moderate(id, action) {
            await fetch(`/admin/moderation/${id}/${action}`, { method: 'POST' });
            document.querySelector(`[data-id="${id}"]`).remove();
        }
        
        loadQueue();
    </script>
</body>
</html>

Content Filtering

Profanity Filter

class ProfanityFilter:
    def __init__(self):
        self.bad_words = self._load_word_list()
        self.pattern = self._build_pattern()
    
    def _load_word_list(self) -> set:
        # Load from file or use built-in list
        return {'badword1', 'badword2', ...}
    
    def _build_pattern(self):
        # Build regex that handles common obfuscation
        patterns = []
        for word in self.bad_words:
            # Handle l33t speak: a->@4, e->3, i->1!, etc.
            obfuscated = word
            obfuscated = obfuscated.replace('a', '[a@4]')
            obfuscated = obfuscated.replace('e', '[e3]')
            obfuscated = obfuscated.replace('i', '[i1!]')
            obfuscated = obfuscated.replace('o', '[o0]')
            obfuscated = obfuscated.replace('s', '[s$5]')
            patterns.append(obfuscated)
        
        return re.compile('|'.join(patterns), re.IGNORECASE)
    
    def contains_profanity(self, text: str) -> bool:
        return bool(self.pattern.search(text))
    
    def censor(self, text: str) -> str:
        def replace(match):
            return '*' * len(match.group())
        return self.pattern.sub(replace, text)

Toxicity Detection

Using a simple keyword approach (for ML, use external APIs):

TOXIC_INDICATORS = {
    'high': ['kill', 'die', 'hate you', 'threat'],
    'medium': ['stupid', 'idiot', 'dumb', 'loser'],
    'low': ['annoying', 'boring', 'whatever']
}

def analyze_toxicity(text: str) -> dict:
    text_lower = text.lower()
    
    found = {'high': [], 'medium': [], 'low': []}
    
    for level, words in TOXIC_INDICATORS.items():
        for word in words:
            if word in text_lower:
                found[level].append(word)
    
    # Calculate score
    score = (
        len(found['high']) * 0.5 +
        len(found['medium']) * 0.2 +
        len(found['low']) * 0.05
    )
    
    return {
        'score': min(score, 1.0),
        'is_toxic': score > 0.3,
        'matches': found
    }

External Moderation APIs

For advanced content moderation:

# Perspective API (Google)
async def check_perspective(text: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"https://commentanalyzer.googleapis.com/v1alpha1/comments:analyze?key={API_KEY}",
            json={
                "comment": {"text": text},
                "requestedAttributes": {
                    "TOXICITY": {},
                    "SEVERE_TOXICITY": {},
                    "INSULT": {},
                    "THREAT": {}
                }
            }
        )
        
        scores = response.json()["attributeScores"]
        return {
            attr: data["summaryScore"]["value"]
            for attr, data in scores.items()
        }

User Banning

Ban System

class BanService:
    def __init__(self, db: AsyncSession, redis: Redis):
        self.db = db
        self.redis = redis
    
    async def ban_user(
        self,
        identifier: str,  # email, user_id, or IP
        ban_type: str,    # email, user, ip
        reason: str,
        duration: int = None,  # None = permanent
        admin_id: str = None
    ):
        ban = Ban(
            identifier=identifier,
            ban_type=ban_type,
            reason=reason,
            expires_at=datetime.utcnow() + timedelta(seconds=duration) if duration else None,
            created_by=admin_id
        )
        self.db.add(ban)
        await self.db.commit()
        
        # Cache for quick lookups
        cache_key = f"ban:{ban_type}:{identifier}"
        if duration:
            await self.redis.setex(cache_key, duration, "1")
        else:
            await self.redis.set(cache_key, "1")
    
    async def is_banned(self, email: str = None, user_id: str = None, ip: str = None) -> bool:
        checks = []
        if email:
            checks.append(("email", email))
        if user_id:
            checks.append(("user", user_id))
        if ip:
            checks.append(("ip", ip))
        
        for ban_type, identifier in checks:
            # Check cache first
            if await self.redis.exists(f"ban:{ban_type}:{identifier}"):
                return True
        
        return False
    
    async def unban(self, identifier: str, ban_type: str):
        await self.db.execute(
            delete(Ban)
            .where(Ban.identifier == identifier)
            .where(Ban.ban_type == ban_type)
        )
        await self.db.commit()
        await self.redis.delete(f"ban:{ban_type}:{identifier}")

Shadow Banning

User can still post, but no one else sees:

async def get_comments_for_page(page_id: str, viewer_ip: str = None):
    query = (
        select(Comment)
        .where(Comment.page_id == page_id)
        .where(Comment.status == CommentStatus.APPROVED)
    )
    
    comments = await db.execute(query)
    result = []
    
    for comment in comments.scalars():
        # Check if author is shadow banned
        is_shadow_banned = await ban_service.is_shadow_banned(comment.author_email)
        
        if is_shadow_banned:
            # Only show to the shadow banned user
            if comment.ip_address == viewer_ip:
                result.append(comment)
        else:
            result.append(comment)
    
    return result

Notification System

Moderator Alerts

class ModerationNotifier:
    def __init__(self, config: Config):
        self.slack_webhook = config.slack_webhook
        self.email_recipients = config.moderator_emails
    
    async def notify_pending(self, comment: Comment):
        """Notify moderators of new pending comment"""
        if await self._should_notify():
            await self._send_slack(
                f"🔔 New comment pending review\n"
                f"Author: {comment.author_name}\n"
                f"Content: {comment.content[:200]}...\n"
                f"<{ADMIN_URL}/moderation|Review>"
            )
    
    async def notify_flagged(self, comment: Comment, flag_count: int):
        """Notify when comment reaches flag threshold"""
        await self._send_slack(
            f"🚩 Comment flagged {flag_count} times\n"
            f"Author: {comment.author_name}\n"
            f"Content: {comment.content[:200]}...\n"
            f"<{ADMIN_URL}/moderation|Review>"
        )
    
    async def _send_slack(self, message: str):
        async with httpx.AsyncClient() as client:
            await client.post(
                self.slack_webhook,
                json={"text": message}
            )
    
    async def _should_notify(self) -> bool:
        # Rate limit notifications
        key = "mod_notify_rate"
        count = await redis.incr(key)
        if count == 1:
            await redis.expire(key, 300)  # 5 minute window
        return count <= 10  # Max 10 notifications per 5 minutes

Audit Logging

Track all moderation actions:

class AuditLog:
    async def log_action(
        self,
        action: str,
        target_type: str,
        target_id: str,
        actor_id: str,
        details: dict = None
    ):
        entry = AuditEntry(
            action=action,
            target_type=target_type,
            target_id=target_id,
            actor_id=actor_id,
            details=details or {},
            ip_address=get_current_ip(),
            created_at=datetime.utcnow()
        )
        db.add(entry)
        await db.commit()

# Usage
await audit.log_action(
    action="comment.approved",
    target_type="comment",
    target_id=comment_id,
    actor_id=admin.id,
    details={"previous_status": "pending"}
)

Chapter Summary

Aspect Recommendation
Strategy Hybrid: auto-approve low risk, queue medium risk
Interface Simple admin panel with bulk actions
Filtering Profanity filter + toxicity scoring
Banning Support IP, email, and user-level bans
Auditing Log all moderation actions

In the next chapter, we’ll explore hosting and deployment options.


Navigation: