import json import time import random import os from datetime import datetime from playwright.sync_api import sync_playwright class SCBankCollector: def __init__(self): self.target_url = "https://jf.scbank.cn:8085/#/orderManagement/deliveryOrders" # 动态生成文件名: data/raw_YYYYMMDD_HHMMSS.jsonl self.ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") self.output_file = f"data/raw_{self.ts_str}.jsonl" self.browser = None self.page = None def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") def save_data(self, data_type, payload): """ 实时追加写入数据 data_type: 'list' | 'detail' """ record = { "ts": int(time.time()), "type": data_type, "payload": payload } try: with open(self.output_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") # self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})") except Exception as e: self.log(f"[ERROR] 写入文件失败: {e}") def start_browser(self): self.log("启动浏览器...") p = sync_playwright().start() self.browser = p.chromium.launch( headless=False, args=["--disable-blink-features=AutomationControlled"] ) context = self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = context.new_page() # 注入 Hook self.page.add_init_script(""" const originalParse = JSON.parse; JSON.parse = function(text, reviver) { const data = originalParse(text, reviver); try { if (data && typeof data === 'object') { const str = JSON.stringify(data); // 1. 过滤噪音 (IsQX) if (str.includes('IsQX') || (data.body && data.body.IsQX)) { return data; } // 2. 识别列表 (rows 或 list) const isList = (data.rows && Array.isArray(data.rows) && data.rows.length > 0) || (data.list && Array.isArray(data.list) && data.list.length > 0); // 3. 识别详情 (orderNo + 长度校验) const isDetail = str.includes('orderNo') && str.length > 300; if (isList) { console.log('__INTERCEPTED_LIST__:' + str); } else if (isDetail) { console.log('__INTERCEPTED_DETAIL__:' + str); } } } catch (e) {} return data; } """) # 监听 Console self.page.on("console", self._handle_console) def _handle_console(self, msg): text = msg.text if text.startswith("__INTERCEPTED_LIST__:"): try: json_str = text.replace("__INTERCEPTED_LIST__:", "") data = json.loads(json_str) self.save_data("list", data) self.log("捕获 [列表] 数据包") except: pass elif text.startswith("__INTERCEPTED_DETAIL__:"): try: json_str = text.replace("__INTERCEPTED_DETAIL__:", "") data = json.loads(json_str) self.save_data("detail", data) self.log("捕获 [详情] 数据包") except: pass def run(self): try: self.start_browser() # 1. 登录 self.log(f"正在打开页面: {self.target_url}") try: self.page.goto(self.target_url) except: pass self.log(">>> 请在浏览器中完成登录操作 <<<") # 等待 URL 包含 homePage (用户指定) try: self.page.wait_for_url("**/homePage**", timeout=0) self.log("检测到登录成功!") time.sleep(2) except Exception as e: self.log(f"登录等待超时或失败: {e}") return # 2. 强制跳转目标页 if "deliveryOrders" not in self.page.url: self.log(f"跳转至订单管理页面: {self.target_url}") self.page.goto(self.target_url) self.page.wait_for_load_state("domcontentloaded") time.sleep(3) # 3. 筛选状态 self._filter_status() # 4. 循环采集 page_num = 1 while True: self.log(f"正在处理第 {page_num} 页...") # 点击详情 self._process_details() # 翻页 if not self._next_page(): break page_num += 1 time.sleep(3) # 等待列表加载 self.log(f"采集任务完成。数据已保存至: {self.output_file}") except Exception as e: self.log(f"[FATAL] 脚本异常: {e}") finally: if self.browser: self.browser.close() def _filter_status(self): self.log("筛选: 订单类型 -> 待发货") try: # 定位 Tab tab_selector = ".el-tabs__item:has-text('待发货')" tab = self.page.locator(tab_selector) if tab.count() > 0: if "is-active" not in tab.get_attribute("class"): tab.click() self.log(" -> 点击“待发货”Tab") time.sleep(1) else: self.log(" -> 已处于“待发货”Tab") # 点击查询 query_btn = self.page.locator("button.el-button--primary:has-text('查询')") if query_btn.count() > 0: query_btn.click() self.log(" -> 点击“查询”刷新列表") time.sleep(3) else: self.log(" [WARN] 未找到“查询”按钮") else: self.log(" [WARN] 未找到“待发货”Tab") except Exception as e: self.log(f"筛选操作失败: {e}") def _process_details(self): """点击当前页所有详情""" try: # 必须等待行出现 self.page.wait_for_selector(".el-table__row", timeout=5000) except: self.log("当前页无数据或加载超时") return detail_selector = "button.el-button--text:has-text('详情')" buttons = self.page.locator(detail_selector).all() visible_buttons = [btn for btn in buttons if btn.is_visible()] self.log(f"发现 {len(visible_buttons)} 个详情按钮") for i, btn in enumerate(visible_buttons): try: btn.click() # 随机等待 Hook 捕获 time.sleep(random.uniform(1.0, 2.0)) # 关闭弹窗 self.page.keyboard.press("Escape") time.sleep(0.5) except Exception as e: self.log(f" 点击详情失败: {e}") def _next_page(self): """翻页逻辑,返回是否成功翻页""" next_btn = self.page.locator(".btn-next") if next_btn.count() == 0: self.log("未找到翻页按钮") return False if next_btn.is_disabled(): self.log("翻页按钮已禁用,到达最后一页") return False try: next_btn.click() self.log("翻页成功") return True except Exception as e: self.log(f"翻页点击失败: {e}") return False if __name__ == "__main__": collector = SCBankCollector() collector.run()