Digital products have unique pricing flexibility. Unlike physical goods with fixed costs, digital products can be priced dynamically based on market conditions, competition, and performance data. This chapter explores automated pricing strategies and inventory management for digital products.
While digital products have “unlimited” inventory, Etsy’s system still requires quantity management:
# etsy_client/inventory.py
"""Inventory management for digital products."""
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from .client import EtsyClient
from .listings import ListingOperations
@dataclass
class InventoryItem:
"""Represents a listing's inventory state."""
listing_id: int
quantity: int
price: float
sku: Optional[str]
is_enabled: bool
variations: List[Dict]
class InventoryOperations:
"""Operations for managing listing inventory."""
def __init__(self, client: EtsyClient):
self.client = client
def get_listing_inventory(
self,
listing_id: int
) -> Dict[str, Any]:
"""
Get inventory details for a listing.
Args:
listing_id: The listing ID
Returns:
Inventory data including products and offerings
"""
return self.client.get(
f"/application/listings/{listing_id}/inventory"
)
def update_listing_inventory(
self,
listing_id: int,
products: List[Dict],
**kwargs
) -> Dict[str, Any]:
"""
Update inventory for a listing.
Args:
listing_id: The listing ID
products: List of product variations with offerings
**kwargs: Additional inventory parameters
Returns:
Updated inventory data
"""
data = {"products": products}
data.update(kwargs)
return self.client.put(
f"/application/listings/{listing_id}/inventory",
json=data
)
def set_listing_quantity(
self,
listing_id: int,
quantity: int = 999
) -> Dict[str, Any]:
"""
Set quantity for a simple listing (no variations).
Args:
listing_id: The listing ID
quantity: New quantity (default 999 for digital)
Returns:
Updated inventory data
"""
# Get current inventory to preserve structure
current = self.get_listing_inventory(listing_id)
products = current.get('products', [])
if not products:
# Simple listing
return self.update_listing_inventory(
listing_id,
products=[{
"offerings": [{
"quantity": quantity,
"is_enabled": True
}]
}]
)
# Update existing products
for product in products:
for offering in product.get('offerings', []):
offering['quantity'] = quantity
return self.update_listing_inventory(listing_id, products)
class DigitalInventoryManager:
"""
Manage inventory specifically for digital products.
Digital products have special considerations:
- Quantity is essentially unlimited
- Variations might be file formats
- Auto-renew is typical
"""
# Default high quantity for digital products
DEFAULT_QUANTITY = 999
def __init__(self, client: EtsyClient, shop_id: int):
self.client = client
self.shop_id = shop_id
self.inventory_ops = InventoryOperations(client)
self.listing_ops = ListingOperations(client)
def ensure_stock(
self,
min_quantity: int = 100
) -> List[Dict]:
"""
Ensure all digital listings have sufficient stock.
Args:
min_quantity: Minimum quantity threshold
Returns:
List of updated listings
"""
# Get all active listings
listings = self.listing_ops.get_all_listings(self.shop_id, "active")
updated = []
for listing in listings:
# Check if digital
if not listing.get('is_digital', False):
continue
# Check quantity
if listing.get('quantity', 0) < min_quantity:
try:
self.inventory_ops.set_listing_quantity(
listing['listing_id'],
self.DEFAULT_QUANTITY
)
updated.append({
'listing_id': listing['listing_id'],
'title': listing['title'],
'old_quantity': listing['quantity'],
'new_quantity': self.DEFAULT_QUANTITY
})
except Exception as e:
print(f"Error updating {listing['listing_id']}: {e}")
return updated
def create_format_variations(
self,
listing_id: int,
formats: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Create format variations for a digital product.
Args:
listing_id: The listing ID
formats: List of format options, e.g.:
[
{"name": "PDF", "price": 9.99},
{"name": "PNG", "price": 7.99},
{"name": "SVG + PDF + PNG Bundle", "price": 14.99}
]
Returns:
Updated inventory data
"""
products = []
for fmt in formats:
products.append({
"sku": fmt.get('sku', ''),
"property_values": [{
"property_id": 513, # Format property
"value_ids": [],
"values": [fmt['name']]
}],
"offerings": [{
"price": int(fmt['price'] * 100), # Convert to cents
"quantity": self.DEFAULT_QUANTITY,
"is_enabled": True
}]
})
return self.inventory_ops.update_listing_inventory(
listing_id,
products
)
# etsy_client/pricing.py
"""Dynamic pricing strategies for Etsy listings."""
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from .client import EtsyClient
from .listings import ListingOperations
from .orders import OrderOperations
class PricingStrategy(Enum):
"""Available pricing strategies."""
FIXED = "fixed"
PERFORMANCE_BASED = "performance"
TIME_BASED = "time"
DEMAND_BASED = "demand"
COMPETITIVE = "competitive"
@dataclass
class PricingRule:
"""Rule for dynamic pricing."""
strategy: PricingStrategy
base_price: float
min_price: float
max_price: float
parameters: Dict[str, Any]
class DynamicPricer:
"""
Implement dynamic pricing strategies.
Adjusts prices based on various factors like performance,
time, and demand.
"""
def __init__(self, client: EtsyClient, shop_id: int):
self.client = client
self.shop_id = shop_id
self.listing_ops = ListingOperations(client)
self.order_ops = OrderOperations(client)
def apply_performance_pricing(
self,
listing_id: int,
base_price: float,
min_price: float,
max_price: float,
views_threshold: int = 100,
conversion_target: float = 0.02 # 2% conversion
) -> Dict[str, Any]:
"""
Adjust price based on listing performance.
High views + low sales = lower price
High conversion = raise price
Args:
listing_id: The listing ID
base_price: Starting price
min_price: Minimum allowed price
max_price: Maximum allowed price
views_threshold: Minimum views for analysis
conversion_target: Target conversion rate
Returns:
Price adjustment result
"""
# Get listing stats
listing = self.listing_ops.get_listing(listing_id)
views = listing.get('views', 0)
favorites = listing.get('num_favorers', 0)
# Get recent sales
transactions = self.order_ops.get_listing_transactions(
self.shop_id, listing_id
)
sales = len(transactions.get('results', []))
if views < views_threshold:
# Not enough data
return {
'listing_id': listing_id,
'action': 'no_change',
'reason': 'insufficient_views',
'current_price': base_price
}
# Calculate conversion rate
conversion_rate = sales / views if views > 0 else 0
# Determine price adjustment
current_price = listing['price']['amount'] / listing['price']['divisor']
if conversion_rate < conversion_target * 0.5:
# Very low conversion - reduce price
adjustment = -0.15 # 15% reduction
elif conversion_rate < conversion_target:
# Below target - small reduction
adjustment = -0.05 # 5% reduction
elif conversion_rate > conversion_target * 2:
# High conversion - increase price
adjustment = 0.10 # 10% increase
elif conversion_rate > conversion_target:
# Above target - small increase
adjustment = 0.05 # 5% increase
else:
adjustment = 0
if adjustment == 0:
return {
'listing_id': listing_id,
'action': 'no_change',
'reason': 'optimal_conversion',
'current_price': current_price,
'conversion_rate': conversion_rate
}
# Calculate new price
new_price = current_price * (1 + adjustment)
new_price = max(min_price, min(max_price, new_price))
new_price = round(new_price, 2)
if new_price != current_price:
self.listing_ops.update_price(listing_id, new_price)
return {
'listing_id': listing_id,
'action': 'adjusted',
'old_price': current_price,
'new_price': new_price,
'adjustment': adjustment,
'conversion_rate': conversion_rate
}
def apply_time_based_pricing(
self,
listing_id: int,
schedule: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Apply time-based pricing (sales, holidays, etc.).
Args:
listing_id: The listing ID
schedule: List of pricing periods, e.g.:
[
{
"start": "2026-11-25",
"end": "2026-11-30",
"discount_percent": 30,
"name": "Black Friday"
}
]
Returns:
Applied pricing result
"""
now = datetime.now()
for period in schedule:
start = datetime.strptime(period['start'], '%Y-%m-%d')
end = datetime.strptime(period['end'], '%Y-%m-%d')
if start <= now <= end:
# Apply discount
listing = self.listing_ops.get_listing(listing_id)
current_price = listing['price']['amount'] / listing['price']['divisor']
discount = period['discount_percent'] / 100
sale_price = round(current_price * (1 - discount), 2)
self.listing_ops.update_price(listing_id, sale_price)
return {
'listing_id': listing_id,
'action': 'sale_applied',
'sale_name': period['name'],
'original_price': current_price,
'sale_price': sale_price,
'discount_percent': period['discount_percent']
}
return {
'listing_id': listing_id,
'action': 'no_sale_active'
}
class SaleManager:
"""Manage sales and promotional pricing."""
def __init__(self, client: EtsyClient, shop_id: int):
self.client = client
self.shop_id = shop_id
self.listing_ops = ListingOperations(client)
self._original_prices: Dict[int, float] = {}
def start_sale(
self,
discount_percent: float,
listing_ids: List[int] = None,
min_price: float = 0.20
) -> Dict[str, Any]:
"""
Start a sale on specified listings or all listings.
Args:
discount_percent: Discount percentage (e.g., 20 for 20% off)
listing_ids: Specific listings, or None for all
min_price: Minimum price after discount
Returns:
Sale application results
"""
if listing_ids is None:
# Get all active listings
listings = self.listing_ops.get_all_listings(
self.shop_id, "active"
)
listing_ids = [l['listing_id'] for l in listings]
results = {
'updated': [],
'errors': [],
'skipped': []
}
multiplier = 1 - (discount_percent / 100)
for listing_id in listing_ids:
try:
listing = self.listing_ops.get_listing(listing_id)
current_price = listing['price']['amount'] / listing['price']['divisor']
# Store original price
self._original_prices[listing_id] = current_price
# Calculate sale price
sale_price = round(current_price * multiplier, 2)
sale_price = max(sale_price, min_price)
if sale_price == current_price:
results['skipped'].append(listing_id)
continue
self.listing_ops.update_price(listing_id, sale_price)
results['updated'].append({
'listing_id': listing_id,
'original': current_price,
'sale_price': sale_price
})
except Exception as e:
results['errors'].append({
'listing_id': listing_id,
'error': str(e)
})
return results
def end_sale(
self,
listing_ids: List[int] = None
) -> Dict[str, Any]:
"""
End a sale and restore original prices.
Args:
listing_ids: Specific listings, or None for all in sale
Returns:
Restoration results
"""
if listing_ids is None:
listing_ids = list(self._original_prices.keys())
results = {
'restored': [],
'errors': [],
'no_original': []
}
for listing_id in listing_ids:
if listing_id not in self._original_prices:
results['no_original'].append(listing_id)
continue
try:
original_price = self._original_prices[listing_id]
self.listing_ops.update_price(listing_id, original_price)
del self._original_prices[listing_id]
results['restored'].append({
'listing_id': listing_id,
'restored_price': original_price
})
except Exception as e:
results['errors'].append({
'listing_id': listing_id,
'error': str(e)
})
return results
def save_original_prices(self, filepath: str):
"""Save original prices to file for persistence."""
import json
with open(filepath, 'w') as f:
json.dump(self._original_prices, f)
def load_original_prices(self, filepath: str):
"""Load original prices from file."""
import json
with open(filepath, 'r') as f:
data = json.load(f)
self._original_prices = {int(k): v for k, v in data.items()}
# scripts/optimize_prices.py
"""Optimize prices based on performance data."""
import sys
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import Config
from etsy_client import EtsyClient
from etsy_client.pricing import DynamicPricer
from etsy_client.listings import ListingOperations
from etsy_client.users import get_my_shop_id
def optimize_all_prices(
min_views: int = 100,
conversion_target: float = 0.02,
price_range_percent: float = 30
):
"""
Optimize prices for all listings with sufficient data.
Args:
min_views: Minimum views required for optimization
conversion_target: Target conversion rate
price_range_percent: Max price adjustment from current
"""
client = EtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
shop_id = get_my_shop_id(client)
pricer = DynamicPricer(client, shop_id)
listing_ops = ListingOperations(client)
print(f"Starting price optimization at {datetime.now()}")
print(f"Parameters: min_views={min_views}, target={conversion_target*100}%")
print("=" * 60)
# Get all active listings
listings = listing_ops.get_all_listings(shop_id, "active")
results = {
'optimized': [],
'no_change': [],
'insufficient_data': [],
'errors': []
}
for listing in listings:
listing_id = listing['listing_id']
current_price = listing['price']['amount'] / listing['price']['divisor']
# Calculate price bounds
min_price = current_price * (1 - price_range_percent / 100)
max_price = current_price * (1 + price_range_percent / 100)
try:
result = pricer.apply_performance_pricing(
listing_id,
base_price=current_price,
min_price=min_price,
max_price=max_price,
views_threshold=min_views,
conversion_target=conversion_target
)
if result['action'] == 'adjusted':
results['optimized'].append(result)
print(f" ↕ {listing['title'][:40]}: "
f"${result['old_price']:.2f} → ${result['new_price']:.2f}")
elif result['reason'] == 'insufficient_views':
results['insufficient_data'].append(listing_id)
else:
results['no_change'].append(listing_id)
except Exception as e:
results['errors'].append({
'listing_id': listing_id,
'error': str(e)
})
# Summary
print("\n" + "=" * 60)
print("OPTIMIZATION SUMMARY")
print("=" * 60)
print(f" Prices adjusted: {len(results['optimized'])}")
print(f" No change needed: {len(results['no_change'])}")
print(f" Insufficient data: {len(results['insufficient_data'])}")
print(f" Errors: {len(results['errors'])}")
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--min-views', type=int, default=100)
parser.add_argument('--target', type=float, default=0.02)
parser.add_argument('--range', type=float, default=30)
args = parser.parse_args()
optimize_all_prices(
min_views=args.min_views,
conversion_target=args.target,
price_range_percent=args.range
)
# scripts/manage_sale.py
"""Manage sales and promotions."""
import sys
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import Config
from etsy_client import EtsyClient
from etsy_client.pricing import SaleManager
from etsy_client.users import get_my_shop_id
PRICES_FILE = "data/original_prices.json"
def start_sale(discount: float):
"""Start a shop-wide sale."""
client = EtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
shop_id = get_my_shop_id(client)
manager = SaleManager(client, shop_id)
print(f"Starting {discount}% off sale...")
print("=" * 50)
results = manager.start_sale(discount)
# Save original prices
manager.save_original_prices(PRICES_FILE)
print(f"\n✓ Updated {len(results['updated'])} listings")
print(f" Skipped: {len(results['skipped'])}")
print(f" Errors: {len(results['errors'])}")
if results['updated']:
print("\nSample updates:")
for item in results['updated'][:5]:
print(f" ${item['original']:.2f} → ${item['sale_price']:.2f}")
print(f"\nOriginal prices saved to: {PRICES_FILE}")
def end_sale():
"""End the current sale and restore prices."""
client = EtsyClient(
api_key=Config.ETSY_API_KEY,
access_token=Config.ETSY_ACCESS_TOKEN
)
shop_id = get_my_shop_id(client)
manager = SaleManager(client, shop_id)
# Load original prices
try:
manager.load_original_prices(PRICES_FILE)
except FileNotFoundError:
print("Error: No saved prices found. Was a sale started?")
return
print("Ending sale and restoring original prices...")
print("=" * 50)
results = manager.end_sale()
print(f"\n✓ Restored {len(results['restored'])} listings")
print(f" No original found: {len(results['no_original'])}")
print(f" Errors: {len(results['errors'])}")
# Clean up prices file
Path(PRICES_FILE).unlink(missing_ok=True)
def scheduled_sale(
discount: float,
start_time: str,
end_time: str
):
"""
Schedule a sale for a specific time period.
Args:
discount: Discount percentage
start_time: Start time (YYYY-MM-DD HH:MM)
end_time: End time (YYYY-MM-DD HH:MM)
"""
import time
start = datetime.strptime(start_time, '%Y-%m-%d %H:%M')
end = datetime.strptime(end_time, '%Y-%m-%d %H:%M')
now = datetime.now()
if now > end:
print("Error: End time is in the past")
return
if now < start:
wait_seconds = (start - now).total_seconds()
print(f"Sale scheduled to start at {start_time}")
print(f"Waiting {wait_seconds / 3600:.1f} hours...")
time.sleep(wait_seconds)
# Start sale
print(f"\n{'='*50}")
print("STARTING SCHEDULED SALE")
print(f"{'='*50}")
start_sale(discount)
# Wait for end
now = datetime.now()
if now < end:
wait_seconds = (end - now).total_seconds()
print(f"\nSale running until {end_time}")
print(f"Waiting {wait_seconds / 3600:.1f} hours...")
time.sleep(wait_seconds)
# End sale
print(f"\n{'='*50}")
print("ENDING SCHEDULED SALE")
print(f"{'='*50}")
end_sale()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Manage sales')
subparsers = parser.add_subparsers(dest='command')
start_parser = subparsers.add_parser('start', help='Start a sale')
start_parser.add_argument('discount', type=float, help='Discount percentage')
end_parser = subparsers.add_parser('end', help='End the current sale')
schedule_parser = subparsers.add_parser('schedule', help='Schedule a sale')
schedule_parser.add_argument('discount', type=float, help='Discount percentage')
schedule_parser.add_argument('start', help='Start time (YYYY-MM-DD HH:MM)')
schedule_parser.add_argument('end', help='End time (YYYY-MM-DD HH:MM)')
args = parser.parse_args()
if args.command == 'start':
start_sale(args.discount)
elif args.command == 'end':
end_sale()
elif args.command == 'schedule':
scheduled_sale(args.discount, args.start, args.end)
else:
parser.print_help()
# etsy_client/price_testing.py
"""A/B testing for pricing strategies."""
import random
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
from .client import EtsyClient
from .listings import ListingOperations
from .orders import OrderOperations
@dataclass
class PriceTest:
"""Represents a price A/B test."""
test_id: str
listing_id: int
price_a: float
price_b: float
start_date: datetime
end_date: Optional[datetime] = None
current_variant: str = 'A'
results: Dict = field(default_factory=dict)
class PriceTestManager:
"""
Manage A/B tests for pricing.
Tests different prices over time to find optimal pricing.
"""
def __init__(
self,
client: EtsyClient,
shop_id: int,
test_file: str = "data/price_tests.json"
):
self.client = client
self.shop_id = shop_id
self.test_file = test_file
self.listing_ops = ListingOperations(client)
self.order_ops = OrderOperations(client)
self._tests: Dict[str, PriceTest] = {}
self._load_tests()
def _load_tests(self):
"""Load tests from file."""
try:
with open(self.test_file, 'r') as f:
data = json.load(f)
for test_id, test_data in data.items():
test_data['start_date'] = datetime.fromisoformat(
test_data['start_date']
)
if test_data.get('end_date'):
test_data['end_date'] = datetime.fromisoformat(
test_data['end_date']
)
self._tests[test_id] = PriceTest(**test_data)
except FileNotFoundError:
pass
def _save_tests(self):
"""Save tests to file."""
data = {}
for test_id, test in self._tests.items():
test_dict = {
'test_id': test.test_id,
'listing_id': test.listing_id,
'price_a': test.price_a,
'price_b': test.price_b,
'start_date': test.start_date.isoformat(),
'end_date': test.end_date.isoformat() if test.end_date else None,
'current_variant': test.current_variant,
'results': test.results
}
data[test_id] = test_dict
with open(self.test_file, 'w') as f:
json.dump(data, f, indent=2)
def start_test(
self,
listing_id: int,
price_a: float,
price_b: float,
duration_days: int = 14
) -> PriceTest:
"""
Start a new price A/B test.
Args:
listing_id: Listing to test
price_a: First price variant
price_b: Second price variant
duration_days: Test duration in days
Returns:
PriceTest object
"""
test_id = f"test_{listing_id}_{datetime.now().strftime('%Y%m%d')}"
test = PriceTest(
test_id=test_id,
listing_id=listing_id,
price_a=price_a,
price_b=price_b,
start_date=datetime.now(),
end_date=datetime.now() + timedelta(days=duration_days),
current_variant='A',
results={
'A': {'views': 0, 'sales': 0, 'revenue': 0, 'periods': 0},
'B': {'views': 0, 'sales': 0, 'revenue': 0, 'periods': 0}
}
)
# Set initial price
self.listing_ops.update_price(listing_id, price_a)
self._tests[test_id] = test
self._save_tests()
return test
def rotate_variant(self, test_id: str):
"""
Rotate to the other price variant.
Should be called periodically (e.g., daily) during the test.
"""
test = self._tests.get(test_id)
if not test:
raise ValueError(f"Test not found: {test_id}")
# Record current metrics before switching
self._record_metrics(test)
# Switch variant
if test.current_variant == 'A':
test.current_variant = 'B'
new_price = test.price_b
else:
test.current_variant = 'A'
new_price = test.price_a
self.listing_ops.update_price(test.listing_id, new_price)
self._save_tests()
def _record_metrics(self, test: PriceTest):
"""Record metrics for current variant."""
listing = self.listing_ops.get_listing(test.listing_id)
variant = test.current_variant
# This is simplified - in production, track incremental changes
test.results[variant]['views'] = listing.get('views', 0)
test.results[variant]['periods'] += 1
# Get sales for this listing
transactions = self.order_ops.get_listing_transactions(
self.shop_id, test.listing_id
)
test.results[variant]['sales'] = len(transactions.get('results', []))
def end_test(self, test_id: str) -> Dict[str, Any]:
"""
End a test and get results.
Args:
test_id: Test to end
Returns:
Test results with winner determination
"""
test = self._tests.get(test_id)
if not test:
raise ValueError(f"Test not found: {test_id}")
# Record final metrics
self._record_metrics(test)
test.end_date = datetime.now()
# Calculate conversion rates
results = {}
for variant in ['A', 'B']:
data = test.results[variant]
views = data.get('views', 0)
sales = data.get('sales', 0)
results[variant] = {
'price': test.price_a if variant == 'A' else test.price_b,
'views': views,
'sales': sales,
'conversion_rate': sales / views if views > 0 else 0,
'revenue_per_view': (sales * (test.price_a if variant == 'A' else test.price_b)) / views if views > 0 else 0
}
# Determine winner (by revenue per view)
rpv_a = results['A']['revenue_per_view']
rpv_b = results['B']['revenue_per_view']
if rpv_a > rpv_b * 1.1: # A wins by >10%
winner = 'A'
elif rpv_b > rpv_a * 1.1: # B wins by >10%
winner = 'B'
else:
winner = 'inconclusive'
# Set winning price
if winner == 'A':
self.listing_ops.update_price(test.listing_id, test.price_a)
elif winner == 'B':
self.listing_ops.update_price(test.listing_id, test.price_b)
self._save_tests()
return {
'test_id': test_id,
'listing_id': test.listing_id,
'duration_days': (test.end_date - test.start_date).days,
'results': results,
'winner': winner,
'winning_price': test.price_a if winner == 'A' else test.price_b if winner == 'B' else None
}
With pricing and inventory automation in place, you’ll want to understand what’s actually happening in your shop. The next chapter covers analytics and reporting—extracting insights from your sales data.
| ← Chapter 7: Orders and Transactions | Table of Contents | Chapter 9: Analytics and Reporting → |