""" Westech R2 — eBay Pricing API Whitelisted Frappe methods for searching eBay sold listings and batch pricing. Mirrors /opt/eim/app/ebay_pricing.py logic, adapted for Frappe/ERPNext. """ import frappe from frappe.utils import now, now_datetime from frappe import _ import json import re import time import urllib.parse import urllib.request from datetime import datetime OXYLABS_API = "https://realtime.oxylabs.io/v1/queries" ACTOR_ID = "caffein.dev~ebay-sold-listings" API_BASE = "https://api.apify.com/v2" MFR_CLEANUP = { "Dell Inc": "Dell", "HP HP": "HP", "HP": "HP", "LENOVO": "Lenovo", "Lenovo": "Lenovo", "Microsoft Corporation": "Microsoft", "Apple Inc": "Apple", "ASUSTeK COMPUTER INC.": "ASUS", "Acer": "Acer", "Panasonic": "Panasonic", "Samsung": "Samsung", "Toshiba": "Toshiba", "Fujitsu": "Fujitsu", "Hewlett-Packard": "HP", "HUAWEI": "Huawei", } def _get_settings(): """Load eBay Pricing Settings singleton.""" if not frappe.db.exists("eBay Pricing Settings", "eBay Pricing Settings"): return None return frappe.get_doc("eBay Pricing Settings", "eBay Pricing Settings") def _get_oxylabs_creds(): """Return (user, password) tuple from settings or env.""" settings = _get_settings() user = password = "" if settings: user = settings.get("oxylabs_user") or "" password = settings.get_password("oxylabs_password") or "" # Env fallback if not user: user = frappe.conf.get("oxylabs_user", "") if not password: password = frappe.conf.get("oxylabs_password", "") return (user, password) def _get_apify_token(): """Return Apify token from settings or env.""" settings = _get_settings() token = "" if settings: token = settings.get_password("apify_token") or "" if not token: token = frappe.conf.get("apify_token", "") return token def clean_manufacturer(mfr): return MFR_CLEANUP.get(mfr, mfr) @frappe.whitelist() def search_model(query=None, manufacturer=None, model=None, source="auto"): """ Search eBay sold listings for a specific model. Returns {results: [...], pricing: {...}} or {error}. """ if not query and not (manufacturer and model): return {"error": "Provide query or manufacturer + model"} if not manufacturer or not model: # Try to split query into manufacturer + model parts = (query or "").split(None, 1) if len(parts) >= 2: manufacturer, model = parts[0], parts[1] else: manufacturer, model = query, "" items, used_source = _search_ebay_sold(model, manufacturer, source=source) pricing = _parse_prices(items, manufacturer, model, source=used_source or "unknown") if pricing: # Write or update System Pricing record _upsert_system_pricing(manufacturer, model, pricing) # Log the API call _log_api_call(manufacturer, model, query, used_source, len(items) if items else 0, "Success" if pricing else "Failed") return {"results": items or [], "pricing": pricing} else: _log_api_call(manufacturer, model, query, used_source, 0, "Failed") return {"results": items or [], "pricing": None, "message": "No pricing data found"} def _search_ebay_sold_oxylabs(model, manufacturer): user, password = _get_oxylabs_creds() if not user or not password: return None import requests as req_module clean_mfr = clean_manufacturer(manufacturer) query = f"{clean_mfr} {model}" payloads = [ { "source": "universal", "url": f"https://www.ebay.com/sch/i.html?_nkw={urllib.parse.quote(query)}&LH_Sold=1&_ipg=240", "render": "html", }, { "source": "ebay_search", "query": query, "domain": "com", "render": "html", }, ] for payload in payloads: try: resp = req_module.post(OXYLABS_API, auth=(user, password), json=payload, timeout=120) if resp.status_code != 200: frappe.log_error(f"Oxylabs HTTP {resp.status_code}", "eBay Pricing") continue data = resp.json() if "results" not in data or not data["results"]: continue content = data["results"][0].get("content", "") if not isinstance(content, str) or len(content) < 100000: continue listings = _parse_ebay_html(content) if listings and len(listings) >= 3: return listings except Exception as e: frappe.log_error(f"Oxylabs error: {e}", "eBay Pricing") continue return None def _parse_ebay_html(content): listings = [] card_blocks = re.split(r'class="s-card\s', content) for block in card_blocks[1:]: listing = {} price_m = re.search( r's-card__price[^>]*>\$([\d,.]+(?:\.\d{2})?)(?:\s*to\s*\$[\d,.]+(?:\.\d{2})?)?', block, ) if price_m: listing["price"] = float(price_m.group(1).replace(",", "")) title_m = re.search(r's-card__title[^>]*>]*>([^<]+)', block) if title_m: listing["title"] = title_m.group(1).strip() if listing["title"].lower() in ("shop on ebay", ""): continue else: heading_m = re.search(r'role=heading[^>]*>(.*?)', block, re.DOTALL) if heading_m: title_text = re.sub(r'<[^>]+>', '', heading_m.group(1)).strip() if title_text.lower() != "shop on ebay": listing["title"] = title_text sold_m = re.search(r'(\d[\d,]*)\s+sold', block, re.IGNORECASE) if sold_m: listing["sold"] = int(sold_m.group(1).replace(",", "")) if re.search(r'Free (?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE): listing["shipping"] = "Free" else: ship_m = re.search( r'\+\$?([\d,.]+)\s+(?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE ) if ship_m: listing["shipping"] = float(ship_m.group(1).replace(",", "")) else: ship_alt = re.search(r'\$([\d,.]+)\s+(?:shipping|delivery)', block, re.IGNORECASE) if ship_alt: listing["shipping"] = float(ship_alt.group(1).replace(",", "")) cond_m = re.search( r'(Pre-Owned|Used|Brand New|New \(Other\)|Refurbished|Open Box|For parts or not working|Seller refurbished|New with defects|New with box|New without box|New with tags)', block, re.IGNORECASE, ) if cond_m: listing["condition"] = cond_m.group(1) if listing.get("price") or listing.get("title"): listings.append(listing) return listings if listings else None def _search_ebay_sold_apify(model, manufacturer): token = _get_apify_token() if not token: return None import requests as req_module clean_mfr = clean_manufacturer(manufacturer) query = f"{clean_mfr} {model}" run_input = { "keywords": [query], "daysToScrape": 60, "count": 30, "categoryId": "0", "ebaySite": "ebay.com", "sortOrder": "endedRecently", "itemCondition": "any", "itemLocation": "domestic", } url = f"{API_BASE}/acts/{ACTOR_ID}/runs?token={token}" try: resp = req_module.post(url, json=run_input, timeout=30) result = resp.json() run_id = result["data"]["id"] dataset_id = result["data"].get("defaultDatasetId") except Exception as e: frappe.log_error(f"Apify start error: {e}", "eBay Pricing") return None max_wait = 120 start = time.time() while time.time() - start < max_wait: time.sleep(8) try: status_url = f"{API_BASE}/actor-runs/{run_id}?token={token}" status_resp = req_module.get(status_url, timeout=10) status = status_resp.json() run_status = status["data"]["status"] if run_status == "SUCCEEDED": break elif run_status in ("FAILED", "ABORTED", "TIMED-OUT"): frappe.log_error(f"Apify run {run_status}", "eBay Pricing") return None except Exception: continue try: results_url = f"{API_BASE}/datasets/{dataset_id}/items?token={token}&limit=30&clean=true" results_resp = req_module.get(results_url, timeout=15) return results_resp.json() except Exception as e: frappe.log_error(f"Apify fetch error: {e}", "eBay Pricing") return None def _search_ebay_sold(model, manufacturer, source="auto"): if source in ("auto", "oxylabs"): result = _search_ebay_sold_oxylabs(model, manufacturer) if result is not None: return result, "oxylabs" frappe.logger().info("Oxylabs failed, trying Apify...") if source in ("auto", "apify"): result = _search_ebay_sold_apify(model, manufacturer) if result is not None: return result, "apify" return None, None def _parse_prices(items, manufacturer, model, source="oxylabs"): if not items: return None prices = [] clean_mfr = clean_manufacturer(manufacturer) search_terms = {clean_mfr.lower(), model.lower()} for item in items: if item.get("error"): continue if source == "oxylabs": price_val = item.get("price") if isinstance(price_val, str): price_str = price_val.replace("$", "").replace(",", "").strip() if " to " in price_str: price_str = price_str.split(" to ")[0] try: price_val = float(price_str) except (ValueError, TypeError): continue if not isinstance(price_val, (int, float)): continue p = float(price_val) if 5 < p < 10000: title = item.get("title", "").upper() model_upper = model.upper() model_words = model_upper.split() if len(items) > 5: short_model_words = [w for w in model_words if len(w) > 2] if short_model_words and not any(w in title for w in short_model_words): continue prices.append(p) else: price_str = item.get("totalPrice") or item.get("soldPrice") if not price_str: continue try: p = float(str(price_str).replace(",", "").replace("$", "").strip()) if 5 < p < 10000: prices.append(p) except (ValueError, TypeError): continue if not prices: return None prices.sort() if len(prices) >= 5: trim = max(1, int(len(prices) * 0.1)) trimmed = prices[trim : len(prices) - trim] if trimmed: prices = trimmed if not prices: return None avg = sum(prices) / len(prices) median = prices[len(prices) // 2] return { "price_high": round(max(prices), 2), "price_low": round(min(prices), 2), "price_average": round(avg, 2), "price_auction": round(median, 2), "sample_count": len(prices), "source": f"ebay_{source}", "scraped_at": now(), } def _upsert_system_pricing(manufacturer, model, pricing): """Create or update System Pricing record.""" # Check if record exists by model/manufacturer existing = frappe.db.get_value( "System Pricing", {"manufacturer": manufacturer, "model": model}, "name", ) doc = None if existing: doc = frappe.get_doc("System Pricing", existing) else: doc = frappe.new_doc("System Pricing") doc.manufacturer = manufacturer doc.model = model for key in ("price_high", "price_low", "price_average", "price_auction", "sample_count", "source", "scraped_at"): if key in pricing: setattr(doc, key, pricing[key]) # Compute days_since_pricing if doc.scraped_at: scraped = frappe.utils.get_datetime(doc.scraped_at) now = now_datetime() doc.days_since_pricing = (now - scraped).days else: doc.days_since_pricing = 0 doc.pricing_status = "Priced" doc.save() frappe.db.commit() def _log_api_call(manufacturer, model, search_query, source, results_count, status): """Log API usage for budget tracking.""" try: log = frappe.new_doc("eBay Pricing Log") log.manufacturer = manufacturer log.model = model log.search_query = search_query log.source = source or "unknown" log.timestamp = now() log.results_count = results_count or 0 log.status = status log.save() frappe.db.commit() except Exception: pass @frappe.whitelist() def run_batch(batch_size=10, source="auto", force=False): """ Run batch pricing on the next N unique models that need pricing. Returns {priced, failed, skipped, total}. """ batch_size = int(batch_size) if batch_size != "all" else 999999 force = bool(force) # Get unique models from Serial No / Item records that have manufacturer + model models = frappe.db.sql( """ SELECT DISTINCT manufacturer, model FROM `tabSerial No` WHERE manufacturer IS NOT NULL AND manufacturer != '' AND model IS NOT NULL AND model != '' ORDER BY creation DESC LIMIT %s """, (batch_size,), as_dict=True, ) priced = failed = skipped = 0 for row in models: mfr = row.manufacturer mdl = row.model # Skip if already priced (unless force) if not force: exists = frappe.db.exists("System Pricing", {"manufacturer": mfr, "model": mdl}) if exists: skipped += 1 continue items, used_source = _search_ebay_sold(mdl, mfr, source=source) pricing = _parse_prices(items, mfr, mdl, source=used_source or "unknown") if pricing: _upsert_system_pricing(mfr, mdl, pricing) _log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, len(items) if items else 0, "Success") priced += 1 else: _log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, 0, "Failed") failed += 1 # Rate limit if used_source == "oxylabs": time.sleep(2) else: time.sleep(3) return {"priced": priced, "failed": failed, "skipped": skipped, "total": len(models)} @frappe.whitelist() def get_recent_pricing(limit=50, status_filter=None): """Return recent System Pricing records as list of dicts.""" filters = {} if status_filter: filters["pricing_status"] = status_filter records = frappe.get_all( "System Pricing", filters=filters, fields=[ "name", "manufacturer", "model", "pricing_status", "scraped_at", "days_since_pricing", "price_high", "price_low", "price_average", "sample_count", "source", ], order_by="scraped_at desc", limit=int(limit), ) for r in records: r["days_since_pricing"] = r.get("days_since_pricing") or 0 for key in ("price_high", "price_low", "price_average"): if r.get(key) is not None: r[key] = round(r[key], 2) return records