Chapter 11: Error Handling and Resilience

Building Robust Etsy Integrations

API integrations fail. Networks drop, services timeout, and rate limits trigger. The difference between a fragile integration and a production-ready system is how you handle these failures.

Error Classification

# etsy_client/errors.py
"""Comprehensive error handling for Etsy API."""

from typing import Dict, Any, Optional
from enum import Enum
import time


class ErrorCategory(Enum):
    """Categories of API errors."""
    AUTHENTICATION = "auth"
    RATE_LIMIT = "rate_limit"
    VALIDATION = "validation"
    NOT_FOUND = "not_found"
    SERVER = "server"
    NETWORK = "network"
    UNKNOWN = "unknown"


class EtsyAPIError(Exception):
    """Base exception for Etsy API errors."""
    
    def __init__(
        self,
        message: str,
        status_code: int = None,
        response_body: Dict = None,
        category: ErrorCategory = ErrorCategory.UNKNOWN
    ):
        super().__init__(message)
        self.message = message
        self.status_code = status_code
        self.response_body = response_body or {}
        self.category = category
        self.timestamp = time.time()
    
    @property
    def is_retryable(self) -> bool:
        """Check if error can be retried."""
        return self.category in [
            ErrorCategory.RATE_LIMIT,
            ErrorCategory.SERVER,
            ErrorCategory.NETWORK
        ]
    
    @property
    def retry_after(self) -> Optional[int]:
        """Get retry delay in seconds."""
        if self.category == ErrorCategory.RATE_LIMIT:
            return self.response_body.get('retry_after', 60)
        elif self.category == ErrorCategory.SERVER:
            return 30
        elif self.category == ErrorCategory.NETWORK:
            return 5
        return None
    
    def __repr__(self):
        return f"EtsyAPIError({self.category.value}, {self.status_code}: {self.message})"


class AuthenticationError(EtsyAPIError):
    """Authentication or authorization failed."""
    
    def __init__(self, message: str, status_code: int = 401, **kwargs):
        super().__init__(
            message,
            status_code,
            category=ErrorCategory.AUTHENTICATION,
            **kwargs
        )


class RateLimitError(EtsyAPIError):
    """Rate limit exceeded."""
    
    def __init__(self, message: str, retry_after: int = 60, **kwargs):
        super().__init__(
            message,
            status_code=429,
            category=ErrorCategory.RATE_LIMIT,
            **kwargs
        )
        self._retry_after = retry_after
    
    @property
    def retry_after(self) -> int:
        return self._retry_after


class ValidationError(EtsyAPIError):
    """Request validation failed."""
    
    def __init__(self, message: str, errors: list = None, **kwargs):
        super().__init__(
            message,
            status_code=400,
            category=ErrorCategory.VALIDATION,
            **kwargs
        )
        self.validation_errors = errors or []


class NotFoundError(EtsyAPIError):
    """Resource not found."""
    
    def __init__(self, resource_type: str, resource_id: Any, **kwargs):
        message = f"{resource_type} {resource_id} not found"
        super().__init__(
            message,
            status_code=404,
            category=ErrorCategory.NOT_FOUND,
            **kwargs
        )
        self.resource_type = resource_type
        self.resource_id = resource_id


class ServerError(EtsyAPIError):
    """Server-side error."""
    
    def __init__(self, message: str, status_code: int = 500, **kwargs):
        super().__init__(
            message,
            status_code,
            category=ErrorCategory.SERVER,
            **kwargs
        )


class NetworkError(EtsyAPIError):
    """Network connectivity error."""
    
    def __init__(self, message: str, original_error: Exception = None, **kwargs):
        super().__init__(
            message,
            category=ErrorCategory.NETWORK,
            **kwargs
        )
        self.original_error = original_error


