import json import time import os import shutil import random import pandas as pd import requests from datetime import datetime class InternalApiClient: """ 内部接口客户端 (Real) """ def __init__(self): # 生产环境 self.api_base_url = "https://hyt.86698.cn/open" # 测试环境 (默认) # self.api_base_url = "http://120.55.12.245:8109" # 生产环境 self.app_id = "8ce4d435fa77492f84d2fafa241c2804" # 测试环境 (默认) # self.app_id = "e699e6ef74504f4d86776b3d244ce602" # 加载商品映射配置 self.product_map = self._load_product_map() def _load_product_map(self): """加载商品名称映射文件 (product_map.txt)""" mapping = {} map_file = "product_map.txt" # 不再自动生成文件,避免覆盖用户配置 if not os.path.exists(map_file): print(f"[Warn] 未找到映射配置文件: {map_file},将使用原始名称匹配") return mapping try: with open(map_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() # 跳过注释和空行 if not line or line.startswith("#"): continue # 使用 ">>>>" 作为分隔符,因为它非常罕见,不可能是商品名的一部分 sep = ">>>>" if sep in line: parts = line.split(sep, 1) key = parts[0].strip() val = parts[1].strip() if key and val: mapping[key] = val print(f"[Info] 已加载 {len(mapping)} 条商品映射规则") except Exception as e: print(f"[Warn] 加载映射文件失败: {e}") return mapping def _post(self, path, data): """发送 POST 请求""" url = f"{self.api_base_url}{path}" headers = { "AppId": self.app_id, "Content-Type": "application/json;charset=UTF-8" } try: # 不验签模式: 直接发送 data json # 真实场景需确认: 这里的 data 是作为 json body 发送,还是作为 params? # 根据文档 '非加密请求: 按业务正常的请求参数进行传输' -> 通常是 json body response = requests.post(url, json=data, headers=headers, timeout=10) return response.json() except Exception as e: print(f"[API Error] {url}: {e}") return None def get_internal_product_id(self, scbank_goods_name): """ 根据商城商品名查询内部商品编号 (Product Code) 接口: /api/v1/open/cus/goods/list 逻辑: 模糊搜索 -> 必须有且仅有1个结果 -> 返回 goods_num """ if not scbank_goods_name: return None # 0. 优先使用映射 search_name = scbank_goods_name if scbank_goods_name in self.product_map: search_name = self.product_map[scbank_goods_name] print(f"[Map] 使用映射: '{scbank_goods_name}' -> '{search_name}'") payload = { "title": search_name, "page": 1, "limit": 10 } resp = self._post("/api/v1/open/cus/goods/list", payload) # 检查响应 if resp and resp.get("code") == 200: data = resp.get("data", {}) # 兼容 data 可能是 list 或 dict (根据文档响应报文 data 包含 list 和 total) # 文档: data -> {list: [], total: int} product_list = data.get("list", []) total = data.get("total", 0) if product_list and len(product_list) == 1: return product_list[0].get("goods_num") else: print(f"[Match Fail] '{search_name}' 匹配到 {len(product_list)} 个商品") return None return None def upload_order(self, order_detail, internal_pid): """ 上传订单 接口: /api/v1/open/order/submit """ if not internal_pid: return False, "商品匹配失败: 未找到或找到多个商品" # 映射字段 # scbank detail -> internal api payload # 收件人信息 receive_info = order_detail.get("mallOrderReceiveInfo", {}) # 构建 payload payload = { "orderBasic": { "customer_order_num": order_detail.get("orderNo"), "consignee": receive_info.get("receiverName", "未知"), "consignee_mobile": receive_info.get("receiverMobile", "00000000000"), # 省市区编码 (文档必填但用户说是误写,传空或默认值) "consignee_province_code": "", "consignee_city_code": "", "consignee_area_code": "", # 详细地址 (用户要求使用完整地址) "consignee_address": receive_info.get("fullAddress", ""), # 客户备注 - 没找到,传空 "in_remark": receive_info.get("remark", ""), }, "goodsList": [ { "goods_num": internal_pid, "number": 1, # 默认为 1,需确认 scbank 数据是否有数量 "sale_price": order_detail.get("orderAmt", 0) # 暂用订单总额作为单价 } ] } # 修正数量与价格: 如果有 goodsInfoList,尝试获取更准确的数量 goods_info = order_detail.get("goodsInfoList", []) if goods_info: first_good = goods_info[0] payload["goodsList"][0]["number"] = first_good.get("count", 1) payload["goodsList"][0]["sale_price"] = first_good.get("price", 0) # 查看 payload print("=== 上传订单 Payload ===") print(json.dumps(payload, ensure_ascii=False, indent=2)) print("=== 上传订单 Payload End ===") resp = self._post("/api/v1/open/order/submit", payload) if resp and resp.get("code") == 200: return True, "上传成功" else: msg = resp.get("message") if resp else "请求失败" return False, f"API错误: {msg}" class SCBankProcessor: def __init__(self): self.data_dir = "data" self.archive_dir = "data/archive" self.output_dir = "output" # 确保目录结构存在 for d in [self.data_dir, self.archive_dir, self.output_dir]: if not os.path.exists(d): os.makedirs(d) self.client = InternalApiClient() def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") def run(self): # 这个方法是为了向后兼容旧的 JSONL 处理方式(如果需要的话),目前菜单已不再直接调用 files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")] if not files: self.log("没有待处理的JSONL数据文件。") return self.log(f"发现 {len(files)} 个待处理文件: {files}") for file_name in files: file_path = os.path.join(self.data_dir, file_name) self._process_file(file_path, file_name) def run_excel(self): """ 处理导出的 Excel 数据 """ # 查找 data 目录下的 excel 文件 files = [f for f in os.listdir(self.data_dir) if f.endswith(".xls") or f.endswith(".xlsx")] if not files: self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。请先执行步骤一!") return self.log(f"发现 {len(files)} 个待处理的 Excel 文件: {files}") for file_name in files: file_path = os.path.join(self.data_dir, file_name) self._process_excel_file(file_path, file_name) def _process_excel_file(self, file_path, file_name): self.log(f"正在处理 Excel 文件: {file_name}") try: df = pd.read_excel(file_path) except Exception as e: self.log(f"读取 Excel 文件失败: {e}") return if df.empty: self.log("Excel 文件内容为空,跳过") return unique_details = {} for _, row in df.iterrows(): order_no = str(row.get("订单号", "")) if not order_no or order_no == "nan": continue # 构造与 JSON 一致的 detail 结构 detail = { "orderNo": order_no, "orderCreateTime": str(row.get("下单时间", "")), "mallOrderReceiveInfo": { "receiverName": str(row.get("收货人名称", "")) if pd.notna(row.get("收货人名称")) else "", "receiverMobile": str(row.get("收货人电话", "")) if pd.notna(row.get("收货人电话")) else "00000000000", "fullAddress": str(row.get("收货地址", "")) if pd.notna(row.get("收货地址")) else "", "remark": str(row.get("备注", "")) if pd.notna(row.get("备注")) else "" }, "goodsInfoList": [ { "spuName": str(row.get("产品名称", "未知商品")) if pd.notna(row.get("产品名称")) else "未知商品", "count": int(row.get("数量", 1)) if pd.notna(row.get("数量")) else 1, "price": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0, } ], "orderAmt": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0, "exMerchant": "成都蓝色兄弟网络科技有限公司" } unique_details[order_no] = detail self.log(f"解析出 {len(unique_details)} 条唯一订单 (来自 Excel)") self._process_unique_details(unique_details, file_name, file_path) def _process_file(self, file_path, file_name): self.log(f"正在处理文件: {file_name}") # 2. 读取与清洗 raw_records = [] try: with open(file_path, "r", encoding="utf-8") as f: for line in f: if line.strip(): try: raw_records.append(json.loads(line)) except: pass except Exception as e: self.log(f"读取文件失败: {e}") return if not raw_records: self.log("文件内容为空,跳过") return # 3. 数据结构化 (合并列表与详情) unique_details = {} for r in raw_records: r_type = r.get("type") payload = r.get("payload", {}) body = payload.get("body", {}) # 兼容 body 可能是字符串的情况 if isinstance(body, str): try: body = json.loads(body) except: continue if not isinstance(body, dict): continue if r_type == "list": page_list = body.get("pageDataList", []) if isinstance(page_list, list): for item in page_list: order_no = item.get("orderNo") if not order_no: continue if order_no not in unique_details: unique_details[order_no] = item else: # 列表数据补全:仅当字段缺失或为空时填充 for k, v in item.items(): # 列表中的 goodsInfoList 通常为空,跳过 if k == "goodsInfoList" and not v: continue if k not in unique_details[order_no] or not unique_details[order_no][k]: unique_details[order_no][k] = v elif r_type == "detail": order_no = body.get("orderNo") if not order_no: continue if order_no not in unique_details: unique_details[order_no] = body else: # 详情数据覆盖 (高优先级) unique_details[order_no].update(body) self.log(f"解析出 {len(unique_details)} 条唯一订单 (列表+详情合并)") self._process_unique_details(unique_details, file_name, file_path) def _process_unique_details(self, unique_details, file_name, file_path): # 4. 业务处理与同步 results = [] for order_no, detail in unique_details.items(): # 解析商品信息 # goodsInfoList 是一个列表,这里取第一个商品作为主商品 goods_list = detail.get("goodsInfoList", []) goods_name = "未知商品" if goods_list and len(goods_list) > 0: goods_name = goods_list[0].get("spuName", "未知商品") # Step 1: 查内部 ID internal_pid = self.client.get_internal_product_id(goods_name) # Step 2: 上传 # 注意: 这里传入 internal_pid success, msg = self.client.upload_order(detail, internal_pid) # 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37 order_time = detail.get("orderCreateTime") if order_time: try: # 尝试解析 ISO 8601 格式 # 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime # 格式: 2026-03-10T14:42:28.000+0000 # %z 只能解析 +0000 这种无冒号的时区 dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z") order_time = dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: try: # 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同 dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S") order_time = dt.strftime("%Y-%m-%d %H:%M:%S") except: pass # 收件人信息 receive_info = detail.get("mallOrderReceiveInfo", {}) results.append({ "处理状态": "成功" if success else "失败", "失败原因": "" if success else msg, "下单时间": order_time, "订单编号": order_no, "收货人": receive_info.get("receiverName"), "收货地址": receive_info.get("fullAddress"), "下单用户手机号码": receive_info.get("receiverMobile", "00000000000"), "商户名称": detail.get("exMerchant"), "订单金额": detail.get("orderAmt"), "商品名称": goods_name, "内部商品ID": internal_pid, }) # 5. 导出 Excel if results: # 处理不同后缀和前缀的文件名,生成统一的结果文件名 base_name = os.path.splitext(file_name)[0] if base_name.startswith("raw_"): ts = base_name.replace("raw_", "") else: # 针对非 raw_ 前缀的文件(如直接下载的 shipping_order)附加时间戳 ts = f"{base_name}_{datetime.now().strftime('%H%M%S')}" output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx") df = pd.DataFrame(results) df.to_excel(output_file, index=False) self.log(f"结果已导出至: {output_file}") # 6. 归档 try: shutil.move(file_path, os.path.join(self.archive_dir, file_name)) self.log(f"源文件已归档至: {self.archive_dir}") except Exception as e: self.log(f"归档失败: {e}") if __name__ == "__main__": processor = SCBankProcessor() processor.run()