Chapter 7: Spam Prevention and Detection

Spam is the biggest challenge for any comment system. This chapter covers a multi-layered defense strategy combining various techniques.

Defense in Depth

No single technique stops all spam. Layer multiple methods:

┌─────────────────────────────────────────────────────────────┐
│                    SPAM DEFENSE LAYERS                       │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  Layer 1: Frontend                                           │
│  └── Honeypots, timing checks, JS validation                 │
│                                                              │
│  Layer 2: Rate Limiting                                      │
│  └── IP-based, fingerprint-based, global limits              │
│                                                              │
│  Layer 3: Content Analysis                                   │
│  └── Link detection, keyword filters, patterns               │
│                                                              │
│  Layer 4: CAPTCHA                                            │
│  └── Invisible reCAPTCHA, hCaptcha, Turnstile                │
│                                                              │
│  Layer 5: Reputation Systems                                 │
│  └── IP reputation, email reputation, user history           │
│                                                              │
│  Layer 6: Machine Learning                                   │
│  └── Trained models, Akismet, CleanTalk                      │
│                                                              │
│  Layer 7: Human Moderation                                   │
│  └── Queue review, community flagging                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Honeypot Fields

Invisible fields that humans ignore but bots fill:

Implementation

<!-- Hidden field that should remain empty -->
<form id="comment-form">
    <input type="text" name="author" required>
    <input type="email" name="email">
    
    <!-- Honeypot - hidden with CSS, not display:none -->
    <div class="hp-field" aria-hidden="true">
        <label>Leave empty</label>
        <input type="text" name="website" tabindex="-1" autocomplete="off">
    </div>
    
    <textarea name="content" required></textarea>
</form>

<style>
.hp-field {
    position: absolute;
    left: -9999px;
    opacity: 0;
    height: 0;
    overflow: hidden;
}
</style>

Backend Validation

def check_honeypot(data: dict) -> bool:
    """Returns True if honeypot triggered (spam detected)"""
    honeypot_fields = ['website', 'url', 'phone', 'company']
    
    for field in honeypot_fields:
        if data.get(field):
            return True
    return False

Timing Analysis

Bots submit too fast:

// Frontend - track form load time
const formLoadTime = Date.now();

document.getElementById('comment-form').addEventListener('submit', function(e) {
    const submitTime = Date.now();
    const elapsed = (submitTime - formLoadTime) / 1000;
    
    // Add timing to form
    this.querySelector('[name="time_elapsed"]').value = elapsed;
});
# Backend validation
def check_timing(elapsed_seconds: float) -> tuple[bool, str]:
    """Returns (is_spam, reason)"""
    if elapsed_seconds < 3:
        return True, "Submitted too quickly"
    if elapsed_seconds > 86400:  # 24 hours
        return True, "Form expired"
    return False, None

Rate Limiting

IP-Based Limits

class CommentRateLimiter:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def check_ip(self, ip: str) -> tuple[bool, str]:
        key = f"rate:ip:{ip}"
        
        # Allow 5 comments per 10 minutes per IP
        count = await self.redis.incr(key)
        if count == 1:
            await self.redis.expire(key, 600)
        
        if count > 5:
            return False, "Too many comments. Please wait."
        return True, None
    
    async def check_page(self, page_id: str) -> tuple[bool, str]:
        key = f"rate:page:{page_id}"
        
        # Allow 50 comments per hour per page
        count = await self.redis.incr(key)
        if count == 1:
            await self.redis.expire(key, 3600)
        
        if count > 50:
            return False, "This page is receiving too many comments."
        return True, None

Sliding Window Rate Limiter

More accurate than fixed windows:

async def sliding_window_limit(
    redis: Redis,
    key: str,
    limit: int,
    window: int
) -> bool:
    now = time.time()
    window_start = now - window
    
    pipe = redis.pipeline()
    pipe.zremrangebyscore(key, 0, window_start)
    pipe.zcard(key)
    pipe.zadd(key, {str(now): now})
    pipe.expire(key, window)
    
    results = await pipe.execute()
    current_count = results[1]
    
    return current_count < limit

Content Analysis

Most spam includes links:

import re

def analyze_links(content: str) -> dict:
    # Find all URLs
    url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
    urls = re.findall(url_pattern, content)
    
    # Extract domains
    domains = set()
    for url in urls:
        match = re.search(r'(?:https?://)?(?:www\.)?([^/]+)', url)
        if match:
            domains.add(match.group(1).lower())
    
    return {
        'url_count': len(urls),
        'unique_domains': len(domains),
        'domains': list(domains),
        'has_shortened': any(d in SHORTENER_DOMAINS for d in domains)
    }

