API calls fail. Networks timeout. Tokens expire. This chapter covers:
Etsy API uses standard HTTP status codes:
| Code | Meaning | Action |
|---|---|---|
| 200 | Success | Process response |
| 201 | Created | Resource created successfully |
| 400 | Bad Request | Check your request data |
| 401 | Unauthorized | Refresh token or re-authenticate |
| 403 | Forbidden | Check permissions/scopes |
| 404 | Not Found | Resource doesn’t exist |
| 429 | Rate Limited | Wait and retry |
| 500 | Server Error | Retry later |
| 503 | Service Unavailable | Etsy is down, retry later |
See code/client.py for the complete implementation.
import requests
class EtsyAPIError(Exception):
"""Custom exception for Etsy API errors."""
def __init__(self, status_code, message, response=None):
self.status_code = status_code
self.message = message
self.response = response
super().__init__(f"[{status_code}] {message}")
def handle_response(response):
"""Process API response and raise appropriate errors."""
if response.status_code == 200:
return response.json()
elif response.status_code == 401:
raise EtsyAPIError(401, "Authentication failed. Token may be expired.")
elif response.status_code == 404:
raise EtsyAPIError(404, "Resource not found.")
elif response.status_code == 429:
raise EtsyAPIError(429, "Rate limit exceeded.")
else:
raise EtsyAPIError(
response.status_code,
response.text,
response
)
import time
from functools import wraps
def retry_on_failure(max_retries=3, delay=1, backoff=2):
"""Decorator to retry failed API calls."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except EtsyAPIError as e:
if e.status_code in [429, 500, 503]:
retries += 1
if retries < max_retries:
print(f"Retry {retries}/{max_retries} after {current_delay}s")
time.sleep(current_delay)
current_delay *= backoff
else:
raise
else:
raise
return func(*args, **kwargs)
return wrapper
return decorator
@retry_on_failure(max_retries=3, delay=2)
def get_listing(client, listing_id):
return client.get(f"/application/listings/{listing_id}")
Etsy has rate limits (typically 10 requests/second, 10,000/day).
def is_rate_limited(response):
"""Check if response indicates rate limiting."""
if response.status_code == 429:
return True
# Check headers for rate limit info
remaining = response.headers.get("X-RateLimit-Remaining")
if remaining and int(remaining) <= 0:
return True
return False
import time
class RateLimiter:
"""Simple rate limiter for API calls."""
def __init__(self, calls_per_second=5):
self.min_interval = 1.0 / calls_per_second
self.last_call = 0
def wait(self):
"""Wait if necessary to respect rate limit."""
elapsed = time.time() - self.last_call
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call = time.time()
# Usage in client
limiter = RateLimiter(calls_per_second=5)
def api_call(endpoint):
limiter.wait()
return requests.get(endpoint)
Access tokens expire after about 1 hour.
class EtsyClient:
def __init__(self):
self.tokens = load_tokens()
self.token_expiry = None
def _ensure_valid_token(self):
"""Refresh token if expired or about to expire."""
if self.token_expiry and time.time() > self.token_expiry - 60:
self._refresh_token()
def _refresh_token(self):
"""Get new access token using refresh token."""
response = requests.post(
"https://api.etsy.com/v3/public/oauth/token",
data={
"grant_type": "refresh_token",
"client_id": Config.API_KEY,
"refresh_token": self.tokens["refresh_token"]
}
)
if response.status_code == 200:
self.tokens = response.json()
self.token_expiry = time.time() + self.tokens["expires_in"]
save_tokens(self.tokens)
else:
raise EtsyAPIError(401, "Failed to refresh token")
def get(self, endpoint, **kwargs):
self._ensure_valid_token()
# Make request...
def create_listing_safely(client, shop_id, data):
"""Create listing with validation."""
# Validate required fields
required = ["title", "description", "price", "quantity", "taxonomy_id"]
missing = [f for f in required if f not in data]
if missing:
raise ValueError(f"Missing required fields: {missing}")
# Validate field lengths
if len(data["title"]) > 140:
raise ValueError("Title exceeds 140 characters")
if len(data.get("tags", [])) > 13:
raise ValueError("Maximum 13 tags allowed")
try:
return client.post(f"/application/shops/{shop_id}/listings", data=data)
except EtsyAPIError as e:
if "taxonomy" in str(e).lower():
raise ValueError(f"Invalid taxonomy_id: {data['taxonomy_id']}")
raise
def upload_file_safely(client, shop_id, listing_id, file_path):
"""Upload file with error handling."""
from pathlib import Path
path = Path(file_path)
# Validate file exists
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Validate file size (20MB limit)
size_mb = path.stat().st_size / (1024 * 1024)
if size_mb > 20:
raise ValueError(f"File too large: {size_mb:.1f}MB (max 20MB)")
try:
return upload_digital_file(client, shop_id, listing_id, path)
except EtsyAPIError as e:
if e.status_code == 400:
raise ValueError(f"Invalid file or listing: {e.message}")
raise
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('etsy_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('etsy')
def logged_request(method, url, **kwargs):
"""Make request with logging."""
logger.info(f"{method.upper()} {url}")
response = requests.request(method, url, **kwargs)
if response.status_code >= 400:
logger.error(f"Error {response.status_code}: {response.text[:200]}")
else:
logger.debug(f"Success: {response.status_code}")
return response
class EtsyClient:
def __init__(self, debug=False):
self.debug = debug
def _log(self, message):
if self.debug:
print(f"[DEBUG] {message}")
def get(self, endpoint, **kwargs):
self._log(f"GET {endpoint}")
response = requests.get(self.base_url + endpoint, **kwargs)
self._log(f"Response: {response.status_code}")
return response
def bulk_update_with_errors(client, shop_id, updates):
"""Perform bulk updates, collecting errors instead of failing."""
results = {
"success": [],
"errors": []
}
for listing_id, data in updates.items():
try:
client.put(
f"/application/shops/{shop_id}/listings/{listing_id}",
data=data
)
results["success"].append(listing_id)
except EtsyAPIError as e:
results["errors"].append({
"listing_id": listing_id,
"error": str(e)
})
except Exception as e:
results["errors"].append({
"listing_id": listing_id,
"error": f"Unexpected: {str(e)}"
})
return results
def process_with_recovery(items, processor_func):
"""Process items with ability to resume on failure."""
processed = []
failed = []
for item in items:
try:
result = processor_func(item)
processed.append({"item": item, "result": result})
except Exception as e:
failed.append({"item": item, "error": str(e)})
# Save progress for potential resume
save_progress(processed, failed)
return {"processed": len(processed), "failed": len(failed), "errors": failed}
import json
PROGRESS_FILE = "progress.json"
def save_progress(completed, pending):
"""Save progress for resumable operations."""
with open(PROGRESS_FILE, "w") as f:
json.dump({"completed": completed, "pending": pending}, f)
def load_progress():
"""Load previous progress if exists."""
try:
with open(PROGRESS_FILE) as f:
return json.load(f)
except FileNotFoundError:
return {"completed": [], "pending": []}
def resume_operation(client, shop_id, all_items, operation):
"""Resume a previously failed operation."""
progress = load_progress()
completed = set(progress["completed"])
remaining = [i for i in all_items if i not in completed]
print(f"Resuming: {len(remaining)} items remaining")
for item in remaining:
operation(client, shop_id, item)
completed.add(item)
save_progress(list(completed), [])
With error handling in place, you’re ready to build robust automation workflows. The next chapter brings everything together into practical automation scripts.
| ← Previous: Analytics | Next: Automation Workflows → |