Chapter 9: Asynchronous Networking and Protocols


The Limits of Threading

Threading works for moderate concurrency — a few hundred simultaneous connections. But threads have costs: memory overhead (~8 MB stack per thread on Linux), context-switching latency, and the complexity of locks and shared state. When you need to handle thousands or tens of thousands of concurrent connections, threading breaks down.

Asynchronous I/O solves this by running everything in a single thread, using an event loop that multiplexes between connections. Instead of blocking on recv(), your code says “call me back when data is available” and moves on to serve other clients.

Python’s asyncio module provides a mature, high-performance async networking framework.


The Event Loop Model

An event loop is a continuous cycle:

while running:
    1. Check which sockets are ready for I/O
    2. Execute callbacks/coroutines for ready sockets
    3. Process timers and scheduled callbacks
    4. Repeat

Under the hood, asyncio uses the operating system’s I/O multiplexing mechanisms: epoll (Linux), kqueue (macOS/BSD), or IOCP (Windows). These can monitor thousands of file descriptors efficiently.


asyncio Fundamentals

Coroutines and await

An async def function is a coroutine. The await keyword suspends it, letting the event loop run other coroutines:

import asyncio

async def say_hello(name: str, delay: float) -> None:
    await asyncio.sleep(delay)  # Non-blocking sleep
    print(f"Hello, {name}!")

async def main():
    # Run concurrently — both coroutines share the event loop
    await asyncio.gather(
        say_hello("Alice", 1.0),
        say_hello("Bob", 0.5),
    )

asyncio.run(main())
# Output: Hello, Bob! (after 0.5s), Hello, Alice! (after 1.0s)

Tasks

Tasks wrap coroutines and schedule them on the event loop:

async def main():
    task1 = asyncio.create_task(say_hello("Alice", 1.0))
    task2 = asyncio.create_task(say_hello("Bob", 0.5))
    # Both run concurrently
    await task1
    await task2

Async TCP Server with asyncio

Using Streams API

asyncio provides a high-level Streams API for TCP:

import asyncio

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info("peername")
    print(f"[+] Connected: {addr}")

    try:
        while True:
            data = await reader.read(4096)
            if not data:
                break
            writer.write(data.upper())
            await writer.drain()
    except ConnectionResetError:
        pass
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"[-] Disconnected: {addr}")

async def main():
    server = await asyncio.start_server(handle_client, "0.0.0.0", 9000)
    addr = server.sockets[0].getsockname()
    print(f"Async server on {addr}")

    async with server:
        await server.serve_forever()

asyncio.run(main())

This server handles thousands of concurrent connections in a single thread. Each await reader.read() suspends the coroutine, allowing the event loop to serve other clients.

Full example: code/async_tcp_server.py

Using Low-Level Protocol API

For maximum control, use the Protocol/Transport API:

import asyncio

class EchoProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.addr = transport.get_extra_info("peername")
        print(f"[+] Connected: {self.addr}")

    def data_received(self, data):
        self.transport.write(data.upper())

    def connection_lost(self, exc):
        print(f"[-] Disconnected: {self.addr}")

async def main():
    loop = asyncio.get_running_loop()
    server = await loop.create_server(EchoProtocol, "0.0.0.0", 9000)
    print("Protocol-based server on port 9000")
    async with server:
        await server.serve_forever()

asyncio.run(main())

Async UDP Server

import asyncio

class UDPEchoProtocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        print(f"From {addr}: {data.decode()}")
        self.transport.sendto(data.upper(), addr)

async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        UDPEchoProtocol,
        local_addr=("0.0.0.0", 9001)
    )
    print("Async UDP server on port 9001")
    try:
        await asyncio.sleep(3600)  # Run for an hour
    finally:
        transport.close()

asyncio.run(main())

Implementing a Simple Protocol

Let’s build an async chat server where clients can send messages to all connected users:

import asyncio
from typing import Set

clients: Set[asyncio.StreamWriter] = set()

