Chapter 10: Webhooks and Real-Time Updates

Instant Notifications for Your Shop

Etsy’s webhook system allows you to receive real-time notifications when events occur in your shop. Instead of constantly polling the API, webhooks push data to you instantly, enabling faster responses to orders, messages, and other critical events.

Understanding Etsy Webhooks

Etsy supports webhooks for several event types:

Event Type Description
listing.created New listing published
listing.updated Listing modified
listing.deleted Listing removed
listing.activated Listing made active
listing.deactivated Listing deactivated
receipt.created New order received
receipt.updated Order status changed
shop.updated Shop settings changed

Webhook Handler Implementation

# etsy_client/webhooks.py
"""Webhook handling for Etsy events."""

import hmac
import hashlib
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime


@dataclass
class WebhookEvent:
    """Represents a webhook event from Etsy."""
    event_type: str
    resource_id: int
    shop_id: int
    timestamp: datetime
    payload: Dict[str, Any]
    
    @classmethod
    def from_payload(cls, data: Dict[str, Any]) -> 'WebhookEvent':
        """Create event from webhook payload."""
        return cls(
            event_type=data.get('event_type', ''),
            resource_id=data.get('resource_id', 0),
            shop_id=data.get('shop_id', 0),
            timestamp=datetime.fromtimestamp(data.get('timestamp', 0)),
            payload=data
        )


class WebhookVerifier:
    """Verify webhook signatures from Etsy."""
    
    def __init__(self, signing_secret: str):
        """
        Initialize verifier.
        
        Args:
            signing_secret: Etsy webhook signing secret
        """
        self.signing_secret = signing_secret
    
    def verify_signature(
        self,
        payload: bytes,
        signature: str,
        timestamp: str
    ) -> bool:
        """
        Verify webhook signature.
        
        Args:
            payload: Raw request body
            signature: X-Etsy-Signature header
            timestamp: X-Etsy-Timestamp header
            
        Returns:
            True if signature is valid
        """
        # Construct signed payload
        signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
        
        # Compute expected signature
        expected = hmac.new(
            self.signing_secret.encode('utf-8'),
            signed_payload.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        # Compare signatures
        return hmac.compare_digest(signature, expected)


class WebhookDispatcher:
    """Route webhook events to handlers."""
    
    def __init__(self):
        self._handlers: Dict[str, list[Callable]] = {}
    
    def register(
        self,
        event_type: str,
        handler: Callable[[WebhookEvent], None]
    ):
        """
        Register a handler for an event type.
        
        Args:
            event_type: Event type to handle (e.g., 'receipt.created')
            handler: Callback function
        """
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)
    
    def on(self, event_type: str):
        """
        Decorator to register an event handler.
        
        Usage:
            @dispatcher.on('receipt.created')
            def handle_new_order(event):
                pass
        """
        def decorator(func: Callable):
            self.register(event_type, func)
            return func
        return decorator
    
    def dispatch(self, event: WebhookEvent):
        """
        Dispatch event to registered handlers.
        
        Args:
            event: WebhookEvent to dispatch
        """
        handlers = self._handlers.get(event.event_type, [])
        handlers += self._handlers.get('*', [])  # Wildcard handlers
        
        for handler in handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Handler error: {e}")

Flask Webhook Server

# webhook_server/app.py
"""Flask server for receiving Etsy webhooks."""

import os
import json
from datetime import datetime
from flask import Flask, request, jsonify
import logging

from etsy_client.webhooks import WebhookVerifier, WebhookEvent, WebhookDispatcher

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize Flask app
app = Flask(__name__)

# Webhook configuration
WEBHOOK_SECRET = os.environ.get('ETSY_WEBHOOK_SECRET', '')
verifier = WebhookVerifier(WEBHOOK_SECRET)
dispatcher = WebhookDispatcher()


# --- Event Handlers ---

@dispatcher.on('receipt.created')
def handle_new_order(event: WebhookEvent):
    """Handle new order received."""
    logger.info(f"πŸ›’ New order received! Receipt ID: {event.resource_id}")
    
    # Send notification
    send_notification(
        title="New Etsy Order!",
        message=f"Order #{event.resource_id} received",
        priority="high"
    )
    
    # Trigger digital delivery
    process_digital_delivery(event.resource_id)


