import sys sys.path.insert(0, '/home/frappe/erpnext-bench/apps/frappe') import frappe from frappe.utils import now, now_datetime, flt from frappe import _ import json import re import time import urllib.parse OXYLABS_API = "https://realtime.oxylabs.io/v1/queries" ACTOR_ID = "caffein.dev~ebay-sold-listings" API_BASE = "https://api.apify.com/v2" MFR_CLEANUP = { "Dell Inc": "Dell", "HP HP": "HP", "HP": "HP", "LENOVO": "Lenovo", "Lenovo": "Lenovo", "Microsoft Corporation": "Microsoft", "Apple Inc": "Apple", "ASUSTeK COMPUTER INC.": "ASUS", "Acer": "Acer", "Panasonic": "Panasonic", "Samsung": "Samsung", "Toshiba": "Toshiba", "Fujitsu": "Fujitsu", "Hewlett-Packard": "HP", "HUAWEI": "Huawei", } def _get_settings(): if not frappe.db.exists("eBay Pricing Settings", "eBay Pricing Settings"): return None return frappe.get_doc("eBay Pricing Settings", "eBay Pricing Settings") def _get_oxylabs_creds(): settings = _get_settings() user = password = "" if settings: user = settings.get("oxylabs_user") or "" try: password = settings.get_password("oxylabs_password") or "" except Exception: pass if not user: user = frappe.conf.get("oxylabs_user", "") if not password: password = frappe.conf.get("oxylabs_password", "") return (user, password) def _get_apify_token(): settings = _get_settings() token = "" if settings: try: token = settings.get_password("apify_token") or "" except Exception: pass if not token: token = frappe.conf.get("apify_token", "") return token def clean_manufacturer(mfr): return MFR_CLEANUP.get(mfr, mfr) @frappe.whitelist() def search_model(query=None, manufacturer=None, model=None, source="auto"): if not query and not (manufacturer and model): return {"error": "Provide query or manufacturer + model"} if not manufacturer or not model: parts = (query or "").split(None, 1) if len(parts) >= 2: manufacturer, model = parts[0], parts[1] else: manufacturer, model = query, "" items, used_source = _search_ebay_sold(model, manufacturer, source=source) pricing = _parse_prices(items, manufacturer, model, source=used_source or "unknown") if pricing: _upsert_system_pricing(manufacturer, model, pricing) _update_item_market_data(manufacturer, model, pricing) _log_api_call(manufacturer, model, query, used_source, len(items) if items else 0, "Success") return {"results": items or [], "pricing": pricing} else: _log_api_call(manufacturer, model, query, used_source, 0, "Failed") return {"results": items or [], "pricing": None, "message": "No pricing data found"} def _search_ebay_sold_oxylabs(model, manufacturer): user, password = _get_oxylabs_creds() if not user or not password: return None import requests as req_module clean_mfr = clean_manufacturer(manufacturer) query = f"{clean_mfr} {model}" payloads = [ { "source": "universal", "url": f"https://www.ebay.com/sch/i.html?_nkw={urllib.parse.quote(query)}&LH_Sold=1&_ipg=240", "render": "html", }, { "source": "ebay_search", "query": query, "domain": "com", "render": "html", }, ] for payload in payloads: try: resp = req_module.post(OXYLABS_API, auth=(user, password), json=payload, timeout=120) if resp.status_code != 200: continue data = resp.json() if "results" not in data or not data["results"]: continue content = data["results"][0].get("content", "") if not isinstance(content, str) or len(content) < 100000: continue listings = _parse_ebay_html(content) if listings and len(listings) >= 3: return listings except Exception: continue return None def _parse_ebay_html(content): listings = [] card_blocks = re.split(r'class="s-card\s', content) for block in card_blocks[1:]: listing = {} price_m = re.search( r's-card__price[^>]*>\$([\d,.]+(?:\.\d{2})?)(?:\s*to\s*\$[\d,.]+(?:\.\d{2})?)?', block, ) if price_m: listing["price"] = float(price_m.group(1).replace(",", "")) title_m = re.search(r's-card__title[^>]*>]*>([^<]+)', block) if title_m: listing["title"] = title_m.group(1).strip() if listing["title"].lower() in ("shop on ebay", ""): continue else: heading_m = re.search(r'role=heading[^>]*>(.*?)', block, re.DOTALL) if heading_m: title_text = re.sub(r'<[^>]+>', '', heading_m.group(1)).strip() if title_text.lower() != "shop on ebay": listing["title"] = title_text sold_m = re.search(r'(\d[\d,]*)\s+sold', block, re.IGNORECASE) if sold_m: listing["sold"] = int(sold_m.group(1).replace(",", "")) if re.search(r'Free (?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE): listing["shipping"] = "Free" else: ship_m = re.search( r'\+\$?([\d,.]+)\s+(?:shipping|delivery|Standard Shipping)', block, re.IGNORECASE, ) if ship_m: listing["shipping"] = float(ship_m.group(1).replace(",", "")) else: ship_alt = re.search(r'\$([\d,.]+)\s+(?:shipping|delivery)', block, re.IGNORECASE) if ship_alt: listing["shipping"] = float(ship_alt.group(1).replace(",", "")) cond_m = re.search( r'(Pre-Owned|Used|Brand New|New \(Other\)|Refurbished|Open Box|' r'For parts or not working|Seller refurbished|New with defects|' r'New with box|New without box|New with tags)', block, re.IGNORECASE, ) if cond_m: listing["condition"] = cond_m.group(1) if listing.get("price") or listing.get("title"): listings.append(listing) return listings if listings else None def _search_ebay_sold_apify(model, manufacturer): token = _get_apify_token() if not token: return None import requests as req_module clean_mfr = clean_manufacturer(manufacturer) query = f"{clean_mfr} {model}" run_input = { "keywords": [query], "daysToScrape": 60, "count": 30, "categoryId": "0", "ebaySite": "ebay.com", "sortOrder": "endedRecently", "itemCondition": "any", "itemLocation": "domestic", } url = f"{API_BASE}/acts/{ACTOR_ID}/runs?token={token}" try: resp = req_module.post(url, json=run_input, timeout=30) result = resp.json() run_id = result["data"]["id"] dataset_id = result["data"].get("defaultDatasetId") except Exception: return None max_wait = 120 start = time.time() while time.time() - start < max_wait: time.sleep(8) try: status_url = f"{API_BASE}/actor-runs/{run_id}?token={token}" status_resp = req_module.get(status_url, timeout=10) status = status_resp.json() run_status = status["data"]["status"] if run_status == "SUCCEEDED": break elif run_status in ("FAILED", "ABORTED", "TIMED-OUT"): return None except Exception: continue try: results_url = f"{API_BASE}/datasets/{dataset_id}/items?token={token}&limit=30&clean=true" results_resp = req_module.get(results_url, timeout=15) return results_resp.json() except Exception: return None def _search_ebay_sold(model, manufacturer, source="auto"): if source in ("auto", "oxylabs"): result = _search_ebay_sold_oxylabs(model, manufacturer) if result is not None: return result, "oxylabs" if source in ("auto", "apify"): result = _search_ebay_sold_apify(model, manufacturer) if result is not None: return result, "apify" return None, None def _parse_prices(items, manufacturer, model, source="oxylabs"): if not items: return None prices = [] for item in items: if item.get("error"): continue if source == "oxylabs": price_val = item.get("price") if isinstance(price_val, str): price_str = price_val.replace("$", "").replace(",", "").strip() if " to " in price_str: price_str = price_str.split(" to ")[0] try: price_val = float(price_str) except (ValueError, TypeError): continue if not isinstance(price_val, (int, float)): continue p = float(price_val) if 5 < p < 10000: title = item.get("title", "").upper() model_upper = model.upper() model_words = model_upper.split() if len(items) > 5: short_model_words = [w for w in model_words if len(w) > 2] if short_model_words and not any(w in title for w in short_model_words): continue prices.append(p) else: price_str = item.get("totalPrice") or item.get("soldPrice") if not price_str: continue try: p = float(str(price_str).replace(",", "").replace("$", "").strip()) if 5 < p < 10000: prices.append(p) except (ValueError, TypeError): continue if not prices: return None prices.sort() if len(prices) >= 5: trim = max(1, int(len(prices) * 0.1)) trimmed = prices[trim : len(prices) - trim] if trimmed: prices = trimmed if not prices: return None avg = sum(prices) / len(prices) median = prices[len(prices) // 2] return { "price_high": round(max(prices), 2), "price_low": round(min(prices), 2), "price_average": round(avg, 2), "price_auction": round(median, 2), "sample_count": len(prices), "source": source or "unknown", "scraped_at": now(), } def _upsert_system_pricing(manufacturer, model, pricing): existing = frappe.db.get_value( "System Pricing", {"manufacturer": manufacturer, "model": model}, "name", ) doc = None if existing: doc = frappe.get_doc("System Pricing", existing) else: doc = frappe.new_doc("System Pricing") doc.manufacturer = manufacturer doc.model = model for key in ("price_high", "price_low", "price_average", "price_auction", "sample_count", "scraped_at"): if key in pricing: setattr(doc, key, pricing[key]) # Fix source to match allowed values if "source" in pricing: raw_source = pricing["source"] if raw_source.startswith("ebay_"): raw_source = raw_source.replace("ebay_", "") if raw_source not in ("oxylabs", "apify"): raw_source = "unknown" setattr(doc, "source", raw_source) if doc.scraped_at: scraped = frappe.utils.get_datetime(doc.scraped_at) now = now_datetime() doc.days_since_pricing = (now - scraped).days else: doc.days_since_pricing = 0 doc.pricing_status = "Priced" doc.save() frappe.db.commit() def _update_item_market_data(manufacturer, model, pricing): clean_mfr = clean_manufacturer(manufacturer) model_lower = model.lower() # Match by brand (exact) + item_name (contains model) items = frappe.get_all( "Item", filters={ "item_group": ["in", ["Laptop", "Desktop", "Tablet", "Phone", "Workstation"]], "disabled": 0, "brand": clean_mfr, }, fields=["name", "item_name"], ) matched = None for item in items: item_name_lower = (item.item_name or "").lower() if model_lower in item_name_lower: matched = item.name break if not matched: return item_doc = frappe.get_doc("Item", matched) item_doc.base_market_price = pricing.get("price_average", 0) item_doc.market_high = pricing.get("price_high", 0) item_doc.market_low = pricing.get("price_low", 0) item_doc.market_median = pricing.get("price_auction", 0) item_doc.market_average = pricing.get("price_average", 0) item_doc.market_samples = pricing.get("sample_count", 0) item_doc.market_last_priced = pricing.get("scraped_at") item_doc.save() frappe.db.commit() return {"item": matched, "updated": True} def _log_api_call(manufacturer, model, search_query, source, results_count, status): try: log = frappe.new_doc("eBay Pricing Log") log.manufacturer = manufacturer log.model = model log.search_query = search_query log.source = source or "unknown" log.timestamp = now() log.results_count = results_count or 0 log.status = status log.save() frappe.db.commit() except Exception: pass @frappe.whitelist() def run_batch(batch_size=10, source="auto", force=False): batch_size = int(batch_size) if batch_size != "all" else 999999 force = bool(force) # Get unique models from Items that have serials models = frappe.db.sql( """ SELECT DISTINCT i.brand as manufacturer, i.item_name as model FROM `tabItem` i INNER JOIN `tabSerial No` sn ON sn.item_code = i.name WHERE i.brand IS NOT NULL AND i.brand != '' AND i.disabled = 0 ORDER BY i.modified DESC LIMIT %s """, (batch_size,), as_dict=True, ) priced = failed = skipped = 0 for row in models: mfr = row.manufacturer mdl = row.model if not force: exists = frappe.db.exists("System Pricing", {"manufacturer": mfr, "model": mdl}) if exists: skipped += 1 continue items, used_source = _search_ebay_sold(mdl, mfr, source=source) pricing = _parse_prices(items, mfr, mdl, source=used_source or "unknown") if pricing: _upsert_system_pricing(mfr, mdl, pricing) _update_item_market_data(mfr, mdl, pricing) _log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, len(items) if items else 0, "Success") priced += 1 else: _log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, 0, "Failed") failed += 1 if used_source == "oxylabs": time.sleep(2) else: time.sleep(3) return {"priced": priced, "failed": failed, "skipped": skipped, "total": len(models)} @frappe.whitelist() def get_recent_pricing(limit=50, status_filter=None): filters = {} if status_filter: filters["pricing_status"] = status_filter records = frappe.get_all( "System Pricing", filters=filters, fields=[ "name", "manufacturer", "model", "pricing_status", "scraped_at", "days_since_pricing", "price_high", "price_low", "price_average", "sample_count", "source", ], order_by="scraped_at desc", limit=int(limit), ) for r in records: r["days_since_pricing"] = r.get("days_since_pricing") or 0 for key in ("price_high", "price_low", "price_average"): if r.get(key) is not None: r[key] = round(r[key], 2) return records @frappe.whitelist() def apply_serial_pricing(serial_no): serial = frappe.get_doc("Serial No", serial_no) if not serial.item_code: return {"error": "No item linked"} item = frappe.get_doc("Item", serial.item_code) grade = serial.grade price_point = serial.price_point if not grade: serial.pricing_status = "Needs Grading" serial.save() return {"status": "needs_grading"} if grade in ("C", "D", "F"): serial.assigned_price = item.commodity_flat_price or 0 serial.commodity_value = item.commodity_flat_price or 0 serial.pricing_status = "Commodity" serial.pricing_source = item.name serial.save() frappe.db.commit() return {"status": "commodity", "price": serial.assigned_price} if not price_point: serial.pricing_status = "Needs Pricing" serial.save() return {"status": "needs_price_point"} base_price = 0 if price_point == "Low": base_price = item.market_low or item.base_market_price or 0 elif price_point == "Median": base_price = item.market_median or item.base_market_price or 0 elif price_point == "Average": base_price = item.market_average or item.base_market_price or 0 elif price_point == "High": base_price = item.market_high or item.base_market_price or 0 elif price_point == "Manual": base_price = serial.manual_price or 0 multiplier = 1.0 if grade == "A": multiplier = item.grade_a_multiplier or 1.0 elif grade == "B": multiplier = item.grade_b_multiplier or 0.8 final_price = flt(base_price) * flt(multiplier) serial.assigned_price = round(final_price, 2) serial.pricing_status = "Priced" if price_point != "Manual" else "Manual Override" serial.pricing_source = item.name serial.save() frappe.db.commit() return { "status": serial.pricing_status, "price": serial.assigned_price, "base": base_price, "multiplier": multiplier, } @frappe.whitelist() def batch_apply_pricing(item_code=None, batch_size=50): filters = {"pricing_status": ["in", ["Needs Grading", "Needs Pricing", "Priced"]]} if item_code: filters["item_code"] = item_code serials = frappe.get_all( "Serial No", filters=filters, fields=["name", "item_code", "grade", "price_point"], limit=int(batch_size), ) results = {"priced": 0, "commodity": 0, "needs_grading": 0, "needs_price_point": 0, "errors": 0} for s in serials: try: result = apply_serial_pricing(s.name) status = result.get("status", "") if status == "commodity": results["commodity"] += 1 elif status == "needs_grading": results["needs_grading"] += 1 elif status == "needs_price_point": results["needs_price_point"] += 1 elif status in ("Priced", "Manual Override"): results["priced"] += 1 else: results["errors"] += 1 except Exception as e: frappe.log_error(f"Pricing error for {s.name}: {e}", "eBay Pricing") results["errors"] += 1 return results