This chapter brings everything together with practical automation workflows for digital product sellers:
Running a holiday sale? Automate the entire process.
SALE_CONFIG = {
"black_friday": {
"discount_percent": 30,
"tags_to_add": ["black friday", "sale", "cyber monday"],
"title_prefix": "π SALE: ",
"start_date": "2025-11-28",
"end_date": "2025-12-01"
},
"christmas": {
"discount_percent": 25,
"tags_to_add": ["christmas", "holiday", "gift"],
"title_prefix": "π ",
"start_date": "2025-12-15",
"end_date": "2025-12-26"
}
}
See code/automation.py for complete implementation.
def apply_sale(client, shop_id, sale_config):
"""Apply sale to all listings."""
listings = get_all_listings(client, shop_id)
original_prices = {} # Save for restoration
for listing in listings:
listing_id = listing["listing_id"]
# Save original price
original_prices[listing_id] = listing["price"]["amount"]
# Calculate discounted price
new_price = int(listing["price"]["amount"] * (1 - sale_config["discount_percent"] / 100))
# Update listing
updates = {
"price": new_price,
"title": sale_config["title_prefix"] + listing["title"][:120],
"tags": (listing.get("tags", []) + sale_config["tags_to_add"])[:13]
}
client.put(f"/application/shops/{shop_id}/listings/{listing_id}", data=updates)
# Save original prices for restoration
save_json("original_prices.json", original_prices)
return len(listings)
def restore_after_sale(client, shop_id, title_prefix):
"""Restore original prices and remove sale modifications."""
original_prices = load_json("original_prices.json")
listings = get_all_listings(client, shop_id)
for listing in listings:
listing_id = listing["listing_id"]
if listing_id in original_prices:
# Restore original price
original_price = original_prices[listing_id]
# Remove title prefix
title = listing["title"]
if title.startswith(title_prefix):
title = title[len(title_prefix):]
client.put(
f"/application/shops/{shop_id}/listings/{listing_id}",
data={"price": original_price, "title": title}
)
Keep tags updated based on seasons, trends, or performance.
SEASONAL_TAGS = {
1: ["new year", "january", "winter"],
2: ["valentine", "february", "love"],
3: ["spring", "march", "st patrick"],
9: ["fall", "autumn", "back to school"],
10: ["halloween", "october", "spooky"],
11: ["thanksgiving", "november", "gratitude"],
12: ["christmas", "holiday", "winter", "gift"]
}
def update_seasonal_tags(client, shop_id):
"""Update tags based on current month."""
current_month = datetime.now().month
tags_to_add = SEASONAL_TAGS.get(current_month, [])
if not tags_to_add:
return 0
listings = get_all_listings(client, shop_id)
updated = 0
for listing in listings:
current_tags = listing.get("tags", [])
# Remove old seasonal tags
all_seasonal = [t for tags in SEASONAL_TAGS.values() for t in tags]
filtered_tags = [t for t in current_tags if t not in all_seasonal]
# Add current seasonal tags
new_tags = (filtered_tags + tags_to_add)[:13]
if new_tags != current_tags:
client.put(
f"/application/shops/{shop_id}/listings/{listing['listing_id']}",
data={"tags": new_tags}
)
updated += 1
return updated
def optimize_tags_by_performance(client, shop_id, days=30):
"""Copy tags from best-performing listings to underperformers."""
# Get top performers
top = top_products(client, shop_id, days, limit=5)
# Collect their tags
winning_tags = []
for listing_id, _ in top:
listing = client.get(f"/application/listings/{listing_id}")
winning_tags.extend(listing.get("tags", []))
# Find most common winning tags
from collections import Counter
common_tags = [tag for tag, _ in Counter(winning_tags).most_common(5)]
# Apply to underperformers
underperformers = products_without_sales(client, shop_id, days)
for listing in underperformers:
current_tags = listing.get("tags", [])
new_tags = list(set(current_tags + common_tags))[:13]
client.put(
f"/application/shops/{shop_id}/listings/{listing['listing_id']}",
data={"tags": new_tags}
)
Automate the process of adding new products.
PRODUCT_TEMPLATE = {
"who_made": "i_did",
"when_made": "2020_2025",
"taxonomy_id": 2078,
"is_digital": True,
"quantity": 999,
"tags": ["digital download", "printable", "instant download"]
}
def create_from_template(client, shop_id, product_info):
"""Create listing from template with product-specific info."""
listing_data = {**PRODUCT_TEMPLATE, **product_info}
listing = client.post(
f"/application/shops/{shop_id}/listings",
data=listing_data
)
return listing["listing_id"]
def batch_create_products(client, shop_id, products_csv):
"""Create multiple products from a CSV file."""
import csv
created = []
errors = []
with open(products_csv) as f:
reader = csv.DictReader(f)
for row in reader:
try:
product_info = {
"title": row["title"],
"description": row["description"],
"price": int(float(row["price"]) * 100),
"tags": row["tags"].split(",")
}
listing_id = create_from_template(client, shop_id, product_info)
# Upload files if specified
if row.get("files"):
for file_path in row["files"].split(";"):
upload_digital_file(client, shop_id, listing_id, file_path)
created.append({"title": row["title"], "listing_id": listing_id})
except Exception as e:
errors.append({"title": row["title"], "error": str(e)})
return {"created": created, "errors": errors}
Generate and distribute reports automatically.
def daily_dashboard(client, shop_id):
"""Generate daily performance dashboard."""
today = datetime.now()
yesterday = today - timedelta(days=1)
# Today's sales
today_orders = get_orders_in_range(client, shop_id, today.replace(hour=0), today)
today_revenue = calculate_revenue(today_orders)
# Yesterday for comparison
yesterday_orders = get_orders_in_range(
client, shop_id,
yesterday.replace(hour=0),
yesterday.replace(hour=23, minute=59)
)
yesterday_revenue = calculate_revenue(yesterday_orders)
# Change percentage
change = ((today_revenue - yesterday_revenue) / yesterday_revenue * 100
if yesterday_revenue > 0 else 0)
dashboard = f"""
π Daily Dashboard - {today.strftime('%Y-%m-%d')}
βββββββββββββββββββββββββββ
Today's Orders: {len(today_orders)}
Today's Revenue: ${today_revenue:.2f}
vs Yesterday: {change:+.1f}%
βββββββββββββββββββββββββββ
"""
return dashboard
def generate_weekly_email(client, shop_id):
"""Generate weekly report suitable for email."""
report = generate_shop_report(client, shop_id, days=7)
# Format as HTML
html = f"""
<h1>Weekly Shop Report</h1>
<p>Week ending: {datetime.now().strftime('%Y-%m-%d')}</p>
<h2>Performance Summary</h2>
<ul>
<li>Orders: {report['performance']['total_orders']}</li>
<li>Revenue: ${report['performance']['total_revenue']:.2f}</li>
<li>Avg Order: ${report['performance']['average_order']:.2f}</li>
</ul>
<h2>Top Products</h2>
<ol>
"""
for listing_id, data in report['top_products'][:5]:
html += f"<li>{data['title'][:40]} - ${data['revenue']:.2f}</li>"
html += "</ol>"
return html
Sync listings with external systems (spreadsheets, other platforms).
def export_listings_to_csv(client, shop_id, filename="listings_export.csv"):
"""Export all listings to CSV for spreadsheet management."""
import csv
listings = get_all_listings(client, shop_id)
fieldnames = ["listing_id", "title", "price", "views", "favorites", "tags", "state"]
with open(filename, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for listing in listings:
writer.writerow({
"listing_id": listing["listing_id"],
"title": listing["title"],
"price": listing["price"]["amount"] / listing["price"]["divisor"],
"views": listing.get("views", 0),
"favorites": listing.get("num_favorers", 0),
"tags": ",".join(listing.get("tags", [])),
"state": listing["state"]
})
return filename
def import_updates_from_csv(client, shop_id, filename="listings_updates.csv"):
"""Apply updates from edited CSV."""
import csv
current_listings = {l["listing_id"]: l for l in get_all_listings(client, shop_id)}
updates = []
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
listing_id = int(row["listing_id"])
current = current_listings.get(listing_id)
if not current:
continue
changes = {}
# Check for price change
new_price = int(float(row["price"]) * 100)
if new_price != current["price"]["amount"]:
changes["price"] = new_price
# Check for title change
if row["title"] != current["title"]:
changes["title"] = row["title"]
# Check for tag changes
new_tags = row["tags"].split(",") if row["tags"] else []
if new_tags != current.get("tags", []):
changes["tags"] = new_tags
if changes:
updates.append({"listing_id": listing_id, "changes": changes})
# Apply updates
for update in updates:
client.put(
f"/application/shops/{shop_id}/listings/{update['listing_id']}",
data=update["changes"]
)
return len(updates)
# Run daily report at 8 AM
0 8 * * * /path/to/venv/bin/python /path/to/daily_report.py
# Update seasonal tags on the 1st of each month
0 0 1 * * /path/to/venv/bin/python /path/to/update_tags.py
import schedule
import time
def run_scheduler():
"""Run scheduled tasks."""
schedule.every().day.at("08:00").do(daily_dashboard, client, shop_id)
schedule.every().monday.at("09:00").do(generate_weekly_email, client, shop_id)
schedule.every(1).months.do(update_seasonal_tags, client, shop_id)
while True:
schedule.run_pending()
time.sleep(60)
Even with good automation, things go wrong. The final chapter covers troubleshooting common problems.
| β Previous: Error Handling | Next: Troubleshooting β |