SHORTENER_DOMAINS = {'bit.ly', 'tinyurl.com', 't.co', 'goo.gl', 'ow.ly'}

def content_spam_score(content: str) -> float:
    """Returns spam score 0-1"""
    score = 0.0
    analysis = analyze_links(content)
    
    # Too many links
    if analysis['url_count'] > 3:
        score += 0.3
    
    # URL shorteners are suspicious
    if analysis['has_shortened']:
        score += 0.4
    
    # All caps
    if content.isupper():
        score += 0.2
    
    # Very short content
    if len(content.strip()) < 10:
        score += 0.1
    
    return min(score, 1.0)

Keyword Filtering

SPAM_KEYWORDS = [
    'buy now', 'click here', 'limited time', 'act now',
    'free money', 'work from home', 'casino', 'viagra',
    'cryptocurrency investment', 'double your'
]

SPAM_PATTERNS = [
    r'\$\d+(?:,\d{3})*(?:\.\d{2})?(?:\s*(?:per|a)\s*(?:day|week|month))',  # "$500 per day"
    r'(?:call|text|whatsapp)\s*(?:me\s*)?(?:at\s*)?\d{10,}',  # Phone numbers
    r'(?:https?://)?(?:t\.me|telegram\.me)/\w+',  # Telegram links
]

def check_keywords(content: str) -> tuple[bool, list]:
    content_lower = content.lower()
    found = []
    
    for keyword in SPAM_KEYWORDS:
        if keyword in content_lower:
            found.append(keyword)
    
    for pattern in SPAM_PATTERNS:
        if re.search(pattern, content, re.IGNORECASE):
            found.append(f"pattern:{pattern[:20]}...")
    
    return len(found) > 0, found

CAPTCHA Integration

Cloudflare Turnstile

Free, privacy-friendly alternative:

<!-- Frontend -->
<script src="https://challenges.cloudflare.com/turnstile/v0/api.js" async defer></script>

<form id="comment-form">
    <!-- Turnstile widget -->
    <div class="cf-turnstile" 
         data-sitekey="YOUR_SITE_KEY"
         data-callback="onTurnstileSuccess"></div>
    
    <input type="hidden" name="cf_turnstile_response" id="turnstile-response">
</form>

<script>
function onTurnstileSuccess(token) {
    document.getElementById('turnstile-response').value = token;
}
</script>
# Backend verification
async def verify_turnstile(token: str, ip: str) -> bool:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://challenges.cloudflare.com/turnstile/v0/siteverify",
            data={
                "secret": TURNSTILE_SECRET_KEY,
                "response": token,
                "remoteip": ip
            }
        )
        result = response.json()
        return result.get("success", False)

hCaptcha

async def verify_hcaptcha(token: str) -> bool:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://hcaptcha.com/siteverify",
            data={
                "secret": HCAPTCHA_SECRET,
                "response": token
            }
        )
        return response.json().get("success", False)

Invisible CAPTCHA Strategy

Only show CAPTCHA when suspicious:

def should_require_captcha(request_data: dict, ip: str) -> bool:
    """Determine if this submission needs CAPTCHA verification"""
    
    # Always require for anonymous
    if not request_data.get('user_id'):
        spam_score = content_spam_score(request_data['content'])
        if spam_score > 0.3:
            return True
    
    # Check IP reputation
    if is_suspicious_ip(ip):
        return True
    
    # Check submission velocity
    recent_comments = get_recent_comment_count(ip, minutes=10)
    if recent_comments > 2:
        return True
    
    return False

Reputation Systems

IP Reputation

class IPReputation:
    def __init__(self, redis: Redis):
        self.redis = redis
    
    async def get_score(self, ip: str) -> int:
        """Get IP reputation score (-100 to 100)"""
        return int(await self.redis.get(f"ip_rep:{ip}") or 0)
    
    async def adjust_score(self, ip: str, delta: int):
        """Adjust score based on behavior"""
        key = f"ip_rep:{ip}"
        score = await self.redis.incrby(key, delta)
        
        # Clamp to range
        if score > 100:
            await self.redis.set(key, 100)
        elif score < -100:
            await self.redis.set(key, -100)
        
        # Expire after 30 days of inactivity
        await self.redis.expire(key, 86400 * 30)
    
    async def record_good_comment(self, ip: str):
        await self.adjust_score(ip, 5)
    
    async def record_spam(self, ip: str):
        await self.adjust_score(ip, -20)
    
    async def record_approved(self, ip: str):
        await self.adjust_score(ip, 10)

Email Reputation

