Throughout this book, we’ve built individual components for authentication, listings, orders, analytics, and error handling. Now it’s time to combine them into a cohesive, production-ready automation system for your digital product business.
┌─────────────────────────────────────────────────────────────────┐
│ ETSY AUTOMATION SYSTEM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Webhook │ │ Scheduler │ │ Manual │ │
│ │ Server │ │ (cron/APQ) │ │ Scripts │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Task Processor │ │
│ └────────┬────────┘ │
│ │ │
│ ┌──────────────────────┼──────────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Orders │ │ Listings │ │ Analytics │ │
│ │ Handler │ │ Handler │ │ Handler │ │
│ └────┬─────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┼──────────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Resilient Client│ │
│ └────────┬────────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ Etsy API │ │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
# automation/system.py
"""Core automation system."""
import logging
from typing import Dict, Any, Optional, Callable, List
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
import threading
import queue
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.errors import EtsyAPIError
from etsy_client.monitoring import ErrorMonitor
logger = logging.getLogger(__name__)
class TaskPriority(Enum):
"""Task priority levels."""
CRITICAL = 0 # Immediate processing
HIGH = 1 # Process within minutes
NORMAL = 2 # Process within hour
LOW = 3 # Process when convenient
@dataclass
class Task:
"""Automation task."""
task_type: str
payload: Dict[str, Any]
priority: TaskPriority = TaskPriority.NORMAL
created_at: datetime = None
max_retries: int = 3
retry_count: int = 0
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
class TaskHandler:
"""Base class for task handlers."""
def __init__(self, client: ResilientEtsyClient):
self.client = client
def can_handle(self, task_type: str) -> bool:
"""Check if handler can process task type."""
raise NotImplementedError
def handle(self, task: Task) -> Dict[str, Any]:
"""Process task and return result."""
raise NotImplementedError
class AutomationSystem:
"""
Central automation system for Etsy operations.
Coordinates all automated tasks including:
- Order processing
- Listing management
- Analytics generation
- Scheduled maintenance
"""
def __init__(
self,
client: ResilientEtsyClient,
shop_id: int
):
self.client = client
self.shop_id = shop_id
self._handlers: List[TaskHandler] = []
self._task_queue = queue.PriorityQueue()
self._running = False
self._worker_thread: Optional[threading.Thread] = None
self.error_monitor = ErrorMonitor()
self.metrics = {
'tasks_processed': 0,
'tasks_failed': 0,
'start_time': None
}
def register_handler(self, handler: TaskHandler):
"""Register a task handler."""
self._handlers.append(handler)
logger.info(f"Registered handler: {handler.__class__.__name__}")
def submit_task(self, task: Task):
"""
Submit task for processing.
Args:
task: Task to process
"""
# Priority queue uses (priority, insertion_order, item)
self._task_queue.put((
task.priority.value,
task.created_at.timestamp(),
task
))
logger.debug(f"Task submitted: {task.task_type}")
def _find_handler(self, task: Task) -> Optional[TaskHandler]:
"""Find handler for task type."""
for handler in self._handlers:
if handler.can_handle(task.task_type):
return handler
return None
def _process_task(self, task: Task) -> bool:
"""
Process a single task.
Returns:
True if successful
"""
handler = self._find_handler(task)
if not handler:
logger.error(f"No handler for task type: {task.task_type}")
return False
try:
result = handler.handle(task)
self.metrics['tasks_processed'] += 1
logger.info(f"Task completed: {task.task_type}")
return True
except EtsyAPIError as e:
self.error_monitor.record_error(e, context={
'task_type': task.task_type,
'payload': task.payload
})
# Retry if possible
if e.is_retryable and task.retry_count < task.max_retries:
task.retry_count += 1
self.submit_task(task)
logger.warning(f"Task retry {task.retry_count}: {task.task_type}")
else:
self.metrics['tasks_failed'] += 1
logger.error(f"Task failed: {task.task_type} - {e}")
return False
except Exception as e:
self.metrics['tasks_failed'] += 1
logger.error(f"Unexpected error in task {task.task_type}: {e}")
return False
def _worker_loop(self):
"""Background worker loop."""
while self._running:
try:
# Wait for task with timeout
_, _, task = self._task_queue.get(timeout=1.0)
self._process_task(task)
self._task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Worker error: {e}")
def start(self):
"""Start automation system."""
self._running = True
self.metrics['start_time'] = datetime.now()
self._worker_thread = threading.Thread(
target=self._worker_loop,
daemon=True
)
self._worker_thread.start()
logger.info("Automation system started")
def stop(self):
"""Stop automation system."""
self._running = False
if self._worker_thread:
self._worker_thread.join(timeout=5.0)
logger.info("Automation system stopped")
def get_status(self) -> Dict[str, Any]:
"""Get system status."""
return {
'running': self._running,
'queue_size': self._task_queue.qsize(),
'handlers': len(self._handlers),
'metrics': self.metrics,
'errors': self.error_monitor.get_summary()
}
# automation/handlers/orders.py
"""Order processing automation."""
import logging
from typing import Dict, Any
from datetime import datetime
from automation.system import TaskHandler, Task
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.orders import OrderOperations
from etsy_client.digital import DigitalFileOperations
logger = logging.getLogger(__name__)
class OrderHandler(TaskHandler):
"""
Handle order-related tasks.
Supports:
- new_order: Process new digital product orders
- order_shipped: Update tracking/completion
- refund_request: Handle refund workflows
"""
SUPPORTED_TASKS = ['new_order', 'order_shipped', 'refund_request']
def __init__(self, client: ResilientEtsyClient, shop_id: int):
super().__init__(client)
self.shop_id = shop_id
self.order_ops = OrderOperations(client)
self.digital_ops = DigitalFileOperations(client)
def can_handle(self, task_type: str) -> bool:
return task_type in self.SUPPORTED_TASKS
def handle(self, task: Task) -> Dict[str, Any]:
"""Route task to appropriate handler."""
handlers = {
'new_order': self._handle_new_order,
'order_shipped': self._handle_shipped,
'refund_request': self._handle_refund
}
handler = handlers.get(task.task_type)
return handler(task.payload)
def _handle_new_order(self, payload: Dict) -> Dict[str, Any]:
"""
Process new digital product order.
1. Verify order details
2. Check digital file availability
3. Send confirmation
4. Log analytics
"""
receipt_id = payload['receipt_id']
# Get order details
receipt = self.order_ops.get_receipt(self.shop_id, receipt_id)
# Check if digital products
has_digital = any(
txn.get('is_digital', False)
for txn in receipt.get('transactions', [])
)
if has_digital:
logger.info(f"Processing digital order {receipt_id}")
# Verify files are ready
for txn in receipt.get('transactions', []):
if txn.get('is_digital'):
listing_id = txn['listing_id']
files = self.digital_ops.get_listing_files(
self.shop_id, listing_id
)
if not files.get('results'):
logger.warning(
f"No digital files for listing {listing_id}"
)
return {
'receipt_id': receipt_id,
'status': 'processed',
'is_digital': has_digital,
'processed_at': datetime.now().isoformat()
}
def _handle_shipped(self, payload: Dict) -> Dict[str, Any]:
"""Handle order marked as shipped."""
receipt_id = payload['receipt_id']
# For digital products, this might trigger follow-up
logger.info(f"Order {receipt_id} marked complete")
return {
'receipt_id': receipt_id,
'status': 'shipped'
}
def _handle_refund(self, payload: Dict) -> Dict[str, Any]:
"""Handle refund request."""
receipt_id = payload['receipt_id']
reason = payload.get('reason', 'unspecified')
logger.info(f"Refund request for {receipt_id}: {reason}")
# Log for manual review
return {
'receipt_id': receipt_id,
'refund_requested': True,
'reason': reason,
'status': 'pending_review'
}
# automation/handlers/listings.py
"""Listing management automation."""
import logging
from typing import Dict, Any, List
from datetime import datetime
from automation.system import TaskHandler, Task
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.listings import ListingOperations
from etsy_client.inventory import InventoryOperations
logger = logging.getLogger(__name__)
class ListingHandler(TaskHandler):
"""
Handle listing-related tasks.
Supports:
- update_listing: Update listing details
- sync_inventory: Sync inventory/pricing
- optimize_tags: Optimize listing SEO
- create_variation: Add product variations
"""
SUPPORTED_TASKS = [
'update_listing', 'sync_inventory',
'optimize_tags', 'create_variation'
]
def __init__(self, client: ResilientEtsyClient, shop_id: int):
super().__init__(client)
self.shop_id = shop_id
self.listing_ops = ListingOperations(client)
self.inventory_ops = InventoryOperations(client)
def can_handle(self, task_type: str) -> bool:
return task_type in self.SUPPORTED_TASKS
def handle(self, task: Task) -> Dict[str, Any]:
handlers = {
'update_listing': self._handle_update,
'sync_inventory': self._handle_sync,
'optimize_tags': self._handle_optimize,
'create_variation': self._handle_variation
}
handler = handlers.get(task.task_type)
return handler(task.payload)
def _handle_update(self, payload: Dict) -> Dict[str, Any]:
"""Update listing details."""
listing_id = payload['listing_id']
updates = payload.get('updates', {})
result = self.listing_ops.update_listing(listing_id, **updates)
logger.info(f"Updated listing {listing_id}")
return {
'listing_id': listing_id,
'status': 'updated',
'updates_applied': list(updates.keys())
}
def _handle_sync(self, payload: Dict) -> Dict[str, Any]:
"""Sync inventory and pricing."""
listing_id = payload['listing_id']
# Get current state
inventory = self.inventory_ops.get_inventory(listing_id)
logger.info(f"Synced inventory for {listing_id}")
return {
'listing_id': listing_id,
'status': 'synced',
'products': len(inventory.get('products', []))
}
def _handle_optimize(self, payload: Dict) -> Dict[str, Any]:
"""Optimize listing tags."""
listing_id = payload['listing_id']
suggested_tags = payload.get('tags', [])
if suggested_tags:
self.listing_ops.update_listing(
listing_id,
tags=suggested_tags[:13] # Max 13 tags
)
return {
'listing_id': listing_id,
'status': 'optimized',
'tags_applied': len(suggested_tags)
}
def _handle_variation(self, payload: Dict) -> Dict[str, Any]:
"""Create product variation."""
listing_id = payload['listing_id']
variation = payload['variation']
logger.info(f"Adding variation to {listing_id}")
return {
'listing_id': listing_id,
'status': 'variation_added',
'variation': variation
}
# automation/scheduler.py
"""Scheduled task management."""
import schedule
import time
import threading
from typing import Callable, Dict, Any
from datetime import datetime
import logging
from automation.system import AutomationSystem, Task, TaskPriority
logger = logging.getLogger(__name__)
class TaskScheduler:
"""
Schedule recurring automation tasks.
"""
def __init__(self, system: AutomationSystem):
self.system = system
self._running = False
self._thread: threading.Thread = None
def schedule_daily_analytics(self, hour: int = 8, minute: int = 0):
"""Schedule daily analytics report."""
schedule.every().day.at(f"{hour:02d}:{minute:02d}").do(
self._submit_task,
'generate_report',
{'report_type': 'daily'},
TaskPriority.NORMAL
)
logger.info(f"Scheduled daily analytics at {hour:02d}:{minute:02d}")
def schedule_hourly_sync(self):
"""Schedule hourly inventory sync."""
schedule.every().hour.do(
self._submit_task,
'sync_all_inventory',
{},
TaskPriority.LOW
)
logger.info("Scheduled hourly inventory sync")
def schedule_weekly_optimization(self, day: str = "sunday", hour: int = 2):
"""Schedule weekly listing optimization."""
getattr(schedule.every(), day).at(f"{hour:02d}:00").do(
self._submit_task,
'optimize_all_listings',
{},
TaskPriority.LOW
)
logger.info(f"Scheduled weekly optimization on {day} at {hour:02d}:00")
def schedule_price_check(self, interval_minutes: int = 30):
"""Schedule periodic price checking."""
schedule.every(interval_minutes).minutes.do(
self._submit_task,
'check_competitor_prices',
{},
TaskPriority.NORMAL
)
logger.info(f"Scheduled price check every {interval_minutes} minutes")
def _submit_task(
self,
task_type: str,
payload: Dict,
priority: TaskPriority
):
"""Submit scheduled task."""
task = Task(
task_type=task_type,
payload=payload,
priority=priority
)
self.system.submit_task(task)
logger.debug(f"Scheduled task submitted: {task_type}")
def _run_loop(self):
"""Scheduler loop."""
while self._running:
schedule.run_pending()
time.sleep(1)
def start(self):
"""Start scheduler."""
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
logger.info("Scheduler started")
def stop(self):
"""Stop scheduler."""
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
logger.info("Scheduler stopped")
class AnalyticsHandler(TaskHandler):
"""Handle analytics tasks."""
SUPPORTED_TASKS = [
'generate_report', 'sync_all_inventory',
'optimize_all_listings', 'check_competitor_prices'
]
def __init__(self, client, shop_id: int):
super().__init__(client)
self.shop_id = shop_id
def can_handle(self, task_type: str) -> bool:
return task_type in self.SUPPORTED_TASKS
def handle(self, task: Task) -> Dict[str, Any]:
handlers = {
'generate_report': self._generate_report,
'sync_all_inventory': self._sync_inventory,
'optimize_all_listings': self._optimize_listings,
'check_competitor_prices': self._check_prices
}
handler = handlers.get(task.task_type)
return handler(task.payload)
def _generate_report(self, payload: Dict) -> Dict[str, Any]:
"""Generate analytics report."""
report_type = payload.get('report_type', 'daily')
logger.info(f"Generating {report_type} report")
# Implementation would call analytics module
return {
'report_type': report_type,
'generated_at': datetime.now().isoformat()
}
def _sync_inventory(self, payload: Dict) -> Dict[str, Any]:
"""Sync all inventory."""
logger.info("Starting inventory sync")
return {'status': 'synced'}
def _optimize_listings(self, payload: Dict) -> Dict[str, Any]:
"""Run listing optimization."""
logger.info("Running listing optimization")
return {'status': 'optimized'}
def _check_prices(self, payload: Dict) -> Dict[str, Any]:
"""Check competitive pricing."""
logger.info("Checking prices")
return {'status': 'checked'}
# automation/main.py
"""Main automation application."""
import os
import sys
import signal
import logging
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import Config
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.users import get_my_shop_id
from automation.system import AutomationSystem
from automation.scheduler import TaskScheduler, AnalyticsHandler
from automation.handlers.orders import OrderHandler
from automation.handlers.listings import ListingHandler
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class EtsyAutomationApp:
"""
Main application for Etsy automation.
"""
def __init__(self):
self.client = None
self.shop_id = None
self.system = None
self.scheduler = None
self._shutdown_requested = False
def initialize(self):
"""Initialize application components."""
logger.info("Initializing Etsy Automation System...")
# Create client
self.client = ResilientEtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
# Get shop ID
self.shop_id = get_my_shop_id(self.client)
logger.info(f"Connected to shop: {self.shop_id}")
# Create automation system
self.system = AutomationSystem(self.client, self.shop_id)
# Register handlers
self.system.register_handler(
OrderHandler(self.client, self.shop_id)
)
self.system.register_handler(
ListingHandler(self.client, self.shop_id)
)
self.system.register_handler(
AnalyticsHandler(self.client, self.shop_id)
)
# Set up scheduler
self.scheduler = TaskScheduler(self.system)
self._configure_schedules()
logger.info("Initialization complete")
def _configure_schedules(self):
"""Configure scheduled tasks."""
# Daily report at 8 AM
self.scheduler.schedule_daily_analytics(hour=8, minute=0)
# Hourly inventory sync
self.scheduler.schedule_hourly_sync()
# Weekly optimization on Sunday at 2 AM
self.scheduler.schedule_weekly_optimization(day="sunday", hour=2)
def start(self):
"""Start all services."""
logger.info("Starting automation services...")
self.system.start()
self.scheduler.start()
logger.info("All services started")
def stop(self):
"""Stop all services."""
logger.info("Stopping automation services...")
self.scheduler.stop()
self.system.stop()
logger.info("All services stopped")
def run(self):
"""Run application until shutdown."""
# Set up signal handlers
signal.signal(signal.SIGINT, self._handle_shutdown)
signal.signal(signal.SIGTERM, self._handle_shutdown)
try:
self.initialize()
self.start()
logger.info("Automation system running. Press Ctrl+C to stop.")
# Keep running until shutdown
while not self._shutdown_requested:
# Print status periodically
status = self.system.get_status()
logger.info(
f"Status: Queue={status['queue_size']}, "
f"Processed={status['metrics']['tasks_processed']}, "
f"Failed={status['metrics']['tasks_failed']}"
)
import time
time.sleep(60) # Status every minute
except Exception as e:
logger.error(f"Application error: {e}")
raise
finally:
self.stop()
def _handle_shutdown(self, signum, frame):
"""Handle shutdown signal."""
logger.info("Shutdown requested...")
self._shutdown_requested = True
def main():
"""Entry point."""
app = EtsyAutomationApp()
app.run()
if __name__ == "__main__":
main()
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Environment
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app
# Run
CMD ["python", "-m", "automation.main"]
# docker-compose.yml
version: '3.8'
services:
automation:
build: .
container_name: etsy-automation
environment:
- ETSY_API_KEY=${ETSY_API_KEY}
- ETSY_ACCESS_TOKEN=${ETSY_ACCESS_TOKEN}
- ETSY_REFRESH_TOKEN=${ETSY_REFRESH_TOKEN}
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
webhook-server:
build:
context: .
dockerfile: webhook_server/Dockerfile
container_name: etsy-webhooks
ports:
- "5000:5000"
environment:
- ETSY_WEBHOOK_SECRET=${ETSY_WEBHOOK_SECRET}
restart: unless-stopped
# scripts/cli.py
"""Command-line interface for automation system."""
import argparse
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import Config
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.users import get_my_shop_id
from automation.system import AutomationSystem, Task, TaskPriority
def get_client():
"""Get configured client."""
return ResilientEtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
def cmd_status(args):
"""Show system status."""
client = get_client()
shop_id = get_my_shop_id(client)
print(f"Shop ID: {shop_id}")
print(f"Client health: {client.health_check()}")
def cmd_sync(args):
"""Trigger sync operation."""
client = get_client()
shop_id = get_my_shop_id(client)
system = AutomationSystem(client, shop_id)
task = Task(
task_type='sync_all_inventory',
payload={},
priority=TaskPriority.HIGH
)
system.submit_task(task)
print("Sync task submitted")
def cmd_report(args):
"""Generate report."""
report_type = args.type or 'daily'
client = get_client()
shop_id = get_my_shop_id(client)
# Generate report inline
from etsy_client.analytics import ShopAnalytics, ReportGenerator
analytics = ShopAnalytics(client, shop_id)
report_gen = ReportGenerator(analytics)
if report_type == 'daily':
print(report_gen.daily_summary())
elif report_type == 'weekly':
print(report_gen.weekly_report())
def main():
parser = argparse.ArgumentParser(description='Etsy Automation CLI')
subparsers = parser.add_subparsers(dest='command')
# Status command
subparsers.add_parser('status', help='Show system status')
# Sync command
subparsers.add_parser('sync', help='Trigger inventory sync')
# Report command
report_parser = subparsers.add_parser('report', help='Generate report')
report_parser.add_argument(
'--type', choices=['daily', 'weekly'],
help='Report type'
)
args = parser.parse_args()
commands = {
'status': cmd_status,
'sync': cmd_sync,
'report': cmd_report
}
if args.command in commands:
commands[args.command](args)
else:
parser.print_help()
if __name__ == "__main__":
main()
You now have a complete automation framework. The next chapter examines real-world case studies showing how these systems perform in production.
| ← Chapter 11: Error Handling and Resilience | Table of Contents | Chapter 13: Case Studies → |