scbank-sync/scbank_processor.py

408 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import os
import shutil
import random
import pandas as pd
import requests
from datetime import datetime
class InternalApiClient:
"""
内部接口客户端 (Real)
"""
def __init__(self):
# 生产环境
self.api_base_url = "https://hyt.86698.cn/open"
# 测试环境 (默认)
# self.api_base_url = "http://120.55.12.245:8109"
# 生产环境
self.app_id = "8ce4d435fa77492f84d2fafa241c2804"
# 测试环境 (默认)
# self.app_id = "e699e6ef74504f4d86776b3d244ce602"
# 加载商品映射配置
self.product_map = self._load_product_map()
def _load_product_map(self):
"""加载商品名称映射文件 (product_map.txt)"""
mapping = {}
map_file = "product_map.txt"
# 不再自动生成文件,避免覆盖用户配置
if not os.path.exists(map_file):
print(f"[Warn] 未找到映射配置文件: {map_file},将使用原始名称匹配")
return mapping
try:
with open(map_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 跳过注释和空行
if not line or line.startswith("#"): continue
# 使用 ">>>>" 作为分隔符,因为它非常罕见,不可能是商品名的一部分
sep = ">>>>"
if sep in line:
parts = line.split(sep, 1)
key = parts[0].strip()
val = parts[1].strip()
if key and val:
mapping[key] = val
print(f"[Info] 已加载 {len(mapping)} 条商品映射规则")
except Exception as e:
print(f"[Warn] 加载映射文件失败: {e}")
return mapping
def _post(self, path, data):
"""发送 POST 请求"""
url = f"{self.api_base_url}{path}"
headers = {
"AppId": self.app_id,
"Content-Type": "application/json;charset=UTF-8"
}
try:
# 不验签模式: 直接发送 data json
# 真实场景需确认: 这里的 data 是作为 json body 发送,还是作为 params?
# 根据文档 '非加密请求: 按业务正常的请求参数进行传输' -> 通常是 json body
response = requests.post(url, json=data, headers=headers, timeout=10)
return response.json()
except Exception as e:
print(f"[API Error] {url}: {e}")
return None
def get_internal_product_id(self, scbank_goods_name):
"""
根据商城商品名查询内部商品编号 (Product Code)
接口: /api/v1/open/cus/goods/list
逻辑: 模糊搜索 -> 必须有且仅有1个结果 -> 返回 goods_num
"""
if not scbank_goods_name: return None
# 0. 优先使用映射
search_name = scbank_goods_name
if scbank_goods_name in self.product_map:
search_name = self.product_map[scbank_goods_name]
print(f"[Map] 使用映射: '{scbank_goods_name}' -> '{search_name}'")
payload = {
"title": search_name,
"page": 1,
"limit": 10
}
resp = self._post("/api/v1/open/cus/goods/list", payload)
# 检查响应
if resp and resp.get("code") == 200:
data = resp.get("data", {})
# 兼容 data 可能是 list 或 dict (根据文档响应报文 data 包含 list 和 total)
# 文档: data -> {list: [], total: int}
product_list = data.get("list", [])
total = data.get("total", 0)
if product_list and len(product_list) == 1:
return product_list[0].get("goods_num")
else:
print(f"[Match Fail] '{search_name}' 匹配到 {len(product_list)} 个商品")
return None
return None
def upload_order(self, order_detail, internal_pid):
"""
上传订单
接口: /api/v1/open/order/submit
"""
if not internal_pid:
return False, "商品匹配失败: 未找到或找到多个商品"
# 映射字段
# scbank detail -> internal api payload
# 收件人信息
receive_info = order_detail.get("mallOrderReceiveInfo", {})
# 构建 payload
payload = {
"orderBasic": {
"customer_order_num": order_detail.get("orderNo"),
"consignee": receive_info.get("receiverName", "未知"),
"consignee_mobile": receive_info.get("receiverMobile", "00000000000"),
# 省市区编码 (文档必填但用户说是误写,传空或默认值)
"consignee_province_code": "",
"consignee_city_code": "",
"consignee_area_code": "",
# 详细地址 (用户要求使用完整地址)
"consignee_address": receive_info.get("fullAddress", ""),
# 客户备注 - 没找到,传空
"in_remark": receive_info.get("remark", ""),
},
"goodsList": [
{
"goods_num": internal_pid,
"number": 1, # 默认为 1需确认 scbank 数据是否有数量
"sale_price": order_detail.get("orderAmt", 0) # 暂用订单总额作为单价
}
]
}
# 修正数量与价格: 如果有 goodsInfoList尝试获取更准确的数量
goods_info = order_detail.get("goodsInfoList", [])
if goods_info:
first_good = goods_info[0]
payload["goodsList"][0]["number"] = first_good.get("count", 1)
payload["goodsList"][0]["sale_price"] = first_good.get("price", 0)
# 查看 payload
print("=== 上传订单 Payload ===")
print(json.dumps(payload, ensure_ascii=False, indent=2))
print("=== 上传订单 Payload End ===")
resp = self._post("/api/v1/open/order/submit", payload)
if resp and resp.get("code") == 200:
return True, "上传成功"
else:
msg = resp.get("message") if resp else "请求失败"
return False, f"API错误: {msg}"
class SCBankProcessor:
def __init__(self):
self.data_dir = "data"
self.archive_dir = "data/archive"
self.output_dir = "output"
# 确保目录结构存在
for d in [self.data_dir, self.archive_dir, self.output_dir]:
if not os.path.exists(d):
os.makedirs(d)
self.client = InternalApiClient()
def log(self, msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def run(self):
# 这个方法是为了向后兼容旧的 JSONL 处理方式(如果需要的话),目前菜单已不再直接调用
files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")]
if not files:
self.log("没有待处理的JSONL数据文件。")
return
self.log(f"发现 {len(files)} 个待处理文件: {files}")
for file_name in files:
file_path = os.path.join(self.data_dir, file_name)
self._process_file(file_path, file_name)
def run_excel(self):
"""
处理导出的 Excel 数据
"""
# 查找 data 目录下的 excel 文件
files = [f for f in os.listdir(self.data_dir) if f.endswith(".xls") or f.endswith(".xlsx")]
if not files:
self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。请先执行步骤一!")
return
self.log(f"发现 {len(files)} 个待处理的 Excel 文件: {files}")
for file_name in files:
file_path = os.path.join(self.data_dir, file_name)
self._process_excel_file(file_path, file_name)
def _process_excel_file(self, file_path, file_name):
self.log(f"正在处理 Excel 文件: {file_name}")
try:
df = pd.read_excel(file_path)
except Exception as e:
self.log(f"读取 Excel 文件失败: {e}")
return
if df.empty:
self.log("Excel 文件内容为空,跳过")
return
unique_details = {}
for _, row in df.iterrows():
order_no = str(row.get("订单号", ""))
if not order_no or order_no == "nan":
continue
# 构造与 JSON 一致的 detail 结构
detail = {
"orderNo": order_no,
"orderCreateTime": str(row.get("下单时间", "")),
"mallOrderReceiveInfo": {
"receiverName": str(row.get("收货人名称", "")) if pd.notna(row.get("收货人名称")) else "",
"receiverMobile": str(row.get("收货人电话", "")) if pd.notna(row.get("收货人电话")) else "00000000000",
"fullAddress": str(row.get("收货地址", "")) if pd.notna(row.get("收货地址")) else "",
"remark": str(row.get("备注", "")) if pd.notna(row.get("备注")) else ""
},
"goodsInfoList": [
{
"spuName": str(row.get("产品名称", "未知商品")) if pd.notna(row.get("产品名称")) else "未知商品",
"count": int(row.get("数量", 1)) if pd.notna(row.get("数量")) else 1,
"price": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0,
}
],
"orderAmt": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0,
"exMerchant": "成都蓝色兄弟网络科技有限公司"
}
unique_details[order_no] = detail
self.log(f"解析出 {len(unique_details)} 条唯一订单 (来自 Excel)")
self._process_unique_details(unique_details, file_name, file_path)
def _process_file(self, file_path, file_name):
self.log(f"正在处理文件: {file_name}")
# 2. 读取与清洗
raw_records = []
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
try:
raw_records.append(json.loads(line))
except: pass
except Exception as e:
self.log(f"读取文件失败: {e}")
return
if not raw_records:
self.log("文件内容为空,跳过")
return
# 3. 数据结构化 (合并列表与详情)
unique_details = {}
for r in raw_records:
r_type = r.get("type")
payload = r.get("payload", {})
body = payload.get("body", {})
# 兼容 body 可能是字符串的情况
if isinstance(body, str):
try: body = json.loads(body)
except: continue
if not isinstance(body, dict): continue
if r_type == "list":
page_list = body.get("pageDataList", [])
if isinstance(page_list, list):
for item in page_list:
order_no = item.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = item
else:
# 列表数据补全:仅当字段缺失或为空时填充
for k, v in item.items():
# 列表中的 goodsInfoList 通常为空,跳过
if k == "goodsInfoList" and not v: continue
if k not in unique_details[order_no] or not unique_details[order_no][k]:
unique_details[order_no][k] = v
elif r_type == "detail":
order_no = body.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = body
else:
# 详情数据覆盖 (高优先级)
unique_details[order_no].update(body)
self.log(f"解析出 {len(unique_details)} 条唯一订单 (列表+详情合并)")
self._process_unique_details(unique_details, file_name, file_path)
def _process_unique_details(self, unique_details, file_name, file_path):
# 4. 业务处理与同步
results = []
for order_no, detail in unique_details.items():
# 解析商品信息
# goodsInfoList 是一个列表,这里取第一个商品作为主商品
goods_list = detail.get("goodsInfoList", [])
goods_name = "未知商品"
if goods_list and len(goods_list) > 0:
goods_name = goods_list[0].get("spuName", "未知商品")
# Step 1: 查内部 ID
internal_pid = self.client.get_internal_product_id(goods_name)
# Step 2: 上传
# 注意: 这里传入 internal_pid
success, msg = self.client.upload_order(detail, internal_pid)
# 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37
order_time = detail.get("orderCreateTime")
if order_time:
try:
# 尝试解析 ISO 8601 格式
# 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime
# 格式: 2026-03-10T14:42:28.000+0000
# %z 只能解析 +0000 这种无冒号的时区
dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z")
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
# 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同
dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S")
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
pass
# 收件人信息
receive_info = detail.get("mallOrderReceiveInfo", {})
results.append({
"处理状态": "成功" if success else "失败",
"失败原因": "" if success else msg,
"下单时间": order_time,
"订单编号": order_no,
"收货人": receive_info.get("receiverName"),
"收货地址": receive_info.get("fullAddress"),
"下单用户手机号码": receive_info.get("receiverMobile", "00000000000"),
"商户名称": detail.get("exMerchant"),
"订单金额": detail.get("orderAmt"),
"商品名称": goods_name,
"内部商品ID": internal_pid,
})
# 5. 导出 Excel
if results:
# 处理不同后缀和前缀的文件名,生成统一的结果文件名
base_name = os.path.splitext(file_name)[0]
if base_name.startswith("raw_"):
ts = base_name.replace("raw_", "")
else:
# 针对非 raw_ 前缀的文件(如直接下载的 shipping_order附加时间戳
ts = f"{base_name}_{datetime.now().strftime('%H%M%S')}"
output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx")
df = pd.DataFrame(results)
df.to_excel(output_file, index=False)
self.log(f"结果已导出至: {output_file}")
# 6. 归档
try:
shutil.move(file_path, os.path.join(self.archive_dir, file_name))
self.log(f"源文件已归档至: {self.archive_dir}")
except Exception as e:
self.log(f"归档失败: {e}")
if __name__ == "__main__":
processor = SCBankProcessor()
processor.run()