def classify_error(status_code: int, response: Dict) -> EtsyAPIError:
    """
    Classify HTTP error into appropriate exception.
    
    Args:
        status_code: HTTP status code
        response: Response body
        
    Returns:
        Appropriate EtsyAPIError subclass
    """
    message = response.get('error', 'Unknown error')
    
    if status_code == 401:
        return AuthenticationError(message, response_body=response)
    
    elif status_code == 403:
        return AuthenticationError(
            "Access denied - check scopes",
            status_code=403,
            response_body=response
        )
    
    elif status_code == 404:
        return NotFoundError("Resource", "unknown", response_body=response)
    
    elif status_code == 429:
        retry_after = int(response.get('retry_after', 60))
        return RateLimitError(message, retry_after=retry_after, response_body=response)
    
    elif status_code == 400:
        errors = response.get('errors', [])
        return ValidationError(message, errors=errors, response_body=response)
    
    elif 500 <= status_code < 600:
        return ServerError(message, status_code=status_code, response_body=response)
    
    else:
        return EtsyAPIError(message, status_code=status_code, response_body=response)

Retry Logic

# etsy_client/retry.py
"""Retry mechanisms for API calls."""

import time
import random
from typing import Callable, TypeVar, Any
from functools import wraps
import logging

from .errors import EtsyAPIError, RateLimitError, NetworkError

logger = logging.getLogger(__name__)

T = TypeVar('T')


class RetryConfig:
    """Configuration for retry behavior."""
    
    def __init__(
        self,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int, error: EtsyAPIError = None) -> float:
        """
        Calculate delay for next retry.
        
        Args:
            attempt: Current attempt number (0-indexed)
            error: The error that triggered retry
            
        Returns:
            Delay in seconds
        """
        # Use error's retry_after if available
        if error and error.retry_after:
            delay = error.retry_after
        else:
            # Exponential backoff
            delay = self.base_delay * (self.exponential_base ** attempt)
        
        # Apply maximum
        delay = min(delay, self.max_delay)
        
        # Add jitter
        if self.jitter:
            delay = delay * (0.5 + random.random())
        
        return delay


