With our database design in place, let’s build the API server that powers our comment system. We’ll cover multiple implementation approaches.
Standard comment API structure:
| Method | Endpoint | Description |
|---|---|---|
| GET | /api/comments?page_id=xxx |
List comments for a page |
| POST | /api/comments |
Create new comment |
| GET | /api/comments/:id |
Get single comment |
| PATCH | /api/comments/:id |
Update comment |
| DELETE | /api/comments/:id |
Delete comment |
| POST | /api/comments/:id/reactions |
Add reaction |
Consistent JSON responses:
{
"success": true,
"data": { ... },
"meta": {
"total": 42,
"page": 1,
"per_page": 20
}
}
Error responses:
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "Content is required",
"field": "content"
}
}
comment-api/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── database.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── comment.py
│ ├── routes/
│ │ ├── __init__.py
│ │ └── comments.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── comment_service.py
│ │ └── spam_service.py
│ └── utils/
│ ├── __init__.py
│ └── validation.py
├── requirements.txt
└── Dockerfile
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routes import comments
from app.database import engine
from app.models import Base
app = FastAPI(title="Comment API", version="1.0.0")
# CORS for static sites
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_methods=["GET", "POST", "PATCH", "DELETE"],
allow_headers=["*"],
)
app.include_router(comments.router, prefix="/api")
@app.on_event("startup")
async def startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# app/models/comment.py
from sqlalchemy import Column, String, Text, DateTime, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from datetime import datetime
import uuid
import enum
class CommentStatus(enum.Enum):
pending = "pending"
approved = "approved"
spam = "spam"
deleted = "deleted"
class Comment(Base):
__tablename__ = "comments"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
page_id = Column(String(255), nullable=False, index=True)
parent_id = Column(UUID(as_uuid=True), ForeignKey("comments.id"))
author_name = Column(String(100), nullable=False)
author_email = Column(String(255))
content = Column(Text, nullable=False)
status = Column(Enum(CommentStatus), default=CommentStatus.pending)
ip_address = Column(String(45))
user_agent = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, onupdate=datetime.utcnow)
replies = relationship("Comment", backref="parent", lazy="selectin")
# app/routes/comments.py
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.comment_service import CommentService
from app.schemas import CommentCreate, CommentResponse
from app.database import get_db
router = APIRouter(prefix="/comments", tags=["comments"])
@router.get("/")
async def list_comments(
page_id: str,
db: AsyncSession = Depends(get_db)
):
service = CommentService(db)
comments = await service.get_comments_for_page(page_id)
return {"success": True, "data": comments}
@router.post("/", status_code=201)
async def create_comment(
comment: CommentCreate,
request: Request,
db: AsyncSession = Depends(get_db)
):
service = CommentService(db)
result = await service.create_comment(
comment,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent")
)
return {"success": True, "data": result}
# app/services/comment_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.comment import Comment, CommentStatus
from app.services.spam_service import SpamService
class CommentService:
def __init__(self, db: AsyncSession):
self.db = db
self.spam_service = SpamService()
async def get_comments_for_page(self, page_id: str):
query = (
select(Comment)
.where(Comment.page_id == page_id)
.where(Comment.status == CommentStatus.approved)
.where(Comment.parent_id.is_(None))
.order_by(Comment.created_at.desc())
)
result = await self.db.execute(query)
return result.scalars().all()
async def create_comment(self, data, ip_address: str, user_agent: str):
# Check for spam
is_spam = await self.spam_service.check(
data.content, data.author_email, ip_address
)
comment = Comment(
page_id=data.page_id,
parent_id=data.parent_id,
author_name=data.author_name,
author_email=data.author_email,
content=data.content,
ip_address=ip_address,
user_agent=user_agent,
status=CommentStatus.spam if is_spam else CommentStatus.pending
)
self.db.add(comment)
await self.db.commit()
await self.db.refresh(comment)
return comment
// server.js
const express = require('express');
const cors = require('cors');
const helmet = require('helmet');
const rateLimit = require('express-rate-limit');
const { Pool } = require('pg');
const app = express();
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
// Middleware
app.use(helmet());
app.use(cors({ origin: process.env.ALLOWED_ORIGINS.split(',') }));
app.use(express.json());
app.use(rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100 // limit each IP to 100 requests per window
}));
// Routes
app.use('/api/comments', require('./routes/comments')(pool));
app.listen(3000);
// routes/comments.js
const express = require('express');
const router = express.Router();
module.exports = (pool) => {
// Get comments
router.get('/', async (req, res) => {
const { page_id } = req.query;
try {
const result = await pool.query(
`SELECT * FROM comments
WHERE page_id = $1 AND status = 'approved'
ORDER BY created_at DESC`,
[page_id]
);
res.json({ success: true, data: result.rows });
} catch (err) {
res.status(500).json({ success: false, error: err.message });
}
});
// Create comment
router.post('/', async (req, res) => {
const { page_id, author_name, author_email, content, parent_id } = req.body;
const ip = req.ip;
const userAgent = req.get('user-agent');
try {
const result = await pool.query(
`INSERT INTO comments
(page_id, parent_id, author_name, author_email, content, ip_address, user_agent)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING *`,
[page_id, parent_id, author_name, author_email, content, ip, userAgent]
);
res.status(201).json({ success: true, data: result.rows[0] });
} catch (err) {
res.status(500).json({ success: false, error: err.message });
}
});
return router;
};
// api/comments.js
import { createClient } from '@supabase/supabase-js';
const supabase = createClient(
process.env.SUPABASE_URL,
process.env.SUPABASE_SERVICE_KEY
);
export default async function handler(req, res) {
// CORS headers
res.setHeader('Access-Control-Allow-Origin', process.env.ALLOWED_ORIGIN);
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
if (req.method === 'OPTIONS') return res.status(200).end();
const { page_id } = req.query;
if (req.method === 'GET') {
const { data, error } = await supabase
.from('comments')
.select('*')
.eq('page_id', page_id)
.eq('status', 'approved')
.order('created_at', { ascending: false });
return res.json({ success: !error, data, error });
}
if (req.method === 'POST') {
const { data, error } = await supabase
.from('comments')
.insert({ ...req.body, page_id })
.select()
.single();
return res.status(201).json({ success: !error, data, error });
}
}
// worker.js
export default {
async fetch(request, env) {
const url = new URL(request.url);
if (url.pathname === '/api/comments') {
if (request.method === 'GET') {
return handleGet(url, env);
}
if (request.method === 'POST') {
return handlePost(request, env);
}
}
return new Response('Not Found', { status: 404 });
}
};
async function handleGet(url, env) {
const pageId = url.searchParams.get('page_id');
const { results } = await env.DB.prepare(
'SELECT * FROM comments WHERE page_id = ? AND status = ? ORDER BY created_at DESC'
).bind(pageId, 'approved').all();
return Response.json({ success: true, data: results });
}
async function handlePost(request, env) {
const body = await request.json();
const result = await env.DB.prepare(
'INSERT INTO comments (page_id, author_name, content) VALUES (?, ?, ?) RETURNING *'
).bind(body.page_id, body.author_name, body.content).first();
return Response.json({ success: true, data: result }, { status: 201 });
}
# app/schemas.py
from pydantic import BaseModel, EmailStr, validator
from typing import Optional
from uuid import UUID
import bleach
class CommentCreate(BaseModel):
page_id: str
parent_id: Optional[UUID] = None
author_name: str
author_email: Optional[EmailStr] = None
content: str
@validator('author_name')
def validate_name(cls, v):
v = v.strip()
if len(v) < 2 or len(v) > 100:
raise ValueError('Name must be 2-100 characters')
return v
@validator('content')
def validate_content(cls, v):
v = v.strip()
if len(v) < 1 or len(v) > 10000:
raise ValueError('Content must be 1-10000 characters')
# Sanitize HTML
return bleach.clean(v, tags=['p', 'br', 'strong', 'em', 'a'], strip=True)
# app/utils/sanitizer.py
import bleach
import re
ALLOWED_TAGS = ['p', 'br', 'strong', 'em', 'a', 'code', 'pre', 'ul', 'ol', 'li']
ALLOWED_ATTRS = {'a': ['href', 'title']}
def sanitize_comment(content: str) -> str:
# Remove dangerous content
cleaned = bleach.clean(
content,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRS,
strip=True
)
# Convert URLs to links
cleaned = bleach.linkify(cleaned)
return cleaned
def extract_mentions(content: str) -> list:
"""Extract @username mentions"""
pattern = r'@([a-zA-Z0-9_]+)'
return re.findall(pattern, content)
# app/utils/rate_limiter.py
import redis
import time
class RateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(self, key: str, limit: int, window: int) -> bool:
"""
Token bucket rate limiting
key: identifier (IP, user_id)
limit: max requests per window
window: time window in seconds
"""
current = time.time()
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, current - window)
# Count recent entries
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(current): current})
# Set expiry
pipe.expire(key, window)
results = pipe.execute()
request_count = results[1]
return request_count < limit
# app/middleware/rate_limit.py
from fastapi import Request, HTTPException
from app.utils.rate_limiter import RateLimiter
async def rate_limit_middleware(request: Request, call_next):
limiter = request.app.state.rate_limiter
client_ip = request.client.host
if not limiter.is_allowed(f"ip:{client_ip}", limit=30, window=60):
raise HTTPException(
status_code=429,
detail="Too many requests. Please try again later."
)
return await call_next(request)
# app/exceptions.py
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class CommentException(Exception):
def __init__(self, code: str, message: str, status_code: int = 400):
self.code = code
self.message = message
self.status_code = status_code
async def comment_exception_handler(request, exc: CommentException):
return JSONResponse(
status_code=exc.status_code,
content={
"success": False,
"error": {
"code": exc.code,
"message": exc.message
}
}
)
# Usage
raise CommentException("SPAM_DETECTED", "Your comment was flagged as spam")
| Component | Technology | Purpose |
|---|---|---|
| Framework | FastAPI / Express | API routing |
| Database | PostgreSQL / SQLite | Data storage |
| Validation | Pydantic / Joi | Input validation |
| Sanitization | Bleach / DOMPurify | XSS prevention |
| Rate Limiting | Redis / Memory | Abuse prevention |
In the next chapter, we’ll build the frontend components to integrate with this API.
Navigation: