Spam is the biggest challenge for any comment system. This chapter covers a multi-layered defense strategy combining various techniques.
No single technique stops all spam. Layer multiple methods:
┌─────────────────────────────────────────────────────────────┐
│ SPAM DEFENSE LAYERS │
├─────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: Frontend │
│ └── Honeypots, timing checks, JS validation │
│ │
│ Layer 2: Rate Limiting │
│ └── IP-based, fingerprint-based, global limits │
│ │
│ Layer 3: Content Analysis │
│ └── Link detection, keyword filters, patterns │
│ │
│ Layer 4: CAPTCHA │
│ └── Invisible reCAPTCHA, hCaptcha, Turnstile │
│ │
│ Layer 5: Reputation Systems │
│ └── IP reputation, email reputation, user history │
│ │
│ Layer 6: Machine Learning │
│ └── Trained models, Akismet, CleanTalk │
│ │
│ Layer 7: Human Moderation │
│ └── Queue review, community flagging │
│ │
└─────────────────────────────────────────────────────────────┘
Invisible fields that humans ignore but bots fill:
<!-- Hidden field that should remain empty -->
<form id="comment-form">
<input type="text" name="author" required>
<input type="email" name="email">
<!-- Honeypot - hidden with CSS, not display:none -->
<div class="hp-field" aria-hidden="true">
<label>Leave empty</label>
<input type="text" name="website" tabindex="-1" autocomplete="off">
</div>
<textarea name="content" required></textarea>
</form>
<style>
.hp-field {
position: absolute;
left: -9999px;
opacity: 0;
height: 0;
overflow: hidden;
}
</style>
def check_honeypot(data: dict) -> bool:
"""Returns True if honeypot triggered (spam detected)"""
honeypot_fields = ['website', 'url', 'phone', 'company']
for field in honeypot_fields:
if data.get(field):
return True
return False
Bots submit too fast:
// Frontend - track form load time
const formLoadTime = Date.now();
document.getElementById('comment-form').addEventListener('submit', function(e) {
const submitTime = Date.now();
const elapsed = (submitTime - formLoadTime) / 1000;
// Add timing to form
this.querySelector('[name="time_elapsed"]').value = elapsed;
});
# Backend validation
def check_timing(elapsed_seconds: float) -> tuple[bool, str]:
"""Returns (is_spam, reason)"""
if elapsed_seconds < 3:
return True, "Submitted too quickly"
if elapsed_seconds > 86400: # 24 hours
return True, "Form expired"
return False, None
class CommentRateLimiter:
def __init__(self, redis: Redis):
self.redis = redis
async def check_ip(self, ip: str) -> tuple[bool, str]:
key = f"rate:ip:{ip}"
# Allow 5 comments per 10 minutes per IP
count = await self.redis.incr(key)
if count == 1:
await self.redis.expire(key, 600)
if count > 5:
return False, "Too many comments. Please wait."
return True, None
async def check_page(self, page_id: str) -> tuple[bool, str]:
key = f"rate:page:{page_id}"
# Allow 50 comments per hour per page
count = await self.redis.incr(key)
if count == 1:
await self.redis.expire(key, 3600)
if count > 50:
return False, "This page is receiving too many comments."
return True, None
More accurate than fixed windows:
async def sliding_window_limit(
redis: Redis,
key: str,
limit: int,
window: int
) -> bool:
now = time.time()
window_start = now - window
pipe = redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, window)
results = await pipe.execute()
current_count = results[1]
return current_count < limit
Most spam includes links:
import re
def analyze_links(content: str) -> dict:
# Find all URLs
url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+'
urls = re.findall(url_pattern, content)
# Extract domains
domains = set()
for url in urls:
match = re.search(r'(?:https?://)?(?:www\.)?([^/]+)', url)
if match:
domains.add(match.group(1).lower())
return {
'url_count': len(urls),
'unique_domains': len(domains),
'domains': list(domains),
'has_shortened': any(d in SHORTENER_DOMAINS for d in domains)
}
SHORTENER_DOMAINS = {'bit.ly', 'tinyurl.com', 't.co', 'goo.gl', 'ow.ly'}
def content_spam_score(content: str) -> float:
"""Returns spam score 0-1"""
score = 0.0
analysis = analyze_links(content)
# Too many links
if analysis['url_count'] > 3:
score += 0.3
# URL shorteners are suspicious
if analysis['has_shortened']:
score += 0.4
# All caps
if content.isupper():
score += 0.2
# Very short content
if len(content.strip()) < 10:
score += 0.1
return min(score, 1.0)
SPAM_KEYWORDS = [
'buy now', 'click here', 'limited time', 'act now',
'free money', 'work from home', 'casino', 'viagra',
'cryptocurrency investment', 'double your'
]
SPAM_PATTERNS = [
r'\$\d+(?:,\d{3})*(?:\.\d{2})?(?:\s*(?:per|a)\s*(?:day|week|month))', # "$500 per day"
r'(?:call|text|whatsapp)\s*(?:me\s*)?(?:at\s*)?\d{10,}', # Phone numbers
r'(?:https?://)?(?:t\.me|telegram\.me)/\w+', # Telegram links
]
def check_keywords(content: str) -> tuple[bool, list]:
content_lower = content.lower()
found = []
for keyword in SPAM_KEYWORDS:
if keyword in content_lower:
found.append(keyword)
for pattern in SPAM_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
found.append(f"pattern:{pattern[:20]}...")
return len(found) > 0, found
Free, privacy-friendly alternative:
<!-- Frontend -->
<script src="https://challenges.cloudflare.com/turnstile/v0/api.js" async defer></script>
<form id="comment-form">
<!-- Turnstile widget -->
<div class="cf-turnstile"
data-sitekey="YOUR_SITE_KEY"
data-callback="onTurnstileSuccess"></div>
<input type="hidden" name="cf_turnstile_response" id="turnstile-response">
</form>
<script>
function onTurnstileSuccess(token) {
document.getElementById('turnstile-response').value = token;
}
</script>
# Backend verification
async def verify_turnstile(token: str, ip: str) -> bool:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data={
"secret": TURNSTILE_SECRET_KEY,
"response": token,
"remoteip": ip
}
)
result = response.json()
return result.get("success", False)
async def verify_hcaptcha(token: str) -> bool:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://hcaptcha.com/siteverify",
data={
"secret": HCAPTCHA_SECRET,
"response": token
}
)
return response.json().get("success", False)
Only show CAPTCHA when suspicious:
def should_require_captcha(request_data: dict, ip: str) -> bool:
"""Determine if this submission needs CAPTCHA verification"""
# Always require for anonymous
if not request_data.get('user_id'):
spam_score = content_spam_score(request_data['content'])
if spam_score > 0.3:
return True
# Check IP reputation
if is_suspicious_ip(ip):
return True
# Check submission velocity
recent_comments = get_recent_comment_count(ip, minutes=10)
if recent_comments > 2:
return True
return False
class IPReputation:
def __init__(self, redis: Redis):
self.redis = redis
async def get_score(self, ip: str) -> int:
"""Get IP reputation score (-100 to 100)"""
return int(await self.redis.get(f"ip_rep:{ip}") or 0)
async def adjust_score(self, ip: str, delta: int):
"""Adjust score based on behavior"""
key = f"ip_rep:{ip}"
score = await self.redis.incrby(key, delta)
# Clamp to range
if score > 100:
await self.redis.set(key, 100)
elif score < -100:
await self.redis.set(key, -100)
# Expire after 30 days of inactivity
await self.redis.expire(key, 86400 * 30)
async def record_good_comment(self, ip: str):
await self.adjust_score(ip, 5)
async def record_spam(self, ip: str):
await self.adjust_score(ip, -20)
async def record_approved(self, ip: str):
await self.adjust_score(ip, 10)
async def check_email_reputation(email: str) -> dict:
domain = email.split('@')[1]
result = {
'disposable': domain in DISPOSABLE_DOMAINS,
'free_provider': domain in FREE_EMAIL_PROVIDERS,
'known_spammer': await is_known_spammer_email(email),
'domain_age_days': await get_domain_age(domain)
}
# Calculate score
score = 100
if result['disposable']:
score -= 50
if result['known_spammer']:
score = 0
if result['domain_age_days'] and result['domain_age_days'] < 30:
score -= 20
result['score'] = max(0, score)
return result
DISPOSABLE_DOMAINS = {
'tempmail.com', 'guerrillamail.com', 'mailinator.com',
'10minutemail.com', 'throwaway.email', 'temp-mail.org'
}
FREE_EMAIL_PROVIDERS = {
'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com',
'protonmail.com', 'icloud.com'
}
Industry standard for WordPress, works anywhere:
class AkismetChecker:
def __init__(self, api_key: str, blog_url: str):
self.api_key = api_key
self.blog_url = blog_url
self.base_url = f"https://{api_key}.rest.akismet.com/1.1"
async def check_comment(
self,
content: str,
author: str,
email: str,
ip: str,
user_agent: str,
page_url: str
) -> bool:
"""Returns True if spam"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/comment-check",
data={
"blog": self.blog_url,
"user_ip": ip,
"user_agent": user_agent,
"comment_type": "comment",
"comment_author": author,
"comment_author_email": email,
"comment_content": content,
"permalink": page_url
}
)
return response.text == "true"
async def submit_spam(self, comment_data: dict):
"""Report false negative"""
await self._submit("submit-spam", comment_data)
async def submit_ham(self, comment_data: dict):
"""Report false positive"""
await self._submit("submit-ham", comment_data)
Alternative to Akismet:
async def cleantalk_check(
email: str,
ip: str,
content: str
) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://moderate.cleantalk.org/api2.0",
json={
"method_name": "check_message",
"auth_key": CLEANTALK_API_KEY,
"sender_email": email,
"sender_ip": ip,
"message": content
}
)
result = response.json()
return {
"is_spam": result.get("allow") == 0,
"reason": result.get("comment")
}
Aggregate all signals:
class SpamDetector:
def __init__(self, redis: Redis, akismet: AkismetChecker):
self.redis = redis
self.akismet = akismet
self.ip_reputation = IPReputation(redis)
async def analyze(self, comment: CommentSubmission) -> SpamAnalysis:
scores = {}
# Honeypot check (immediate rejection)
if check_honeypot(comment.raw_data):
return SpamAnalysis(is_spam=True, confidence=1.0, reason="honeypot")
# Timing check
timing_spam, timing_reason = check_timing(comment.elapsed_seconds)
if timing_spam:
scores['timing'] = 1.0
# Content analysis
scores['content'] = content_spam_score(comment.content)
# IP reputation
ip_score = await self.ip_reputation.get_score(comment.ip)
scores['ip'] = max(0, -ip_score) / 100 # Convert to 0-1
# Email reputation
if comment.email:
email_rep = await check_email_reputation(comment.email)
scores['email'] = 1 - (email_rep['score'] / 100)
# Akismet (if enabled)
if self.akismet:
is_akismet_spam = await self.akismet.check_comment(
comment.content, comment.author,
comment.email, comment.ip,
comment.user_agent, comment.page_url
)
scores['akismet'] = 1.0 if is_akismet_spam else 0.0
# Calculate weighted score
weights = {
'timing': 0.15,
'content': 0.25,
'ip': 0.15,
'email': 0.15,
'akismet': 0.30
}
total_weight = sum(weights[k] for k in scores)
weighted_score = sum(scores[k] * weights[k] for k in scores) / total_weight
return SpamAnalysis(
is_spam=weighted_score > 0.5,
confidence=weighted_score,
scores=scores,
requires_moderation=0.3 < weighted_score < 0.7
)
| Technique | Effectiveness | Cost | Friction |
|---|---|---|---|
| Honeypots | Medium | Free | None |
| Rate Limiting | High | Free | Low |
| Content Analysis | Medium | Free | None |
| Turnstile/hCaptcha | High | Free | Low |
| Akismet | Very High | $5+/mo | None |
| IP Reputation | Medium | Free | None |
Best practice: Start with free methods (honeypots, rate limiting, content analysis), add CAPTCHA for suspicious cases, and consider Akismet for high-traffic sites.
Navigation: