feat: add apply_serial_pricing, batch_apply_pricing, and Item market data sync

This commit is contained in:
Westech Admin
2026-05-17 13:35:33 +00:00
parent 6c4e00bdec
commit 9689f2eaa7
+162 -99
View File
@@ -1,57 +1,41 @@
""" import sys
Westech R2 — eBay Pricing API sys.path.insert(0, '/home/frappe/erpnext-bench/apps/frappe')
Whitelisted Frappe methods for searching eBay sold listings and batch pricing.
Mirrors /opt/eim/app/ebay_pricing.py logic, adapted for Frappe/ERPNext.
"""
import frappe import frappe
from frappe.utils import now, now_datetime from frappe.utils import now, now_datetime, flt
from frappe import _ from frappe import _
import json import json
import re import re
import time import time
import urllib.parse import urllib.parse
import urllib.request
from datetime import datetime
OXYLABS_API = "https://realtime.oxylabs.io/v1/queries" OXYLABS_API = "https://realtime.oxylabs.io/v1/queries"
ACTOR_ID = "caffein.dev~ebay-sold-listings" ACTOR_ID = "caffein.dev~ebay-sold-listings"
API_BASE = "https://api.apify.com/v2" API_BASE = "https://api.apify.com/v2"
MFR_CLEANUP = { MFR_CLEANUP = {
"Dell Inc": "Dell", "Dell Inc": "Dell", "HP HP": "HP", "HP": "HP",
"HP HP": "HP", "LENOVO": "Lenovo", "Lenovo": "Lenovo",
"HP": "HP", "Microsoft Corporation": "Microsoft", "Apple Inc": "Apple",
"LENOVO": "Lenovo", "ASUSTeK COMPUTER INC.": "ASUS", "Acer": "Acer",
"Lenovo": "Lenovo", "Panasonic": "Panasonic", "Samsung": "Samsung",
"Microsoft Corporation": "Microsoft", "Toshiba": "Toshiba", "Fujitsu": "Fujitsu",
"Apple Inc": "Apple", "Hewlett-Packard": "HP", "HUAWEI": "Huawei",
"ASUSTeK COMPUTER INC.": "ASUS",
"Acer": "Acer",
"Panasonic": "Panasonic",
"Samsung": "Samsung",
"Toshiba": "Toshiba",
"Fujitsu": "Fujitsu",
"Hewlett-Packard": "HP",
"HUAWEI": "Huawei",
} }
def _get_settings(): def _get_settings():
"""Load eBay Pricing Settings singleton."""
if not frappe.db.exists("eBay Pricing Settings", "eBay Pricing Settings"): if not frappe.db.exists("eBay Pricing Settings", "eBay Pricing Settings"):
return None return None
return frappe.get_doc("eBay Pricing Settings", "eBay Pricing Settings") return frappe.get_doc("eBay Pricing Settings", "eBay Pricing Settings")
def _get_oxylabs_creds(): def _get_oxylabs_creds():
"""Return (user, password) tuple from settings or env."""
settings = _get_settings() settings = _get_settings()
user = password = "" user = password = ""
if settings: if settings:
user = settings.get("oxylabs_user") or "" user = settings.get("oxylabs_user") or ""
password = settings.get_password("oxylabs_password") or "" password = settings.get_password("oxylabs_password") or ""
# Env fallback
if not user: if not user:
user = frappe.conf.get("oxylabs_user", "") user = frappe.conf.get("oxylabs_user", "")
if not password: if not password:
@@ -60,7 +44,6 @@ def _get_oxylabs_creds():
def _get_apify_token(): def _get_apify_token():
"""Return Apify token from settings or env."""
settings = _get_settings() settings = _get_settings()
token = "" token = ""
if settings: if settings:
@@ -76,46 +59,35 @@ def clean_manufacturer(mfr):
@frappe.whitelist() @frappe.whitelist()
def search_model(query=None, manufacturer=None, model=None, source="auto"): def search_model(query=None, manufacturer=None, model=None, source="auto"):
"""
Search eBay sold listings for a specific model.
Returns {results: [...], pricing: {...}} or {error}.
"""
if not query and not (manufacturer and model): if not query and not (manufacturer and model):
return {"error": "Provide query or manufacturer + model"} return {"error": "Provide query or manufacturer + model"}
if not manufacturer or not model: if not manufacturer or not model:
# Try to split query into manufacturer + model
parts = (query or "").split(None, 1) parts = (query or "").split(None, 1)
if len(parts) >= 2: if len(parts) >= 2:
manufacturer, model = parts[0], parts[1] manufacturer, model = parts[0], parts[1]
else: else:
manufacturer, model = query, "" manufacturer, model = query, ""
items, used_source = _search_ebay_sold(model, manufacturer, source=source) items, used_source = _search_ebay_sold(model, manufacturer, source=source)
pricing = _parse_prices(items, manufacturer, model, source=used_source or "unknown") pricing = _parse_prices(items, manufacturer, model, source=used_source or "unknown")
if pricing: if pricing:
# Write or update System Pricing record
_upsert_system_pricing(manufacturer, model, pricing) _upsert_system_pricing(manufacturer, model, pricing)
# Log the API call _update_item_market_data(manufacturer, model, pricing)
_log_api_call(manufacturer, model, query, used_source, len(items) if items else 0, _log_api_call(manufacturer, model, query, used_source,
"Success" if pricing else "Failed") len(items) if items else 0, "Success")
return {"results": items or [], "pricing": pricing} return {"results": items or [], "pricing": pricing}
else: else:
_log_api_call(manufacturer, model, query, used_source, 0, "Failed") _log_api_call(manufacturer, model, query, used_source, 0, "Failed")
return {"results": items or [], "pricing": None, "message": "No pricing data found"} return {"results": items or [], "pricing": None,
"message": "No pricing data found"}
def _search_ebay_sold_oxylabs(model, manufacturer): def _search_ebay_sold_oxylabs(model, manufacturer):
user, password = _get_oxylabs_creds() user, password = _get_oxylabs_creds()
if not user or not password: if not user or not password:
return None return None
import requests as req_module import requests as req_module
clean_mfr = clean_manufacturer(manufacturer) clean_mfr = clean_manufacturer(manufacturer)
query = f"{clean_mfr} {model}" query = f"{clean_mfr} {model}"
payloads = [ payloads = [
{ {
"source": "universal", "source": "universal",
@@ -129,12 +101,11 @@ def _search_ebay_sold_oxylabs(model, manufacturer):
"render": "html", "render": "html",
}, },
] ]
for payload in payloads: for payload in payloads:
try: try:
resp = req_module.post(OXYLABS_API, auth=(user, password), json=payload, timeout=120) resp = req_module.post(OXYLABS_API, auth=(user, password),
json=payload, timeout=120)
if resp.status_code != 200: if resp.status_code != 200:
frappe.log_error(f"Oxylabs HTTP {resp.status_code}", "eBay Pricing")
continue continue
data = resp.json() data = resp.json()
if "results" not in data or not data["results"]: if "results" not in data or not data["results"]:
@@ -145,8 +116,7 @@ def _search_ebay_sold_oxylabs(model, manufacturer):
listings = _parse_ebay_html(content) listings = _parse_ebay_html(content)
if listings and len(listings) >= 3: if listings and len(listings) >= 3:
return listings return listings
except Exception as e: except Exception:
frappe.log_error(f"Oxylabs error: {e}", "eBay Pricing")
continue continue
return None return None
@@ -162,43 +132,44 @@ def _parse_ebay_html(content):
) )
if price_m: if price_m:
listing["price"] = float(price_m.group(1).replace(",", "")) listing["price"] = float(price_m.group(1).replace(",", ""))
title_m = re.search(r's-card__title[^>]*><span[^>]*>([^<]+)</span>', block) title_m = re.search(r's-card__title[^>]*><span[^>]*>([^<]+)</span>', block)
if title_m: if title_m:
listing["title"] = title_m.group(1).strip() listing["title"] = title_m.group(1).strip()
if listing["title"].lower() in ("shop on ebay", ""): if listing["title"].lower() in ("shop on ebay", ""):
continue continue
else: else:
heading_m = re.search(r'role=heading[^>]*>(.*?)</(?:div|span|h\d)>', block, re.DOTALL) heading_m = re.search(r'role=heading[^>]*>(.*?)</(?:div|span|h\d)>',
block, re.DOTALL)
if heading_m: if heading_m:
title_text = re.sub(r'<[^>]+>', '', heading_m.group(1)).strip() title_text = re.sub(r'<[^>]+>', '', heading_m.group(1)).strip()
if title_text.lower() != "shop on ebay": if title_text.lower() != "shop on ebay":
listing["title"] = title_text listing["title"] = title_text
sold_m = re.search(r'(\d[\d,]*)\s+sold', block, re.IGNORECASE) sold_m = re.search(r'(\d[\d,]*)\s+sold', block, re.IGNORECASE)
if sold_m: if sold_m:
listing["sold"] = int(sold_m.group(1).replace(",", "")) listing["sold"] = int(sold_m.group(1).replace(",", ""))
if re.search(r'Free (?:shipping|delivery|Standard Shipping)',
if re.search(r'Free (?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE): block, re.IGNORECASE):
listing["shipping"] = "Free" listing["shipping"] = "Free"
else: else:
ship_m = re.search( ship_m = re.search(
r'\+\$?([\d,.]+)\s+(?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE r'\+\$?([\d,.]+)\s+(?:shipping|delivery|Standard Shipping)',
block, re.IGNORECASE,
) )
if ship_m: if ship_m:
listing["shipping"] = float(ship_m.group(1).replace(",", "")) listing["shipping"] = float(ship_m.group(1).replace(",", ""))
else: else:
ship_alt = re.search(r'\$([\d,.]+)\s+(?:shipping|delivery)', block, re.IGNORECASE) ship_alt = re.search(r'\$([\d,.]+)\s+(?:shipping|delivery)',
block, re.IGNORECASE)
if ship_alt: if ship_alt:
listing["shipping"] = float(ship_alt.group(1).replace(",", "")) listing["shipping"] = float(ship_alt.group(1).replace(",", ""))
cond_m = re.search( cond_m = re.search(
r'(Pre-Owned|Used|Brand New|New \(Other\)|Refurbished|Open Box|For parts or not working|Seller refurbished|New with defects|New with box|New without box|New with tags)', r'(Pre-Owned|Used|Brand New|New \(Other\)|Refurbished|Open Box|'
r'For parts or not working|Seller refurbished|New with defects|'
r'New with box|New without box|New with tags)',
block, re.IGNORECASE, block, re.IGNORECASE,
) )
if cond_m: if cond_m:
listing["condition"] = cond_m.group(1) listing["condition"] = cond_m.group(1)
if listing.get("price") or listing.get("title"): if listing.get("price") or listing.get("title"):
listings.append(listing) listings.append(listing)
return listings if listings else None return listings if listings else None
@@ -208,12 +179,9 @@ def _search_ebay_sold_apify(model, manufacturer):
token = _get_apify_token() token = _get_apify_token()
if not token: if not token:
return None return None
import requests as req_module import requests as req_module
clean_mfr = clean_manufacturer(manufacturer) clean_mfr = clean_manufacturer(manufacturer)
query = f"{clean_mfr} {model}" query = f"{clean_mfr} {model}"
run_input = { run_input = {
"keywords": [query], "keywords": [query],
"daysToScrape": 60, "daysToScrape": 60,
@@ -224,17 +192,14 @@ def _search_ebay_sold_apify(model, manufacturer):
"itemCondition": "any", "itemCondition": "any",
"itemLocation": "domestic", "itemLocation": "domestic",
} }
url = f"{API_BASE}/acts/{ACTOR_ID}/runs?token={token}" url = f"{API_BASE}/acts/{ACTOR_ID}/runs?token={token}"
try: try:
resp = req_module.post(url, json=run_input, timeout=30) resp = req_module.post(url, json=run_input, timeout=30)
result = resp.json() result = resp.json()
run_id = result["data"]["id"] run_id = result["data"]["id"]
dataset_id = result["data"].get("defaultDatasetId") dataset_id = result["data"].get("defaultDatasetId")
except Exception as e: except Exception:
frappe.log_error(f"Apify start error: {e}", "eBay Pricing")
return None return None
max_wait = 120 max_wait = 120
start = time.time() start = time.time()
while time.time() - start < max_wait: while time.time() - start < max_wait:
@@ -247,17 +212,14 @@ def _search_ebay_sold_apify(model, manufacturer):
if run_status == "SUCCEEDED": if run_status == "SUCCEEDED":
break break
elif run_status in ("FAILED", "ABORTED", "TIMED-OUT"): elif run_status in ("FAILED", "ABORTED", "TIMED-OUT"):
frappe.log_error(f"Apify run {run_status}", "eBay Pricing")
return None return None
except Exception: except Exception:
continue continue
try: try:
results_url = f"{API_BASE}/datasets/{dataset_id}/items?token={token}&limit=30&clean=true" results_url = f"{API_BASE}/datasets/{dataset_id}/items?token={token}&limit=30&clean=true"
results_resp = req_module.get(results_url, timeout=15) results_resp = req_module.get(results_url, timeout=15)
return results_resp.json() return results_resp.json()
except Exception as e: except Exception:
frappe.log_error(f"Apify fetch error: {e}", "eBay Pricing")
return None return None
@@ -266,7 +228,6 @@ def _search_ebay_sold(model, manufacturer, source="auto"):
result = _search_ebay_sold_oxylabs(model, manufacturer) result = _search_ebay_sold_oxylabs(model, manufacturer)
if result is not None: if result is not None:
return result, "oxylabs" return result, "oxylabs"
frappe.logger().info("Oxylabs failed, trying Apify...")
if source in ("auto", "apify"): if source in ("auto", "apify"):
result = _search_ebay_sold_apify(model, manufacturer) result = _search_ebay_sold_apify(model, manufacturer)
if result is not None: if result is not None:
@@ -278,9 +239,6 @@ def _parse_prices(items, manufacturer, model, source="oxylabs"):
if not items: if not items:
return None return None
prices = [] prices = []
clean_mfr = clean_manufacturer(manufacturer)
search_terms = {clean_mfr.lower(), model.lower()}
for item in items: for item in items:
if item.get("error"): if item.get("error"):
continue continue
@@ -316,7 +274,6 @@ def _parse_prices(items, manufacturer, model, source="oxylabs"):
prices.append(p) prices.append(p)
except (ValueError, TypeError): except (ValueError, TypeError):
continue continue
if not prices: if not prices:
return None return None
prices.sort() prices.sort()
@@ -327,7 +284,6 @@ def _parse_prices(items, manufacturer, model, source="oxylabs"):
prices = trimmed prices = trimmed
if not prices: if not prices:
return None return None
avg = sum(prices) / len(prices) avg = sum(prices) / len(prices)
median = prices[len(prices) // 2] median = prices[len(prices) // 2]
return { return {
@@ -342,8 +298,6 @@ def _parse_prices(items, manufacturer, model, source="oxylabs"):
def _upsert_system_pricing(manufacturer, model, pricing): def _upsert_system_pricing(manufacturer, model, pricing):
"""Create or update System Pricing record."""
# Check if record exists by model/manufacturer
existing = frappe.db.get_value( existing = frappe.db.get_value(
"System Pricing", "System Pricing",
{"manufacturer": manufacturer, "model": model}, {"manufacturer": manufacturer, "model": model},
@@ -356,27 +310,64 @@ def _upsert_system_pricing(manufacturer, model, pricing):
doc = frappe.new_doc("System Pricing") doc = frappe.new_doc("System Pricing")
doc.manufacturer = manufacturer doc.manufacturer = manufacturer
doc.model = model doc.model = model
for key in ("price_high", "price_low", "price_average", "price_auction", for key in ("price_high", "price_low", "price_average", "price_auction",
"sample_count", "source", "scraped_at"): "sample_count", "source", "scraped_at"):
if key in pricing: if key in pricing:
setattr(doc, key, pricing[key]) setattr(doc, key, pricing[key])
# Compute days_since_pricing
if doc.scraped_at: if doc.scraped_at:
scraped = frappe.utils.get_datetime(doc.scraped_at) scraped = frappe.utils.get_datetime(doc.scraped_at)
now = now_datetime() now = now_datetime()
doc.days_since_pricing = (now - scraped).days doc.days_since_pricing = (now - scraped).days
else: else:
doc.days_since_pricing = 0 doc.days_since_pricing = 0
doc.pricing_status = "Priced" doc.pricing_status = "Priced"
doc.save() doc.save()
frappe.db.commit() frappe.db.commit()
def _update_item_market_data(manufacturer, model, pricing):
items = frappe.get_all(
"Item",
filters={
"item_group": ["in", ["Laptop", "Desktop", "Tablet", "Phone", "Workstation"]],
"disabled": 0,
},
fields=["name", "item_name", "brand"],
)
clean_mfr = clean_manufacturer(manufacturer).lower()
model_lower = model.lower()
matched = None
for item in items:
item_name_lower = (item.item_name or "").lower()
brand_lower = (item.brand or "").lower()
if model_lower in item_name_lower:
if clean_mfr in brand_lower or brand_lower in clean_mfr or clean_mfr in item_name_lower:
matched = item.name
break
elif matched is None:
matched = item.name
if not matched:
for item in items:
item_name_lower = (item.item_name or "").lower()
if model_lower in item_name_lower:
matched = item.name
break
if not matched:
return
item_doc = frappe.get_doc("Item", matched)
item_doc.base_market_price = pricing.get("price_average", 0)
item_doc.market_high = pricing.get("price_high", 0)
item_doc.market_low = pricing.get("price_low", 0)
item_doc.market_median = pricing.get("price_auction", 0)
item_doc.market_average = pricing.get("price_average", 0)
item_doc.market_samples = pricing.get("sample_count", 0)
item_doc.market_last_priced = pricing.get("scraped_at")
item_doc.save()
frappe.db.commit()
return {"item": matched, "updated": True}
def _log_api_call(manufacturer, model, search_query, source, results_count, status): def _log_api_call(manufacturer, model, search_query, source, results_count, status):
"""Log API usage for budget tracking."""
try: try:
log = frappe.new_doc("eBay Pricing Log") log = frappe.new_doc("eBay Pricing Log")
log.manufacturer = manufacturer log.manufacturer = manufacturer
@@ -394,14 +385,8 @@ def _log_api_call(manufacturer, model, search_query, source, results_count, stat
@frappe.whitelist() @frappe.whitelist()
def run_batch(batch_size=10, source="auto", force=False): def run_batch(batch_size=10, source="auto", force=False):
"""
Run batch pricing on the next N unique models that need pricing.
Returns {priced, failed, skipped, total}.
"""
batch_size = int(batch_size) if batch_size != "all" else 999999 batch_size = int(batch_size) if batch_size != "all" else 999999
force = bool(force) force = bool(force)
# Get unique models from Serial No / Item records that have manufacturer + model
models = frappe.db.sql( models = frappe.db.sql(
""" """
SELECT DISTINCT manufacturer, model SELECT DISTINCT manufacturer, model
@@ -414,45 +399,38 @@ def run_batch(batch_size=10, source="auto", force=False):
(batch_size,), (batch_size,),
as_dict=True, as_dict=True,
) )
priced = failed = skipped = 0 priced = failed = skipped = 0
for row in models: for row in models:
mfr = row.manufacturer mfr = row.manufacturer
mdl = row.model mdl = row.model
# Skip if already priced (unless force)
if not force: if not force:
exists = frappe.db.exists("System Pricing", {"manufacturer": mfr, "model": mdl}) exists = frappe.db.exists("System Pricing", {"manufacturer": mfr, "model": mdl})
if exists: if exists:
skipped += 1 skipped += 1
continue continue
items, used_source = _search_ebay_sold(mdl, mfr, source=source) items, used_source = _search_ebay_sold(mdl, mfr, source=source)
pricing = _parse_prices(items, mfr, mdl, source=used_source or "unknown") pricing = _parse_prices(items, mfr, mdl, source=used_source or "unknown")
if pricing: if pricing:
_upsert_system_pricing(mfr, mdl, pricing) _upsert_system_pricing(mfr, mdl, pricing)
_log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, len(items) if items else 0, "Success") _update_item_market_data(mfr, mdl, pricing)
_log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source,
len(items) if items else 0, "Success")
priced += 1 priced += 1
else: else:
_log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, 0, "Failed") _log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, 0, "Failed")
failed += 1 failed += 1
# Rate limit
if used_source == "oxylabs": if used_source == "oxylabs":
time.sleep(2) time.sleep(2)
else: else:
time.sleep(3) time.sleep(3)
return {"priced": priced, "failed": failed, "skipped": skipped, "total": len(models)} return {"priced": priced, "failed": failed, "skipped": skipped, "total": len(models)}
@frappe.whitelist() @frappe.whitelist()
def get_recent_pricing(limit=50, status_filter=None): def get_recent_pricing(limit=50, status_filter=None):
"""Return recent System Pricing records as list of dicts."""
filters = {} filters = {}
if status_filter: if status_filter:
filters["pricing_status"] = status_filter filters["pricing_status"] = status_filter
records = frappe.get_all( records = frappe.get_all(
"System Pricing", "System Pricing",
filters=filters, filters=filters,
@@ -465,10 +443,95 @@ def get_recent_pricing(limit=50, status_filter=None):
order_by="scraped_at desc", order_by="scraped_at desc",
limit=int(limit), limit=int(limit),
) )
for r in records: for r in records:
r["days_since_pricing"] = r.get("days_since_pricing") or 0 r["days_since_pricing"] = r.get("days_since_pricing") or 0
for key in ("price_high", "price_low", "price_average"): for key in ("price_high", "price_low", "price_average"):
if r.get(key) is not None: if r.get(key) is not None:
r[key] = round(r[key], 2) r[key] = round(r[key], 2)
return records return records
@frappe.whitelist()
def apply_serial_pricing(serial_no):
serial = frappe.get_doc("Serial No", serial_no)
if not serial.item_code:
return {"error": "No item linked"}
item = frappe.get_doc("Item", serial.item_code)
grade = serial.grade
price_point = serial.price_point
if not grade:
serial.pricing_status = "Needs Grading"
serial.save()
return {"status": "needs_grading"}
if grade in ("C", "D", "F"):
serial.assigned_price = item.commodity_flat_price or 0
serial.commodity_value = item.commodity_flat_price or 0
serial.pricing_status = "Commodity"
serial.pricing_source = item.name
serial.save()
frappe.db.commit()
return {"status": "commodity", "price": serial.assigned_price}
if not price_point:
serial.pricing_status = "Needs Pricing"
serial.save()
return {"status": "needs_price_point"}
base_price = 0
if price_point == "Low":
base_price = item.market_low or item.base_market_price or 0
elif price_point == "Median":
base_price = item.market_median or item.base_market_price or 0
elif price_point == "Average":
base_price = item.market_average or item.base_market_price or 0
elif price_point == "High":
base_price = item.market_high or item.base_market_price or 0
elif price_point == "Manual":
base_price = serial.manual_price or 0
multiplier = 1.0
if grade == "A":
multiplier = item.grade_a_multiplier or 1.0
elif grade == "B":
multiplier = item.grade_b_multiplier or 0.8
final_price = flt(base_price) * flt(multiplier)
serial.assigned_price = round(final_price, 2)
serial.pricing_status = "Priced" if price_point != "Manual" else "Manual Override"
serial.pricing_source = item.name
serial.save()
frappe.db.commit()
return {
"status": serial.pricing_status,
"price": serial.assigned_price,
"base": base_price,
"multiplier": multiplier,
}
@frappe.whitelist()
def batch_apply_pricing(item_code=None, batch_size=50):
filters = {"pricing_status": ["in", ["Needs Grading", "Needs Pricing", "Priced"]]}
if item_code:
filters["item_code"] = item_code
serials = frappe.get_all(
"Serial No",
filters=filters,
fields=["name", "item_code", "grade", "price_point"],
limit=int(batch_size),
)
results = {"priced": 0, "commodity": 0, "needs_grading": 0, "needs_price_point": 0, "errors": 0}
for s in serials:
try:
result = apply_serial_pricing(s.name)
status = result.get("status", "")
if status == "commodity":
results["commodity"] += 1
elif status == "needs_grading":
results["needs_grading"] += 1
elif status == "needs_price_point":
results["needs_price_point"] += 1
elif status in ("Priced", "Manual Override"):
results["priced"] += 1
else:
results["errors"] += 1
except Exception as e:
frappe.log_error(f"Pricing error for {s.name}: {e}", "eBay Pricing")
results["errors"] += 1
return results