@dispatcher.on('receipt.updated')
def handle_order_update(event: WebhookEvent):
    """Handle order status change."""
    logger.info(f"πŸ“¦ Order updated: {event.resource_id}")


@dispatcher.on('listing.updated')
def handle_listing_update(event: WebhookEvent):
    """Handle listing modification."""
    logger.info(f"πŸ“ Listing updated: {event.resource_id}")


@dispatcher.on('listing.deactivated')
def handle_listing_deactivation(event: WebhookEvent):
    """Handle listing going inactive."""
    logger.warning(f"⚠️ Listing deactivated: {event.resource_id}")
    
    # Alert - listing may need attention
    send_notification(
        title="Listing Deactivated",
        message=f"Listing {event.resource_id} was deactivated",
        priority="medium"
    )


@dispatcher.on('*')
def log_all_events(event: WebhookEvent):
    """Log all events for debugging."""
    logger.debug(f"Event: {event.event_type} - Resource: {event.resource_id}")


# --- Helper Functions ---

def send_notification(title: str, message: str, priority: str = "normal"):
    """Send push notification."""
    # Implement your notification logic here
    # e.g., Pushover, Slack, email, etc.
    logger.info(f"Notification: {title} - {message}")


def process_digital_delivery(receipt_id: int):
    """Trigger digital product delivery."""
    # This would integrate with your delivery system
    logger.info(f"Processing digital delivery for receipt {receipt_id}")


# --- Routes ---

@app.route('/webhook', methods=['POST'])
def webhook_endpoint():
    """Main webhook endpoint."""
    
    # Get verification headers
    signature = request.headers.get('X-Etsy-Signature', '')
    timestamp = request.headers.get('X-Etsy-Timestamp', '')
    
    # Verify signature
    if WEBHOOK_SECRET and not verifier.verify_signature(
        request.data, signature, timestamp
    ):
        logger.warning("Invalid webhook signature")
        return jsonify({"error": "Invalid signature"}), 401
    
    # Parse event
    try:
        data = request.get_json()
        event = WebhookEvent.from_payload(data)
    except Exception as e:
        logger.error(f"Failed to parse webhook: {e}")
        return jsonify({"error": "Invalid payload"}), 400
    
    # Dispatch to handlers
    dispatcher.dispatch(event)
    
    # Always return 200 quickly
    return jsonify({"status": "received"}), 200


@app.route('/health', methods=['GET'])
def health_check():
    """Health check endpoint."""
    return jsonify({"status": "healthy", "timestamp": datetime.utcnow().isoformat()})


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

FastAPI Webhook Server

# webhook_server/app_fastapi.py
"""FastAPI server for receiving Etsy webhooks."""

import os
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from pydantic import BaseModel
import logging

from etsy_client.webhooks import WebhookVerifier, WebhookEvent, WebhookDispatcher

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI
app = FastAPI(title="Etsy Webhook Handler")

# Configuration
WEBHOOK_SECRET = os.environ.get('ETSY_WEBHOOK_SECRET', '')
verifier = WebhookVerifier(WEBHOOK_SECRET)
dispatcher = WebhookDispatcher()


class WebhookPayload(BaseModel):
    """Webhook payload model."""
    event_type: str
    resource_id: int
    shop_id: int
    timestamp: int
    # Add other fields as needed


# --- Event Handlers ---

def handle_new_order(event: WebhookEvent):
    """Process new order in background."""
    logger.info(f"πŸ›’ Processing order {event.resource_id}")
    # Add your order processing logic


def handle_listing_change(event: WebhookEvent):
    """Process listing change in background."""
    logger.info(f"πŸ“ Processing listing {event.resource_id}")
    # Add your listing sync logic


# Register handlers
dispatcher.register('receipt.created', handle_new_order)
dispatcher.register('listing.updated', handle_listing_change)
dispatcher.register('listing.created', handle_listing_change)


# --- Routes ---

