feat: 切换为下载excel处理方式

This commit is contained in:
fuzhongyun 2026-03-19 16:29:01 +08:00
parent d68e87fb3a
commit 00dcb45882
3 changed files with 64 additions and 144 deletions

29
main.py
View File

@ -4,30 +4,29 @@ from scbank_processor import SCBankProcessor
def main():
while True:
print("\n=== 四川银行权益商城自动化工具 ===")
print("1. 启动采集 (Collector) -> 浏览器抓取")
print("2. 执行同步 (Processor) -> 货易通上传 (处理JSONL)")
print("3. 执行同步 (Processor) -> 货易通上传 (处理Excel)")
print("4. 退出")
choice = input("请输入选项 [1-4]: ").strip()
print("\n" + "="*40)
print(" 🚀 四川银行权益商城自动化工具")
print("="*40)
print(" [1] 步骤一:启动浏览器抓取 (下载待发货Excel)")
print(" [2] 步骤二:执行批量发货同步 (读取Excel上传至货易通)")
print(" [3] 退出程序")
print("-" * 40)
choice = input("👉 请输入选项 [1-3]: ").strip()
if choice == "1":
print("\n[系统] 正在启动采集器...")
print("\n[系统] 正在启动采集器,准备下载 Excel...")
collector = SCBankCollector()
collector.run()
elif choice == "2":
print("\n[系统] 正在启动处理器 (JSONL模式)...")
processor = SCBankProcessor()
processor.run()
elif choice == "3":
print("\n[系统] 正在启动处理器 (Excel模式)...")
print("\n[系统] 正在启动处理器,准备解析本地 Excel 数据并同步...")
processor = SCBankProcessor()
processor.run_excel()
elif choice == "4":
print("\n[系统] 退出程序。")
elif choice == "3":
print("\n[系统] 感谢使用,再见!👋")
sys.exit(0)
else:
print("[错误] 无效选项,请重新输入")
print("\n[错误] 无效选项,请重新输入正确数字")
# 暂停一下,避免刷屏太快
# input("\n按回车键继续...")

View File

