import json import time import os import shutil import random import pandas as pd from datetime import datetime class InternalApiClient: """ 内部接口客户端桩代码 (Mock) """ def __init__(self): self.api_url = "http://internal-system.local/api" def get_internal_product_id(self, scbank_goods_name): """ Mock: 根据商城商品名查询内部商品编号 """ # 模拟延时 time.sleep(0.1) if not scbank_goods_name: return None # 简单模拟 if "iPhone" in scbank_goods_name: return "P_APPLE_001" if "华为" in scbank_goods_name: return "P_HUAWEI_002" return "P_UNKNOWN_999" def upload_order(self, order_detail): """ Mock: 上传订单 返回: (Success: bool, Message: str) """ time.sleep(0.2) # 模拟随机成功率 90% if random.random() < 0.9: return True, "上传成功" else: return False, "模拟网络超时" class SCBankProcessor: def __init__(self): self.data_dir = "data" self.archive_dir = "data/archive" self.output_dir = "output" self.client = InternalApiClient() def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") def run(self): # 1. 扫描文件 files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")] if not files: self.log("没有待处理的数据文件。") return self.log(f"发现 {len(files)} 个待处理文件: {files}") for file_name in files: file_path = os.path.join(self.data_dir, file_name) self._process_file(file_path, file_name) def _process_file(self, file_path, file_name): self.log(f"正在处理文件: {file_name}") # 2. 读取与清洗 raw_records = [] try: with open(file_path, "r", encoding="utf-8") as f: for line in f: if line.strip(): try: raw_records.append(json.loads(line)) except: pass except Exception as e: self.log(f"读取文件失败: {e}") return if not raw_records: self.log("文件内容为空,跳过") return # 3. 数据结构化 # 提取详情数据 (以此为主) details = [] for r in raw_records: if r.get("type") == "detail": payload = r.get("payload", {}) # 根据真实数据结构: payload -> body -> orderNo body = payload.get("body", {}) if body and "orderNo" in str(body): # 如果 body 是字符串,尝试解析 if isinstance(body, str): try: body = json.loads(body) except: pass details.append(body) # 去重 (按 orderNo) # 注意:这里假设 payload 是 dict,如果已经是 dict 则直接用 # 如果是字符串则需要 json.loads unique_details = {} for d in details: # 兼容 payload 可能是字符串的情况 (虽然 Collector 存的是 dict) if isinstance(d, str): try: d = json.loads(d) except: continue order_no = d.get("orderNo") or d.get("orderId") if order_no: unique_details[order_no] = d self.log(f"解析出 {len(unique_details)} 条有效唯一订单详情") # 4. 业务处理与同步 results = [] for order_no, detail in unique_details.items(): # 解析商品信息 # goodsInfoList 是一个列表,这里取第一个商品作为主商品 goods_list = detail.get("goodsInfoList", []) goods_name = "未知商品" if goods_list and len(goods_list) > 0: goods_name = goods_list[0].get("spuName", "未知商品") # Step 1: 查内部 ID internal_pid = self.client.get_internal_product_id(goods_name) # Step 2: 上传 success, msg = self.client.upload_order(detail) # 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37 order_time = detail.get("orderCreateTime") if order_time: try: # 尝试解析 ISO 8601 格式 # 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime # 格式: 2026-03-10T14:42:28.000+0000 # %z 只能解析 +0000 这种无冒号的时区 dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z") order_time = dt.strftime("%Y-%m-%d %H:%M:%S") except ValueError: try: # 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同 dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S") order_time = dt.strftime("%Y-%m-%d %H:%M:%S") except: pass results.append({ "处理状态": "成功" if success else "失败", "失败原因": "" if success else msg, "下单时间": order_time, "订单编号": order_no, "下单用户手机号码": detail.get("orderMobile"), "商户名称": detail.get("exMerchant"), "订单金额": detail.get("orderAmt"), "商品名称": goods_name, "内部商品ID": internal_pid, }) # 5. 导出 Excel if results: ts = file_name.replace("raw_", "").replace(".jsonl", "") output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx") df = pd.DataFrame(results) df.to_excel(output_file, index=False) self.log(f"结果已导出至: {output_file}") # 6. 归档 try: shutil.move(file_path, os.path.join(self.archive_dir, file_name)) self.log(f"源文件已归档至: {self.archive_dir}") except Exception as e: self.log(f"归档失败: {e}") if __name__ == "__main__": processor = SCBankProcessor() processor.run()