@app.post("/webhook")
async def webhook_endpoint(
    request: Request,
    background_tasks: BackgroundTasks
):
    """
    Handle incoming webhooks from Etsy.
    
    Processes webhooks asynchronously for faster response.
    """
    # Get raw body and headers
    body = await request.body()
    signature = request.headers.get('X-Etsy-Signature', '')
    timestamp = request.headers.get('X-Etsy-Timestamp', '')
    
    # Verify signature
    if WEBHOOK_SECRET and not verifier.verify_signature(body, signature, timestamp):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Parse payload
    try:
        data = await request.json()
        event = WebhookEvent.from_payload(data)
    except Exception as e:
        logger.error(f"Parse error: {e}")
        raise HTTPException(status_code=400, detail="Invalid payload")
    
    # Process in background for fast response
    background_tasks.add_task(dispatcher.dispatch, event)
    
    return {"status": "accepted"}


@app.get("/health")
async def health():
    """Health check."""
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat()
    }


@app.get("/")
async def root():
    """Root endpoint."""
    return {"service": "Etsy Webhook Handler", "version": "1.0.0"}

Registering Webhooks with Etsy

# etsy_client/webhook_management.py
"""Manage webhook subscriptions with Etsy API."""

from typing import Dict, Any, List, Optional
from .client import EtsyClient


class WebhookManager:
    """
    Manage Etsy webhook subscriptions.
    
    Note: Webhook registration is done through Etsy's developer portal
    for most apps. This class provides utilities for webhook-related
    operations.
    """
    
    def __init__(self, client: EtsyClient, shop_id: int):
        self.client = client
        self.shop_id = shop_id
    
    def get_shop_push_configurations(self) -> List[Dict[str, Any]]:
        """
        Get push notification configurations for shop.
        
        Returns:
            List of push configurations
        """
        response = self.client.get(
            f"/application/shops/{self.shop_id}/push-configurations"
        )
        return response.get('results', [])
    
    def create_push_configuration(
        self,
        url: str,
        events: List[str]
    ) -> Dict[str, Any]:
        """
        Create a new push configuration.
        
        Args:
            url: Webhook endpoint URL
            events: List of event types to subscribe to
            
        Returns:
            Created configuration
        """
        return self.client.post(
            f"/application/shops/{self.shop_id}/push-configurations",
            data={
                "url": url,
                "events": events
            }
        )
    
    def delete_push_configuration(self, config_id: int) -> bool:
        """
        Delete a push configuration.
        
        Args:
            config_id: Configuration ID to delete
            
        Returns:
            True if successful
        """
        self.client.delete(
            f"/application/shops/{self.shop_id}/push-configurations/{config_id}"
        )
        return True


# scripts/setup_webhooks.py
def setup_webhooks():
    """Set up webhook subscriptions."""
    from config import Config
    from etsy_client import EtsyClient
    from etsy_client.users import get_my_shop_id
    
    client = EtsyClient(
        api_key=Config.ETSY_API_KEY,
        access_token=Config.ETSY_ACCESS_TOKEN
    )
    
    shop_id = get_my_shop_id(client)
    manager = WebhookManager(client, shop_id)
    
    # Check existing configurations
    configs = manager.get_shop_push_configurations()
    print(f"Existing configurations: {len(configs)}")
    
    for config in configs:
        print(f"  - URL: {config.get('url')}")
        print(f"    Events: {config.get('events')}")

Local Development with ngrok

For local testing, use ngrok to expose your webhook server:

# Start your webhook server
python webhook_server/app.py

# In another terminal, expose with ngrok
ngrok http 5000
# scripts/dev_tunnel.py
"""Helper for local webhook development."""

import subprocess
import json
import time
import requests


def start_ngrok_tunnel(port: int = 5000) -> str:
    """
    Start ngrok tunnel and return public URL.
    
    Args:
        port: Local port to tunnel
        
    Returns:
        Public ngrok URL
    """
    # Start ngrok in background
    process = subprocess.Popen(
        ['ngrok', 'http', str(port), '--log=stdout'],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )
    
    # Wait for tunnel to establish
    time.sleep(2)
    
    # Get tunnel URL from API
    try:
        response = requests.get('http://localhost:4040/api/tunnels')
        tunnels = response.json()
        
        for tunnel in tunnels.get('tunnels', []):
            if tunnel['proto'] == 'https':
                return tunnel['public_url']
    except Exception as e:
        print(f"Error getting tunnel URL: {e}")
    
    return None


