API integrations fail. Networks drop, services timeout, and rate limits trigger. The difference between a fragile integration and a production-ready system is how you handle these failures.
# etsy_client/errors.py
"""Comprehensive error handling for Etsy API."""
from typing import Dict, Any, Optional
from enum import Enum
import time
class ErrorCategory(Enum):
"""Categories of API errors."""
AUTHENTICATION = "auth"
RATE_LIMIT = "rate_limit"
VALIDATION = "validation"
NOT_FOUND = "not_found"
SERVER = "server"
NETWORK = "network"
UNKNOWN = "unknown"
class EtsyAPIError(Exception):
"""Base exception for Etsy API errors."""
def __init__(
self,
message: str,
status_code: int = None,
response_body: Dict = None,
category: ErrorCategory = ErrorCategory.UNKNOWN
):
super().__init__(message)
self.message = message
self.status_code = status_code
self.response_body = response_body or {}
self.category = category
self.timestamp = time.time()
@property
def is_retryable(self) -> bool:
"""Check if error can be retried."""
return self.category in [
ErrorCategory.RATE_LIMIT,
ErrorCategory.SERVER,
ErrorCategory.NETWORK
]
@property
def retry_after(self) -> Optional[int]:
"""Get retry delay in seconds."""
if self.category == ErrorCategory.RATE_LIMIT:
return self.response_body.get('retry_after', 60)
elif self.category == ErrorCategory.SERVER:
return 30
elif self.category == ErrorCategory.NETWORK:
return 5
return None
def __repr__(self):
return f"EtsyAPIError({self.category.value}, {self.status_code}: {self.message})"
class AuthenticationError(EtsyAPIError):
"""Authentication or authorization failed."""
def __init__(self, message: str, status_code: int = 401, **kwargs):
super().__init__(
message,
status_code,
category=ErrorCategory.AUTHENTICATION,
**kwargs
)
class RateLimitError(EtsyAPIError):
"""Rate limit exceeded."""
def __init__(self, message: str, retry_after: int = 60, **kwargs):
super().__init__(
message,
status_code=429,
category=ErrorCategory.RATE_LIMIT,
**kwargs
)
self._retry_after = retry_after
@property
def retry_after(self) -> int:
return self._retry_after
class ValidationError(EtsyAPIError):
"""Request validation failed."""
def __init__(self, message: str, errors: list = None, **kwargs):
super().__init__(
message,
status_code=400,
category=ErrorCategory.VALIDATION,
**kwargs
)
self.validation_errors = errors or []
class NotFoundError(EtsyAPIError):
"""Resource not found."""
def __init__(self, resource_type: str, resource_id: Any, **kwargs):
message = f"{resource_type} {resource_id} not found"
super().__init__(
message,
status_code=404,
category=ErrorCategory.NOT_FOUND,
**kwargs
)
self.resource_type = resource_type
self.resource_id = resource_id
class ServerError(EtsyAPIError):
"""Server-side error."""
def __init__(self, message: str, status_code: int = 500, **kwargs):
super().__init__(
message,
status_code,
category=ErrorCategory.SERVER,
**kwargs
)
class NetworkError(EtsyAPIError):
"""Network connectivity error."""
def __init__(self, message: str, original_error: Exception = None, **kwargs):
super().__init__(
message,
category=ErrorCategory.NETWORK,
**kwargs
)
self.original_error = original_error
def classify_error(status_code: int, response: Dict) -> EtsyAPIError:
"""
Classify HTTP error into appropriate exception.
Args:
status_code: HTTP status code
response: Response body
Returns:
Appropriate EtsyAPIError subclass
"""
message = response.get('error', 'Unknown error')
if status_code == 401:
return AuthenticationError(message, response_body=response)
elif status_code == 403:
return AuthenticationError(
"Access denied - check scopes",
status_code=403,
response_body=response
)
elif status_code == 404:
return NotFoundError("Resource", "unknown", response_body=response)
elif status_code == 429:
retry_after = int(response.get('retry_after', 60))
return RateLimitError(message, retry_after=retry_after, response_body=response)
elif status_code == 400:
errors = response.get('errors', [])
return ValidationError(message, errors=errors, response_body=response)
elif 500 <= status_code < 600:
return ServerError(message, status_code=status_code, response_body=response)
else:
return EtsyAPIError(message, status_code=status_code, response_body=response)
# etsy_client/retry.py
"""Retry mechanisms for API calls."""
import time
import random
from typing import Callable, TypeVar, Any
from functools import wraps
import logging
from .errors import EtsyAPIError, RateLimitError, NetworkError
logger = logging.getLogger(__name__)
T = TypeVar('T')
class RetryConfig:
"""Configuration for retry behavior."""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int, error: EtsyAPIError = None) -> float:
"""
Calculate delay for next retry.
Args:
attempt: Current attempt number (0-indexed)
error: The error that triggered retry
Returns:
Delay in seconds
"""
# Use error's retry_after if available
if error and error.retry_after:
delay = error.retry_after
else:
# Exponential backoff
delay = self.base_delay * (self.exponential_base ** attempt)
# Apply maximum
delay = min(delay, self.max_delay)
# Add jitter
if self.jitter:
delay = delay * (0.5 + random.random())
return delay
def retry(config: RetryConfig = None):
"""
Decorator for retrying failed API calls.
Args:
config: RetryConfig instance
"""
if config is None:
config = RetryConfig()
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_error = None
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except EtsyAPIError as e:
last_error = e
if not e.is_retryable:
raise
if attempt >= config.max_retries:
logger.error(f"Max retries exceeded: {e}")
raise
delay = config.get_delay(attempt, e)
logger.warning(
f"Retry {attempt + 1}/{config.max_retries} "
f"after {delay:.1f}s: {e}"
)
time.sleep(delay)
except Exception as e:
# Network errors from requests library
if "Connection" in str(e) or "Timeout" in str(e):
last_error = NetworkError(str(e), original_error=e)
if attempt >= config.max_retries:
raise last_error
delay = config.get_delay(attempt, last_error)
logger.warning(f"Network error, retry in {delay:.1f}s")
time.sleep(delay)
else:
raise
raise last_error
return wrapper
return decorator
class RetryableClient:
"""Mixin for adding retry capability to API clients."""
def __init__(self, retry_config: RetryConfig = None):
self.retry_config = retry_config or RetryConfig()
def with_retry(self, func: Callable[..., T], *args, **kwargs) -> T:
"""
Execute function with retry logic.
Args:
func: Function to call
*args, **kwargs: Arguments to pass
Returns:
Function result
"""
@retry(self.retry_config)
def wrapper():
return func(*args, **kwargs)
return wrapper()
# etsy_client/circuit_breaker.py
"""Circuit breaker for protecting against cascading failures."""
import time
import threading
from enum import Enum
from typing import Callable, TypeVar, Any
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitStats:
"""Statistics for circuit breaker."""
failures: int = 0
successes: int = 0
last_failure_time: float = 0
last_success_time: float = 0
class CircuitBreaker:
"""
Circuit breaker to prevent cascading failures.
When too many failures occur, the circuit opens and rejects
calls immediately, giving the service time to recover.
"""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 60.0,
name: str = "default"
):
"""
Initialize circuit breaker.
Args:
failure_threshold: Failures before opening
success_threshold: Successes needed to close
timeout: Seconds before trying half-open
name: Circuit name for logging
"""
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.name = name
self._state = CircuitState.CLOSED
self._stats = CircuitStats()
self._lock = threading.Lock()
@property
def state(self) -> CircuitState:
"""Get current state."""
with self._lock:
# Check if we should transition from OPEN to HALF_OPEN
if self._state == CircuitState.OPEN:
if time.time() - self._stats.last_failure_time > self.timeout:
logger.info(f"Circuit {self.name}: OPEN -> HALF_OPEN")
self._state = CircuitState.HALF_OPEN
self._stats.failures = 0
return self._state
def is_available(self) -> bool:
"""Check if circuit allows calls."""
return self.state != CircuitState.OPEN
def record_success(self):
"""Record a successful call."""
with self._lock:
self._stats.successes += 1
self._stats.last_success_time = time.time()
if self._state == CircuitState.HALF_OPEN:
if self._stats.successes >= self.success_threshold:
logger.info(f"Circuit {self.name}: HALF_OPEN -> CLOSED")
self._state = CircuitState.CLOSED
self._stats.failures = 0
def record_failure(self):
"""Record a failed call."""
with self._lock:
self._stats.failures += 1
self._stats.last_failure_time = time.time()
self._stats.successes = 0
if self._state == CircuitState.CLOSED:
if self._stats.failures >= self.failure_threshold:
logger.warning(f"Circuit {self.name}: CLOSED -> OPEN")
self._state = CircuitState.OPEN
elif self._state == CircuitState.HALF_OPEN:
logger.warning(f"Circuit {self.name}: HALF_OPEN -> OPEN")
self._state = CircuitState.OPEN
def call(self, func: Callable[..., T], *args, **kwargs) -> T:
"""
Execute function through circuit breaker.
Args:
func: Function to call
*args, **kwargs: Arguments
Returns:
Function result
Raises:
CircuitOpenError: If circuit is open
"""
if not self.is_available():
raise CircuitOpenError(f"Circuit {self.name} is open")
try:
result = func(*args, **kwargs)
self.record_success()
return result
except Exception as e:
self.record_failure()
raise
class CircuitOpenError(Exception):
"""Raised when circuit is open."""
pass
# Global circuit breaker registry
_circuits: dict[str, CircuitBreaker] = {}
def get_circuit(name: str = "etsy") -> CircuitBreaker:
"""Get or create a circuit breaker."""
if name not in _circuits:
_circuits[name] = CircuitBreaker(name=name)
return _circuits[name]
# etsy_client/rate_limiter.py
"""Proactive rate limit management."""
import time
import threading
from typing import Optional
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class RateLimitInfo:
"""Rate limit information from API response."""
limit: int
remaining: int
reset_at: float # Unix timestamp
class RateLimiter:
"""
Proactive rate limit management.
Tracks API limits and prevents requests that would exceed them.
"""
def __init__(
self,
requests_per_second: float = 10.0,
daily_limit: int = 10000,
buffer_percentage: float = 0.1
):
"""
Initialize rate limiter.
Args:
requests_per_second: Target RPS
daily_limit: Maximum daily requests
buffer_percentage: Buffer before hitting limits
"""
self.requests_per_second = requests_per_second
self.daily_limit = daily_limit
self.buffer_percentage = buffer_percentage
self._lock = threading.Lock()
self._last_request_time = 0.0
self._daily_count = 0
self._daily_reset_time = 0.0
# Track from API headers
self._api_limit_info: Optional[RateLimitInfo] = None
def _min_request_interval(self) -> float:
"""Calculate minimum time between requests."""
return 1.0 / self.requests_per_second
def _daily_remaining(self) -> int:
"""Get remaining daily requests."""
if self._api_limit_info:
return self._api_limit_info.remaining
buffer = int(self.daily_limit * self.buffer_percentage)
return max(0, self.daily_limit - self._daily_count - buffer)
def wait_if_needed(self):
"""
Block if necessary to respect rate limits.
Called before each API request.
"""
with self._lock:
# Check daily limit
if self._daily_remaining() <= 0:
wait_time = self._daily_reset_time - time.time()
if wait_time > 0:
logger.warning(f"Daily limit reached, waiting {wait_time:.0f}s")
time.sleep(wait_time)
# Respect per-second rate
now = time.time()
elapsed = now - self._last_request_time
min_interval = self._min_request_interval()
if elapsed < min_interval:
sleep_time = min_interval - elapsed
time.sleep(sleep_time)
self._last_request_time = time.time()
self._daily_count += 1
def update_from_headers(self, headers: dict):
"""
Update limits from API response headers.
Args:
headers: Response headers
"""
try:
limit = int(headers.get('X-Limit-Daily-Limit', 0))
remaining = int(headers.get('X-Limit-Daily-Remaining', 0))
reset = int(headers.get('X-Limit-Daily-Reset', 0))
if limit > 0:
with self._lock:
self._api_limit_info = RateLimitInfo(
limit=limit,
remaining=remaining,
reset_at=float(reset)
)
self._daily_reset_time = float(reset)
except (ValueError, TypeError):
pass
def get_status(self) -> dict:
"""Get current rate limit status."""
with self._lock:
return {
'daily_remaining': self._daily_remaining(),
'last_request': self._last_request_time,
'api_info': self._api_limit_info
}
class AdaptiveRateLimiter(RateLimiter):
"""
Rate limiter that adapts based on API responses.
Automatically slows down when approaching limits
and speeds up when limits are healthy.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._consecutive_rate_limits = 0
self._adaptive_multiplier = 1.0
def record_rate_limit(self):
"""Record a rate limit response."""
with self._lock:
self._consecutive_rate_limits += 1
# Double the wait time each consecutive rate limit
self._adaptive_multiplier = min(
10.0,
2.0 ** self._consecutive_rate_limits
)
logger.warning(
f"Rate limit hit, multiplier now {self._adaptive_multiplier}"
)
def record_success(self):
"""Record successful request."""
with self._lock:
self._consecutive_rate_limits = 0
# Gradually return to normal
self._adaptive_multiplier = max(
1.0,
self._adaptive_multiplier * 0.9
)
def _min_request_interval(self) -> float:
"""Calculate adaptive interval."""
base = super()._min_request_interval()
return base * self._adaptive_multiplier
# etsy_client/resilient_client.py
"""Resilient Etsy API client with full error handling."""
import requests
from typing import Dict, Any, Optional
import logging
from .errors import (
EtsyAPIError, AuthenticationError, RateLimitError,
ValidationError, NotFoundError, ServerError, NetworkError,
classify_error
)
from .retry import RetryConfig, retry
from .circuit_breaker import CircuitBreaker, CircuitOpenError
from .rate_limiter import AdaptiveRateLimiter
logger = logging.getLogger(__name__)
class ResilientEtsyClient:
"""
Production-ready Etsy API client with:
- Automatic retries with exponential backoff
- Circuit breaker pattern
- Proactive rate limiting
- Comprehensive error handling
"""
BASE_URL = "https://api.etsy.com/v3"
def __init__(
self,
api_key: str,
access_token: str = None,
retry_config: RetryConfig = None,
circuit_breaker: CircuitBreaker = None,
rate_limiter: AdaptiveRateLimiter = None
):
self.api_key = api_key
self.access_token = access_token
self.retry_config = retry_config or RetryConfig()
self.circuit = circuit_breaker or CircuitBreaker(name="etsy")
self.rate_limiter = rate_limiter or AdaptiveRateLimiter()
self.session = requests.Session()
def _headers(self) -> Dict[str, str]:
"""Get request headers."""
headers = {
"x-api-key": self.api_key,
"Accept": "application/json"
}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
return headers
def _handle_response(self, response: requests.Response) -> Dict[str, Any]:
"""
Process API response.
Args:
response: HTTP response
Returns:
Response JSON
Raises:
Appropriate EtsyAPIError subclass
"""
# Update rate limit info
self.rate_limiter.update_from_headers(response.headers)
# Success
if response.ok:
self.rate_limiter.record_success()
return response.json() if response.content else {}
# Handle rate limit specifically
if response.status_code == 429:
self.rate_limiter.record_rate_limit()
retry_after = int(response.headers.get('Retry-After', 60))
raise RateLimitError(
"Rate limit exceeded",
retry_after=retry_after,
response_body=response.json() if response.content else {}
)
# Parse error response
try:
error_body = response.json()
except Exception:
error_body = {"error": response.text}
raise classify_error(response.status_code, error_body)
@retry()
def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict[str, Any]:
"""
Make API request with all protections.
Args:
method: HTTP method
endpoint: API endpoint
**kwargs: Request arguments
Returns:
Response JSON
"""
# Check circuit breaker
if not self.circuit.is_available():
raise CircuitOpenError("Circuit is open, request rejected")
# Wait for rate limit
self.rate_limiter.wait_if_needed()
url = f"{self.BASE_URL}{endpoint}"
try:
response = self.session.request(
method,
url,
headers=self._headers(),
timeout=30,
**kwargs
)
result = self._handle_response(response)
self.circuit.record_success()
return result
except (requests.ConnectionError, requests.Timeout) as e:
self.circuit.record_failure()
raise NetworkError(str(e), original_error=e)
except EtsyAPIError as e:
if e.category in ['server', 'network']:
self.circuit.record_failure()
raise
def get(self, endpoint: str, params: Dict = None) -> Dict[str, Any]:
"""GET request."""
return self._make_request("GET", endpoint, params=params)
def post(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
"""POST request."""
return self._make_request("POST", endpoint, json=data)
def put(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
"""PUT request."""
return self._make_request("PUT", endpoint, json=data)
def patch(self, endpoint: str, data: Dict = None) -> Dict[str, Any]:
"""PATCH request."""
return self._make_request("PATCH", endpoint, json=data)
def delete(self, endpoint: str) -> Dict[str, Any]:
"""DELETE request."""
return self._make_request("DELETE", endpoint)
def health_check(self) -> Dict[str, Any]:
"""
Check client health.
Returns:
Health status dictionary
"""
return {
'circuit_state': self.circuit.state.value,
'rate_limit_status': self.rate_limiter.get_status(),
'session_active': self.session is not None
}
# etsy_client/monitoring.py
"""Error monitoring and alerting."""
import time
from collections import deque
from dataclasses import dataclass
from typing import Dict, List, Optional, Callable
import threading
import logging
from .errors import EtsyAPIError, ErrorCategory
logger = logging.getLogger(__name__)
@dataclass
class ErrorRecord:
"""Record of an API error."""
error: EtsyAPIError
timestamp: float
endpoint: str
context: Dict
class ErrorMonitor:
"""
Monitor API errors and trigger alerts.
"""
def __init__(
self,
window_seconds: int = 300, # 5 minutes
alert_threshold: int = 10
):
self.window_seconds = window_seconds
self.alert_threshold = alert_threshold
self._errors: deque = deque()
self._lock = threading.Lock()
self._alert_callbacks: List[Callable] = []
def record_error(
self,
error: EtsyAPIError,
endpoint: str = "",
context: Dict = None
):
"""
Record an error occurrence.
Args:
error: The error that occurred
endpoint: API endpoint
context: Additional context
"""
with self._lock:
record = ErrorRecord(
error=error,
timestamp=time.time(),
endpoint=endpoint,
context=context or {}
)
self._errors.append(record)
# Clean old errors
self._cleanup()
# Check for alert
self._check_alerts()
def _cleanup(self):
"""Remove errors outside window."""
cutoff = time.time() - self.window_seconds
while self._errors and self._errors[0].timestamp < cutoff:
self._errors.popleft()
def _check_alerts(self):
"""Check if alert should be triggered."""
if len(self._errors) >= self.alert_threshold:
self._trigger_alert()
def _trigger_alert(self):
"""Trigger alert callbacks."""
summary = self.get_summary()
for callback in self._alert_callbacks:
try:
callback(summary)
except Exception as e:
logger.error(f"Alert callback failed: {e}")
def on_alert(self, callback: Callable[[Dict], None]):
"""Register alert callback."""
self._alert_callbacks.append(callback)
def get_summary(self) -> Dict:
"""Get error summary."""
with self._lock:
self._cleanup()
by_category = {}
by_endpoint = {}
for record in self._errors:
cat = record.error.category.value
by_category[cat] = by_category.get(cat, 0) + 1
by_endpoint[record.endpoint] = by_endpoint.get(
record.endpoint, 0
) + 1
return {
'total_errors': len(self._errors),
'window_seconds': self.window_seconds,
'by_category': by_category,
'by_endpoint': by_endpoint,
'most_recent': (
self._errors[-1].error.message
if self._errors else None
)
}
# Usage
def setup_error_monitoring():
"""Set up error monitoring with alerting."""
monitor = ErrorMonitor(
window_seconds=300, # 5 minutes
alert_threshold=10
)
def send_alert(summary: Dict):
"""Send alert notification."""
logger.critical(f"HIGH ERROR RATE: {summary}")
# Add Slack/email notification here
monitor.on_alert(send_alert)
return monitor
# scripts/test_error_handling.py
"""Test error handling and resilience."""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import Config
from etsy_client.resilient_client import ResilientEtsyClient
from etsy_client.errors import (
EtsyAPIError, AuthenticationError, RateLimitError,
ValidationError, NotFoundError
)
from etsy_client.monitoring import ErrorMonitor
def test_error_handling():
"""Test various error scenarios."""
client = ResilientEtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
monitor = ErrorMonitor()
print("Testing error handling scenarios...")
print("=" * 60)
# Test 1: Not Found
print("\n1. Testing Not Found error:")
try:
client.get("/application/listings/999999999")
except NotFoundError as e:
print(f" ✓ Caught NotFoundError: {e.message}")
monitor.record_error(e, "/application/listings/999999999")
except EtsyAPIError as e:
print(f" ✓ Caught error: {e}")
monitor.record_error(e)
# Test 2: Invalid request
print("\n2. Testing Validation error:")
try:
# Attempt to create listing with missing fields
client.post("/application/shops/0/listings", data={})
except ValidationError as e:
print(f" ✓ Caught ValidationError: {e.message}")
print(f" Errors: {e.validation_errors}")
monitor.record_error(e)
except EtsyAPIError as e:
print(f" ✓ Caught error: {e}")
# Test 3: Health check
print("\n3. Client health check:")
health = client.health_check()
print(f" Circuit state: {health['circuit_state']}")
print(f" Rate limit: {health['rate_limit_status']}")
# Test 4: Error summary
print("\n4. Error summary:")
summary = monitor.get_summary()
print(f" Total errors: {summary['total_errors']}")
print(f" By category: {summary['by_category']}")
print("\n" + "=" * 60)
print("Error handling test complete!")
if __name__ == "__main__":
test_error_handling()
With robust error handling in place, you’re ready to build complete automation systems. The next chapter brings everything together into a fully automated Etsy workflow.
| ← Chapter 10: Webhooks and Real-Time Updates | Table of Contents | Chapter 12: Building a Complete Automation System → |