From 00dcb45882edf67a8918553d29cea23cc58cff1b Mon Sep 17 00:00:00 2001 From: fuzhongyun <15339891972@163.com> Date: Thu, 19 Mar 2026 16:29:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=87=E6=8D=A2=E4=B8=BA=E4=B8=8B?= =?UTF-8?q?=E8=BD=BDexcel=E5=A4=84=E7=90=86=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 29 ++++---- scbank_collector.py | 171 ++++++++++++-------------------------------- scbank_processor.py | 8 +-- 3 files changed, 64 insertions(+), 144 deletions(-) diff --git a/main.py b/main.py index 549fb3d..0395b05 100644 --- a/main.py +++ b/main.py @@ -4,30 +4,29 @@ from scbank_processor import SCBankProcessor def main(): while True: - print("\n=== 四川银行权益商城自动化工具 ===") - print("1. 启动采集 (Collector) -> 浏览器抓取") - print("2. 执行同步 (Processor) -> 货易通上传 (处理JSONL)") - print("3. 执行同步 (Processor) -> 货易通上传 (处理Excel)") - print("4. 退出") - choice = input("请输入选项 [1-4]: ").strip() + print("\n" + "="*40) + print(" 🚀 四川银行权益商城自动化工具") + print("="*40) + print(" [1] 步骤一:启动浏览器抓取 (下载待发货Excel)") + print(" [2] 步骤二:执行批量发货同步 (读取Excel上传至货易通)") + print(" [3] 退出程序") + print("-" * 40) + + choice = input("👉 请输入选项 [1-3]: ").strip() if choice == "1": - print("\n[系统] 正在启动采集器...") + print("\n[系统] 正在启动采集器,准备下载 Excel...") collector = SCBankCollector() collector.run() elif choice == "2": - print("\n[系统] 正在启动处理器 (JSONL模式)...") - processor = SCBankProcessor() - processor.run() - elif choice == "3": - print("\n[系统] 正在启动处理器 (Excel模式)...") + print("\n[系统] 正在启动处理器,准备解析本地 Excel 数据并同步...") processor = SCBankProcessor() processor.run_excel() - elif choice == "4": - print("\n[系统] 退出程序。") + elif choice == "3": + print("\n[系统] 感谢使用,再见!👋") sys.exit(0) else: - print("[错误] 无效选项,请重新输入。") + print("\n[错误] 无效选项,请重新输入正确数字。") # 暂停一下,避免刷屏太快 # input("\n按回车键继续...") diff --git a/scbank_collector.py b/scbank_collector.py index 4f78b40..ec33d41 100644 --- a/scbank_collector.py +++ b/scbank_collector.py @@ -19,30 +19,12 @@ class SCBankCollector: if not os.path.exists("data"): os.makedirs("data") - self.output_file = f"data/raw_{self.ts_str}.jsonl" self.browser = None self.page = None def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") - def save_data(self, data_type, payload): - """ - 实时追加写入数据 - data_type: 'list' | 'detail' - """ - record = { - "ts": int(time.time()), - "type": data_type, - "payload": payload - } - try: - with open(self.output_file, "a", encoding="utf-8") as f: - f.write(json.dumps(record, ensure_ascii=False) + "\n") - # self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})") - except Exception as e: - self.log(f"[ERROR] 写入文件失败: {e}") - def start_browser(self): self.log("启动浏览器...") p = sync_playwright().start() @@ -78,62 +60,11 @@ class SCBankCollector: self.browser = browser context = self.browser.new_context( viewport={'width': 1920, 'height': 1080}, - user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + accept_downloads=True # 明确允许下载 ) self.page = context.new_page() - # 注入 Hook - self.page.add_init_script(""" - const originalParse = JSON.parse; - JSON.parse = function(text, reviver) { - const data = originalParse(text, reviver); - try { - if (data && typeof data === 'object') { - const str = JSON.stringify(data); - - // 1. 识别列表 (pageDataList) - const isList = data.body && - data.body.pageDataList && - Array.isArray(data.body.pageDataList); - - // 2. 识别详情 (orderNo + goodsInfoList) - // 详情页通常包含 orderNo 和 goodsInfoList,且不包含 pageDataList - const isDetail = data.body && - data.body.orderNo && - data.body.goodsInfoList && - !data.body.pageDataList; - - if (isList) { - console.log('__INTERCEPTED_LIST__:' + str); - } else if (isDetail) { - console.log('__INTERCEPTED_DETAIL__:' + str); - } - } - } catch (e) {} - return data; - } - """) - - # 监听 Console - self.page.on("console", self._handle_console) - - def _handle_console(self, msg): - text = msg.text - if text.startswith("__INTERCEPTED_LIST__:"): - try: - json_str = text.replace("__INTERCEPTED_LIST__:", "") - data = json.loads(json_str) - self.save_data("list", data) - self.log("捕获 [列表] 数据包") - except: pass - elif text.startswith("__INTERCEPTED_DETAIL__:"): - try: - json_str = text.replace("__INTERCEPTED_DETAIL__:", "") - data = json.loads(json_str) - self.save_data("detail", data) - self.log("捕获 [详情] 数据包") - except: pass - def run(self): try: self.start_browser() @@ -177,22 +108,10 @@ class SCBankCollector: # 3. 筛选状态 self._filter_status() - # 4. 循环采集 - page_num = 1 - while True: - self.log(f"正在处理第 {page_num} 页...") - - # 点击详情 - self._process_details() - - # 翻页 - if not self._next_page(): - break - - page_num += 1 - time.sleep(3) # 等待列表加载 + # 4. 执行批量导出下载 + self._download_excel() - self.log(f"采集任务完成。数据已保存至: {self.output_file}") + self.log("采集任务完成。") except Exception as e: self.log(f"[FATAL] 脚本异常: {e}") @@ -221,51 +140,55 @@ class SCBankCollector: except Exception as e: self.log(f"筛选操作失败: {e}") - def _process_details(self): - """点击当前页所有详情""" + def _download_excel(self): + """执行批量导出操作""" try: - # 必须等待行出现 - self.page.wait_for_selector(".el-table__row", timeout=0) - except: - self.log("当前页无数据或加载超时") - return + self.log("准备触发批量发货...") + + # 1. 点击批量发货按钮 + # 通过包含的文本或者 class 寻找按钮 + batch_ship_btn = self.page.locator("button:has-text('批量发货')") + if batch_ship_btn.count() > 0: + batch_ship_btn.first.click() + self.log("已点击 '批量发货' 按钮,等待弹窗加载...") + time.sleep(2) # 等待弹窗和里面的按钮渲染 + else: + self.log("[WARN] 未找到 '批量发货' 按钮") + return - detail_selector = "button.el-button--text:has-text('详情')" - buttons = self.page.locator(detail_selector).all() - visible_buttons = [btn for btn in buttons if btn.is_visible()] - - self.log(f"发现 {len(visible_buttons)} 个详情按钮") - - for i, btn in enumerate(visible_buttons): - try: - btn.click() - # 随机等待 Hook 捕获 - time.sleep(random.uniform(1.0, 2.0)) - # 关闭弹窗 + # 2. 点击导出待发货订单按钮,并拦截下载 + self.log("尝试寻找并点击 '导出待发货订单' 按钮...") + + # 使用文本包含来定位按钮,即使它在复杂的结构中 + export_btn = self.page.locator("button:has-text('导出待发货订单')") + if export_btn.count() == 0: + self.log("[ERROR] 弹窗中未找到 '导出待发货订单' 按钮,可能是因为无待发货订单或者页面结构变更") + # 按 ESC 关闭弹窗,防止阻塞 self.page.keyboard.press("Escape") - time.sleep(0.5) - except Exception as e: - self.log(f" 点击详情失败: {e}") + return - def _next_page(self): - """翻页逻辑,返回是否成功翻页""" - next_btn = self.page.locator(".btn-next") - - if next_btn.count() == 0: - self.log("未找到翻页按钮") - return False + # 开始监听下载事件 + self.log("开始监听文件下载...") + with self.page.expect_download(timeout=60000) as download_info: + export_btn.first.click() + self.log("已点击 '导出待发货订单'") - if next_btn.is_disabled(): - self.log("翻页按钮已禁用,到达最后一页") - return False + download = download_info.value + + # 保存文件到 data 目录 + file_name = f"shipping_order_{self.ts_str}.xls" + save_path = os.path.join("data", file_name) + + self.log(f"正在保存文件...") + download.save_as(save_path) + self.log(f"✅ 文件下载成功: {save_path}") + + # 按 ESC 关闭弹窗 + time.sleep(1) + self.page.keyboard.press("Escape") - try: - next_btn.click() - self.log("翻页成功") - return True except Exception as e: - self.log(f"翻页点击失败: {e}") - return False + self.log(f"执行批量导出失败: {e}") if __name__ == "__main__": collector = SCBankCollector() diff --git a/scbank_processor.py b/scbank_processor.py index c07b773..806ea04 100644 --- a/scbank_processor.py +++ b/scbank_processor.py @@ -189,7 +189,7 @@ class SCBankProcessor: print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") def run(self): - # 1. 扫描文件 + # 这个方法是为了向后兼容旧的 JSONL 处理方式(如果需要的话),目前菜单已不再直接调用 files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")] if not files: self.log("没有待处理的JSONL数据文件。") @@ -207,14 +207,12 @@ class SCBankProcessor: """ # 查找 data 目录下的 excel 文件 files = [f for f in os.listdir(self.data_dir) if f.endswith(".xls") or f.endswith(".xlsx")] - # 过滤掉已经被处理或不需要处理的文件(如果有特定的前缀) - # 这里假设直接处理所有的 .xls 和 .xlsx 文件 if not files: - self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。") + self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。请先执行步骤一!") return - self.log(f"发现 {len(files)} 个 Excel 待处理文件: {files}") + self.log(f"发现 {len(files)} 个待处理的 Excel 文件: {files}") for file_name in files: file_path = os.path.join(self.data_dir, file_name)