async def broadcast(message: str, sender: asyncio.StreamWriter) -> None:
    """Send a message to all clients except the sender."""
    for client in list(clients):
        if client != sender:
            try:
                client.write(message.encode())
                await client.drain()
            except (ConnectionResetError, BrokenPipeError):
                clients.discard(client)

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info("peername")
    clients.add(writer)
    await broadcast(f"*** {addr} joined ***\n", writer)

    try:
        while True:
            data = await reader.readline()
            if not data:
                break
            message = f"{addr}: {data.decode()}"
            await broadcast(message, writer)
    finally:
        clients.discard(writer)
        await broadcast(f"*** {addr} left ***\n", writer)
        writer.close()
        await writer.wait_closed()

async def main():
    server = await asyncio.start_server(handle_client, "0.0.0.0", 9000)
    print("Chat server on port 9000")
    async with server:
        await server.serve_forever()

asyncio.run(main())

Full example: code/async_chat_server.py

Test with multiple terminals: nc localhost 9000


Building an Async DNS Resolver

Let’s implement a simple DNS query using raw sockets and the DNS wire format:

import asyncio
import struct

async def dns_query(domain: str, server: str = "8.8.8.8") -> list[str]:
    """Send a DNS A record query and parse the response."""
    # Build DNS query packet
    txn_id = 0x1234
    flags = 0x0100  # Standard query, recursion desired
    header = struct.pack("!HHHHHH", txn_id, flags, 1, 0, 0, 0)

    question = b""
    for label in domain.split("."):
        question += struct.pack("B", len(label)) + label.encode()
    question += b"\x00"  # Root label
    question += struct.pack("!HH", 1, 1)  # Type A, Class IN

    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        asyncio.DatagramProtocol,
        remote_addr=(server, 53)
    )

    transport.sendto(header + question)
    await asyncio.sleep(1)  # Wait for response
    transport.close()
    return []  # Parsing omitted for brevity — see full example

asyncio.run(dns_query("example.com"))

Full example with complete parsing: code/async_dns_client.py


Timeouts and Cancellation

Using asyncio.wait_for

async def fetch_with_timeout(host: str, port: int) -> bytes:
    try:
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(host, port),
            timeout=5.0
        )
        writer.write(b"GET / HTTP/1.1\r\nHost: {host}\r\n\r\n")
        data = await asyncio.wait_for(reader.read(4096), timeout=10.0)
        writer.close()
        await writer.wait_closed()
        return data
    except asyncio.TimeoutError:
        print(f"Timeout connecting to {host}:{port}")
        return b""

Cancelling Tasks

async def long_running_task():
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        print("Task was cancelled — cleaning up")
        raise  # Re-raise to propagate cancellation

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(3)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled successfully")

Async Context Managers and Patterns

Connection Pool Pattern

import asyncio

class AsyncConnectionPool:
    def __init__(self, host: str, port: int, size: int = 10):
        self.host = host
        self.port = port
        self.pool: asyncio.Queue = asyncio.Queue(maxsize=size)
        self.size = size

    async def initialize(self):
        for _ in range(self.size):
            reader, writer = await asyncio.open_connection(self.host, self.port)
            await self.pool.put((reader, writer))

    async def acquire(self):
        return await self.pool.get()

    async def release(self, reader, writer):
        await self.pool.put((reader, writer))

    async def close(self):
        while not self.pool.empty():
            reader, writer = await self.pool.get()
            writer.close()
            await writer.wait_closed()

Performance: Threads vs Async

Metric Threaded (50 threads) Async (single thread)
Concurrent connections ~50 ~10,000+
Memory per connection ~8 MB (stack) ~few KB (coroutine)
Context switching OS-level (expensive) User-level (cheap)
CPU-bound work Limited by GIL Blocks event loop
Complexity Locks, races Structured concurrency

Use async for I/O-bound servers with many connections. Use threads for simpler servers or when calling blocking libraries.


Key Takeaways


← Previous: Building TCP and UDP Servers Table of Contents Next: HTTP, APIs, and WebSockets →