async def check_email_reputation(email: str) -> dict:
    domain = email.split('@')[1]
    
    result = {
        'disposable': domain in DISPOSABLE_DOMAINS,
        'free_provider': domain in FREE_EMAIL_PROVIDERS,
        'known_spammer': await is_known_spammer_email(email),
        'domain_age_days': await get_domain_age(domain)
    }
    
    # Calculate score
    score = 100
    if result['disposable']:
        score -= 50
    if result['known_spammer']:
        score = 0
    if result['domain_age_days'] and result['domain_age_days'] < 30:
        score -= 20
    
    result['score'] = max(0, score)
    return result

DISPOSABLE_DOMAINS = {
    'tempmail.com', 'guerrillamail.com', 'mailinator.com',
    '10minutemail.com', 'throwaway.email', 'temp-mail.org'
}

FREE_EMAIL_PROVIDERS = {
    'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com',
    'protonmail.com', 'icloud.com'
}

Third-Party Spam Services

Akismet Integration

Industry standard for WordPress, works anywhere:

class AkismetChecker:
    def __init__(self, api_key: str, blog_url: str):
        self.api_key = api_key
        self.blog_url = blog_url
        self.base_url = f"https://{api_key}.rest.akismet.com/1.1"
    
    async def check_comment(
        self,
        content: str,
        author: str,
        email: str,
        ip: str,
        user_agent: str,
        page_url: str
    ) -> bool:
        """Returns True if spam"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/comment-check",
                data={
                    "blog": self.blog_url,
                    "user_ip": ip,
                    "user_agent": user_agent,
                    "comment_type": "comment",
                    "comment_author": author,
                    "comment_author_email": email,
                    "comment_content": content,
                    "permalink": page_url
                }
            )
            return response.text == "true"
    
    async def submit_spam(self, comment_data: dict):
        """Report false negative"""
        await self._submit("submit-spam", comment_data)
    
    async def submit_ham(self, comment_data: dict):
        """Report false positive"""
        await self._submit("submit-ham", comment_data)

CleanTalk

Alternative to Akismet:

async def cleantalk_check(
    email: str,
    ip: str,
    content: str
) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://moderate.cleantalk.org/api2.0",
            json={
                "method_name": "check_message",
                "auth_key": CLEANTALK_API_KEY,
                "sender_email": email,
                "sender_ip": ip,
                "message": content
            }
        )
        result = response.json()
        
        return {
            "is_spam": result.get("allow") == 0,
            "reason": result.get("comment")
        }

Combined Spam Scoring

Aggregate all signals:

class SpamDetector:
    def __init__(self, redis: Redis, akismet: AkismetChecker):
        self.redis = redis
        self.akismet = akismet
        self.ip_reputation = IPReputation(redis)
    
    async def analyze(self, comment: CommentSubmission) -> SpamAnalysis:
        scores = {}
        
        # Honeypot check (immediate rejection)
        if check_honeypot(comment.raw_data):
            return SpamAnalysis(is_spam=True, confidence=1.0, reason="honeypot")
        
        # Timing check
        timing_spam, timing_reason = check_timing(comment.elapsed_seconds)
        if timing_spam:
            scores['timing'] = 1.0
        
        # Content analysis
        scores['content'] = content_spam_score(comment.content)
        
        # IP reputation
        ip_score = await self.ip_reputation.get_score(comment.ip)
        scores['ip'] = max(0, -ip_score) / 100  # Convert to 0-1
        
        # Email reputation
        if comment.email:
            email_rep = await check_email_reputation(comment.email)
            scores['email'] = 1 - (email_rep['score'] / 100)
        
        # Akismet (if enabled)
        if self.akismet:
            is_akismet_spam = await self.akismet.check_comment(
                comment.content, comment.author,
                comment.email, comment.ip,
                comment.user_agent, comment.page_url
            )
            scores['akismet'] = 1.0 if is_akismet_spam else 0.0
        
        # Calculate weighted score
        weights = {
            'timing': 0.15,
            'content': 0.25,
            'ip': 0.15,
            'email': 0.15,
            'akismet': 0.30
        }
        
        total_weight = sum(weights[k] for k in scores)
        weighted_score = sum(scores[k] * weights[k] for k in scores) / total_weight
        
        return SpamAnalysis(
            is_spam=weighted_score > 0.5,
            confidence=weighted_score,
            scores=scores,
            requires_moderation=0.3 < weighted_score < 0.7
        )

Chapter Summary

Technique Effectiveness Cost Friction
Honeypots Medium Free None
Rate Limiting High Free Low
Content Analysis Medium Free None
Turnstile/hCaptcha High Free Low
Akismet Very High $5+/mo None
IP Reputation Medium Free None

Best practice: Start with free methods (honeypots, rate limiting, content analysis), add CAPTCHA for suspicious cases, and consider Akismet for high-traffic sites.


Navigation: