Etsyβs webhook system allows you to receive real-time notifications when events occur in your shop. Instead of constantly polling the API, webhooks push data to you instantly, enabling faster responses to orders, messages, and other critical events.
Etsy supports webhooks for several event types:
| Event Type | Description |
|---|---|
listing.created |
New listing published |
listing.updated |
Listing modified |
listing.deleted |
Listing removed |
listing.activated |
Listing made active |
listing.deactivated |
Listing deactivated |
receipt.created |
New order received |
receipt.updated |
Order status changed |
shop.updated |
Shop settings changed |
# etsy_client/webhooks.py
"""Webhook handling for Etsy events."""
import hmac
import hashlib
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class WebhookEvent:
"""Represents a webhook event from Etsy."""
event_type: str
resource_id: int
shop_id: int
timestamp: datetime
payload: Dict[str, Any]
@classmethod
def from_payload(cls, data: Dict[str, Any]) -> 'WebhookEvent':
"""Create event from webhook payload."""
return cls(
event_type=data.get('event_type', ''),
resource_id=data.get('resource_id', 0),
shop_id=data.get('shop_id', 0),
timestamp=datetime.fromtimestamp(data.get('timestamp', 0)),
payload=data
)
class WebhookVerifier:
"""Verify webhook signatures from Etsy."""
def __init__(self, signing_secret: str):
"""
Initialize verifier.
Args:
signing_secret: Etsy webhook signing secret
"""
self.signing_secret = signing_secret
def verify_signature(
self,
payload: bytes,
signature: str,
timestamp: str
) -> bool:
"""
Verify webhook signature.
Args:
payload: Raw request body
signature: X-Etsy-Signature header
timestamp: X-Etsy-Timestamp header
Returns:
True if signature is valid
"""
# Construct signed payload
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
# Compute expected signature
expected = hmac.new(
self.signing_secret.encode('utf-8'),
signed_payload.encode('utf-8'),
hashlib.sha256
).hexdigest()
# Compare signatures
return hmac.compare_digest(signature, expected)
class WebhookDispatcher:
"""Route webhook events to handlers."""
def __init__(self):
self._handlers: Dict[str, list[Callable]] = {}
def register(
self,
event_type: str,
handler: Callable[[WebhookEvent], None]
):
"""
Register a handler for an event type.
Args:
event_type: Event type to handle (e.g., 'receipt.created')
handler: Callback function
"""
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
def on(self, event_type: str):
"""
Decorator to register an event handler.
Usage:
@dispatcher.on('receipt.created')
def handle_new_order(event):
pass
"""
def decorator(func: Callable):
self.register(event_type, func)
return func
return decorator
def dispatch(self, event: WebhookEvent):
"""
Dispatch event to registered handlers.
Args:
event: WebhookEvent to dispatch
"""
handlers = self._handlers.get(event.event_type, [])
handlers += self._handlers.get('*', []) # Wildcard handlers
for handler in handlers:
try:
handler(event)
except Exception as e:
print(f"Handler error: {e}")
# webhook_server/app.py
"""Flask server for receiving Etsy webhooks."""
import os
import json
from datetime import datetime
from flask import Flask, request, jsonify
import logging
from etsy_client.webhooks import WebhookVerifier, WebhookEvent, WebhookDispatcher
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
# Webhook configuration
WEBHOOK_SECRET = os.environ.get('ETSY_WEBHOOK_SECRET', '')
verifier = WebhookVerifier(WEBHOOK_SECRET)
dispatcher = WebhookDispatcher()
# --- Event Handlers ---
@dispatcher.on('receipt.created')
def handle_new_order(event: WebhookEvent):
"""Handle new order received."""
logger.info(f"π New order received! Receipt ID: {event.resource_id}")
# Send notification
send_notification(
title="New Etsy Order!",
message=f"Order #{event.resource_id} received",
priority="high"
)
# Trigger digital delivery
process_digital_delivery(event.resource_id)
@dispatcher.on('receipt.updated')
def handle_order_update(event: WebhookEvent):
"""Handle order status change."""
logger.info(f"π¦ Order updated: {event.resource_id}")
@dispatcher.on('listing.updated')
def handle_listing_update(event: WebhookEvent):
"""Handle listing modification."""
logger.info(f"π Listing updated: {event.resource_id}")
@dispatcher.on('listing.deactivated')
def handle_listing_deactivation(event: WebhookEvent):
"""Handle listing going inactive."""
logger.warning(f"β οΈ Listing deactivated: {event.resource_id}")
# Alert - listing may need attention
send_notification(
title="Listing Deactivated",
message=f"Listing {event.resource_id} was deactivated",
priority="medium"
)
@dispatcher.on('*')
def log_all_events(event: WebhookEvent):
"""Log all events for debugging."""
logger.debug(f"Event: {event.event_type} - Resource: {event.resource_id}")
# --- Helper Functions ---
def send_notification(title: str, message: str, priority: str = "normal"):
"""Send push notification."""
# Implement your notification logic here
# e.g., Pushover, Slack, email, etc.
logger.info(f"Notification: {title} - {message}")
def process_digital_delivery(receipt_id: int):
"""Trigger digital product delivery."""
# This would integrate with your delivery system
logger.info(f"Processing digital delivery for receipt {receipt_id}")
# --- Routes ---
@app.route('/webhook', methods=['POST'])
def webhook_endpoint():
"""Main webhook endpoint."""
# Get verification headers
signature = request.headers.get('X-Etsy-Signature', '')
timestamp = request.headers.get('X-Etsy-Timestamp', '')
# Verify signature
if WEBHOOK_SECRET and not verifier.verify_signature(
request.data, signature, timestamp
):
logger.warning("Invalid webhook signature")
return jsonify({"error": "Invalid signature"}), 401
# Parse event
try:
data = request.get_json()
event = WebhookEvent.from_payload(data)
except Exception as e:
logger.error(f"Failed to parse webhook: {e}")
return jsonify({"error": "Invalid payload"}), 400
# Dispatch to handlers
dispatcher.dispatch(event)
# Always return 200 quickly
return jsonify({"status": "received"}), 200
@app.route('/health', methods=['GET'])
def health_check():
"""Health check endpoint."""
return jsonify({"status": "healthy", "timestamp": datetime.utcnow().isoformat()})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
# webhook_server/app_fastapi.py
"""FastAPI server for receiving Etsy webhooks."""
import os
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from pydantic import BaseModel
import logging
from etsy_client.webhooks import WebhookVerifier, WebhookEvent, WebhookDispatcher
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(title="Etsy Webhook Handler")
# Configuration
WEBHOOK_SECRET = os.environ.get('ETSY_WEBHOOK_SECRET', '')
verifier = WebhookVerifier(WEBHOOK_SECRET)
dispatcher = WebhookDispatcher()
class WebhookPayload(BaseModel):
"""Webhook payload model."""
event_type: str
resource_id: int
shop_id: int
timestamp: int
# Add other fields as needed
# --- Event Handlers ---
def handle_new_order(event: WebhookEvent):
"""Process new order in background."""
logger.info(f"π Processing order {event.resource_id}")
# Add your order processing logic
def handle_listing_change(event: WebhookEvent):
"""Process listing change in background."""
logger.info(f"π Processing listing {event.resource_id}")
# Add your listing sync logic
# Register handlers
dispatcher.register('receipt.created', handle_new_order)
dispatcher.register('listing.updated', handle_listing_change)
dispatcher.register('listing.created', handle_listing_change)
# --- Routes ---
@app.post("/webhook")
async def webhook_endpoint(
request: Request,
background_tasks: BackgroundTasks
):
"""
Handle incoming webhooks from Etsy.
Processes webhooks asynchronously for faster response.
"""
# Get raw body and headers
body = await request.body()
signature = request.headers.get('X-Etsy-Signature', '')
timestamp = request.headers.get('X-Etsy-Timestamp', '')
# Verify signature
if WEBHOOK_SECRET and not verifier.verify_signature(body, signature, timestamp):
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
try:
data = await request.json()
event = WebhookEvent.from_payload(data)
except Exception as e:
logger.error(f"Parse error: {e}")
raise HTTPException(status_code=400, detail="Invalid payload")
# Process in background for fast response
background_tasks.add_task(dispatcher.dispatch, event)
return {"status": "accepted"}
@app.get("/health")
async def health():
"""Health check."""
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}
@app.get("/")
async def root():
"""Root endpoint."""
return {"service": "Etsy Webhook Handler", "version": "1.0.0"}
# etsy_client/webhook_management.py
"""Manage webhook subscriptions with Etsy API."""
from typing import Dict, Any, List, Optional
from .client import EtsyClient
class WebhookManager:
"""
Manage Etsy webhook subscriptions.
Note: Webhook registration is done through Etsy's developer portal
for most apps. This class provides utilities for webhook-related
operations.
"""
def __init__(self, client: EtsyClient, shop_id: int):
self.client = client
self.shop_id = shop_id
def get_shop_push_configurations(self) -> List[Dict[str, Any]]:
"""
Get push notification configurations for shop.
Returns:
List of push configurations
"""
response = self.client.get(
f"/application/shops/{self.shop_id}/push-configurations"
)
return response.get('results', [])
def create_push_configuration(
self,
url: str,
events: List[str]
) -> Dict[str, Any]:
"""
Create a new push configuration.
Args:
url: Webhook endpoint URL
events: List of event types to subscribe to
Returns:
Created configuration
"""
return self.client.post(
f"/application/shops/{self.shop_id}/push-configurations",
data={
"url": url,
"events": events
}
)
def delete_push_configuration(self, config_id: int) -> bool:
"""
Delete a push configuration.
Args:
config_id: Configuration ID to delete
Returns:
True if successful
"""
self.client.delete(
f"/application/shops/{self.shop_id}/push-configurations/{config_id}"
)
return True
# scripts/setup_webhooks.py
def setup_webhooks():
"""Set up webhook subscriptions."""
from config import Config
from etsy_client import EtsyClient
from etsy_client.users import get_my_shop_id
client = EtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
shop_id = get_my_shop_id(client)
manager = WebhookManager(client, shop_id)
# Check existing configurations
configs = manager.get_shop_push_configurations()
print(f"Existing configurations: {len(configs)}")
for config in configs:
print(f" - URL: {config.get('url')}")
print(f" Events: {config.get('events')}")
For local testing, use ngrok to expose your webhook server:
# Start your webhook server
python webhook_server/app.py
# In another terminal, expose with ngrok
ngrok http 5000
# scripts/dev_tunnel.py
"""Helper for local webhook development."""
import subprocess
import json
import time
import requests
def start_ngrok_tunnel(port: int = 5000) -> str:
"""
Start ngrok tunnel and return public URL.
Args:
port: Local port to tunnel
Returns:
Public ngrok URL
"""
# Start ngrok in background
process = subprocess.Popen(
['ngrok', 'http', str(port), '--log=stdout'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Wait for tunnel to establish
time.sleep(2)
# Get tunnel URL from API
try:
response = requests.get('http://localhost:4040/api/tunnels')
tunnels = response.json()
for tunnel in tunnels.get('tunnels', []):
if tunnel['proto'] == 'https':
return tunnel['public_url']
except Exception as e:
print(f"Error getting tunnel URL: {e}")
return None
def webhook_test_endpoint():
"""Print webhook testing instructions."""
print("""
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WEBHOOK TESTING GUIDE β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ£
β β
β 1. Start your webhook server: β
β python webhook_server/app.py β
β β
β 2. Start ngrok: β
β ngrok http 5000 β
β β
β 3. Copy the HTTPS URL from ngrok β
β β
β 4. Configure webhook in Etsy Developer Portal: β
β - Go to your app settings β
β - Add webhook URL: https://xxx.ngrok.io/webhook β
β - Select events to receive β
β β
β 5. Test by creating/updating a listing β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
if __name__ == "__main__":
webhook_test_endpoint()
For production reliability, use a message queue:
# webhook_server/queue_handler.py
"""Queue-based webhook processing."""
import json
import redis
from typing import Dict, Any
import threading
import time
from etsy_client.webhooks import WebhookEvent
class WebhookQueue:
"""
Redis-based queue for webhook processing.
Ensures reliable event handling with retries.
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
queue_name: str = "etsy_webhooks"
):
self.redis = redis.from_url(redis_url)
self.queue_name = queue_name
self.processing_queue = f"{queue_name}:processing"
self.dead_letter_queue = f"{queue_name}:dead"
def enqueue(self, event_data: Dict[str, Any]):
"""
Add event to queue.
Args:
event_data: Raw webhook payload
"""
self.redis.lpush(self.queue_name, json.dumps(event_data))
def dequeue(self, timeout: int = 5) -> Dict[str, Any] | None:
"""
Get event from queue.
Args:
timeout: Blocking timeout in seconds
Returns:
Event data or None if timeout
"""
result = self.redis.brpoplpush(
self.queue_name,
self.processing_queue,
timeout
)
if result:
return json.loads(result)
return None
def acknowledge(self, event_data: Dict[str, Any]):
"""Mark event as processed."""
self.redis.lrem(self.processing_queue, 1, json.dumps(event_data))
def reject(self, event_data: Dict[str, Any], retry: bool = True):
"""
Reject event.
Args:
event_data: Event to reject
retry: If True, requeue; if False, send to dead letter
"""
self.redis.lrem(self.processing_queue, 1, json.dumps(event_data))
if retry:
# Add retry count
event_data['_retry_count'] = event_data.get('_retry_count', 0) + 1
if event_data['_retry_count'] < 3:
self.redis.lpush(self.queue_name, json.dumps(event_data))
else:
self.redis.lpush(self.dead_letter_queue, json.dumps(event_data))
else:
self.redis.lpush(self.dead_letter_queue, json.dumps(event_data))
def queue_length(self) -> int:
"""Get number of pending events."""
return self.redis.llen(self.queue_name)
class WebhookWorker:
"""Background worker for processing webhooks."""
def __init__(self, queue: WebhookQueue, dispatcher):
self.queue = queue
self.dispatcher = dispatcher
self.running = False
def process_one(self) -> bool:
"""
Process one event from queue.
Returns:
True if event was processed
"""
event_data = self.queue.dequeue(timeout=5)
if not event_data:
return False
try:
event = WebhookEvent.from_payload(event_data)
self.dispatcher.dispatch(event)
self.queue.acknowledge(event_data)
return True
except Exception as e:
print(f"Processing error: {e}")
self.queue.reject(event_data, retry=True)
return False
def run(self):
"""Run worker loop."""
self.running = True
print("Webhook worker started")
while self.running:
try:
self.process_one()
except Exception as e:
print(f"Worker error: {e}")
time.sleep(1)
def stop(self):
"""Stop worker."""
self.running = False
# webhook_server/notifications.py
"""Notification integrations for webhook events."""
import os
import json
import requests
from typing import Optional
from dataclasses import dataclass
@dataclass
class NotificationConfig:
"""Notification configuration."""
slack_webhook: Optional[str] = None
pushover_token: Optional[str] = None
pushover_user: Optional[str] = None
email_api_key: Optional[str] = None
class NotificationService:
"""Multi-channel notification service."""
def __init__(self, config: NotificationConfig):
self.config = config
def send_slack(self, message: str, channel: str = None):
"""Send Slack notification."""
if not self.config.slack_webhook:
return
payload = {
"text": message,
"username": "Etsy Bot",
"icon_emoji": ":package:"
}
if channel:
payload["channel"] = channel
requests.post(self.config.slack_webhook, json=payload)
def send_pushover(
self,
title: str,
message: str,
priority: int = 0
):
"""Send Pushover notification."""
if not self.config.pushover_token:
return
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": self.config.pushover_token,
"user": self.config.pushover_user,
"title": title,
"message": message,
"priority": priority
}
)
def send_all(self, title: str, message: str):
"""Send to all configured channels."""
self.send_slack(f"*{title}*\n{message}")
self.send_pushover(title, message)
# Usage in webhook handlers
def setup_notifications():
config = NotificationConfig(
slack_webhook=os.environ.get('SLACK_WEBHOOK'),
pushover_token=os.environ.get('PUSHOVER_TOKEN'),
pushover_user=os.environ.get('PUSHOVER_USER')
)
return NotificationService(config)
# Example handler with notifications
notifications = setup_notifications()
def handle_order_with_notification(event):
"""Handle new order with notification."""
notifications.send_all(
"π New Etsy Order!",
f"Order #{event.resource_id} received"
)
# webhook_server/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Environment variables
ENV PYTHONUNBUFFERED=1
# Run with gunicorn
CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:5000", "app:app"]
# docker-compose.yml
version: '3.8'
services:
webhook-server:
build: ./webhook_server
ports:
- "5000:5000"
environment:
- ETSY_WEBHOOK_SECRET=${ETSY_WEBHOOK_SECRET}
- SLACK_WEBHOOK=${SLACK_WEBHOOK}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
restart: unless-stopped
webhook-worker:
build: ./webhook_server
command: python worker.py
environment:
- ETSY_API_KEY=${ETSY_API_KEY}
- ETSY_ACCESS_TOKEN=${ETSY_ACCESS_TOKEN}
- REDIS_URL=redis://redis:6379
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
restart: unless-stopped
volumes:
redis_data:
Real-time events need robust error handling. The next chapter covers building resilient systems that gracefully handle API errors and rate limits.
| β Chapter 9: Analytics and Reporting | Table of Contents | Chapter 11: Error Handling and Resilience β |