Chapter 7 introduced the socket API with simple echo servers. Now it’s time to build production-quality servers that handle multiple clients simultaneously, manage errors gracefully, and implement real protocols.
This chapter covers three approaches to concurrent TCP servers (threading, forking, socketserver), UDP service patterns, and practical techniques for building robust network services.
Our echo server from Chapter 7 has a fatal flaw: it handles only one client at a time. While serving Client A, Client B must wait:
Client A connects → Server busy with A
Client B connects → Queued (blocked in backlog)
Client A finishes → Server starts handling B
For any real service, this is unacceptable. We need concurrency.
The most straightforward approach: spawn a new thread for each client connection.
import socket
import threading
def handle_client(conn: socket.socket, addr: tuple) -> None:
"""Handle a single client connection."""
print(f"[+] Connected: {addr}")
try:
while True:
data = conn.recv(4096)
if not data:
break
response = data.upper()
conn.sendall(response)
except ConnectionResetError:
pass
finally:
conn.close()
print(f"[-] Disconnected: {addr}")
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 9000))
s.listen(100)
print("Threaded server on port 9000")
while True:
conn, addr = s.accept()
t = threading.Thread(target=handle_client, args=(conn, addr))
t.daemon = True
t.start()
if __name__ == "__main__":
main()
Pros: Simple, each client gets its own thread, works well for moderate concurrency.
Cons: Thread creation overhead, GIL limits CPU parallelism, resource exhaustion with thousands of connections.
Full example: code/tcp_threaded_server.py
Instead of creating unlimited threads, use a thread pool to cap concurrency:
import socket
from concurrent.futures import ThreadPoolExecutor
def handle_client(conn: socket.socket, addr: tuple) -> None:
with conn:
print(f"[+] Connected: {addr}")
while True:
data = conn.recv(4096)
if not data:
break
conn.sendall(data.upper())
print(f"[-] Disconnected: {addr}")
def main():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 9000))
s.listen(100)
print("Thread pool server on port 9000")
with ThreadPoolExecutor(max_workers=50) as pool:
while True:
conn, addr = s.accept()
pool.submit(handle_client, conn, addr)
if __name__ == "__main__":
main()
This limits the server to 50 simultaneous clients. Additional clients queue until a thread becomes available.
Python’s socketserver module provides a higher-level framework for building servers:
import socketserver
class EchoHandler(socketserver.StreamRequestHandler):
def handle(self):
print(f"[+] Connected: {self.client_address}")
while True:
line = self.rfile.readline()
if not line:
break
self.wfile.write(line.upper())
print(f"[-] Disconnected: {self.client_address}")
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
if __name__ == "__main__":
with ThreadedTCPServer(("0.0.0.0", 9000), EchoHandler) as server:
print("socketserver on port 9000")
server.serve_forever()
StreamRequestHandler provides self.rfile and self.wfile — file-like objects for reading and writing, which simplify line-oriented protocols.
On Unix systems, you can use ForkingMixIn to spawn a child process per client:
class ForkingTCPServer(socketserver.ForkingMixIn, socketserver.TCPServer):
allow_reuse_address = True
max_children = 50
Forking provides true parallelism (no GIL limitation) but has higher memory overhead since each child is a full process copy.
Real servers implement protocols — structured rules for communication. Let’s build a simple key-value store server with a text protocol:
Commands:
SET key value → OK
GET key → value or NOT_FOUND
DEL key → DELETED or NOT_FOUND
QUIT → (closes connection)
import socket
import threading
store = {}
lock = threading.Lock()
def handle_client(conn, addr):
with conn:
conn.sendall(b"READY\n")
buf = b""
while True:
data = conn.recv(4096)
if not data:
break
buf += data
while b"\n" in buf:
line, buf = buf.split(b"\n", 1)
response = process_command(line.decode().strip())
conn.sendall(response.encode() + b"\n")
if response == "BYE":
return
def process_command(cmd: str) -> str:
parts = cmd.split(None, 2)
if not parts:
return "ERROR: empty command"
op = parts[0].upper()
if op == "SET" and len(parts) == 3:
with lock:
store[parts[1]] = parts[2]
return "OK"
elif op == "GET" and len(parts) == 2:
with lock:
return store.get(parts[1], "NOT_FOUND")
elif op == "DEL" and len(parts) == 2:
with lock:
return "DELETED" if store.pop(parts[1], None) is not None else "NOT_FOUND"
elif op == "QUIT":
return "BYE"
return "ERROR: unknown command"
Full example: code/kv_store_server.py
Most UDP services follow a stateless request-response pattern:
import socket
import json
def handle_request(data: bytes) -> bytes:
"""Process a JSON request and return a JSON response."""
try:
request = json.loads(data)
if request.get("type") == "time":
import datetime
return json.dumps({"time": str(datetime.datetime.now())}).encode()
return json.dumps({"error": "unknown request"}).encode()
except json.JSONDecodeError:
return json.dumps({"error": "invalid JSON"}).encode()
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind(("0.0.0.0", 9001))
print("UDP service on port 9001")
while True:
data, addr = s.recvfrom(4096)
response = handle_request(data)
s.sendto(response, addr)
Multicast delivers messages to a group of interested receivers simultaneously:
import socket
import struct
MCAST_GROUP = "239.1.1.1"
MCAST_PORT = 5007
# Sender
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
s.sendto(b"Hello, multicast group!", (MCAST_GROUP, MCAST_PORT))
# Receiver
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", MCAST_PORT))
group = socket.inet_aton(MCAST_GROUP)
mreq = struct.pack("4sL", group, socket.INADDR_ANY)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
data, addr = s.recvfrom(4096)
print(f"Received: {data.decode()} from {addr}")
Full example: code/udp_multicast.py
Production servers need to shut down cleanly — finishing active requests, closing connections, and releasing resources:
import signal
import socket
import threading
shutdown_event = threading.Event()
def signal_handler(signum, frame):
print("\nShutting down...")
shutdown_event.set()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("0.0.0.0", 9000))
s.listen(100)
s.settimeout(1.0) # Allow periodic shutdown checks
print("Server running. Press Ctrl+C to stop.")
while not shutdown_event.is_set():
try:
conn, addr = s.accept()
t = threading.Thread(target=handle_client, args=(conn, addr))
t.daemon = True
t.start()
except socket.timeout:
continue
print("Server stopped.")
Robust servers must handle various failure modes:
def handle_client(conn: socket.socket, addr: tuple) -> None:
try:
with conn:
conn.settimeout(30.0) # Idle timeout
while True:
data = conn.recv(4096)
if not data:
break # Client disconnected cleanly
conn.sendall(process(data))
except socket.timeout:
print(f"Client {addr} timed out")
except ConnectionResetError:
print(f"Client {addr} reset connection")
except BrokenPipeError:
print(f"Client {addr} pipe broken")
except OSError as e:
print(f"OS error with {addr}: {e}")
Use Python’s logging module instead of print() for production servers:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("server.log"),
]
)
logger = logging.getLogger(__name__)
# Usage
logger.info("Client connected: %s", addr)
logger.warning("Client %s timed out", addr)
logger.error("Error handling %s: %s", addr, e)
ThreadPoolExecutor) cap resource usage while handling many clientssocketserver module provides ThreadingMixIn and ForkingMixIn for quick server developmentlogging instead of print() for production server diagnostics| ← Previous: Socket Programming Fundamentals | Table of Contents | Next: Asynchronous Networking and Protocols → |