Chapter 8: Inventory and Pricing Automation

The Power of Dynamic Pricing

Digital products have unique pricing flexibility. Unlike physical goods with fixed costs, digital products can be priced dynamically based on market conditions, competition, and performance data. This chapter explores automated pricing strategies and inventory management for digital products.

Inventory Management for Digital Products

While digital products have “unlimited” inventory, Etsy’s system still requires quantity management:

# etsy_client/inventory.py
"""Inventory management for digital products."""

from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from .client import EtsyClient
from .listings import ListingOperations


@dataclass
class InventoryItem:
    """Represents a listing's inventory state."""
    listing_id: int
    quantity: int
    price: float
    sku: Optional[str]
    is_enabled: bool
    variations: List[Dict]


class InventoryOperations:
    """Operations for managing listing inventory."""
    
    def __init__(self, client: EtsyClient):
        self.client = client
    
    def get_listing_inventory(
        self,
        listing_id: int
    ) -> Dict[str, Any]:
        """
        Get inventory details for a listing.
        
        Args:
            listing_id: The listing ID
            
        Returns:
            Inventory data including products and offerings
        """
        return self.client.get(
            f"/application/listings/{listing_id}/inventory"
        )
    
    def update_listing_inventory(
        self,
        listing_id: int,
        products: List[Dict],
        **kwargs
    ) -> Dict[str, Any]:
        """
        Update inventory for a listing.
        
        Args:
            listing_id: The listing ID
            products: List of product variations with offerings
            **kwargs: Additional inventory parameters
            
        Returns:
            Updated inventory data
        """
        data = {"products": products}
        data.update(kwargs)
        
        return self.client.put(
            f"/application/listings/{listing_id}/inventory",
            json=data
        )
    
    def set_listing_quantity(
        self,
        listing_id: int,
        quantity: int = 999
    ) -> Dict[str, Any]:
        """
        Set quantity for a simple listing (no variations).
        
        Args:
            listing_id: The listing ID
            quantity: New quantity (default 999 for digital)
            
        Returns:
            Updated inventory data
        """
        # Get current inventory to preserve structure
        current = self.get_listing_inventory(listing_id)
        products = current.get('products', [])
        
        if not products:
            # Simple listing
            return self.update_listing_inventory(
                listing_id,
                products=[{
                    "offerings": [{
                        "quantity": quantity,
                        "is_enabled": True
                    }]
                }]
            )
        
        # Update existing products
        for product in products:
            for offering in product.get('offerings', []):
                offering['quantity'] = quantity
        
        return self.update_listing_inventory(listing_id, products)


