scbank-sync/scbank_collector.py

254 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import random
import os
from datetime import datetime
from playwright.sync_api import sync_playwright
class SCBankCollector:
def __init__(self):
self.target_url = "https://jf.scbank.cn:8085/#/orderManagement/deliveryOrders"
# 动态生成文件名: data/raw_YYYYMMDD_HHMMSS.jsonl
self.ts_str = datetime.now().strftime("%Y%m%d_%H%M%S")
# 确保数据目录存在
if not os.path.exists("data"):
os.makedirs("data")
self.output_file = f"data/raw_{self.ts_str}.jsonl"
self.browser = None
self.page = None
def log(self, msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def save_data(self, data_type, payload):
"""
实时追加写入数据
data_type: 'list' | 'detail'
"""
record = {
"ts": int(time.time()),
"type": data_type,
"payload": payload
}
try:
with open(self.output_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
# self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})")
except Exception as e:
self.log(f"[ERROR] 写入文件失败: {e}")
def start_browser(self):
self.log("启动浏览器...")
p = sync_playwright().start()
# 尝试使用本地浏览器 (Chrome 或 Edge)
browser = None
for channel in ["chrome", "msedge"]:
try:
self.log(f"尝试启动本地 {channel}...")
browser = p.chromium.launch(
channel=channel,
headless=False,
args=["--disable-blink-features=AutomationControlled"]
)
self.log(f"成功启动 {channel}")
break
except Exception as e:
self.log(f"启动 {channel} 失败,尝试下一个...")
# 如果本地浏览器都失败,尝试使用内置 Chromium (如果已安装)
if not browser:
self.log("未找到本地 Chrome 或 Edge尝试使用内置 Chromium...")
try:
browser = p.chromium.launch(
headless=False,
args=["--disable-blink-features=AutomationControlled"]
)
except Exception as e:
self.log(f"[FATAL] 无法启动任何浏览器: {e}")
self.log("请确保已安装 Google Chrome 或 Microsoft Edge 浏览器。")
raise e
self.browser = browser
context = self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
self.page = context.new_page()
# 注入 Hook
self.page.add_init_script("""
const originalParse = JSON.parse;
JSON.parse = function(text, reviver) {
const data = originalParse(text, reviver);
try {
if (data && typeof data === 'object') {
const str = JSON.stringify(data);
// 1. 识别列表 (pageDataList)
const isList = data.body &&
data.body.pageDataList &&
Array.isArray(data.body.pageDataList);
// 2. 识别详情 (orderNo + goodsInfoList)
// 详情页通常包含 orderNo 和 goodsInfoList且不包含 pageDataList
const isDetail = data.body &&
data.body.orderNo &&
data.body.goodsInfoList &&
!data.body.pageDataList;
if (isList) {
console.log('__INTERCEPTED_LIST__:' + str);
} else if (isDetail) {
console.log('__INTERCEPTED_DETAIL__:' + str);
}
}
} catch (e) {}
return data;
}
""")
# 监听 Console
self.page.on("console", self._handle_console)
def _handle_console(self, msg):
text = msg.text
if text.startswith("__INTERCEPTED_LIST__:"):
try:
json_str = text.replace("__INTERCEPTED_LIST__:", "")
data = json.loads(json_str)
self.save_data("list", data)
self.log("捕获 [列表] 数据包")
except: pass
elif text.startswith("__INTERCEPTED_DETAIL__:"):
try:
json_str = text.replace("__INTERCEPTED_DETAIL__:", "")
data = json.loads(json_str)
self.save_data("detail", data)
self.log("捕获 [详情] 数据包")
except: pass
def run(self):
try:
self.start_browser()
# 1. 登录
self.log(f"正在打开页面: {self.target_url}")
try:
self.page.goto(self.target_url)
except: pass
self.log(">>> 请在浏览器中完成登录操作 <<<")
# 等待 URL 包含 homePage (用户指定)
try:
self.page.wait_for_url("**/homePage**", timeout=0)
self.log("检测到登录成功!")
time.sleep(1)
except Exception as e:
self.log(f"登录等待超时或失败: {e}")
return
# 2. 强制跳转目标页
if "deliveryOrders" not in self.page.url:
self.log(f"跳转至订单管理页面: {self.target_url}")
self.page.goto(self.target_url)
self.page.wait_for_load_state("domcontentloaded")
time.sleep(0.5)
# 3. 筛选状态
self._filter_status()
# 4. 循环采集
page_num = 1
while True:
self.log(f"正在处理第 {page_num} 页...")
# 点击详情
self._process_details()
# 翻页
if not self._next_page():
break
page_num += 1
time.sleep(3) # 等待列表加载
self.log(f"采集任务完成。数据已保存至: {self.output_file}")
except Exception as e:
self.log(f"[FATAL] 脚本异常: {e}")
finally:
if self.browser:
self.browser.close()
def _filter_status(self):
self.log("正在点击“待发货”标签页")
try:
# 定位 Tab
tab_selector = ".el-tabs__item:has-text('待发货')"
tab = self.page.locator(tab_selector)
if tab.count() > 0:
if "is-active" not in tab.get_attribute("class"):
tab.click()
self.log("已点击“待发货”标签页")
time.sleep(1)
else:
self.log("“待发货”标签页已经是选中状态")
else:
self.log(" [WARN] 未找到“待发货”Tab")
except Exception as e:
self.log(f"筛选操作失败: {e}")
def _process_details(self):
"""点击当前页所有详情"""
try:
# 必须等待行出现
self.page.wait_for_selector(".el-table__row", timeout=0)
except:
self.log("当前页无数据或加载超时")
return
detail_selector = "button.el-button--text:has-text('详情')"
buttons = self.page.locator(detail_selector).all()
visible_buttons = [btn for btn in buttons if btn.is_visible()]
self.log(f"发现 {len(visible_buttons)} 个详情按钮")
for i, btn in enumerate(visible_buttons):
try:
btn.click()
# 随机等待 Hook 捕获
time.sleep(random.uniform(1.0, 2.0))
# 关闭弹窗
self.page.keyboard.press("Escape")
time.sleep(0.5)
except Exception as e:
self.log(f" 点击详情失败: {e}")
def _next_page(self):
"""翻页逻辑,返回是否成功翻页"""
next_btn = self.page.locator(".btn-next")
if next_btn.count() == 0:
self.log("未找到翻页按钮")
return False
if next_btn.is_disabled():
self.log("翻页按钮已禁用,到达最后一页")
return False
try:
next_btn.click()
self.log("翻页成功")
return True
except Exception as e:
self.log(f"翻页点击失败: {e}")
return False
if __name__ == "__main__":
collector = SCBankCollector()
collector.run()