wip: receiving dashboard, customer records, archive old 2-level paths, various fixes

This commit is contained in:
vagrant
2026-05-28 03:30:45 +00:00
parent d4ed4b1d89
commit 6fe6d61779
141 changed files with 4247 additions and 1175 deletions
@@ -0,0 +1,3 @@
from westech_r2.api import sales
from westech_r2.api import receiving_api
@@ -0,0 +1,166 @@
import frappe
from reportlab.lib.pagesizes import letter
from reportlab.lib.units import inch
from reportlab.lib.colors import HexColor, black, white, grey
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, HRFlowable, Image
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY
from reportlab.lib import colors
import io
import os
DARK_BLUE = HexColor('#2F5496')
LIGHT_BLUE = HexColor('#D6E4F0')
GRAY = HexColor('#666666')
@frappe.whitelist()
def generate_cor(company_name=None, weights=None, received_date=None, red_r2=None, contact_name=None, contact_number=None, address_line=None, pallet_name=None):
"""Generate Certificate of Recycling PDF from form data."""
# Format date
date_str = ''
if received_date:
from frappe.utils import formatdate
date_str = formatdate(received_date, 'MMMM d, Y')
items_recycled = 'e-waste'
if red_r2:
items_recycled += ' (' + red_r2 + ')'
output = io.BytesIO()
doc = SimpleDocTemplate(
output,
pagesize=letter,
topMargin=0.5 * inch,
bottomMargin=0.5 * inch,
leftMargin=0.75 * inch,
rightMargin=0.75 * inch
)
styles = getSampleStyleSheet()
# Custom styles matching the Electron app
date_style = ParagraphStyle('DateBlock', parent=styles['Normal'], fontSize=14, fontName='Times-Bold', alignment=TA_LEFT)
title_style = ParagraphStyle('CertTitle', parent=styles['Title'], fontSize=16, fontName='Times-Bold', textColor=black, spaceAfter=6, alignment=TA_CENTER, letterSpacing=0.05)
cert_style = ParagraphStyle('CertBody', parent=styles['Normal'], fontName='Times-Roman', fontSize=12, spaceAfter=12, alignment=TA_JUSTIFY)
body_style = ParagraphStyle('BodyText2', parent=styles['Normal'], fontName='Times-Roman', fontSize=12, spaceAfter=10, alignment=TA_JUSTIFY)
bullet_style = ParagraphStyle('BulletText', parent=styles['Normal'], fontName='Times-Roman', fontSize=10, spaceAfter=4, leftIndent=24, bulletIndent=12, alignment=TA_JUSTIFY)
optin_style = ParagraphStyle('OptIn', parent=styles['Normal'], fontName='Times-Roman', fontSize=12, spaceAfter=10, alignment=TA_JUSTIFY)
sig_style = ParagraphStyle('Signature', parent=styles['Normal'], fontName='Times-Bold', fontSize=18, spaceBefore=18)
footer_style = ParagraphStyle('Footer', parent=styles['Normal'], fontName='Times-Roman', fontSize=10, textColor=GRAY)
elements = []
# Header row: Date | Logo | Title
logo_path = os.path.join(frappe.get_app_path('westech_r2'), 'public', 'images', 'cor_logo.png')
logo_img = None
if os.path.exists(logo_path):
logo_img = Image(logo_path, width=2.45 * inch, height=0.8 * inch)
header_data = [
[Paragraph(date_str, date_style), logo_img or Paragraph('', styles['Normal']), Paragraph('CERTIFICATE OF RECYCLING', title_style)]
]
header_table = Table(header_data, colWidths=[1.8 * inch, 2.45 * inch, 2.75 * inch])
header_table.setStyle(TableStyle([
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'),
('ALIGN', (0, 0), (0, 0), 'LEFT'),
('ALIGN', (1, 0), (1, 0), 'CENTER'),
('ALIGN', (2, 0), (2, 0), 'CENTER'),
]))
elements.append(header_table)
elements.append(Spacer(1, 18))
# Certification paragraph
elements.append(Paragraph(
'Full Circle Electronics AZ, LLC (dba Westech Recyclers) certifies that the '
'materials submitted for recycling are received and will be properly recycled '
'in accordance with all state and federal recycling regulations and in '
'accordance with the R2 Standard.',
cert_style
))
# Data table
data_rows = [
['Company:', company_name or 'N/A'],
['Weight:', weights or 'N/A'],
['Items Recycled:', items_recycled],
]
if contact_name:
data_rows.append(['Contact:', contact_name])
if contact_number:
data_rows.append(['Phone:', contact_number])
if address_line:
data_rows.append(['Address:', address_line])
data_table = Table(data_rows, colWidths=[3.36 * inch, 3.64 * inch])
data_table.setStyle(TableStyle([
('FONTNAME', (0, 0), (-1, -1), 'Times-Roman'),
('FONTSIZE', (0, 0), (-1, -1), 12),
('ALIGN', (0, 0), (0, -1), 'RIGHT'),
('ALIGN', (1, 0), (1, -1), 'LEFT'),
('VALIGN', (0, 0), (-1, -1), 'TOP'),
('GRID', (0, 0), (-1, -1), 0.5, HexColor('#bfbfbf')),
('TOPPADDING', (0, 0), (-1, -1), 4),
('BOTTOMPADDING', (0, 0), (-1, -1), 4),
('LEFTPADDING', (0, 0), (-1, -1), 6),
('RIGHTPADDING', (0, 0), (-1, -1), 6),
]))
elements.append(data_table)
elements.append(Spacer(1, 12))
# Body paragraphs
elements.append(Paragraph(
'Full Circle Electronics AZ, LLC further acknowledges the acceptance and '
'recycling of any material potentially containing data. Data containing '
'materials are stored in our secured facility ensuring the security of the '
'unit(s) prior to data sanitization.',
body_style
))
elements.append(Paragraph(
'Data containing materials are sanitized in compliance with NIST 800-88 '
'guidelines which is set forth by the U.S. government for a robust methodology '
'for erasing data from storage media. Depending upon the media received, the '
'data destruction methods used are as follows:',
body_style
))
# Bullet list
bullets = [
'Hard disk and solid-state drives will either be logically sanitized using professional software or physically destroyed via shredding or degaussing.',
'Media cards and small storage devices will either be degaussed / shredded at our facility or sent straight to a smelter.',
'Data tapes or reels will either be degaussed or shredded at a vetted and approved downstream service provider.',
'Electronics with embedded storage chips will either be destroyed by physical destruction at our facility or at a vetted and approved downstream service provider.',
'Small electronics containing data will either be logically sanitized using the manufacturer\'s application for destroying data or sent to a vetted and approved downstream service provider.',
]
for b in bullets:
elements.append(Paragraph('\u2022 ' + b, bullet_style))
elements.append(Spacer(1, 6))
# Opt-in
elements.append(Paragraph(
'Opt-in option. If you desire to be informed of our data destruction process '
'changes or be notified of any unlikely security breaches, please let us know.',
optin_style
))
# Signature
elements.append(Paragraph('Westech Recyclers', sig_style))
# Footer
elements.append(Spacer(1, 10))
elements.append(Paragraph(
'220 S 9th St Phoenix, AZ 85034    '
'<link href="http://www.westechrecyclers.com" color="#1155cc">www.westechrecyclers.com</link> &nbsp;&nbsp; '
'602.256.7626',
footer_style
))
doc.build(elements)
output.seek(0)
frappe.response.filename = 'COR_' + (company_name or 'document').replace(' ', '_') + '.pdf'
frappe.response.filecontent = output.getvalue()
frappe.response.type = 'download'
frappe.response.display_content_as = 'attachment'
@@ -0,0 +1,546 @@
import sys
sys.path.insert(0, '/home/frappe/erpnext-bench/apps/frappe')
import frappe
from frappe.utils import now, now_datetime, flt
from frappe import _
import json
import re
import time
import urllib.parse
OXYLABS_API = "https://realtime.oxylabs.io/v1/queries"
ACTOR_ID = "caffein.dev~ebay-sold-listings"
API_BASE = "https://api.apify.com/v2"
MFR_CLEANUP = {
"Dell Inc": "Dell", "HP HP": "HP", "HP": "HP",
"LENOVO": "Lenovo", "Lenovo": "Lenovo",
"Microsoft Corporation": "Microsoft", "Apple Inc": "Apple",
"ASUSTeK COMPUTER INC.": "ASUS", "Acer": "Acer",
"Panasonic": "Panasonic", "Samsung": "Samsung",
"Toshiba": "Toshiba", "Fujitsu": "Fujitsu",
"Hewlett-Packard": "HP", "HUAWEI": "Huawei",
}
def _get_settings():
if not frappe.db.exists("eBay Pricing Settings", "eBay Pricing Settings"):
return None
return frappe.get_doc("eBay Pricing Settings", "eBay Pricing Settings")
def _get_oxylabs_creds():
settings = _get_settings()
user = password = ""
if settings:
user = settings.get("oxylabs_user") or ""
try:
password = settings.get_password("oxylabs_password") or ""
except Exception:
pass
if not user:
user = frappe.conf.get("oxylabs_user", "")
if not password:
password = frappe.conf.get("oxylabs_password", "")
return (user, password)
def _get_apify_token():
settings = _get_settings()
token = ""
if settings:
try:
token = settings.get_password("apify_token") or ""
except Exception:
pass
if not token:
token = frappe.conf.get("apify_token", "")
return token
def clean_manufacturer(mfr):
return MFR_CLEANUP.get(mfr, mfr)
@frappe.whitelist()
def search_model(query=None, manufacturer=None, model=None, source="auto"):
if not query and not (manufacturer and model):
return {"error": "Provide query or manufacturer + model"}
if not manufacturer or not model:
parts = (query or "").split(None, 1)
if len(parts) >= 2:
manufacturer, model = parts[0], parts[1]
else:
manufacturer, model = query, ""
items, used_source = _search_ebay_sold(model, manufacturer, source=source)
pricing = _parse_prices(items, manufacturer, model, source=used_source or "unknown")
if pricing:
_upsert_system_pricing(manufacturer, model, pricing)
_update_item_market_data(manufacturer, model, pricing)
_log_api_call(manufacturer, model, query, used_source,
len(items) if items else 0, "Success")
return {"results": items or [], "pricing": pricing}
else:
_log_api_call(manufacturer, model, query, used_source, 0, "Failed")
return {"results": items or [], "pricing": None,
"message": "No pricing data found"}
def _search_ebay_sold_oxylabs(model, manufacturer):
user, password = _get_oxylabs_creds()
if not user or not password:
return None
import requests as req_module
clean_mfr = clean_manufacturer(manufacturer)
query = f"{clean_mfr} {model}"
payloads = [
{
"source": "universal",
"url": f"https://www.ebay.com/sch/i.html?_nkw={urllib.parse.quote(query)}&LH_Sold=1&_ipg=240",
"render": "html",
},
{
"source": "ebay_search",
"query": query,
"domain": "com",
"render": "html",
},
]
for payload in payloads:
try:
resp = req_module.post(OXYLABS_API, auth=(user, password),
json=payload, timeout=120)
if resp.status_code != 200:
continue
data = resp.json()
if "results" not in data or not data["results"]:
continue
content = data["results"][0].get("content", "")
if not isinstance(content, str) or len(content) < 100000:
continue
listings = _parse_ebay_html(content)
if listings and len(listings) >= 3:
return listings
except Exception:
continue
return None
def _parse_ebay_html(content):
listings = []
card_blocks = re.split(r'class="s-card\s', content)
for block in card_blocks[1:]:
listing = {}
price_m = re.search(
r's-card__price[^>]*>\$([\d,.]+(?:\.\d{2})?)(?:\s*to\s*\$[\d,.]+(?:\.\d{2})?)?</span>',
block,
)
if price_m:
listing["price"] = float(price_m.group(1).replace(",", ""))
title_m = re.search(r's-card__title[^>]*><span[^>]*>([^<]+)</span>', block)
if title_m:
listing["title"] = title_m.group(1).strip()
if listing["title"].lower() in ("shop on ebay", ""):
continue
else:
heading_m = re.search(r'role=heading[^>]*>(.*?)</(?:div|span|h\d)>',
block, re.DOTALL)
if heading_m:
title_text = re.sub(r'<[^>]+>', '', heading_m.group(1)).strip()
if title_text.lower() != "shop on ebay":
listing["title"] = title_text
sold_m = re.search(r'(\d[\d,]*)\s+sold', block, re.IGNORECASE)
if sold_m:
listing["sold"] = int(sold_m.group(1).replace(",", ""))
if re.search(r'Free (?:shipping|delivery|Standard Shipping)',
block, re.IGNORECASE):
listing["shipping"] = "Free"
else:
ship_m = re.search(
r'\+\$?([\d,.]+)\s+(?:shipping|delivery|Standard Shipping)',
block, re.IGNORECASE,
)
if ship_m:
listing["shipping"] = float(ship_m.group(1).replace(",", ""))
else:
ship_alt = re.search(r'\$([\d,.]+)\s+(?:shipping|delivery)',
block, re.IGNORECASE)
if ship_alt:
listing["shipping"] = float(ship_alt.group(1).replace(",", ""))
cond_m = re.search(
r'(Pre-Owned|Used|Brand New|New \(Other\)|Refurbished|Open Box|'
r'For parts or not working|Seller refurbished|New with defects|'
r'New with box|New without box|New with tags)',
block, re.IGNORECASE,
)
if cond_m:
listing["condition"] = cond_m.group(1)
if listing.get("price") or listing.get("title"):
listings.append(listing)
return listings if listings else None
def _search_ebay_sold_apify(model, manufacturer):
token = _get_apify_token()
if not token:
return None
import requests as req_module
clean_mfr = clean_manufacturer(manufacturer)
query = f"{clean_mfr} {model}"
run_input = {
"keywords": [query],
"daysToScrape": 60,
"count": 30,
"categoryId": "0",
"ebaySite": "ebay.com",
"sortOrder": "endedRecently",
"itemCondition": "any",
"itemLocation": "domestic",
}
url = f"{API_BASE}/acts/{ACTOR_ID}/runs?token={token}"
try:
resp = req_module.post(url, json=run_input, timeout=30)
result = resp.json()
run_id = result["data"]["id"]
dataset_id = result["data"].get("defaultDatasetId")
except Exception:
return None
max_wait = 120
start = time.time()
while time.time() - start < max_wait:
time.sleep(8)
try:
status_url = f"{API_BASE}/actor-runs/{run_id}?token={token}"
status_resp = req_module.get(status_url, timeout=10)
status = status_resp.json()
run_status = status["data"]["status"]
if run_status == "SUCCEEDED":
break
elif run_status in ("FAILED", "ABORTED", "TIMED-OUT"):
return None
except Exception:
continue
try:
results_url = f"{API_BASE}/datasets/{dataset_id}/items?token={token}&limit=30&clean=true"
results_resp = req_module.get(results_url, timeout=15)
return results_resp.json()
except Exception:
return None
def _search_ebay_sold(model, manufacturer, source="auto"):
if source in ("auto", "oxylabs"):
result = _search_ebay_sold_oxylabs(model, manufacturer)
if result is not None:
return result, "oxylabs"
if source in ("auto", "apify"):
result = _search_ebay_sold_apify(model, manufacturer)
if result is not None:
return result, "apify"
return None, None
def _parse_prices(items, manufacturer, model, source="oxylabs"):
if not items:
return None
prices = []
for item in items:
if item.get("error"):
continue
if source == "oxylabs":
price_val = item.get("price")
if isinstance(price_val, str):
price_str = price_val.replace("$", "").replace(",", "").strip()
if " to " in price_str:
price_str = price_str.split(" to ")[0]
try:
price_val = float(price_str)
except (ValueError, TypeError):
continue
if not isinstance(price_val, (int, float)):
continue
p = float(price_val)
if 5 < p < 10000:
title = item.get("title", "").upper()
model_upper = model.upper()
model_words = model_upper.split()
if len(items) > 5:
short_model_words = [w for w in model_words if len(w) > 2]
if short_model_words and not any(w in title for w in short_model_words):
continue
prices.append(p)
else:
price_str = item.get("totalPrice") or item.get("soldPrice")
if not price_str:
continue
try:
p = float(str(price_str).replace(",", "").replace("$", "").strip())
if 5 < p < 10000:
prices.append(p)
except (ValueError, TypeError):
continue
if not prices:
return None
prices.sort()
if len(prices) >= 5:
trim = max(1, int(len(prices) * 0.1))
trimmed = prices[trim : len(prices) - trim]
if trimmed:
prices = trimmed
if not prices:
return None
avg = sum(prices) / len(prices)
median = prices[len(prices) // 2]
return {
"price_high": round(max(prices), 2),
"price_low": round(min(prices), 2),
"price_average": round(avg, 2),
"price_auction": round(median, 2),
"sample_count": len(prices),
"source": source or "unknown",
"scraped_at": now(),
}
def _upsert_system_pricing(manufacturer, model, pricing):
existing = frappe.db.get_value(
"System Pricing",
{"manufacturer": manufacturer, "model": model},
"name",
)
doc = None
if existing:
doc = frappe.get_doc("System Pricing", existing)
else:
doc = frappe.new_doc("System Pricing")
doc.manufacturer = manufacturer
doc.model = model
for key in ("price_high", "price_low", "price_average", "price_auction",
"sample_count", "scraped_at"):
if key in pricing:
setattr(doc, key, pricing[key])
# Fix source to match allowed values
if "source" in pricing:
raw_source = pricing["source"]
if raw_source.startswith("ebay_"):
raw_source = raw_source.replace("ebay_", "")
if raw_source not in ("oxylabs", "apify"):
raw_source = "unknown"
setattr(doc, "source", raw_source)
if doc.scraped_at:
scraped = frappe.utils.get_datetime(doc.scraped_at)
now = now_datetime()
doc.days_since_pricing = (now - scraped).days
else:
doc.days_since_pricing = 0
doc.pricing_status = "Priced"
doc.save()
frappe.db.commit()
def _update_item_market_data(manufacturer, model, pricing):
clean_mfr = clean_manufacturer(manufacturer)
model_lower = model.lower()
# Match by brand (exact) + item_name (contains model)
items = frappe.get_all(
"Item",
filters={
"item_group": ["in", ["Laptop", "Desktop", "Tablet", "Phone", "Workstation"]],
"disabled": 0,
"brand": clean_mfr,
},
fields=["name", "item_name"],
)
matched = None
for item in items:
item_name_lower = (item.item_name or "").lower()
if model_lower in item_name_lower:
matched = item.name
break
if not matched:
return
item_doc = frappe.get_doc("Item", matched)
item_doc.base_market_price = pricing.get("price_average", 0)
item_doc.market_high = pricing.get("price_high", 0)
item_doc.market_low = pricing.get("price_low", 0)
item_doc.market_median = pricing.get("price_auction", 0)
item_doc.market_average = pricing.get("price_average", 0)
item_doc.market_samples = pricing.get("sample_count", 0)
item_doc.market_last_priced = pricing.get("scraped_at")
item_doc.save()
frappe.db.commit()
return {"item": matched, "updated": True}
def _log_api_call(manufacturer, model, search_query, source, results_count, status):
try:
log = frappe.new_doc("eBay Pricing Log")
log.manufacturer = manufacturer
log.model = model
log.search_query = search_query
log.source = source or "unknown"
log.timestamp = now()
log.results_count = results_count or 0
log.status = status
log.save()
frappe.db.commit()
except Exception:
pass
@frappe.whitelist()
def run_batch(batch_size=10, source="auto", force=False):
batch_size = int(batch_size) if batch_size != "all" else 999999
force = bool(force)
# Get unique models from Items that have serials
models = frappe.db.sql(
"""
SELECT DISTINCT i.brand as manufacturer, i.item_name as model
FROM `tabItem` i
INNER JOIN `tabSerial No` sn ON sn.item_code = i.name
WHERE i.brand IS NOT NULL AND i.brand != ''
AND i.disabled = 0
ORDER BY i.modified DESC
LIMIT %s
""",
(batch_size,),
as_dict=True,
)
priced = failed = skipped = 0
for row in models:
mfr = row.manufacturer
mdl = row.model
if not force:
exists = frappe.db.exists("System Pricing", {"manufacturer": mfr, "model": mdl})
if exists:
skipped += 1
continue
items, used_source = _search_ebay_sold(mdl, mfr, source=source)
pricing = _parse_prices(items, mfr, mdl, source=used_source or "unknown")
if pricing:
_upsert_system_pricing(mfr, mdl, pricing)
_update_item_market_data(mfr, mdl, pricing)
_log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source,
len(items) if items else 0, "Success")
priced += 1
else:
_log_api_call(mfr, mdl, f"{mfr} {mdl}", used_source, 0, "Failed")
failed += 1
if used_source == "oxylabs":
time.sleep(2)
else:
time.sleep(3)
return {"priced": priced, "failed": failed, "skipped": skipped, "total": len(models)}
@frappe.whitelist()
def get_recent_pricing(limit=50, status_filter=None):
filters = {}
if status_filter:
filters["pricing_status"] = status_filter
records = frappe.get_all(
"System Pricing",
filters=filters,
fields=[
"name", "manufacturer", "model", "pricing_status",
"scraped_at", "days_since_pricing",
"price_high", "price_low", "price_average",
"sample_count", "source",
],
order_by="scraped_at desc",
limit=int(limit),
)
for r in records:
r["days_since_pricing"] = r.get("days_since_pricing") or 0
for key in ("price_high", "price_low", "price_average"):
if r.get(key) is not None:
r[key] = round(r[key], 2)
return records
@frappe.whitelist()
def apply_serial_pricing(serial_no):
serial = frappe.get_doc("Serial No", serial_no)
if not serial.item_code:
return {"error": "No item linked"}
item = frappe.get_doc("Item", serial.item_code)
grade = serial.grade
price_point = serial.price_point
if not grade:
serial.pricing_status = "Needs Grading"
serial.save()
return {"status": "needs_grading"}
if grade in ("C", "D", "F"):
serial.assigned_price = item.commodity_flat_price or 0
serial.commodity_value = item.commodity_flat_price or 0
serial.pricing_status = "Commodity"
serial.pricing_source = item.name
serial.save()
frappe.db.commit()
return {"status": "commodity", "price": serial.assigned_price}
if not price_point:
serial.pricing_status = "Needs Pricing"
serial.save()
return {"status": "needs_price_point"}
base_price = 0
if price_point == "Low":
base_price = item.market_low or item.base_market_price or 0
elif price_point == "Median":
base_price = item.market_median or item.base_market_price or 0
elif price_point == "Average":
base_price = item.market_average or item.base_market_price or 0
elif price_point == "High":
base_price = item.market_high or item.base_market_price or 0
elif price_point == "Manual":
base_price = serial.manual_price or 0
multiplier = 1.0
if grade == "A":
multiplier = item.grade_a_multiplier or 1.0
elif grade == "B":
multiplier = item.grade_b_multiplier or 0.8
final_price = flt(base_price) * flt(multiplier)
serial.assigned_price = round(final_price, 2)
serial.pricing_status = "Priced" if price_point != "Manual" else "Manual Override"
serial.pricing_source = item.name
serial.save()
frappe.db.commit()
return {
"status": serial.pricing_status,
"price": serial.assigned_price,
"base": base_price,
"multiplier": multiplier,
}
@frappe.whitelist()
def batch_apply_pricing(item_code=None, batch_size=50):
filters = {"pricing_status": ["in", ["Needs Grading", "Needs Pricing", "Priced"]]}
if item_code:
filters["item_code"] = item_code
serials = frappe.get_all(
"Serial No",
filters=filters,
fields=["name", "item_code", "grade", "price_point"],
limit=int(batch_size),
)
results = {"priced": 0, "commodity": 0, "needs_grading": 0, "needs_price_point": 0, "errors": 0}
for s in serials:
try:
result = apply_serial_pricing(s.name)
status = result.get("status", "")
if status == "commodity":
results["commodity"] += 1
elif status == "needs_grading":
results["needs_grading"] += 1
elif status == "needs_price_point":
results["needs_price_point"] += 1
elif status in ("Priced", "Manual Override"):
results["priced"] += 1
else:
results["errors"] += 1
except Exception as e:
frappe.log_error(f"Pricing error for {s.name}: {e}", "eBay Pricing")
results["errors"] += 1
return results
@@ -0,0 +1,6 @@
import frappe
@frappe.whitelist()
def install_ssh():
"""Placeholder for SSH install functionality."""
return {"success": True, "message": "SSH install endpoint ready"}
@@ -0,0 +1,127 @@
import json
import re
import frappe
@frappe.whitelist()
def optimize_routes(pickup_date=None):
"""Optimize routes for all trucks on a given date."""
if not pickup_date:
pickup_date = frappe.utils.today()
trucks = frappe.get_list("Truck Profile",
filters={"active": 1},
fields=["name", "truck_name", "total_slots", "weight_capacity"]
)
pickups = frappe.get_list("Scheduled Pickup",
filters={
"pickup_date": pickup_date,
"shipment_type": "Truck",
"truck_profile": ["is", "not set"]
},
fields=["name", "customer_number", "company_name", "estimated_items",
"estimated_weight", "gaylord_count", "gaylord_sizes", "slots_needed",
"latitude", "longitude"],
order_by="stop_order asc"
)
for pickup in pickups:
if not pickup.slots_needed and pickup.gaylord_sizes:
pickup.slots_needed = _calculate_slots(pickup.gaylord_sizes)
elif not pickup.slots_needed:
pickup.slots_needed = pickup.gaylord_count or 1
pickups.sort(key=lambda x: x.slots_needed or 0, reverse=True)
routes = {}
for truck in trucks:
routes[truck.name] = {
"truck": truck,
"pickups": [],
"used_slots": 0,
"used_weight": 0
}
unassigned = []
for pickup in pickups:
assigned = False
for truck_name, route in routes.items():
truck = route["truck"]
slots = pickup.slots_needed or 0
weight = float(pickup.estimated_weight or 0)
if route["used_slots"] + slots <= truck.total_slots:
if truck.weight_capacity and route["used_weight"] + weight <= truck.weight_capacity:
route["pickups"].append(pickup)
route["used_slots"] += slots
route["used_weight"] += weight
assigned = True
frappe.db.set_value("Scheduled Pickup", pickup.name, "truck_profile", truck_name)
break
elif not truck.weight_capacity:
route["pickups"].append(pickup)
route["used_slots"] += slots
route["used_weight"] += weight
assigned = True
frappe.db.set_value("Scheduled Pickup", pickup.name, "truck_profile", truck_name)
break
if not assigned:
unassigned.append(pickup)
for truck_name, route in routes.items():
if route["pickups"]:
route["pickups"].sort(key=lambda p: (float(p.latitude or 0), float(p.longitude or 0)))
for i, pickup in enumerate(route["pickups"], 1):
frappe.db.set_value("Scheduled Pickup", pickup.name, "stop_order", i)
frappe.db.commit()
return {
"success": True,
"date": pickup_date,
"trucks_assigned": len([r for r in routes.values() if r["pickups"]]),
"total_pickups": len(pickups),
"unassigned": len(unassigned),
"routes": {
truck_name: {
"truck_name": route["truck"].truck_name,
"slots_used": route["used_slots"],
"slots_total": route["truck"].total_slots,
"weight_used": route["used_weight"],
"weight_capacity": route["truck"].weight_capacity,
"stops": len(route["pickups"]),
"pickups": [{"name": p.name, "company": p.company_name, "slots": p.slots_needed} for p in route["pickups"]]
}
for truck_name, route in routes.items() if route["pickups"]
}
}
def _calculate_slots(gaylord_sizes_text):
if not gaylord_sizes_text:
return 0
size_map = {"small": 1, "medium": 2, "large": 3}
total = 0
matches = re.findall(r'(\d+)\s*(\w+)', gaylord_sizes_text.lower())
for count, size in matches:
slots = size_map.get(size, 1)
total += int(count) * slots
return total or 1
@frappe.whitelist()
def get_scheduled_pickups(pickup_date=None):
"""Get scheduled pickups for a given date."""
if not pickup_date:
pickup_date = frappe.utils.today()
pickups = frappe.get_list("Scheduled Pickup",
filters={"pickup_date": pickup_date},
fields=["name", "customer_number", "company_name", "estimated_items",
"estimated_weight", "gaylord_count", "gaylord_sizes", "slots_needed",
"latitude", "longitude", "stop_order", "truck_profile", "status",
"contact_name", "contact_phone", "address_line", "city", "state", "zip_code"],
order_by="stop_order asc"
)
return pickups
+68
View File
@@ -0,0 +1,68 @@
import frappe
from frappe import _
@frappe.whitelist()
def get_qa_ready_serials(limit=50):
"""Get Serial Nos ready for QA."""
return frappe.get_all('Serial No',
filters={'r2_status': 'Needs QA'},
fields=['name', 'item_code', 'item_name', 'pallet', 'cosmetic_grade'],
limit=limit,
order_by='creation asc'
)
@frappe.whitelist()
def create_qa_from_serial(serial_no):
"""Create QA inspection for a Serial No."""
if not frappe.db.exists('Serial No', serial_no):
return {'error': 'Serial No not found'}
serial = frappe.get_doc('Serial No', serial_no)
existing = frappe.db.get_value('R2 Device Inspection',
{'serial_no': serial_no, 'docstatus': ['!=', 2]}, 'name')
if existing:
return {'error': f'Inspection {existing} already exists'}
insp = frappe.get_doc({
'doctype': 'R2 Device Inspection',
'serial_no': serial_no,
'item_code': serial.item_code,
'inspection_date': frappe.utils.today(),
'inspector': frappe.session.user,
'cosmetic_grade': serial.cosmetic_grade,
'screen_condition': serial.screen_condition,
'keyboard_condition': serial.keyboard_condition,
'case_condition': serial.case_condition
})
insp.insert(ignore_permissions=True)
return {'success': True, 'inspection': insp.name}
@frappe.whitelist()
def auto_grade(serial_no, grade='C5'):
"""Auto-grade a device and move to Priced state."""
serial = frappe.get_doc('Serial No', serial_no)
serial.cosmetic_grade = grade
serial.r2_status = 'Processed'
serial.save(ignore_permissions=True)
# Apply flat pricing
item = frappe.db.get_value('Item', serial.item_code, 'item_group')
flat_prices = {
'Laptops': {'c3': 250, 'c4': 200, 'c5': 150, 'c6': 100, 'c7': 60, 'c8': 30, 'c9': 15},
'Desktops': {'c3': 180, 'c4': 150, 'c5': 120, 'c6': 80, 'c7': 50, 'c8': 25, 'c9': 10},
'Tablets': {'c3': 120, 'c4': 100, 'c5': 80, 'c6': 50, 'c7': 30, 'c8': 15, 'c9': 8},
'Phones': {'c3': 100, 'c4': 80, 'c5': 60, 'c6': 40, 'c7': 25, 'c8': 12, 'c9': 5}
}
prices = flat_prices.get(item, flat_prices['Laptops'])
price = prices.get(grade.lower(), 100)
serial.suggested_price = price
serial.assigned_price = price
serial.pricing_status = 'Priced'
serial.price_point = grade
serial.r2_status = 'Priced'
serial.save(ignore_permissions=True)
return {'success': True, 'price': price}
@@ -0,0 +1,499 @@
import json
import frappe
from frappe.utils import today, getdate, add_days
from datetime import timedelta
@frappe.whitelist()
def get_pickups(date=None):
"""Fetch scheduled pickups with optional date filter.
Returns pickups, calendar (next 30 days), and weekly chart data."""
filters = []
if date:
filters.append(["Scheduled Pickup", "pickup_date", "=", date])
fields = [
"name", "pickup_date", "pickup_type", "status", "truck", "stop_order",
"customer_number", "company_name",
"contact_name", "contact_phone", "contact_email",
"address_line", "city", "state", "zip_code",
"latitude", "longitude",
"estimated_items", "estimated_weight", "load_contents",
"num_labels", "data_status", "red_r2",
"notes", "legacy_notes", "needs_aor", "needs_cod",
]
pickups = frappe.get_list("Scheduled Pickup",
fields=fields,
filters=filters if filters else None,
order_by="pickup_date asc, stop_order asc",
limit_page_length=500,
)
# Build calendar data (next 30 days)
from_date = getdate(today())
to_date = add_days(from_date, 30)
all_pickups = frappe.get_list("Scheduled Pickup",
fields=["pickup_date"],
filters=[["Scheduled Pickup", "pickup_date", ">=", str(from_date)],
["Scheduled Pickup", "pickup_date", "<=", str(to_date)]],
limit_page_length=500,
)
pickup_counts = {}
for p in all_pickups:
d = p.get("pickup_date", "")
if d:
pickup_counts[d] = pickup_counts.get(d, 0) + 1
calendar = []
for i in range(30):
d = add_days(from_date, i)
ds = str(d)
calendar.append({"date": ds, "count": pickup_counts.get(ds, 0)})
# Build weekly chart data (last 12 weeks)
weekly = []
for i in range(11, -1, -1):
week_start = add_days(from_date, -(from_date.weekday() + 7 * i))
week_end = add_days(week_start, 6)
count = 0
for d_str, c in pickup_counts.items():
try:
d = getdate(d_str)
if week_start <= d <= week_end:
count += c
except (ValueError, TypeError):
pass
weekly.append({"label": week_start.strftime("%m/%d"), "count": count})
return {
"pickups": pickups,
"calendar": calendar,
"weekly": weekly,
}
@frappe.whitelist()
def auto_route(date=None):
"""Auto-assign pickups to trucks based on capacity and proximity."""
if not date:
date = today()
pickups = frappe.get_list("Scheduled Pickup",
filters={"pickup_date": date},
fields=["name", "company_name", "estimated_items", "estimated_weight",
"latitude", "longitude", "pickup_type"],
limit_page_length=200,
)
if not pickups:
return {"success": True, "assigned": 0}
trucks = ["Truck 1", "Truck 2", "Truck 3"]
sorted_p = sorted(pickups, key=lambda p: (float(p.get("latitude") or 0), float(p.get("longitude") or 0)))
n = len(sorted_p)
assigned = 0
for i, p in enumerate(sorted_p):
if p.get("pickup_type") == "Drop-off":
truck = ""
else:
truck = trucks[i % 3] if n <= 3 else trucks[min(i * 3 // n, 2)]
doc = frappe.get_doc("Scheduled Pickup", p["name"])
doc.truck = truck
doc.status = "Routed" if truck else "Scheduled"
doc.stop_order = i + 1
doc.save()
assigned += 1
frappe.db.commit()
return {"success": True, "assigned": assigned}
@frappe.whitelist()
def get_checkins():
"""Fetch completed check-ins — returns Loads with their Pallets."""
loads = frappe.get_list("Load",
fields=["name", "load_number", "incoming_date", "customer", "customer_name",
"total_devices", "total_weight", "data_status", "red_r2"],
order_by="incoming_date desc",
limit_page_length=100,
)
for load in loads:
pallets = frappe.get_list("Pallet",
filters={"load": load.name},
fields=["name", "pallet_number", "received_date", "inbound_weight",
"total_items", "data_status", "red_r2", "description", "status"],
limit_page_length=50,
)
load["pallets"] = pallets
load["pallet_count"] = len(pallets)
return {"checkins": loads}
@frappe.whitelist()
def checkin_load(pickup_name, received_date, actual_pallets, total_weight, load_contents, data_status=None, red_r2=None):
"""Check in a load: create Load + Pallets, mark pickup Complete."""
# Get the pickup
pickup = frappe.get_doc("Scheduled Pickup", pickup_name)
if not actual_pallets or int(actual_pallets) < 1:
frappe.throw("Actual pallet count must be at least 1")
actual_pallets = int(actual_pallets)
# Resolve customer - customer_number on pickup is a Link to Customer
customer_id = pickup.customer_number
if not customer_id or not frappe.db.exists("Customer", customer_id):
frappe.throw("Customer {} not found. Please verify the customer on the pickup.".format(customer_id))
# Generate Load name: MMDDYYYY-CustomerNumber format
from datetime import datetime
try:
dt = datetime.strptime(received_date, "%Y-%m-%d")
date_part = dt.strftime("%m%d%Y")
except (ValueError, TypeError):
date_part = received_date.replace("-", "")
cust_num = customer_id
load_name = "{}-{}".format(date_part, cust_num)
# Make unique if name already exists
base_name = load_name
counter = 1
while frappe.db.exists("Load", load_name):
load_name = "{}-{}".format(base_name, counter)
counter += 1
# Create Load
load = frappe.get_doc({
"doctype": "Load",
"name": load_name,
"load_number": load_name,
"incoming_date": received_date,
"customer": customer_id,
"customer_name": pickup.company_name or "",
"customer_number": customer_id,
"data_status": data_status or pickup.data_status or "",
"red_r2": red_r2 or pickup.red_r2 or "",
"total_weight": float(total_weight) if total_weight else 0,
"total_devices": actual_pallets,
"material_items": [{
"material_type": "# Of Pallets",
"total_count": actual_pallets,
"weight": float(total_weight) if total_weight else 0,
"initial_data_status": data_status or pickup.data_status or "D0",
}],
})
load.insert()
frappe.db.commit()
# Create Pallets — autoname=pallet_number, so set name=pallet_number
for i in range(actual_pallets):
pallet_num = "{}-P{}".format(load_name, i + 1)
pallet = frappe.get_doc({
"doctype": "Pallet",
"name": pallet_num,
"pallet_number": pallet_num,
"received_date": received_date,
"load": load.name,
"company_name": pickup.company_name or "",
"inbound_weight": str(round(float(total_weight) / actual_pallets, 1)) if total_weight and actual_pallets else "",
"description": load_contents or "",
"data_status": data_status or pickup.data_status or "",
"red_r2": red_r2 or pickup.red_r2 or "",
"contact_name": pickup.contact_name or "",
"contact_number": pickup.contact_phone or "",
"contact_email": pickup.contact_email or "",
"address_line": (pickup.address_line or "") + ((", " + pickup.city) if pickup.city else "") + ((", " + pickup.state) if pickup.state else ""),
"needs_aor": pickup.needs_aor or 0,
"needs_cod": pickup.needs_cod or 0,
"notes": pickup.notes or "",
"pickup": pickup.pickup_type or "",
"status": "Received",
})
pallet.insert()
frappe.db.commit()
# Update pickup status
pickup.status = "Complete"
pickup.save()
frappe.db.commit()
return {
"success": True,
"load": load.name,
"pallets_created": actual_pallets,
}
@frappe.whitelist()
def get_pickup_details(pickup_name):
"""Get full details of a Scheduled Pickup for the check-in form."""
pickup = frappe.get_doc("Scheduled Pickup", pickup_name)
return {
"name": pickup.name,
"pickup_date": pickup.pickup_date,
"pickup_type": pickup.pickup_type,
"customer_number": pickup.customer_number,
"company_name": pickup.company_name,
"contact_name": pickup.contact_name,
"contact_phone": pickup.contact_phone,
"contact_email": pickup.contact_email,
"address_line": pickup.address_line,
"city": pickup.city,
"state": pickup.state,
"zip_code": pickup.zip_code,
"estimated_items": pickup.estimated_items,
"estimated_weight": pickup.estimated_weight,
"data_status": pickup.data_status,
"red_r2": pickup.red_r2,
"needs_aor": pickup.needs_aor,
"needs_cod": pickup.needs_cod,
"notes": pickup.notes,
}
@frappe.whitelist()
def cor_report():
"""Generate Certificate of Recycling report."""
pickups = frappe.get_list("Scheduled Pickup",
filters={"status": "Complete"},
fields=["name", "pickup_date", "company_name", "customer_number",
"estimated_items", "estimated_weight", "load_contents",
"data_status", "red_r2", "needs_aor", "needs_cod"],
order_by="pickup_date desc",
limit_page_length=200,
)
html = "<!DOCTYPE html><html><head><title>CoR Report</title>"
html += "<style>body{font-family:Arial,sans-serif;margin:40px;}"
html += "table{border-collapse:collapse;width:100%;}"
html += "th,td{border:1px solid #ddd;padding:8px;text-align:left;font-size:13px;}"
html += "th{background:#2F5496;color:#fff;}h1{color:#2F5496;}</style></head><body>"
html += "<h1>Certificate of Recycling (CoR) Report</h1>"
html += "<p>Generated: " + frappe.utils.now() + "</p>"
html += "<p>Total completed loads: " + str(len(pickups)) + "</p>"
if pickups:
html += "<table><tr><th>Date</th><th>Customer</th><th>Items</th><th>Weight</th>"
html += "<th>Contents</th><th>Data Status</th><th>RED/R2</th><th>AoR</th><th>CoD</th></tr>"
for p in pickups:
html += "<tr><td>" + str(p.get("pickup_date", "")) + "</td>"
html += "<td>" + str(p.get("company_name", "")) + "</td>"
html += "<td>" + str(p.get("estimated_items", "")) + "</td>"
html += "<td>" + str(p.get("estimated_weight", "")) + "</td>"
html += "<td>" + str(p.get("load_contents", "")) + "</td>"
html += "<td>" + str(p.get("data_status", "")) + "</td>"
html += "<td>" + str(p.get("red_r2", "")) + "</td>"
html += "<td>" + ("" if p.get("needs_aor") else "") + "</td>"
html += "<td>" + ("" if p.get("needs_cod") else "") + "</td></tr>"
html += "</table>"
else:
html += "<p>No completed loads found.</p>"
html += "</body></html>"
frappe.local.response["type"] = "html"
frappe.local.response["page_content"] = html
@frappe.whitelist()
def print_route_sheet(date=None):
"""Generate printable route sheet."""
if not date:
date = today()
filters = {"pickup_date": date} if date else {}
pickups = frappe.get_list("Scheduled Pickup",
filters=filters,
fields=["name", "pickup_date", "pickup_type", "status", "truck", "stop_order",
"company_name", "contact_name", "contact_phone", "contact_email",
"address_line", "city", "state", "zip_code",
"estimated_items", "estimated_weight", "load_contents",
"data_status", "red_r2", "needs_aor", "needs_cod", "notes"],
order_by="truck asc, stop_order asc",
limit_page_length=200,
)
trucks = {}
unassigned = []
for p in pickups:
t = p.get("truck", "")
if t and t != "Unassigned":
trucks.setdefault(t, []).append(p)
else:
unassigned.append(p)
html = "<!DOCTYPE html><html><head><title>Route Sheet</title>"
html += "<style>body{font-family:Arial,sans-serif;margin:30px;}"
html += "table{border-collapse:collapse;width:100%;margin-bottom:12px;}"
html += "th,td{border:1px solid #999;padding:6px 10px;text-align:left;font-size:12px;}"
html += "th{background:#2F5496;color:#fff;}h1{color:#2F5496;font-size:20px;}"
html += ".truck-header{background:#f0f0f0;padding:8px;font-weight:700;font-size:14px;border:1px solid #ccc;}"
html += "@media print{body{margin:10px;}}</style></head><body>"
html += "<h1>Route Sheet — " + str(date or "Today") + "</h1>"
for truck_name, stops in sorted(trucks.items()):
html += '<div class="truck-header">🚛 ' + truck_name + "" + str(len(stops)) + " stops</div>"
html += "<table><tr><th>#</th><th>Customer</th><th>Address</th><th>Contact</th>"
html += "<th>Items</th><th>Weight</th><th>Data</th><th>RED/R2</th><th>AoR</th><th>CoD</th><th>Notes</th></tr>"
for i, s in enumerate(stops, 1):
addr = str(s.get("address_line", "")) + ", " + str(s.get("city", "")) + ", " + str(s.get("state", "")) + " " + str(s.get("zip_code", ""))
html += "<tr><td>" + str(i) + "</td><td>" + str(s.get("company_name", "")) + "</td>"
html += "<td>" + addr + "</td><td>" + str(s.get("contact_name", "")) + "<br>" + str(s.get("contact_phone", "")) + "</td>"
html += "<td>" + str(s.get("estimated_items", "")) + "</td><td>" + str(s.get("estimated_weight", "")) + "</td>"
html += "<td>" + str(s.get("data_status", "")) + "</td><td>" + str(s.get("red_r2", "")) + "</td>"
html += "<td>" + ("" if s.get("needs_aor") else "") + "</td>"
html += "<td>" + ("" if s.get("needs_cod") else "") + "</td>"
html += "<td>" + str(s.get("notes", "")) + "</td></tr>"
html += "</table>"
if unassigned:
html += "<h2>Unassigned</h2><table><tr><th>Customer</th><th>Address</th><th>Notes</th></tr>"
for s in unassigned:
html += "<tr><td>" + str(s.get("company_name", "")) + "</td>"
html += "<td>" + str(s.get("address_line", "")) + "</td>"
html += "<td>" + str(s.get("notes", "")) + "</td></tr>"
html += "</table>"
html += "</body></html>"
frappe.local.response["type"] = "html"
frappe.local.response["page_content"] = html
@frappe.whitelist()
def print_green_sheet(date=None):
"""Generate Green Sheet printout for each pallet.
Shows customer info, service level banner, driver instructions, RED LINE instructions."""
if not date:
date = today()
# Get completed loads for this date
loads = frappe.get_list("Load",
filters={"incoming_date": date},
fields=["name", "load_number", "customer", "customer_name",
"incoming_date", "total_weight", "data_status", "red_r2"],
order_by="name asc",
limit_page_length=200,
)
html = "<!DOCTYPE html><html><head><title>Green Sheets</title>"
html += "<style>body{font-family:Arial,sans-serif;margin:30px;}"
html += ".green-sheet{border:2px solid #2E7D32;border-radius:6px;padding:16px;margin:20px 0;page-break-after:always;}"
html += ".gs-title{color:#2E7D32;font-size:18px;font-weight:700;border-bottom:2px solid #2E7D32;padding-bottom:4px;}"
html += ".gs-customer{background:#E8F5E9;border:1px solid #66BB6A;border-radius:4px;padding:8px;margin:8px 0;}"
html += ".gs-service-banner{background:#C62828;color:#fff;font-size:16px;font-weight:700;text-align:center;padding:8px;border-radius:4px;margin:8px 0;}"
html += ".gs-driver{background:#F5F5F5;border:1px solid #999;border-radius:4px;padding:8px;margin:8px 0;}"
html += ".gs-redline{background:#FFCDD2;border:2px solid #C62828;border-radius:4px;padding:8px;margin:8px 0;}"
html += ".gs-r2warning{background:#FFF9C4;border:1px solid #F9A825;border-radius:4px;padding:8px;margin:8px 0;font-size:12px;}"
html += "table{border-collapse:collapse;width:100%;margin:8px 0;}"
html += "th,td{border:1px solid #999;padding:6px;text-align:left;font-size:12px;}"
html += "th{background:#2E7D32;color:#fff;}"
html += ".gs-footer{margin-top:12px;font-size:11px;color:#666;border-top:1px solid #ccc;padding-top:8px;}"
html += "@media print{body{margin:10px;}.green-sheet{page-break-after:always;}}</style></head><body>"
for load in loads:
pallets = frappe.get_list("Pallet",
filters={"load": load.name},
fields=["name", "pallet_number", "inbound_weight", "total_items",
"data_status", "red_r2", "description", "needs_aor", "needs_cod", "notes", "status"],
limit_page_length=50,
)
for pallet in pallets:
html += '<div class="green-sheet">'
html += '<div class="gs-title">🟢 GREEN SHEET — Data-Bearing Equipment Tracking</div>'
html += '<div style="text-align:right;font-size:12px;">Pallet # ' + str(pallet.get("pallet_number", "")) + ' | Load # ' + str(load.get("name", "")) + ' | ' + str(load.get("incoming_date", "")) + '</div>'
# Customer block
html += '<div class="gs-customer">'
service_level = ""
if pallet.get("red_r2"):
service_level = pallet.get("red_r2", "")
html += '<strong>(' + str(load.get("customer", "")) + ') — ' + str(service_level) + '' + str(load.get("customer_name", "")) + '</strong>'
html += '</div>'
# Service level banner (only for RED/NIST)
rr = pallet.get("red_r2", "")
if rr and rr != "Neither":
html += '<div class="gs-service-banner">SERVICE LEVEL: ' + str(rr).upper() + '</div>'
# Driver instructions (notes)
if pallet.get("notes"):
html += '<div class="gs-driver"><strong>Driver Instructions:</strong><br>' + str(pallet.get("notes", "")) + '</div>'
# RED LINE instructions (for RED/NIST)
if rr and rr not in ("", "Neither", "R2"):
html += '<div class="gs-redline"><strong>⚠ RED LINE INSTRUCTIONS</strong><br>All data-bearing equipment must be tracked. Destruction method per customer specification.</div>'
# Pallet details table
html += '<table><tr><th>Pallet Designation</th><th>Data Status</th></tr>'
html += '<tr><td>' + str(pallet.get("status", "Received")) + '</td><td>' + str(pallet.get("data_status", "")) + '</td></tr>'
html += '<tr><th>Inbound Weight</th><th>Total Items</th></tr>'
html += '<tr><td>' + str(pallet.get("inbound_weight", "")) + '</td><td>' + str(pallet.get("total_items", "")) + '</td></tr>'
html += '<tr><th>AoR/CoR</th><th>Contents</th></tr>'
aor_cor = ""
if pallet.get("needs_aor"): aor_cor += "✓ AoR "
if pallet.get("needs_cod"): aor_cor += "✓ CoD"
html += '<tr><td>' + (aor_cor or "None") + '</td><td>' + str(pallet.get("description", "")) + '</td></tr>'
html += '</table>'
# Material tracking (hand-write on paper)
html += '<table><tr><th>Material</th><th>%</th><th>Weight</th><th>Sign Off</th><th>Date</th></tr>'
for _ in range(4):
html += '<tr><td>&nbsp;</td><td></td><td></td><td></td><td></td></tr>'
html += '</table>'
# R2 warning
html += '<div class="gs-r2warning">⚠ R2 REQUIREMENT: This pallet contains data-bearing equipment. All devices must be tracked through erasure with 5% verification audit.</div>'
# Signatures
html += '<table><tr><th>Received By</th><th>Inspected By</th><th>Verified By</th></tr>'
html += '<tr><td>&nbsp;</td><td>&nbsp;</td><td>&nbsp;</td></tr></table>'
# Footer
html += '<div class="gs-footer">Westech Electronics • Green Sheet — Pallet # ' + str(pallet.get("pallet_number", "")) + ' • Printed ' + str(frappe.utils.nowdate()) + ' • KEEP WITH PALLET AT ALL TIMES</div>'
html += '</div>'
html += "</body></html>"
frappe.local.response["type"] = "html"
frappe.local.response["page_content"] = html
@frappe.whitelist()
def print_labels(date=None):
"""Generate printable labels."""
if not date:
date = today()
filters = {"pickup_date": date} if date else {}
pickups = frappe.get_list("Scheduled Pickup",
filters=filters,
fields=["name", "company_name", "pickup_date", "num_labels", "data_status", "red_r2"],
limit_page_length=200,
)
html = "<!DOCTYPE html><html><head><title>Labels</title>"
html += "<style>body{font-family:Arial,sans-serif;margin:10px;}"
html += ".label{border:2px solid #000;width:3in;height:1.5in;display:inline-block;margin:4px;padding:6px;font-size:11px;page-break-inside:avoid;}"
html += ".label-co{font-weight:700;font-size:14px;}.label-date{font-size:10px;color:#666;}"
html += ".label-status{font-size:10px;margin-top:2px;}"
html += "@media print{body{margin:0;}}</style></head><body>"
for p in pickups:
n = p.get("num_labels") or 1
for _ in range(n):
html += '<div class="label">'
html += '<div class="label-co">' + str(p.get("company_name", "")) + '</div>'
html += '<div class="label-date">' + str(p.get("pickup_date", "")) + '</div>'
html += '<div class="label-status">' + str(p.get("data_status", "")) + " | " + str(p.get("red_r2", "")) + '</div>'
html += '</div>'
html += "</body></html>"
frappe.local.response["type"] = "html"
frappe.local.response["page_content"] = html
+92
View File
@@ -0,0 +1,92 @@
import frappe
from frappe import _
@frappe.whitelist()
def quick_sell(serial_no, customer=None, payment_method='Cash'):
"""Create Sales Invoice for quick cash sale."""
try:
serial = frappe.get_doc('Serial No', serial_no)
if serial.r2_status != 'Ready for Sale':
return {'error': 'Device must be Ready for Sale'}
# Use default customer if none provided
if not customer:
customer = frappe.db.get_value('Customer', {}, 'name', order_by='creation asc')
if not customer:
# Create walk-in customer
customer = frappe.get_doc({
'doctype': 'Customer',
'customer_name': 'Walk-in Customer',
'customer_type': 'Individual'
}).insert(ignore_permissions=True).name
# Create Sales Invoice
price = serial.assigned_price or serial.suggested_price or 0
invoice = frappe.get_doc({
'doctype': 'Sales Invoice',
'customer': customer,
'serial_no': serial_no,
'device_condition': f"Cosmetic: {serial.cosmetic_grade or 'N/A'}",
'posting_date': frappe.utils.today(),
'due_date': frappe.utils.today(),
'items': [{
'item_code': serial.item_code,
'qty': 1,
'rate': price,
'amount': price,
'serial_no': serial_no
}],
'payments': [{
'mode_of_payment': payment_method,
'amount': price
}]
})
invoice.insert(ignore_permissions=True)
invoice.submit()
# Update Serial No
serial.r2_status = 'Sold'
serial.status = 'Delivered'
serial.customer = customer
serial.save(ignore_permissions=True)
return {
'success': True,
'invoice': invoice.name,
'customer': customer,
'amount': price
}
except Exception as e:
frappe.log_error(f"Quick sell failed: {str(e)}", "Sales")
return {'error': str(e)}
@frappe.whitelist()
def create_sales_order(quotation_name):
"""Create Sales Order from Quotation."""
try:
from erpnext.selling.doctype.quotation.quotation import make_sales_order
so = make_sales_order(quotation_name)
so.insert(ignore_permissions=True)
so.submit()
return {'success': True, 'sales_order': so.name}
except Exception as e:
return {'error': str(e)}
@frappe.whitelist()
def create_delivery_note(sales_order_name):
"""Create Delivery Note from Sales Order."""
try:
from erpnext.selling.doctype.sales_order.sales_order import make_delivery_note
dn = make_delivery_note(sales_order_name)
dn.insert(ignore_permissions=True)
dn.submit()
return {'success': True, 'delivery_note': dn.name}
except Exception as e:
return {'error': str(e)}
+244
View File
@@ -0,0 +1,244 @@
import frappe
import re
from frappe.utils import now, now_datetime
def get_pricing_config():
if not frappe.db.exists("Pricing Score Config", "Pricing Score Config"):
return None
return frappe.get_doc("Pricing Score Config", "Pricing Score Config")
def parse_ram(ram_str):
if not ram_str:
return 0
m = re.search(r'(\d+(?:\.\d+)?)', str(ram_str))
if m:
return int(float(m.group(1)))
return 0
def parse_cpu(cpu_str):
if not cpu_str:
return {"brand": "", "series": "", "gen": 0}
cpu = str(cpu_str).lower()
result = {"brand": "", "series": "", "gen": 0}
if "intel" in cpu or "core" in cpu or "i3" in cpu or "i5" in cpu or "i7" in cpu or "i9" in cpu or "xeon" in cpu:
result["brand"] = "intel"
elif "ryzen" in cpu or "athlon" in cpu or "amd" in cpu:
result["brand"] = "amd"
elif "apple" in cpu or "m1" in cpu or "m2" in cpu or "m3" in cpu:
result["brand"] = "apple"
series_match = re.search(r'(i[3579])', cpu)
if series_match:
result["series"] = series_match.group(1)
elif "xeon" in cpu:
result["series"] = "xeon"
elif "ryzen" in cpu:
ryz_match = re.search(r'ryzen\s+(\d+)', cpu)
if ryz_match:
result["series"] = f"r{ryz_match.group(1)}"
elif "m1" in cpu:
result["series"] = "m1"
elif "m2" in cpu:
result["series"] = "m2"
elif "m3" in cpu:
result["series"] = "m3"
gen_match = re.search(r'(?:i[3579]-|core\s+i[3579]\s+)(\d)', cpu)
if gen_match:
first_digit = int(gen_match.group(1))
if first_digit == 1:
gen_match2 = re.search(r'(?:i[3579]-|core\s+i[3579]\s+)1(\d)', cpu)
if gen_match2:
result["gen"] = 10 + int(gen_match2.group(1))
else:
result["gen"] = 1
else:
result["gen"] = first_digit
gen_match3 = re.search(r'(?:i[3579]-)(\d{4})', cpu)
if gen_match3:
model = gen_match3.group(1)
first_digit = int(model[0])
if first_digit == 1:
result["gen"] = 10 + int(model[1])
else:
result["gen"] = first_digit
return result
@frappe.whitelist()
def calculate_serial_score(serial_no):
serial = frappe.get_doc("Serial No", serial_no)
config = get_pricing_config()
if not config:
return {"error": "No Pricing Score Config found"}
score = 0.0
details = []
cos_grade = serial.get("cosmetic_grade", "")
if cos_grade:
grade_match = re.search(r'[Cc](\d+)', str(cos_grade))
if grade_match:
g = int(grade_match.group(1))
if g <= 2:
return {"status": "scrap", "reason": f"Cosmetic C{g} - below minimum"}
base = getattr(config, f"c{g}_base", 0)
score += base
details.append(f"Cosmetic C{g} = +{base}")
cpu_info = parse_cpu(serial.get("processor", ""))
if cpu_info["series"] in ("i7", "r7"):
score += config.i7_bonus
details.append(f"CPU {cpu_info['series'].upper()} = +{config.i7_bonus}")
elif cpu_info["series"] in ("i9", "r9"):
score += config.i9_bonus
details.append(f"CPU {cpu_info['series'].upper()} = +{config.i9_bonus}")
gen = cpu_info["gen"]
gen_bonus = 0
if gen == 10: gen_bonus = config.gen_10_bonus
elif gen == 11: gen_bonus = config.gen_11_bonus
elif gen == 12: gen_bonus = config.gen_12_bonus
elif gen == 13: gen_bonus = config.gen_13_bonus
elif gen == 14: gen_bonus = config.gen_14_bonus
if gen_bonus > 0:
score += gen_bonus
details.append(f"Gen {gen} = +{gen_bonus}")
ram_gb = parse_ram(serial.get("ram", ""))
if ram_gb >= 16:
score += config.ram_16_bonus
details.append(f"RAM {ram_gb}GB = +{config.ram_16_bonus}")
if ram_gb >= 32:
score += config.ram_32_bonus
details.append(f"RAM {ram_gb}GB = +{config.ram_32_bonus}")
if ram_gb > 32:
details.append(f"WARNING: {ram_gb}GB exceeds laptop/desktop norm")
if score >= config.medium_threshold:
tier = "High"
elif score >= config.low_threshold:
tier = "Medium"
else:
tier = "Low"
market = {"low": 0, "median": 0, "high": 0}
age_days = 0
age_status = "unknown"
if serial.item_code:
item = frappe.get_doc("Item", serial.item_code)
market = {
"low": item.market_low or item.base_market_price or 0,
"median": item.market_median or item.base_market_price or 0,
"high": item.market_high or item.base_market_price or 0,
}
if item.market_last_priced:
age_days = (now_datetime() - item.market_last_priced).days
if age_days <= 30: age_status = "current"
elif age_days <= 60: age_status = "stale"
elif age_days <= 90: age_status = "aging"
else: age_status = "expired"
suggested_price = market["high"] if tier == "High" else market["median"] if tier == "Medium" else market["low"]
serial.desirability_score = round(score, 1)
serial.suggested_tier = tier
serial.suggested_price = suggested_price
serial.save()
frappe.db.commit()
return {
"status": "ok",
"serial_no": serial_no,
"score": round(score, 1),
"tier": tier,
"details": details,
"market_prices": market,
"suggested_price": suggested_price,
"age_days": age_days,
"age_status": age_status,
}
@frappe.whitelist()
def batch_calculate_scores(batch_size=100):
serials = frappe.get_all("Serial No",
filters={"cosmetic_grade": ["is", "set"]},
fields=["name"],
limit=int(batch_size)
)
results = {"updated": 0, "scrap": 0, "errors": 0}
for s in serials:
try:
result = calculate_serial_score(s.name)
if result.get("status") == "scrap":
results["scrap"] += 1
else:
results["updated"] += 1
except Exception as e:
results["errors"] += 1
frappe.log_error(f"Score calc error for {s.name}: {e}")
return results
@frappe.whitelist()
def get_sales_pricing_data(limit=50):
"""Get pricing data for Sales Manager page."""
config = get_pricing_config()
if not config:
return {"error": "No Pricing Score Config"}
serials = frappe.get_all("Serial No",
filters={"cosmetic_grade": ["is", "set"]},
fields=["name", "serial_no", "item_code", "item_name", "cosmetic_grade", "processor", "ram", "desirability_score", "suggested_tier", "suggested_price", "assigned_price", "pricing_status"],
limit=int(limit),
order_by="modified desc"
)
results = []
for s in serials:
age = {"days": 0, "status": "unknown", "color": "gray"}
market = {"low": 0, "median": 0, "high": 0}
if s.item_code:
item = frappe.get_doc("Item", s.item_code)
market = {
"low": item.market_low or 0,
"median": item.market_median or 0,
"high": item.market_high or 0,
}
if item.market_last_priced:
age_days = (now_datetime() - item.market_last_priced).days
age["days"] = age_days
if age_days <= 30:
age["status"] = "current"
age["color"] = "green"
elif age_days <= 60:
age["status"] = "stale"
age["color"] = "yellow"
elif age_days <= 90:
age["status"] = "aging"
age["color"] = "orange"
else:
age["status"] = "expired"
age["color"] = "red"
results.append({
"serial_no": s.name,
"item_code": s.item_code,
"item_name": s.item_name,
"cosmetic_grade": s.cosmetic_grade,
"processor": s.processor,
"ram": s.ram,
"score": s.desirability_score,
"tier": s.suggested_tier,
"market": market,
"suggested_price": s.suggested_price,
"assigned_price": s.assigned_price,
"pricing_status": s.pricing_status,
"age": age,
})
return {
"serials": results,
"config": {
"low_threshold": config.low_threshold,
"medium_threshold": config.medium_threshold,
}
}
@@ -0,0 +1,37 @@
import frappe
from frappe import _
def validate_hardware_tests(doc, method):
"""Before save: if CPU or RAM test failed, route to Dismantle."""
# Check if this is a device-type serial (has item_code)
if not doc.item_code:
return
# Check CPU and RAM test results
cpu_fail = doc.get("cpu_test") == "Fail"
ram_fail = doc.get("ram_test") == "Fail"
if cpu_fail or ram_fail:
# Set grade to Flagged
doc.grade = "Flagged"
doc.pricing_status = "Dismantle"
doc.assigned_price = None
# Route to Dismantle warehouse if it exists
dismantle_wh = frappe.db.exists("Warehouse", "Dismantle - WR")
if dismantle_wh:
doc.warehouse = "Dismantle - WR"
# Log the failure reason
reasons = []
if cpu_fail:
reasons.append("CPU test failed")
if ram_fail:
reasons.append("RAM test failed")
frappe.msgprint(
_("Hardware failure detected: {0}. Device routed to Dismantle.").format(", ".join(reasons)),
indicator="red",
alert=True
)
@@ -0,0 +1,191 @@
import frappe
from frappe import _
# Mapping of material types to service categories
HDD_MATERIALS = {
"Loose Hard Drive", "External Hard Drive", "Printers/Copiers Hard Drives",
"Loose SSD or mSATA Drive", "Server", "Desktop", "Laptop", "Chromebook / Notebook",
"All In One", "HPStream", "Thin Clients", "Tablet", "Cell Phone / Smart Phone",
"Gaming Systems", "Smart TV", "POS", "POS Terminals", "DVR", "Switch",
"Network / Modems / Routers", "Office/ IP Phone", "Personal Electronics / PDA",
"CRT TV", "Printers/Copiers", "USB Drive", "SD Cards", "GPS"
}
TAPE_MATERIALS = {"CD / Floppy / DVD / Tapes"}
def _get_service_item(destruction_method, has_hardware=True):
"""Map destruction method to service item code."""
if destruction_method == "Wipe":
if has_hardware:
return "SVR-HDD-WIPE-1PASS" # Default to 1-pass; user can override
return "SVR-HDD-WIPE-3PASS-NOHW"
elif destruction_method in ("Shred", "Degauss"):
if has_hardware:
return "SVR-HDD-SERIAL-WIPE-HW"
return "SVR-HDD-SERIAL-WIPE-NOHW"
elif destruction_method == "None":
return None
return "SVR-HDD-WIPE-1PASS"
def _get_tape_item(destruction_method, has_hardware=True):
if has_hardware:
return "SVR-TAPE-SHRED"
return "SVR-TAPE-SHRED-NOHW"
def _get_onsite_item(has_hardware=True):
return "SVR-HDD-ONSITE-HW" if has_hardware else "SVR-HDD-ONSITE-NOHW"
def _calculate_tier_price(item_code, qty):
"""Return unit rate for given qty based on tier pricing."""
tiers = {
"SVR-HDD-WIPE-1PASS": [(1,10,7.00), (11,30,6.00), (31,50,4.50), (51,99,3.50), (100,999999,3.00)],
"SVR-HDD-WIPE-3PASS-HW": [(1,10,8.50), (11,30,7.00), (31,50,5.25), (51,99,4.25), (100,999999,3.50)],
"SVR-HDD-WIPE-3PASS-NOHW": [(1,10,14.00), (11,30,11.50), (31,50,8.40), (51,99,7.25), (100,999999,6.00)],
"SVR-HDD-SERIAL-WIPE-HW": [(1,10,9.00), (11,30,7.50), (31,50,6.00), (51,99,5.00), (100,999999,4.25)],
"SVR-HDD-SERIAL-WIPE-NOHW": [(1,10,14.50), (11,30,12.00), (31,50,10.00), (51,99,8.00), (100,999999,6.50)],
"SVR-HDD-ONSITE-HW": [(1,100,500.00), (101,999999,3.50)],
"SVR-HDD-ONSITE-NOHW": [(1,100,850.00), (101,999999,6.00)],
"SVR-TAPE-SHRED": [(1,10,4.00), (11,30,3.30), (31,50,2.70), (51,99,2.00), (100,999999,1.50)],
"SVR-TAPE-SHRED-NOHW": [(1,10,6.65), (11,30,5.50), (31,50,4.50), (51,99,3.35), (100,999999,2.50)],
"SVR-VIDEO-RECORD": [(1,999999,3.50)],
}
for item, tlist in tiers.items():
if item == item_code:
for min_qty, max_qty, rate in tlist:
if min_qty <= qty <= max_qty:
return rate
# Fallback to Item Price
rate = frappe.db.get_value("Item Price", {"item_code": item_code, "price_list": "2025 Service Rates", "selling": 1}, "price_list_rate")
return rate or 0
@frappe.whitelist()
def generate_service_invoice(load_name):
"""Generate a Sales Invoice from a Load document."""
load = frappe.get_doc("Load", load_name)
if load.invoice_generated:
frappe.throw(_("Invoice already generated for this load."))
if not load.customer:
frappe.throw(_("Load must have a Customer linked."))
# Gather quantities per service item
service_qty = {}
hdd_count = 0
tape_count = 0
total_items = 0
for item in load.material_items or []:
mt = item.material_type or ""
qty = item.total_count or 0
if qty <= 0:
continue
total_items += qty
if mt in HDD_MATERIALS:
hdd_count += qty
elif mt in TAPE_MATERIALS:
tape_count += qty
# Determine if on-site
is_onsite = (load.service_type or "") == "On-site"
destruction = load.destruction_method or "Wipe"
# For simplicity, assume "has hardware" = True unless explicitly set otherwise.
# TODO: Add custom field `has_hardware` on Load if needed.
has_hardware = True
invoice_items = []
if is_onsite and hdd_count > 0:
onsite_item = _get_onsite_item(has_hardware)
base_rate = _calculate_tier_price(onsite_item, hdd_count)
# Onsite: base fee + per-drive for extras above 100
if hdd_count <= 100:
invoice_items.append({
"item_code": onsite_item,
"qty": 1,
"rate": base_rate,
"description": f"On-site shredding for {hdd_count} drives"
})
else:
# One base fee + per-drive extras
invoice_items.append({
"item_code": onsite_item,
"qty": 1,
"rate": base_rate,
"description": f"On-site base fee (1-100 drives)"
})
extra = hdd_count - 100
per_drive_rate = _calculate_tier_price(onsite_item, hdd_count)
invoice_items.append({
"item_code": onsite_item,
"qty": extra,
"rate": per_drive_rate,
"description": f"Additional on-site drives ({extra})"
})
else:
# Standard pickup/mail-in pricing
if hdd_count > 0:
hdd_item = _get_service_item(destruction, has_hardware)
if hdd_item:
rate = _calculate_tier_price(hdd_item, hdd_count)
invoice_items.append({
"item_code": hdd_item,
"qty": hdd_count,
"rate": rate,
"description": f"{hdd_item} for {hdd_count} drives"
})
if tape_count > 0:
tape_item = _get_tape_item(destruction, has_hardware)
rate = _calculate_tier_price(tape_item, tape_count)
invoice_items.append({
"item_code": tape_item,
"qty": tape_count,
"rate": rate,
"description": f"{tape_item} for {tape_count} tapes"
})
# Video recording surcharge
if load.video_recording and total_items > 0:
vid_rate = _calculate_tier_price("SVR-VIDEO-RECORD", total_items)
invoice_items.append({
"item_code": "SVR-VIDEO-RECORD",
"qty": total_items,
"rate": vid_rate,
"description": f"Video recording surcharge for {total_items} items"
})
if not invoice_items:
frappe.throw(_("No billable items found in this load."))
# Create Sales Invoice
si = frappe.new_doc("Sales Invoice")
si.customer = load.customer
si.posting_date = frappe.utils.today()
si.due_date = frappe.utils.today()
si.price_list = "2025 Service Rates"
si.selling_price_list = "2025 Service Rates"
si.currency = "USD"
si.set_warehouse = None
si.update_stock = 0
for it in invoice_items:
si.append("items", {
"item_code": it["item_code"],
"qty": it["qty"],
"rate": it["rate"],
"description": it.get("description", ""),
"uom": "Unit"
})
si.save()
# Do NOT submit automatically; let user review
# Update Load
load.invoice_generated = 1
load.sales_invoice = si.name
load.save()
frappe.db.commit()
return {"status": "ok", "sales_invoice": si.name}