def webhook_test_endpoint():
    """Print webhook testing instructions."""
    print("""
╔════════════════════════════════════════════════════════════╗
β•‘                 WEBHOOK TESTING GUIDE                       β•‘
╠════════════════════════════════════════════════════════════╣
β•‘                                                            β•‘
β•‘  1. Start your webhook server:                             β•‘
β•‘     python webhook_server/app.py                           β•‘
β•‘                                                            β•‘
β•‘  2. Start ngrok:                                           β•‘
β•‘     ngrok http 5000                                        β•‘
β•‘                                                            β•‘
β•‘  3. Copy the HTTPS URL from ngrok                          β•‘
β•‘                                                            β•‘
β•‘  4. Configure webhook in Etsy Developer Portal:            β•‘
β•‘     - Go to your app settings                              β•‘
β•‘     - Add webhook URL: https://xxx.ngrok.io/webhook        β•‘
β•‘     - Select events to receive                             β•‘
β•‘                                                            β•‘
β•‘  5. Test by creating/updating a listing                    β•‘
β•‘                                                            β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
    """)


if __name__ == "__main__":
    webhook_test_endpoint()

Event Processing Queue

For production reliability, use a message queue:

# webhook_server/queue_handler.py
"""Queue-based webhook processing."""

import json
import redis
from typing import Dict, Any
import threading
import time

from etsy_client.webhooks import WebhookEvent


class WebhookQueue:
    """
    Redis-based queue for webhook processing.
    
    Ensures reliable event handling with retries.
    """
    
    def __init__(
        self,
        redis_url: str = "redis://localhost:6379",
        queue_name: str = "etsy_webhooks"
    ):
        self.redis = redis.from_url(redis_url)
        self.queue_name = queue_name
        self.processing_queue = f"{queue_name}:processing"
        self.dead_letter_queue = f"{queue_name}:dead"
    
    def enqueue(self, event_data: Dict[str, Any]):
        """
        Add event to queue.
        
        Args:
            event_data: Raw webhook payload
        """
        self.redis.lpush(self.queue_name, json.dumps(event_data))
    
    def dequeue(self, timeout: int = 5) -> Dict[str, Any] | None:
        """
        Get event from queue.
        
        Args:
            timeout: Blocking timeout in seconds
            
        Returns:
            Event data or None if timeout
        """
        result = self.redis.brpoplpush(
            self.queue_name,
            self.processing_queue,
            timeout
        )
        
        if result:
            return json.loads(result)
        return None
    
    def acknowledge(self, event_data: Dict[str, Any]):
        """Mark event as processed."""
        self.redis.lrem(self.processing_queue, 1, json.dumps(event_data))
    
    def reject(self, event_data: Dict[str, Any], retry: bool = True):
        """
        Reject event.
        
        Args:
            event_data: Event to reject
            retry: If True, requeue; if False, send to dead letter
        """
        self.redis.lrem(self.processing_queue, 1, json.dumps(event_data))
        
        if retry:
            # Add retry count
            event_data['_retry_count'] = event_data.get('_retry_count', 0) + 1
            
            if event_data['_retry_count'] < 3:
                self.redis.lpush(self.queue_name, json.dumps(event_data))
            else:
                self.redis.lpush(self.dead_letter_queue, json.dumps(event_data))
        else:
            self.redis.lpush(self.dead_letter_queue, json.dumps(event_data))
    
    def queue_length(self) -> int:
        """Get number of pending events."""
        return self.redis.llen(self.queue_name)