def retry(config: RetryConfig = None):
    """
    Decorator for retrying failed API calls.
    
    Args:
        config: RetryConfig instance
    """
    if config is None:
        config = RetryConfig()
    
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            last_error = None
            
            for attempt in range(config.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                    
                except EtsyAPIError as e:
                    last_error = e
                    
                    if not e.is_retryable:
                        raise
                    
                    if attempt >= config.max_retries:
                        logger.error(f"Max retries exceeded: {e}")
                        raise
                    
                    delay = config.get_delay(attempt, e)
                    logger.warning(
                        f"Retry {attempt + 1}/{config.max_retries} "
                        f"after {delay:.1f}s: {e}"
                    )
                    time.sleep(delay)
                    
                except Exception as e:
                    # Network errors from requests library
                    if "Connection" in str(e) or "Timeout" in str(e):
                        last_error = NetworkError(str(e), original_error=e)
                        
                        if attempt >= config.max_retries:
                            raise last_error
                        
                        delay = config.get_delay(attempt, last_error)
                        logger.warning(f"Network error, retry in {delay:.1f}s")
                        time.sleep(delay)
                    else:
                        raise
            
            raise last_error
        
        return wrapper
    return decorator


class RetryableClient:
    """Mixin for adding retry capability to API clients."""
    
    def __init__(self, retry_config: RetryConfig = None):
        self.retry_config = retry_config or RetryConfig()
    
    def with_retry(self, func: Callable[..., T], *args, **kwargs) -> T:
        """
        Execute function with retry logic.
        
        Args:
            func: Function to call
            *args, **kwargs: Arguments to pass
            
        Returns:
            Function result
        """
        @retry(self.retry_config)
        def wrapper():
            return func(*args, **kwargs)
        
        return wrapper()

Circuit Breaker Pattern

# etsy_client/circuit_breaker.py
"""Circuit breaker for protecting against cascading failures."""

import time
import threading
from enum import Enum
from typing import Callable, TypeVar, Any
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')


class CircuitState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing, reject calls
    HALF_OPEN = "half_open" # Testing recovery


@dataclass
class CircuitStats:
    """Statistics for circuit breaker."""
    failures: int = 0
    successes: int = 0
    last_failure_time: float = 0
    last_success_time: float = 0


class CircuitBreaker:
    """
    Circuit breaker to prevent cascading failures.
    
    When too many failures occur, the circuit opens and rejects
    calls immediately, giving the service time to recover.
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 2,
        timeout: float = 60.0,
        name: str = "default"
    ):
        """
        Initialize circuit breaker.
        
        Args:
            failure_threshold: Failures before opening
            success_threshold: Successes needed to close
            timeout: Seconds before trying half-open
            name: Circuit name for logging
        """
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout
        self.name = name
        
        self._state = CircuitState.CLOSED
        self._stats = CircuitStats()
        self._lock = threading.Lock()
    
    @property
    def state(self) -> CircuitState:
        """Get current state."""
        with self._lock:
            # Check if we should transition from OPEN to HALF_OPEN
            if self._state == CircuitState.OPEN:
                if time.time() - self._stats.last_failure_time > self.timeout:
                    logger.info(f"Circuit {self.name}: OPEN -> HALF_OPEN")
                    self._state = CircuitState.HALF_OPEN
                    self._stats.failures = 0
            
            return self._state
    
    def is_available(self) -> bool:
        """Check if circuit allows calls."""
        return self.state != CircuitState.OPEN
    
    def record_success(self):
        """Record a successful call."""
        with self._lock:
            self._stats.successes += 1
            self._stats.last_success_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                if self._stats.successes >= self.success_threshold:
                    logger.info(f"Circuit {self.name}: HALF_OPEN -> CLOSED")
                    self._state = CircuitState.CLOSED
                    self._stats.failures = 0
    
    def record_failure(self):
        """Record a failed call."""
        with self._lock:
            self._stats.failures += 1
            self._stats.last_failure_time = time.time()
            self._stats.successes = 0
            
            if self._state == CircuitState.CLOSED:
                if self._stats.failures >= self.failure_threshold:
                    logger.warning(f"Circuit {self.name}: CLOSED -> OPEN")
                    self._state = CircuitState.OPEN
            
            elif self._state == CircuitState.HALF_OPEN:
                logger.warning(f"Circuit {self.name}: HALF_OPEN -> OPEN")
                self._state = CircuitState.OPEN
    
    def call(self, func: Callable[..., T], *args, **kwargs) -> T:
        """
        Execute function through circuit breaker.
        
        Args:
            func: Function to call
            *args, **kwargs: Arguments
            
        Returns:
            Function result
            
        Raises:
            CircuitOpenError: If circuit is open
        """
        if not self.is_available():
            raise CircuitOpenError(f"Circuit {self.name} is open")
        
        try:
            result = func(*args, **kwargs)
            self.record_success()
            return result
        except Exception as e:
            self.record_failure()
            raise


class CircuitOpenError(Exception):
    """Raised when circuit is open."""
    pass


# Global circuit breaker registry
_circuits: dict[str, CircuitBreaker] = {}


def get_circuit(name: str = "etsy") -> CircuitBreaker:
    """Get or create a circuit breaker."""
    if name not in _circuits:
        _circuits[name] = CircuitBreaker(name=name)
    return _circuits[name]

Rate Limit Manager

# etsy_client/rate_limiter.py
"""Proactive rate limit management."""

import time
import threading
from typing import Optional
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)


@dataclass
class RateLimitInfo:
    """Rate limit information from API response."""
    limit: int
    remaining: int
    reset_at: float  # Unix timestamp


class RateLimiter:
    """
    Proactive rate limit management.
    
    Tracks API limits and prevents requests that would exceed them.
    """
    
    def __init__(
        self,
        requests_per_second: float = 10.0,
        daily_limit: int = 10000,
        buffer_percentage: float = 0.1
    ):
        """
        Initialize rate limiter.
        
        Args:
            requests_per_second: Target RPS
            daily_limit: Maximum daily requests
            buffer_percentage: Buffer before hitting limits
        """
        self.requests_per_second = requests_per_second
        self.daily_limit = daily_limit
        self.buffer_percentage = buffer_percentage
        
        self._lock = threading.Lock()
        self._last_request_time = 0.0
        self._daily_count = 0
        self._daily_reset_time = 0.0
        
        # Track from API headers
        self._api_limit_info: Optional[RateLimitInfo] = None
    
    def _min_request_interval(self) -> float:
        """Calculate minimum time between requests."""
        return 1.0 / self.requests_per_second
    
    def _daily_remaining(self) -> int:
        """Get remaining daily requests."""
        if self._api_limit_info:
            return self._api_limit_info.remaining
        
        buffer = int(self.daily_limit * self.buffer_percentage)
        return max(0, self.daily_limit - self._daily_count - buffer)
    
    def wait_if_needed(self):
        """
        Block if necessary to respect rate limits.
        
        Called before each API request.
        """
        with self._lock:
            # Check daily limit
            if self._daily_remaining() <= 0:
                wait_time = self._daily_reset_time - time.time()
                if wait_time > 0:
                    logger.warning(f"Daily limit reached, waiting {wait_time:.0f}s")
                    time.sleep(wait_time)
            
            # Respect per-second rate
            now = time.time()
            elapsed = now - self._last_request_time
            min_interval = self._min_request_interval()
            
            if elapsed < min_interval:
                sleep_time = min_interval - elapsed
                time.sleep(sleep_time)
            
            self._last_request_time = time.time()
            self._daily_count += 1
    
    def update_from_headers(self, headers: dict):
        """
        Update limits from API response headers.
        
        Args:
            headers: Response headers
        """
        try:
            limit = int(headers.get('X-Limit-Daily-Limit', 0))
            remaining = int(headers.get('X-Limit-Daily-Remaining', 0))
            reset = int(headers.get('X-Limit-Daily-Reset', 0))
            
            if limit > 0:
                with self._lock:
                    self._api_limit_info = RateLimitInfo(
                        limit=limit,
                        remaining=remaining,
                        reset_at=float(reset)
                    )
                    self._daily_reset_time = float(reset)
        except (ValueError, TypeError):
            pass
    
    def get_status(self) -> dict:
        """Get current rate limit status."""
        with self._lock:
            return {
                'daily_remaining': self._daily_remaining(),
                'last_request': self._last_request_time,
                'api_info': self._api_limit_info
            }


class AdaptiveRateLimiter(RateLimiter):
    """
    Rate limiter that adapts based on API responses.
    
    Automatically slows down when approaching limits
    and speeds up when limits are healthy.
    """
    
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._consecutive_rate_limits = 0
        self._adaptive_multiplier = 1.0
    
    def record_rate_limit(self):
        """Record a rate limit response."""
        with self._lock:
            self._consecutive_rate_limits += 1
            # Double the wait time each consecutive rate limit
            self._adaptive_multiplier = min(
                10.0, 
                2.0 ** self._consecutive_rate_limits
            )
            logger.warning(
                f"Rate limit hit, multiplier now {self._adaptive_multiplier}"
            )
    
    def record_success(self):
        """Record successful request."""
        with self._lock:
            self._consecutive_rate_limits = 0
            # Gradually return to normal
            self._adaptive_multiplier = max(
                1.0,
                self._adaptive_multiplier * 0.9
            )
    
    def _min_request_interval(self) -> float:
        """Calculate adaptive interval."""
        base = super()._min_request_interval()
        return base * self._adaptive_multiplier

Enhanced Client with Error Handling

# etsy_client/resilient_client.py
"""Resilient Etsy API client with full error handling."""

import requests
from typing import Dict, Any, Optional
import logging

from .errors import (
    EtsyAPIError, AuthenticationError, RateLimitError,
    ValidationError, NotFoundError, ServerError, NetworkError,
    classify_error
)
from .retry import RetryConfig, retry
from .circuit_breaker import CircuitBreaker, CircuitOpenError
from .rate_limiter import AdaptiveRateLimiter

logger = logging.getLogger(__name__)


class ResilientEtsyClient:
    """
    Production-ready Etsy API client with:
    - Automatic retries with exponential backoff
    - Circuit breaker pattern
    - Proactive rate limiting
    - Comprehensive error handling
    """
    
    BASE_URL = "https://api.etsy.com/v3"
    
    def __init__(
        self,
        api_key: str,
        access_token: str = None,
        retry_config: RetryConfig = None,
        circuit_breaker: CircuitBreaker = None,
        rate_limiter: AdaptiveRateLimiter = None
    ):
        self.api_key = api_key
        self.access_token = access_token
        
        self.retry_config = retry_config or RetryConfig()
        self.circuit = circuit_breaker or CircuitBreaker(name="etsy")
        self.rate_limiter = rate_limiter or AdaptiveRateLimiter()
        
        self.session = requests.Session()
    
    def _headers(self) -> Dict[str, str]:
        """Get request headers."""
        headers = {
            "x-api-key": self.api_key,
            "Accept": "application/json"
        }
        if self.access_token:
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers
    
    def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
        """
        Process API response.
        
        Args:
            response: HTTP response
            
        Returns:
            Response JSON
            
        Raises:
            Appropriate EtsyAPIError subclass
        """
        # Update rate limit info
        self.rate_limiter.update_from_headers(response.headers)
        
        # Success
        if response.ok:
            self.rate_limiter.record_success()
            return response.json() if response.content else {}
        
        # Handle rate limit specifically
        if response.status_code == 429:
            self.rate_limiter.record_rate_limit()
            retry_after = int(response.headers.get('Retry-After', 60))
            raise RateLimitError(
                "Rate limit exceeded",
                retry_after=retry_after,
                response_body=response.json() if response.content else {}
            )
        
        # Parse error response
        try:
            error_body = response.json()
        except Exception:
            error_body = {"error": response.text}
        
        raise classify_error(response.status_code, error_body)
    
    @retry()
    def _make_request(
        self,
        method: str,
        endpoint: str,
        **kwargs
    ) -> Dict[str, Any]:
        """
        Make API request with all protections.
        
        Args:
            method: HTTP method
            endpoint: API endpoint
            **kwargs: Request arguments
            
        Returns:
            Response JSON
        """
        # Check circuit breaker
        if not self.circuit.is_available():
            raise CircuitOpenError("Circuit is open, request rejected")
        
        # Wait for rate limit
        self.rate_limiter.wait_if_needed()
        
        url = f"{self.BASE_URL}{endpoint}"
        
        try:
            response = self.session.request(
                method,
                url,
                headers=self._headers(),
                timeout=30,
                **kwargs
            )
            
            result = self._handle_response(response)
            self.circuit.record_success()
            return result
            
        except (requests.ConnectionError, requests.Timeout) as e:
            self.circuit.record_failure()
            raise NetworkError(str(e), original_error=e)
        
        except EtsyAPIError as e:
            if e.category in ['server', 'network']:
                self.circuit.record_failure()
            raise
    
    def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]:
        """GET request."""
        return self._make_request("GET", endpoint, params=params)
    
    def post(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
        """POST request."""
        return self._make_request("POST", endpoint, json=data)
    
    def put(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
        """PUT request."""
        return self._make_request("PUT", endpoint, json=data)
    
    def patch(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
        """PATCH request."""
        return self._make_request("PATCH", endpoint, json=data)
    
    def delete(self, endpoint: str) -> Dict[str, Any]:
        """DELETE request."""
        return self._make_request("DELETE", endpoint)
    
    def health_check(self) -> Dict[str, Any]:
        """
        Check client health.
        
        Returns:
            Health status dictionary
        """
        return {
            'circuit_state': self.circuit.state.value,
            'rate_limit_status': self.rate_limiter.get_status(),
            'session_active': self.session is not None
        }

Error Monitoring

# etsy_client/monitoring.py
"""Error monitoring and alerting."""

import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
import threading
import logging

from .errors import EtsyAPIError, ErrorCategory

logger = logging.getLogger(__name__)


@dataclass
class ErrorRecord:
    """Record of an API error."""
    error: EtsyAPIError
    timestamp: float
    endpoint: str
    context: Dict


class ErrorMonitor:
    """
    Monitor API errors and trigger alerts.
    """
    
    def __init__(
        self,
        window_seconds: int = 300,  # 5 minutes
        alert_threshold: int = 10
    ):
        self.window_seconds = window_seconds
        self.alert_threshold = alert_threshold
        
        self._errors: deque = deque()
        self._lock = threading.Lock()
        self._alert_callbacks: List[Callable] = []
    
    def record_error(
        self,
        error: EtsyAPIError,
        endpoint: str = "",
        context: Dict = None
    ):
        """
        Record an error occurrence.
        
        Args:
            error: The error that occurred
            endpoint: API endpoint
            context: Additional context
        """
        with self._lock:
            record = ErrorRecord(
                error=error,
                timestamp=time.time(),
                endpoint=endpoint,
                context=context or {}
            )
            self._errors.append(record)
            
            # Clean old errors
            self._cleanup()
            
            # Check for alert
            self._check_alerts()
    
    def _cleanup(self):
        """Remove errors outside window."""
        cutoff = time.time() - self.window_seconds
        while self._errors and self._errors[0].timestamp < cutoff:
            self._errors.popleft()
    
    def _check_alerts(self):
        """Check if alert should be triggered."""
        if len(self._errors) >= self.alert_threshold:
            self._trigger_alert()
    
    def _trigger_alert(self):
        """Trigger alert callbacks."""
        summary = self.get_summary()
        
        for callback in self._alert_callbacks:
            try:
                callback(summary)
            except Exception as e:
                logger.error(f"Alert callback failed: {e}")
    
    def on_alert(self, callback: Callable[[Dict], None]):
        """Register alert callback."""
        self._alert_callbacks.append(callback)
    
    def get_summary(self) -> Dict:
        """Get error summary."""
        with self._lock:
            self._cleanup()
            
            by_category = {}
            by_endpoint = {}
            
            for record in self._errors:
                cat = record.error.category.value
                by_category[cat] = by_category.get(cat, 0) + 1
                
                by_endpoint[record.endpoint] = by_endpoint.get(
                    record.endpoint, 0
                ) + 1
            
            return {
                'total_errors': len(self._errors),
                'window_seconds': self.window_seconds,
                'by_category': by_category,
                'by_endpoint': by_endpoint,
                'most_recent': (
                    self._errors[-1].error.message 
                    if self._errors else None
                )
            }


# Usage
def setup_error_monitoring():
    """Set up error monitoring with alerting."""
    monitor = ErrorMonitor(
        window_seconds=300,  # 5 minutes
        alert_threshold=10
    )
    
    def send_alert(summary: Dict):
        """Send alert notification."""
        logger.critical(f"HIGH ERROR RATE: {summary}")
        # Add Slack/email notification here
    
    monitor.on_alert(send_alert)
    
    return monitor

Best Practices Script

# scripts/test_error_handling.py
"""Test error handling and resilience."""

import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent.parent))

from config import Config
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.errors import (
    EtsyAPIError, AuthenticationError, RateLimitError,
    ValidationError, NotFoundError
)
from etsy_client.monitoring import ErrorMonitor


def test_error_handling():
    """Test various error scenarios."""
    
    client = ResilientEtsyClient(
        api_key=Config.ETSY_API_KEY,
        access_token=Config.ETSY_ACCESS_TOKEN
    )
    
    monitor = ErrorMonitor()
    
    print("Testing error handling scenarios...")
    print("=" * 60)
    
    # Test 1: Not Found
    print("\n1. Testing Not Found error:")
    try:
        client.get("/application/listings/999999999")
    except NotFoundError as e:
        print(f"   ✓ Caught NotFoundError: {e.message}")
        monitor.record_error(e, "/application/listings/999999999")
    except EtsyAPIError as e:
        print(f"   ✓ Caught error: {e}")
        monitor.record_error(e)
    
    # Test 2: Invalid request
    print("\n2. Testing Validation error:")
    try:
        # Attempt to create listing with missing fields
        client.post("/application/shops/0/listings", data={})
    except ValidationError as e:
        print(f"   ✓ Caught ValidationError: {e.message}")
        print(f"     Errors: {e.validation_errors}")
        monitor.record_error(e)
    except EtsyAPIError as e:
        print(f"   ✓ Caught error: {e}")
    
    # Test 3: Health check
    print("\n3. Client health check:")
    health = client.health_check()
    print(f"   Circuit state: {health['circuit_state']}")
    print(f"   Rate limit: {health['rate_limit_status']}")
    
    # Test 4: Error summary
    print("\n4. Error summary:")
    summary = monitor.get_summary()
    print(f"   Total errors: {summary['total_errors']}")
    print(f"   By category: {summary['by_category']}")
    
    print("\n" + "=" * 60)
    print("Error handling test complete!")


if __name__ == "__main__":
    test_error_handling()

Key Takeaways

  1. Classify errors: Different errors need different handling
  2. Retry wisely: Only retry transient errors with backoff
  3. Use circuit breakers: Prevent cascading failures
  4. Rate limit proactively: Don’t wait for 429 responses
  5. Monitor patterns: Track errors to spot issues early

Moving Forward

With robust error handling in place, you’re ready to build complete automation systems. The next chapter brings everything together into a fully automated Etsy workflow.


← Chapter 10: Webhooks and Real-Time Updates Table of Contents Chapter 12: Building a Complete Automation System →