class DigitalInventoryManager:
    """
    Manage inventory specifically for digital products.
    
    Digital products have special considerations:
    - Quantity is essentially unlimited
    - Variations might be file formats
    - Auto-renew is typical
    """
    
    # Default high quantity for digital products
    DEFAULT_QUANTITY = 999
    
    def __init__(self, client: EtsyClient, shop_id: int):
        self.client = client
        self.shop_id = shop_id
        self.inventory_ops = InventoryOperations(client)
        self.listing_ops = ListingOperations(client)
    
    def ensure_stock(
        self,
        min_quantity: int = 100
    ) -> List[Dict]:
        """
        Ensure all digital listings have sufficient stock.
        
        Args:
            min_quantity: Minimum quantity threshold
            
        Returns:
            List of updated listings
        """
        # Get all active listings
        listings = self.listing_ops.get_all_listings(self.shop_id, "active")
        
        updated = []
        
        for listing in listings:
            # Check if digital
            if not listing.get('is_digital', False):
                continue
            
            # Check quantity
            if listing.get('quantity', 0) < min_quantity:
                try:
                    self.inventory_ops.set_listing_quantity(
                        listing['listing_id'],
                        self.DEFAULT_QUANTITY
                    )
                    updated.append({
                        'listing_id': listing['listing_id'],
                        'title': listing['title'],
                        'old_quantity': listing['quantity'],
                        'new_quantity': self.DEFAULT_QUANTITY
                    })
                except Exception as e:
                    print(f"Error updating {listing['listing_id']}: {e}")
        
        return updated
    
    def create_format_variations(
        self,
        listing_id: int,
        formats: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """
        Create format variations for a digital product.
        
        Args:
            listing_id: The listing ID
            formats: List of format options, e.g.:
                [
                    {"name": "PDF", "price": 9.99},
                    {"name": "PNG", "price": 7.99},
                    {"name": "SVG + PDF + PNG Bundle", "price": 14.99}
                ]
                
        Returns:
            Updated inventory data
        """
        products = []
        
        for fmt in formats:
            products.append({
                "sku": fmt.get('sku', ''),
                "property_values": [{
                    "property_id": 513,  # Format property
                    "value_ids": [],
                    "values": [fmt['name']]
                }],
                "offerings": [{
                    "price": int(fmt['price'] * 100),  # Convert to cents
                    "quantity": self.DEFAULT_QUANTITY,
                    "is_enabled": True
                }]
            })
        
        return self.inventory_ops.update_listing_inventory(
            listing_id,
            products
        )

Dynamic Pricing Strategies

# etsy_client/pricing.py
"""Dynamic pricing strategies for Etsy listings."""

from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from .client import EtsyClient
from .listings import ListingOperations
from .orders import OrderOperations


class PricingStrategy(Enum):
    """Available pricing strategies."""
    FIXED = "fixed"
    PERFORMANCE_BASED = "performance"
    TIME_BASED = "time"
    DEMAND_BASED = "demand"
    COMPETITIVE = "competitive"


@dataclass
class PricingRule:
    """Rule for dynamic pricing."""
    strategy: PricingStrategy
    base_price: float
    min_price: float
    max_price: float
    parameters: Dict[str, Any]


class DynamicPricer:
    """
    Implement dynamic pricing strategies.
    
    Adjusts prices based on various factors like performance,
    time, and demand.
    """
    
    def __init__(self, client: EtsyClient, shop_id: int):
        self.client = client
        self.shop_id = shop_id
        self.listing_ops = ListingOperations(client)
        self.order_ops = OrderOperations(client)
    
    def apply_performance_pricing(
        self,
        listing_id: int,
        base_price: float,
        min_price: float,
        max_price: float,
        views_threshold: int = 100,
        conversion_target: float = 0.02  # 2% conversion
    ) -> Dict[str, Any]:
        """
        Adjust price based on listing performance.
        
        High views + low sales = lower price
        High conversion = raise price
        
        Args:
            listing_id: The listing ID
            base_price: Starting price
            min_price: Minimum allowed price
            max_price: Maximum allowed price
            views_threshold: Minimum views for analysis
            conversion_target: Target conversion rate
            
        Returns:
            Price adjustment result
        """
        # Get listing stats
        listing = self.listing_ops.get_listing(listing_id)
        
        views = listing.get('views', 0)
        favorites = listing.get('num_favorers', 0)
        
        # Get recent sales
        transactions = self.order_ops.get_listing_transactions(
            self.shop_id, listing_id
        )
        sales = len(transactions.get('results', []))
        
        if views < views_threshold:
            # Not enough data
            return {
                'listing_id': listing_id,
                'action': 'no_change',
                'reason': 'insufficient_views',
                'current_price': base_price
            }
        
        # Calculate conversion rate
        conversion_rate = sales / views if views > 0 else 0
        
        # Determine price adjustment
        current_price = listing['price']['amount'] / listing['price']['divisor']
        
        if conversion_rate < conversion_target * 0.5:
            # Very low conversion - reduce price
            adjustment = -0.15  # 15% reduction
        elif conversion_rate < conversion_target:
            # Below target - small reduction
            adjustment = -0.05  # 5% reduction
        elif conversion_rate > conversion_target * 2:
            # High conversion - increase price
            adjustment = 0.10  # 10% increase
        elif conversion_rate > conversion_target:
            # Above target - small increase
            adjustment = 0.05  # 5% increase
        else:
            adjustment = 0
        
        if adjustment == 0:
            return {
                'listing_id': listing_id,
                'action': 'no_change',
                'reason': 'optimal_conversion',
                'current_price': current_price,
                'conversion_rate': conversion_rate
            }
        
        # Calculate new price
        new_price = current_price * (1 + adjustment)
        new_price = max(min_price, min(max_price, new_price))
        new_price = round(new_price, 2)
        
        if new_price != current_price:
            self.listing_ops.update_price(listing_id, new_price)
        
        return {
            'listing_id': listing_id,
            'action': 'adjusted',
            'old_price': current_price,
            'new_price': new_price,
            'adjustment': adjustment,
            'conversion_rate': conversion_rate
        }
    
    def apply_time_based_pricing(
        self,
        listing_id: int,
        schedule: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """
        Apply time-based pricing (sales, holidays, etc.).
        
        Args:
            listing_id: The listing ID
            schedule: List of pricing periods, e.g.:
                [
                    {
                        "start": "2026-11-25",
                        "end": "2026-11-30",
                        "discount_percent": 30,
                        "name": "Black Friday"
                    }
                ]
                
        Returns:
            Applied pricing result
        """
        now = datetime.now()
        
        for period in schedule:
            start = datetime.strptime(period['start'], '%Y-%m-%d')
            end = datetime.strptime(period['end'], '%Y-%m-%d')
            
            if start <= now <= end:
                # Apply discount
                listing = self.listing_ops.get_listing(listing_id)
                current_price = listing['price']['amount'] / listing['price']['divisor']
                
                discount = period['discount_percent'] / 100
                sale_price = round(current_price * (1 - discount), 2)
                
                self.listing_ops.update_price(listing_id, sale_price)
                
                return {
                    'listing_id': listing_id,
                    'action': 'sale_applied',
                    'sale_name': period['name'],
                    'original_price': current_price,
                    'sale_price': sale_price,
                    'discount_percent': period['discount_percent']
                }
        
        return {
            'listing_id': listing_id,
            'action': 'no_sale_active'
        }


class SaleManager:
    """Manage sales and promotional pricing."""
    
    def __init__(self, client: EtsyClient, shop_id: int):
        self.client = client
        self.shop_id = shop_id
        self.listing_ops = ListingOperations(client)
        
        self._original_prices: Dict[int, float] = {}
    
    def start_sale(
        self,
        discount_percent: float,
        listing_ids: List[int] = None,
        min_price: float = 0.20
    ) -> Dict[str, Any]:
        """
        Start a sale on specified listings or all listings.
        
        Args:
            discount_percent: Discount percentage (e.g., 20 for 20% off)
            listing_ids: Specific listings, or None for all
            min_price: Minimum price after discount
            
        Returns:
            Sale application results
        """
        if listing_ids is None:
            # Get all active listings
            listings = self.listing_ops.get_all_listings(
                self.shop_id, "active"
            )
            listing_ids = [l['listing_id'] for l in listings]
        
        results = {
            'updated': [],
            'errors': [],
            'skipped': []
        }
        
        multiplier = 1 - (discount_percent / 100)
        
        for listing_id in listing_ids:
            try:
                listing = self.listing_ops.get_listing(listing_id)
                current_price = listing['price']['amount'] / listing['price']['divisor']
                
                # Store original price
                self._original_prices[listing_id] = current_price
                
                # Calculate sale price
                sale_price = round(current_price * multiplier, 2)
                sale_price = max(sale_price, min_price)
                
                if sale_price == current_price:
                    results['skipped'].append(listing_id)
                    continue
                
                self.listing_ops.update_price(listing_id, sale_price)
                
                results['updated'].append({
                    'listing_id': listing_id,
                    'original': current_price,
                    'sale_price': sale_price
                })
                
            except Exception as e:
                results['errors'].append({
                    'listing_id': listing_id,
                    'error': str(e)
                })
        
        return results
    
    def end_sale(
        self,
        listing_ids: List[int] = None
    ) -> Dict[str, Any]:
        """
        End a sale and restore original prices.
        
        Args:
            listing_ids: Specific listings, or None for all in sale
            
        Returns:
            Restoration results
        """
        if listing_ids is None:
            listing_ids = list(self._original_prices.keys())
        
        results = {
            'restored': [],
            'errors': [],
            'no_original': []
        }
        
        for listing_id in listing_ids:
            if listing_id not in self._original_prices:
                results['no_original'].append(listing_id)
                continue
            
            try:
                original_price = self._original_prices[listing_id]
                self.listing_ops.update_price(listing_id, original_price)
                
                del self._original_prices[listing_id]
                
                results['restored'].append({
                    'listing_id': listing_id,
                    'restored_price': original_price
                })
                
            except Exception as e:
                results['errors'].append({
                    'listing_id': listing_id,
                    'error': str(e)
                })
        
        return results
    
    def save_original_prices(self, filepath: str):
        """Save original prices to file for persistence."""
        import json
        with open(filepath, 'w') as f:
            json.dump(self._original_prices, f)
    
    def load_original_prices(self, filepath: str):
        """Load original prices from file."""
        import json
        with open(filepath, 'r') as f:
            data = json.load(f)
            self._original_prices = {int(k): v for k, v in data.items()}

Automated Pricing Scripts

Performance-Based Price Optimizer

# scripts/optimize_prices.py
"""Optimize prices based on performance data."""

import sys
from pathlib import Path
from datetime import datetime

sys.path.insert(0, str(Path(__file__).parent.parent))

from config import Config
from etsy_client import EtsyClient
from etsy_client.pricing import DynamicPricer
from etsy_client.listings import ListingOperations
from etsy_client.users import get_my_shop_id


def optimize_all_prices(
    min_views: int = 100,
    conversion_target: float = 0.02,
    price_range_percent: float = 30
):
    """
    Optimize prices for all listings with sufficient data.
    
    Args:
        min_views: Minimum views required for optimization
        conversion_target: Target conversion rate
        price_range_percent: Max price adjustment from current
    """
    client = EtsyClient(
        api_key=Config.ETSY_API_KEY,
        access_token=Config.ETSY_ACCESS_TOKEN
    )
    
    shop_id = get_my_shop_id(client)
    pricer = DynamicPricer(client, shop_id)
    listing_ops = ListingOperations(client)
    
    print(f"Starting price optimization at {datetime.now()}")
    print(f"Parameters: min_views={min_views}, target={conversion_target*100}%")
    print("=" * 60)
    
    # Get all active listings
    listings = listing_ops.get_all_listings(shop_id, "active")
    
    results = {
        'optimized': [],
        'no_change': [],
        'insufficient_data': [],
        'errors': []
    }
    
    for listing in listings:
        listing_id = listing['listing_id']
        current_price = listing['price']['amount'] / listing['price']['divisor']
        
        # Calculate price bounds
        min_price = current_price * (1 - price_range_percent / 100)
        max_price = current_price * (1 + price_range_percent / 100)
        
        try:
            result = pricer.apply_performance_pricing(
                listing_id,
                base_price=current_price,
                min_price=min_price,
                max_price=max_price,
                views_threshold=min_views,
                conversion_target=conversion_target
            )
            
            if result['action'] == 'adjusted':
                results['optimized'].append(result)
                print(f"{listing['title'][:40]}: "
                      f"${result['old_price']:.2f} → ${result['new_price']:.2f}")
            elif result['reason'] == 'insufficient_views':
                results['insufficient_data'].append(listing_id)
            else:
                results['no_change'].append(listing_id)
                
        except Exception as e:
            results['errors'].append({
                'listing_id': listing_id,
                'error': str(e)
            })
    
    # Summary
    print("\n" + "=" * 60)
    print("OPTIMIZATION SUMMARY")
    print("=" * 60)
    print(f"  Prices adjusted: {len(results['optimized'])}")
    print(f"  No change needed: {len(results['no_change'])}")
    print(f"  Insufficient data: {len(results['insufficient_data'])}")
    print(f"  Errors: {len(results['errors'])}")
    
    return results


if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--min-views', type=int, default=100)
    parser.add_argument('--target', type=float, default=0.02)
    parser.add_argument('--range', type=float, default=30)
    
    args = parser.parse_args()
    
    optimize_all_prices(
        min_views=args.min_views,
        conversion_target=args.target,
        price_range_percent=args.range
    )

Sale Automation

# scripts/manage_sale.py
"""Manage sales and promotions."""

import sys
from pathlib import Path
from datetime import datetime

sys.path.insert(0, str(Path(__file__).parent.parent))

from config import Config
from etsy_client import EtsyClient
from etsy_client.pricing import SaleManager
from etsy_client.users import get_my_shop_id


PRICES_FILE = "data/original_prices.json"


def start_sale(discount: float):
    """Start a shop-wide sale."""
    client = EtsyClient(
        api_key=Config.ETSY_API_KEY,
        access_token=Config.ETSY_ACCESS_TOKEN
    )
    
    shop_id = get_my_shop_id(client)
    manager = SaleManager(client, shop_id)
    
    print(f"Starting {discount}% off sale...")
    print("=" * 50)
    
    results = manager.start_sale(discount)
    
    # Save original prices
    manager.save_original_prices(PRICES_FILE)
    
    print(f"\n✓ Updated {len(results['updated'])} listings")
    print(f"  Skipped: {len(results['skipped'])}")
    print(f"  Errors: {len(results['errors'])}")
    
    if results['updated']:
        print("\nSample updates:")
        for item in results['updated'][:5]:
            print(f"  ${item['original']:.2f} → ${item['sale_price']:.2f}")
    
    print(f"\nOriginal prices saved to: {PRICES_FILE}")


def end_sale():
    """End the current sale and restore prices."""
    client = EtsyClient(
        api_key=Config.ETSY_API_KEY,
        access_token=Config.ETSY_ACCESS_TOKEN
    )
    
    shop_id = get_my_shop_id(client)
    manager = SaleManager(client, shop_id)
    
    # Load original prices
    try:
        manager.load_original_prices(PRICES_FILE)
    except FileNotFoundError:
        print("Error: No saved prices found. Was a sale started?")
        return
    
    print("Ending sale and restoring original prices...")
    print("=" * 50)
    
    results = manager.end_sale()
    
    print(f"\n✓ Restored {len(results['restored'])} listings")
    print(f"  No original found: {len(results['no_original'])}")
    print(f"  Errors: {len(results['errors'])}")
    
    # Clean up prices file
    Path(PRICES_FILE).unlink(missing_ok=True)


def scheduled_sale(
    discount: float,
    start_time: str,
    end_time: str
):
    """
    Schedule a sale for a specific time period.
    
    Args:
        discount: Discount percentage
        start_time: Start time (YYYY-MM-DD HH:MM)
        end_time: End time (YYYY-MM-DD HH:MM)
    """
    import time
    
    start = datetime.strptime(start_time, '%Y-%m-%d %H:%M')
    end = datetime.strptime(end_time, '%Y-%m-%d %H:%M')
    now = datetime.now()
    
    if now > end:
        print("Error: End time is in the past")
        return
    
    if now < start:
        wait_seconds = (start - now).total_seconds()
        print(f"Sale scheduled to start at {start_time}")
        print(f"Waiting {wait_seconds / 3600:.1f} hours...")
        time.sleep(wait_seconds)
    
    # Start sale
    print(f"\n{'='*50}")
    print("STARTING SCHEDULED SALE")
    print(f"{'='*50}")
    start_sale(discount)
    
    # Wait for end
    now = datetime.now()
    if now < end:
        wait_seconds = (end - now).total_seconds()
        print(f"\nSale running until {end_time}")
        print(f"Waiting {wait_seconds / 3600:.1f} hours...")
        time.sleep(wait_seconds)
    
    # End sale
    print(f"\n{'='*50}")
    print("ENDING SCHEDULED SALE")
    print(f"{'='*50}")
    end_sale()


if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description='Manage sales')
    subparsers = parser.add_subparsers(dest='command')
    
    start_parser = subparsers.add_parser('start', help='Start a sale')
    start_parser.add_argument('discount', type=float, help='Discount percentage')
    
    end_parser = subparsers.add_parser('end', help='End the current sale')
    
    schedule_parser = subparsers.add_parser('schedule', help='Schedule a sale')
    schedule_parser.add_argument('discount', type=float, help='Discount percentage')
    schedule_parser.add_argument('start', help='Start time (YYYY-MM-DD HH:MM)')
    schedule_parser.add_argument('end', help='End time (YYYY-MM-DD HH:MM)')
    
    args = parser.parse_args()
    
    if args.command == 'start':
        start_sale(args.discount)
    elif args.command == 'end':
        end_sale()
    elif args.command == 'schedule':
        scheduled_sale(args.discount, args.start, args.end)
    else:
        parser.print_help()

Price Testing (A/B Testing)

# etsy_client/price_testing.py
"""A/B testing for pricing strategies."""

import random
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
from .client import EtsyClient
from .listings import ListingOperations
from .orders import OrderOperations


@dataclass
class PriceTest:
    """Represents a price A/B test."""
    test_id: str
    listing_id: int
    price_a: float
    price_b: float
    start_date: datetime
    end_date: Optional[datetime] = None
    current_variant: str = 'A'
    results: Dict = field(default_factory=dict)


class PriceTestManager:
    """
    Manage A/B tests for pricing.
    
    Tests different prices over time to find optimal pricing.
    """
    
    def __init__(
        self,
        client: EtsyClient,
        shop_id: int,
        test_file: str = "data/price_tests.json"
    ):
        self.client = client
        self.shop_id = shop_id
        self.test_file = test_file
        self.listing_ops = ListingOperations(client)
        self.order_ops = OrderOperations(client)
        
        self._tests: Dict[str, PriceTest] = {}
        self._load_tests()
    
    def _load_tests(self):
        """Load tests from file."""
        try:
            with open(self.test_file, 'r') as f:
                data = json.load(f)
            
            for test_id, test_data in data.items():
                test_data['start_date'] = datetime.fromisoformat(
                    test_data['start_date']
                )
                if test_data.get('end_date'):
                    test_data['end_date'] = datetime.fromisoformat(
                        test_data['end_date']
                    )
                self._tests[test_id] = PriceTest(**test_data)
        except FileNotFoundError:
            pass
    
    def _save_tests(self):
        """Save tests to file."""
        data = {}
        for test_id, test in self._tests.items():
            test_dict = {
                'test_id': test.test_id,
                'listing_id': test.listing_id,
                'price_a': test.price_a,
                'price_b': test.price_b,
                'start_date': test.start_date.isoformat(),
                'end_date': test.end_date.isoformat() if test.end_date else None,
                'current_variant': test.current_variant,
                'results': test.results
            }
            data[test_id] = test_dict
        
        with open(self.test_file, 'w') as f:
            json.dump(data, f, indent=2)
    
    def start_test(
        self,
        listing_id: int,
        price_a: float,
        price_b: float,
        duration_days: int = 14
    ) -> PriceTest:
        """
        Start a new price A/B test.
        
        Args:
            listing_id: Listing to test
            price_a: First price variant
            price_b: Second price variant
            duration_days: Test duration in days
            
        Returns:
            PriceTest object
        """
        test_id = f"test_{listing_id}_{datetime.now().strftime('%Y%m%d')}"
        
        test = PriceTest(
            test_id=test_id,
            listing_id=listing_id,
            price_a=price_a,
            price_b=price_b,
            start_date=datetime.now(),
            end_date=datetime.now() + timedelta(days=duration_days),
            current_variant='A',
            results={
                'A': {'views': 0, 'sales': 0, 'revenue': 0, 'periods': 0},
                'B': {'views': 0, 'sales': 0, 'revenue': 0, 'periods': 0}
            }
        )
        
        # Set initial price
        self.listing_ops.update_price(listing_id, price_a)
        
        self._tests[test_id] = test
        self._save_tests()
        
        return test
    
    def rotate_variant(self, test_id: str):
        """
        Rotate to the other price variant.
        
        Should be called periodically (e.g., daily) during the test.
        """
        test = self._tests.get(test_id)
        if not test:
            raise ValueError(f"Test not found: {test_id}")
        
        # Record current metrics before switching
        self._record_metrics(test)
        
        # Switch variant
        if test.current_variant == 'A':
            test.current_variant = 'B'
            new_price = test.price_b
        else:
            test.current_variant = 'A'
            new_price = test.price_a
        
        self.listing_ops.update_price(test.listing_id, new_price)
        
        self._save_tests()
    
    def _record_metrics(self, test: PriceTest):
        """Record metrics for current variant."""
        listing = self.listing_ops.get_listing(test.listing_id)
        
        variant = test.current_variant
        
        # This is simplified - in production, track incremental changes
        test.results[variant]['views'] = listing.get('views', 0)
        test.results[variant]['periods'] += 1
        
        # Get sales for this listing
        transactions = self.order_ops.get_listing_transactions(
            self.shop_id, test.listing_id
        )
        
        test.results[variant]['sales'] = len(transactions.get('results', []))
    
    def end_test(self, test_id: str) -> Dict[str, Any]:
        """
        End a test and get results.
        
        Args:
            test_id: Test to end
            
        Returns:
            Test results with winner determination
        """
        test = self._tests.get(test_id)
        if not test:
            raise ValueError(f"Test not found: {test_id}")
        
        # Record final metrics
        self._record_metrics(test)
        
        test.end_date = datetime.now()
        
        # Calculate conversion rates
        results = {}
        for variant in ['A', 'B']:
            data = test.results[variant]
            views = data.get('views', 0)
            sales = data.get('sales', 0)
            
            results[variant] = {
                'price': test.price_a if variant == 'A' else test.price_b,
                'views': views,
                'sales': sales,
                'conversion_rate': sales / views if views > 0 else 0,
                'revenue_per_view': (sales * (test.price_a if variant == 'A' else test.price_b)) / views if views > 0 else 0
            }
        
        # Determine winner (by revenue per view)
        rpv_a = results['A']['revenue_per_view']
        rpv_b = results['B']['revenue_per_view']
        
        if rpv_a > rpv_b * 1.1:  # A wins by >10%
            winner = 'A'
        elif rpv_b > rpv_a * 1.1:  # B wins by >10%
            winner = 'B'
        else:
            winner = 'inconclusive'
        
        # Set winning price
        if winner == 'A':
            self.listing_ops.update_price(test.listing_id, test.price_a)
        elif winner == 'B':
            self.listing_ops.update_price(test.listing_id, test.price_b)
        
        self._save_tests()
        
        return {
            'test_id': test_id,
            'listing_id': test.listing_id,
            'duration_days': (test.end_date - test.start_date).days,
            'results': results,
            'winner': winner,
            'winning_price': test.price_a if winner == 'A' else test.price_b if winner == 'B' else None
        }

Key Takeaways

  1. Digital inventory is essentially infinite: But maintain high quantities to avoid issues
  2. Performance-based pricing works: Adjust prices based on conversion data
  3. Schedule sales properly: Save original prices and restore after sale
  4. Test prices: A/B testing helps find optimal price points
  5. Automate carefully: Always have safeguards and rollback capabilities

Moving Forward

With pricing and inventory automation in place, you’ll want to understand what’s actually happening in your shop. The next chapter covers analytics and reporting—extracting insights from your sales data.


← Chapter 7: Orders and Transactions Table of Contents Chapter 9: Analytics and Reporting →