class WebhookWorker:
    """Background worker for processing webhooks."""
    
    def __init__(self, queue: WebhookQueue, dispatcher):
        self.queue = queue
        self.dispatcher = dispatcher
        self.running = False
    
    def process_one(self) -> bool:
        """
        Process one event from queue.
        
        Returns:
            True if event was processed
        """
        event_data = self.queue.dequeue(timeout=5)
        
        if not event_data:
            return False
        
        try:
            event = WebhookEvent.from_payload(event_data)
            self.dispatcher.dispatch(event)
            self.queue.acknowledge(event_data)
            return True
        except Exception as e:
            print(f"Processing error: {e}")
            self.queue.reject(event_data, retry=True)
            return False
    
    def run(self):
        """Run worker loop."""
        self.running = True
        print("Webhook worker started")
        
        while self.running:
            try:
                self.process_one()
            except Exception as e:
                print(f"Worker error: {e}")
                time.sleep(1)
    
    def stop(self):
        """Stop worker."""
        self.running = False

Notification Integration

# webhook_server/notifications.py
"""Notification integrations for webhook events."""

import os
import json
import requests
from typing import Optional
from dataclasses import dataclass


@dataclass
class NotificationConfig:
    """Notification configuration."""
    slack_webhook: Optional[str] = None
    pushover_token: Optional[str] = None
    pushover_user: Optional[str] = None
    email_api_key: Optional[str] = None


class NotificationService:
    """Multi-channel notification service."""
    
    def __init__(self, config: NotificationConfig):
        self.config = config
    
    def send_slack(self, message: str, channel: str = None):
        """Send Slack notification."""
        if not self.config.slack_webhook:
            return
        
        payload = {
            "text": message,
            "username": "Etsy Bot",
            "icon_emoji": ":package:"
        }
        
        if channel:
            payload["channel"] = channel
        
        requests.post(self.config.slack_webhook, json=payload)
    
    def send_pushover(
        self,
        title: str,
        message: str,
        priority: int = 0
    ):
        """Send Pushover notification."""
        if not self.config.pushover_token:
            return
        
        requests.post(
            "https://api.pushover.net/1/messages.json",
            data={
                "token": self.config.pushover_token,
                "user": self.config.pushover_user,
                "title": title,
                "message": message,
                "priority": priority
            }
        )
    
    def send_all(self, title: str, message: str):
        """Send to all configured channels."""
        self.send_slack(f"*{title}*\n{message}")
        self.send_pushover(title, message)


# Usage in webhook handlers
def setup_notifications():
    config = NotificationConfig(
        slack_webhook=os.environ.get('SLACK_WEBHOOK'),
        pushover_token=os.environ.get('PUSHOVER_TOKEN'),
        pushover_user=os.environ.get('PUSHOVER_USER')
    )
    return NotificationService(config)


# Example handler with notifications
notifications = setup_notifications()

def handle_order_with_notification(event):
    """Handle new order with notification."""
    notifications.send_all(
        "πŸ›’ New Etsy Order!",
        f"Order #{event.resource_id} received"
    )

Docker Deployment

# webhook_server/Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Environment variables
ENV PYTHONUNBUFFERED=1

# Run with gunicorn
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:5000", "app:app"]
# docker-compose.yml
version: '3.8'

services:
  webhook-server:
    build: ./webhook_server
    ports:
      - "5000:5000"
    environment:
      - ETSY_WEBHOOK_SECRET=${ETSY_WEBHOOK_SECRET}
      - SLACK_WEBHOOK=${SLACK_WEBHOOK}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    restart: unless-stopped
  
  webhook-worker:
    build: ./webhook_server
    command: python worker.py
    environment:
      - ETSY_API_KEY=${ETSY_API_KEY}
      - ETSY_ACCESS_TOKEN=${ETSY_ACCESS_TOKEN}
      - REDIS_URL=redis://redis:6379
    depends_on:
      - redis
    restart: unless-stopped
  
  redis:
    image: redis:7-alpine
    volumes:
      - redis_data:/data
    restart: unless-stopped

volumes:
  redis_data:

Key Takeaways

  1. Fast response: Always return 200 quickly, process in background
  2. Verify signatures: Validate webhooks are from Etsy
  3. Use queues: For reliability in production
  4. Handle failures: Implement retries and dead letter queues
  5. Monitor: Log all events for debugging

Moving Forward

Real-time events need robust error handling. The next chapter covers building resilient systems that gracefully handle API errors and rate limits.


← Chapter 9: Analytics and Reporting Table of Contents Chapter 11: Error Handling and Resilience β†’