import json import time import random import os from datetime import datetime from playwright.sync_api import sync_playwright class SCBankCollector: def __init__(self): self.target_url = "https://jf.scbank.cn:8085/#/orderManagement/deliveryOrders" # 动态生成文件名: data/raw_YYYYMMDD_HHMMSS.jsonl self.ts_str = datetime.now().strftime("%Y%m%d_%H%M%S") # 配置账号密码 self.username = "Lsxd01" self.password = "Lsxd@2026" # 确保数据目录存在 if not os.path.exists("data"): os.makedirs("data") self.output_file = f"data/raw_{self.ts_str}.jsonl" self.browser = None self.page = None def log(self, msg): print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") def save_data(self, data_type, payload): """ 实时追加写入数据 data_type: 'list' | 'detail' """ record = { "ts": int(time.time()), "type": data_type, "payload": payload } try: with open(self.output_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") # self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})") except Exception as e: self.log(f"[ERROR] 写入文件失败: {e}") def start_browser(self): self.log("启动浏览器...") p = sync_playwright().start() # 尝试使用本地浏览器 (Chrome 或 Edge) browser = None for channel in ["chrome", "msedge"]: try: self.log(f"尝试启动本地 {channel}...") browser = p.chromium.launch( channel=channel, headless=False, args=["--disable-blink-features=AutomationControlled"] ) self.log(f"成功启动 {channel}") break except Exception as e: self.log(f"启动 {channel} 失败,尝试下一个...") # 如果本地浏览器都失败,尝试使用内置 Chromium (如果已安装) if not browser: self.log("未找到本地 Chrome 或 Edge,尝试使用内置 Chromium...") try: browser = p.chromium.launch( headless=False, args=["--disable-blink-features=AutomationControlled"] ) except Exception as e: self.log(f"[FATAL] 无法启动任何浏览器: {e}") self.log("请确保已安装 Google Chrome 或 Microsoft Edge 浏览器。") raise e self.browser = browser context = self.browser.new_context( viewport={'width': 1920, 'height': 1080}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) self.page = context.new_page() # 注入 Hook self.page.add_init_script(""" const originalParse = JSON.parse; JSON.parse = function(text, reviver) { const data = originalParse(text, reviver); try { if (data && typeof data === 'object') { const str = JSON.stringify(data); // 1. 识别列表 (pageDataList) const isList = data.body && data.body.pageDataList && Array.isArray(data.body.pageDataList); // 2. 识别详情 (orderNo + goodsInfoList) // 详情页通常包含 orderNo 和 goodsInfoList,且不包含 pageDataList const isDetail = data.body && data.body.orderNo && data.body.goodsInfoList && !data.body.pageDataList; if (isList) { console.log('__INTERCEPTED_LIST__:' + str); } else if (isDetail) { console.log('__INTERCEPTED_DETAIL__:' + str); } } } catch (e) {} return data; } """) # 监听 Console self.page.on("console", self._handle_console) def _handle_console(self, msg): text = msg.text if text.startswith("__INTERCEPTED_LIST__:"): try: json_str = text.replace("__INTERCEPTED_LIST__:", "") data = json.loads(json_str) self.save_data("list", data) self.log("捕获 [列表] 数据包") except: pass elif text.startswith("__INTERCEPTED_DETAIL__:"): try: json_str = text.replace("__INTERCEPTED_DETAIL__:", "") data = json.loads(json_str) self.save_data("detail", data) self.log("捕获 [详情] 数据包") except: pass def run(self): try: self.start_browser() # 1. 登录 self.log(f"正在打开页面: {self.target_url}") try: self.page.goto(self.target_url) # 等待跳转到登录页并加载输入框 try: self.log("等待登录页面加载...") self.page.wait_for_selector('input[name="username"]', timeout=10000) # 自动填入账号密码 self.log(f"正在自动填入账号: {self.username}") self.page.fill('input[name="username"]', self.username) self.page.fill('input[name="password"]', self.password) self.log("账号密码已填入") except Exception as e: self.log(f"自动填入账号密码失败 (可能已登录或页面结构变化): {e}") except: pass self.log(">>> 请在浏览器中完成登录操作 (输入验证码并点击登录) <<<") # 等待 URL 包含 homePage (用户指定) try: self.page.wait_for_url("**/homePage**", timeout=0) self.log("检测到登录成功!") time.sleep(1) except Exception as e: self.log(f"登录等待超时或失败: {e}") return # 2. 强制跳转目标页 if "deliveryOrders" not in self.page.url: self.log(f"跳转至订单管理页面: {self.target_url}") self.page.goto(self.target_url) self.page.wait_for_load_state("domcontentloaded") time.sleep(0.5) # 3. 筛选状态 self._filter_status() # 4. 循环采集 page_num = 1 while True: self.log(f"正在处理第 {page_num} 页...") # 点击详情 self._process_details() # 翻页 if not self._next_page(): break page_num += 1 time.sleep(3) # 等待列表加载 self.log(f"采集任务完成。数据已保存至: {self.output_file}") except Exception as e: self.log(f"[FATAL] 脚本异常: {e}") finally: # 数据抓取完成后,浏览器不退出 # if self.browser: # self.browser.close() self.log("浏览器保持开启状态,请手动关闭。") def _filter_status(self): self.log("正在点击“待发货”标签页") try: # 定位 Tab tab_selector = ".el-tabs__item:has-text('待发货')" tab = self.page.locator(tab_selector) if tab.count() > 0: if "is-active" not in tab.get_attribute("class"): tab.click() self.log("已点击“待发货”标签页") time.sleep(1) else: self.log("“待发货”标签页已经是选中状态") else: self.log(" [WARN] 未找到“待发货”Tab") except Exception as e: self.log(f"筛选操作失败: {e}") def _process_details(self): """点击当前页所有详情""" try: # 必须等待行出现 self.page.wait_for_selector(".el-table__row", timeout=0) except: self.log("当前页无数据或加载超时") return detail_selector = "button.el-button--text:has-text('详情')" buttons = self.page.locator(detail_selector).all() visible_buttons = [btn for btn in buttons if btn.is_visible()] self.log(f"发现 {len(visible_buttons)} 个详情按钮") for i, btn in enumerate(visible_buttons): try: btn.click() # 随机等待 Hook 捕获 time.sleep(random.uniform(1.0, 2.0)) # 关闭弹窗 self.page.keyboard.press("Escape") time.sleep(0.5) except Exception as e: self.log(f" 点击详情失败: {e}") def _next_page(self): """翻页逻辑,返回是否成功翻页""" next_btn = self.page.locator(".btn-next") if next_btn.count() == 0: self.log("未找到翻页按钮") return False if next_btn.is_disabled(): self.log("翻页按钮已禁用,到达最后一页") return False try: next_btn.click() self.log("翻页成功") return True except Exception as e: self.log(f"翻页点击失败: {e}") return False if __name__ == "__main__": collector = SCBankCollector() collector.run()