@ -19,30 +19,12 @@ class SCBankCollector:
if not os.path.exists("data"):
os.makedirs("data")
self.output_file = f"data/raw_{self.ts_str}.jsonl"
self.browser = None
self.page = None
def log(self, msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def save_data(self, data_type, payload):
"""
实时追加写入数据
data_type: 'list' | 'detail'
"""
record = {
"ts": int(time.time()),
"type": data_type,
"payload": payload
}
try:
with open(self.output_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
# self.log(f"已捕获 {data_type} 数据 (长度: {len(str(payload))})")
except Exception as e:
self.log(f"[ERROR] 写入文件失败: {e}")
def start_browser(self):
self.log("启动浏览器...")
p = sync_playwright().start()
@ -78,62 +60,11 @@ class SCBankCollector:
self.browser = browser
context = self.browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
accept_downloads=True # 明确允许下载
)
self.page = context.new_page()
# 注入 Hook
self.page.add_init_script("""
const originalParse = JSON.parse;
JSON.parse = function(text, reviver) {
const data = originalParse(text, reviver);
try {
if (data && typeof data === 'object') {
const str = JSON.stringify(data);
// 1. 识别列表 (pageDataList)
const isList = data.body &&
data.body.pageDataList &&
Array.isArray(data.body.pageDataList);
// 2. 识别详情 (orderNo + goodsInfoList)
// 详情页通常包含 orderNo goodsInfoList且不包含 pageDataList
const isDetail = data.body &&
data.body.orderNo &&
data.body.goodsInfoList &&
!data.body.pageDataList;
if (isList) {
console.log('__INTERCEPTED_LIST__:' + str);
} else if (isDetail) {
console.log('__INTERCEPTED_DETAIL__:' + str);
}
}
} catch (e) {}
return data;
}
""")
# 监听 Console
self.page.on("console", self._handle_console)
def _handle_console(self, msg):
text = msg.text
if text.startswith("__INTERCEPTED_LIST__:"):
try:
json_str = text.replace("__INTERCEPTED_LIST__:", "")
data = json.loads(json_str)
self.save_data("list", data)
self.log("捕获 [列表] 数据包")
except: pass
elif text.startswith("__INTERCEPTED_DETAIL__:"):
try:
json_str = text.replace("__INTERCEPTED_DETAIL__:", "")
data = json.loads(json_str)
self.save_data("detail", data)
self.log("捕获 [详情] 数据包")
except: pass
def run(self):
try:
self.start_browser()
@ -177,22 +108,10 @@ class SCBankCollector:
# 3. 筛选状态
self._filter_status()
# 4. 循环采集
page_num = 1
while True:
self.log(f"正在处理第 {page_num} 页...")
# 点击详情
self._process_details()
# 翻页
if not self._next_page():
break
page_num += 1
time.sleep(3) # 等待列表加载
# 4. 执行批量导出下载
self._download_excel()
self.log(f"采集任务完成。数据已保存至: {self.output_file}")
self.log("采集任务完成。")
except Exception as e:
self.log(f"[FATAL] 脚本异常: {e}")
@ -221,51 +140,55 @@ class SCBankCollector:
except Exception as e:
self.log(f"筛选操作失败: {e}")
def _process_details(self):
"""点击当前页所有详情"""
def _download_excel(self):
"""执行批量导出操作"""
try:
# 必须等待行出现
self.page.wait_for_selector(".el-table__row", timeout=0)
except:
self.log("当前页无数据或加载超时")
return
self.log("准备触发批量发货...")
# 1. 点击批量发货按钮
# 通过包含的文本或者 class 寻找按钮
batch_ship_btn = self.page.locator("button:has-text('批量发货')")
if batch_ship_btn.count() > 0:
batch_ship_btn.first.click()
self.log("已点击 '批量发货' 按钮,等待弹窗加载...")
time.sleep(2) # 等待弹窗和里面的按钮渲染
else:
self.log("[WARN] 未找到 '批量发货' 按钮")
return
detail_selector = "button.el-button--text:has-text('详情')"
buttons = self.page.locator(detail_selector).all()
visible_buttons = [btn for btn in buttons if btn.is_visible()]
self.log(f"发现 {len(visible_buttons)} 个详情按钮")
for i, btn in enumerate(visible_buttons):
try:
btn.click()
# 随机等待 Hook 捕获
time.sleep(random.uniform(1.0, 2.0))
# 关闭弹窗
# 2. 点击导出待发货订单按钮,并拦截下载
self.log("尝试寻找并点击 '导出待发货订单' 按钮...")
# 使用文本包含来定位按钮,即使它在复杂的结构中
export_btn = self.page.locator("button:has-text('导出待发货订单')")
if export_btn.count() == 0:
self.log("[ERROR] 弹窗中未找到 '导出待发货订单' 按钮,可能是因为无待发货订单或者页面结构变更")
# 按 ESC 关闭弹窗,防止阻塞
self.page.keyboard.press("Escape")
time.sleep(0.5)
except Exception as e:
self.log(f" 点击详情失败: {e}")
return
def _next_page(self):
"""翻页逻辑,返回是否成功翻页"""
next_btn = self.page.locator(".btn-next")
if next_btn.count() == 0:
self.log("未找到翻页按钮")
return False
# 开始监听下载事件
self.log("开始监听文件下载...")
with self.page.expect_download(timeout=60000) as download_info:
export_btn.first.click()
self.log("已点击 '导出待发货订单'")
if next_btn.is_disabled():
self.log("翻页按钮已禁用,到达最后一页")
return False
download = download_info.value
# 保存文件到 data 目录
file_name = f"shipping_order_{self.ts_str}.xls"
save_path = os.path.join("data", file_name)
self.log(f"正在保存文件...")
download.save_as(save_path)
self.log(f"✅ 文件下载成功: {save_path}")
# 按 ESC 关闭弹窗
time.sleep(1)
self.page.keyboard.press("Escape")
try:
next_btn.click()
self.log("翻页成功")
return True
except Exception as e:
self.log(f"翻页点击失败: {e}")
return False
self.log(f"执行批量导出失败: {e}")
if __name__ == "__main__":
collector = SCBankCollector()

View File

@ -189,7 +189,7 @@ class SCBankProcessor:
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def run(self):
# 1. 扫描文件
# 这个方法是为了向后兼容旧的 JSONL 处理方式(如果需要的话),目前菜单已不再直接调用
files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")]
if not files:
self.log("没有待处理的JSONL数据文件。")
@ -207,14 +207,12 @@ class SCBankProcessor:
"""
# 查找 data 目录下的 excel 文件
files = [f for f in os.listdir(self.data_dir) if f.endswith(".xls") or f.endswith(".xlsx")]
# 过滤掉已经被处理或不需要处理的文件(如果有特定的前缀)
# 这里假设直接处理所有的 .xls 和 .xlsx 文件
if not files:
self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。")
self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。请先执行步骤一!")
return
self.log(f"发现 {len(files)} Excel 待处理文件: {files}")
self.log(f"发现 {len(files)}待处理的 Excel 文件: {files}")
for file_name in files:
file_path = os.path.join(self.data_dir, file_name)