diff --git a/.gitignore b/.gitignore index cbc0c0d..af60cb8 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ -scbank_data.jsonl \ No newline at end of file +scbank_data.jsonl +data/ +output/ +.trae/ \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3a10f5d --- /dev/null +++ b/main.py @@ -0,0 +1,28 @@ +import sys +from scbank_collector import SCBankCollector +from scbank_processor import SCBankProcessor + +def main(): + while True: + print("\n=== 四川银行权益商城自动化工具 ===") + print("1. 启动采集 (Collector) -> 浏览器抓取") + print("2. 执行同步 (Processor) -> 内部系统上传") + print("3. 退出") + choice = input("请输入选项 [1-3]: ").strip() + + if choice == "1": + print("\n[系统] 正在启动采集器...") + collector = SCBankCollector() + collector.run() + elif choice == "2": + print("\n[系统] 正在启动处理器...") + processor = SCBankProcessor() + processor.run() + elif choice == "3": + print("\n[系统] 退出程序。") + sys.exit(0) + else: + print("[错误] 无效选项,请重新输入。") + +if __name__ == "__main__": + main() diff --git a/scbank_analyze.py b/scbank_analyze.py index f60bc1f..b9289e2 100644 --- a/scbank_analyze.py +++ b/scbank_analyze.py @@ -31,8 +31,8 @@ def run(): # 等待登录成功 try: - # 等待 URL 包含 orderManagement - page.wait_for_url("**/orderManagement/**", timeout=0) + # 进入首页视为登录成功 + page.wait_for_url("**/homePage", timeout=0) print("[状态] 检测到登录成功!") # 等待表格数据加载 (关键步骤) diff --git a/scbank_collector.py b/scbank_collector.py new file mode 100644 index 0000000..66c919b --- /dev/null +++ b/scbank_collector.py @@ -0,0 +1,231 @@ +import json +import time +import random +import os +from datetime import datetime +from playwright.sync_api import sync_playwright + +class SCBankCollector: + def __init__(self): + self.target_url = "https://jf.scbank.cn:8085/#/orderManagement/deliveryOrders" + # 动态生成文件名: data/raw_YYYYMMDD_HHMMSS.jsonl + self.ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") + self.output_file = f"data/raw_{self.ts_str}.jsonl" + self.browser = None + self.page = None + + def log(self, msg): + print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") + + def save_data(self, data_type, payload): + """ + 实时追加写入数据 + data_type: 'list' | 'detail' + """ + record = { + "ts": int(time.time()), + "type": data_type, + "payload": payload + } + try: + with open(self.output_file, "a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + # self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})") + except Exception as e: + self.log(f"[ERROR] 写入文件失败: {e}") + + def start_browser(self): + self.log("启动浏览器...") + p = sync_playwright().start() + self.browser = p.chromium.launch( + headless=False, + args=["--disable-blink-features=AutomationControlled"] + ) + context = self.browser.new_context( + viewport={'width': 1920, 'height': 1080}, + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + ) + self.page = context.new_page() + + # 注入 Hook + self.page.add_init_script(""" + const originalParse = JSON.parse; + JSON.parse = function(text, reviver) { + const data = originalParse(text, reviver); + try { + if (data && typeof data === 'object') { + const str = JSON.stringify(data); + + // 1. 过滤噪音 (IsQX) + if (str.includes('IsQX') || (data.body && data.body.IsQX)) { + return data; + } + + // 2. 识别列表 (rows 或 list) + const isList = (data.rows && Array.isArray(data.rows) && data.rows.length > 0) || + (data.list && Array.isArray(data.list) && data.list.length > 0); + + // 3. 识别详情 (orderNo + 长度校验) + const isDetail = str.includes('orderNo') && str.length > 300; + + if (isList) { + console.log('__INTERCEPTED_LIST__:' + str); + } else if (isDetail) { + console.log('__INTERCEPTED_DETAIL__:' + str); + } + } + } catch (e) {} + return data; + } + """) + + # 监听 Console + self.page.on("console", self._handle_console) + + def _handle_console(self, msg): + text = msg.text + if text.startswith("__INTERCEPTED_LIST__:"): + try: + json_str = text.replace("__INTERCEPTED_LIST__:", "") + data = json.loads(json_str) + self.save_data("list", data) + self.log("捕获 [列表] 数据包") + except: pass + elif text.startswith("__INTERCEPTED_DETAIL__:"): + try: + json_str = text.replace("__INTERCEPTED_DETAIL__:", "") + data = json.loads(json_str) + self.save_data("detail", data) + self.log("捕获 [详情] 数据包") + except: pass + + def run(self): + try: + self.start_browser() + + # 1. 登录 + self.log(f"正在打开页面: {self.target_url}") + try: + self.page.goto(self.target_url) + except: pass + + self.log(">>> 请在浏览器中完成登录操作 <<<") + # 等待 URL 包含 homePage (用户指定) + try: + self.page.wait_for_url("**/homePage**", timeout=0) + self.log("检测到登录成功!") + time.sleep(2) + except Exception as e: + self.log(f"登录等待超时或失败: {e}") + return + + # 2. 强制跳转目标页 + if "deliveryOrders" not in self.page.url: + self.log(f"跳转至订单管理页面: {self.target_url}") + self.page.goto(self.target_url) + self.page.wait_for_load_state("domcontentloaded") + time.sleep(3) + + # 3. 筛选状态 + self._filter_status() + + # 4. 循环采集 + page_num = 1 + while True: + self.log(f"正在处理第 {page_num} 页...") + + # 点击详情 + self._process_details() + + # 翻页 + if not self._next_page(): + break + + page_num += 1 + time.sleep(3) # 等待列表加载 + + self.log(f"采集任务完成。数据已保存至: {self.output_file}") + + except Exception as e: + self.log(f"[FATAL] 脚本异常: {e}") + finally: + if self.browser: + self.browser.close() + + def _filter_status(self): + self.log("筛选: 订单类型 -> 待发货") + try: + # 定位 Tab + tab_selector = ".el-tabs__item:has-text('待发货')" + tab = self.page.locator(tab_selector) + + if tab.count() > 0: + if "is-active" not in tab.get_attribute("class"): + tab.click() + self.log(" -> 点击“待发货”Tab") + time.sleep(1) + else: + self.log(" -> 已处于“待发货”Tab") + + # 点击查询 + query_btn = self.page.locator("button.el-button--primary:has-text('查询')") + if query_btn.count() > 0: + query_btn.click() + self.log(" -> 点击“查询”刷新列表") + time.sleep(3) + else: + self.log(" [WARN] 未找到“查询”按钮") + else: + self.log(" [WARN] 未找到“待发货”Tab") + except Exception as e: + self.log(f"筛选操作失败: {e}") + + def _process_details(self): + """点击当前页所有详情""" + try: + # 必须等待行出现 + self.page.wait_for_selector(".el-table__row", timeout=5000) + except: + self.log("当前页无数据或加载超时") + return + + detail_selector = "button.el-button--text:has-text('详情')" + buttons = self.page.locator(detail_selector).all() + visible_buttons = [btn for btn in buttons if btn.is_visible()] + + self.log(f"发现 {len(visible_buttons)} 个详情按钮") + + for i, btn in enumerate(visible_buttons): + try: + btn.click() + # 随机等待 Hook 捕获 + time.sleep(random.uniform(1.0, 2.0)) + # 关闭弹窗 + self.page.keyboard.press("Escape") + time.sleep(0.5) + except Exception as e: + self.log(f" 点击详情失败: {e}") + + def _next_page(self): + """翻页逻辑,返回是否成功翻页""" + next_btn = self.page.locator(".btn-next") + + if next_btn.count() == 0: + self.log("未找到翻页按钮") + return False + + if next_btn.is_disabled(): + self.log("翻页按钮已禁用,到达最后一页") + return False + + try: + next_btn.click() + self.log("翻页成功") + return True + except Exception as e: + self.log(f"翻页点击失败: {e}") + return False + +if __name__ == "__main__": + collector = SCBankCollector() + collector.run() diff --git a/scbank_processor.py b/scbank_processor.py new file mode 100644 index 0000000..ff1a089 --- /dev/null +++ b/scbank_processor.py @@ -0,0 +1,177 @@ +import json +import time +import os +import shutil +import random +import pandas as pd +from datetime import datetime + +class InternalApiClient: + """ + 内部接口客户端桩代码 (Mock) + """ + def __init__(self): + self.api_url = "http://internal-system.local/api" + + def get_internal_product_id(self, scbank_goods_name): + """ + Mock: 根据商城商品名查询内部商品编号 + """ + # 模拟延时 + time.sleep(0.1) + if not scbank_goods_name: return None + + # 简单模拟 + if "iPhone" in scbank_goods_name: return "P_APPLE_001" + if "华为" in scbank_goods_name: return "P_HUAWEI_002" + return "P_UNKNOWN_999" + + def upload_order(self, order_detail): + """ + Mock: 上传订单 + 返回: (Success: bool, Message: str) + """ + time.sleep(0.2) + # 模拟随机成功率 90% + if random.random() < 0.9: + return True, "上传成功" + else: + return False, "模拟网络超时" + +class SCBankProcessor: + def __init__(self): + self.data_dir = "data" + self.archive_dir = "data/archive" + self.output_dir = "output" + self.client = InternalApiClient() + + def log(self, msg): + print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") + + def run(self): + # 1. 扫描文件 + files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")] + if not files: + self.log("没有待处理的数据文件。") + return + + self.log(f"发现 {len(files)} 个待处理文件: {files}") + + for file_name in files: + file_path = os.path.join(self.data_dir, file_name) + self._process_file(file_path, file_name) + + def _process_file(self, file_path, file_name): + self.log(f"正在处理文件: {file_name}") + + # 2. 读取与清洗 + raw_records = [] + try: + with open(file_path, "r", encoding="utf-8") as f: + for line in f: + if line.strip(): + try: + raw_records.append(json.loads(line)) + except: pass + except Exception as e: + self.log(f"读取文件失败: {e}") + return + + if not raw_records: + self.log("文件内容为空,跳过") + return + + # 3. 数据结构化 + # 提取详情数据 (以此为主) + details = [] + for r in raw_records: + if r.get("type") == "detail": + payload = r.get("payload", {}) + # 根据真实数据结构: payload -> body -> orderNo + body = payload.get("body", {}) + if body and "orderNo" in str(body): + # 如果 body 是字符串,尝试解析 + if isinstance(body, str): + try: body = json.loads(body) + except: pass + + details.append(body) + + # 去重 (按 orderNo) + # 注意:这里假设 payload 是 dict,如果已经是 dict 则直接用 + # 如果是字符串则需要 json.loads + unique_details = {} + for d in details: + # 兼容 payload 可能是字符串的情况 (虽然 Collector 存的是 dict) + if isinstance(d, str): + try: d = json.loads(d) + except: continue + + order_no = d.get("orderNo") or d.get("orderId") + if order_no: + unique_details[order_no] = d + + self.log(f"解析出 {len(unique_details)} 条有效唯一订单详情") + + # 4. 业务处理与同步 + results = [] + for order_no, detail in unique_details.items(): + # 解析商品信息 + # goodsInfoList 是一个列表,这里取第一个商品作为主商品 + goods_list = detail.get("goodsInfoList", []) + goods_name = "未知商品" + if goods_list and len(goods_list) > 0: + goods_name = goods_list[0].get("spuName", "未知商品") + + # Step 1: 查内部 ID + internal_pid = self.client.get_internal_product_id(goods_name) + + # Step 2: 上传 + success, msg = self.client.upload_order(detail) + + # 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37 + order_time = detail.get("orderCreateTime") + if order_time: + try: + # 尝试解析 ISO 8601 格式 + # 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime + # 格式: 2026-03-10T14:42:28.000+0000 + # %z 只能解析 +0000 这种无冒号的时区 + dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z") + order_time = dt.strftime("%Y-%m-%d %H:%M:%S") + except ValueError: + try: + # 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同 + dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S") + order_time = dt.strftime("%Y-%m-%d %H:%M:%S") + except: + pass + + results.append({ + "处理状态": "成功" if success else "失败", + "失败原因": "" if success else msg, + "商城订单号": order_no, + "商品名称": goods_name, + "内部商品ID": internal_pid, + "订单金额": detail.get("orderAmt"), + "下单时间": order_time + }) + + # 5. 导出 Excel + if results: + ts = file_name.replace("raw_", "").replace(".jsonl", "") + output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx") + df = pd.DataFrame(results) + df.to_excel(output_file, index=False) + self.log(f"结果已导出至: {output_file}") + + # 6. 归档 + try: + shutil.move(file_path, os.path.join(self.archive_dir, file_name)) + self.log(f"源文件已归档至: {self.archive_dir}") + except Exception as e: + self.log(f"归档失败: {e}") + +if __name__ == "__main__": + processor = SCBankProcessor() + processor.run()