scbank-sync/scbank_processor.py

286 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import os
import shutil
import random
import pandas as pd
import requests
from datetime import datetime
class InternalApiClient:
"""
内部接口客户端 (Real)
"""
def __init__(self):
# 生产环境
# self.api_base_url = "https://hyt.86698.cn/open"
# 测试环境 (默认)
self.api_base_url = "http://120.55.12.245:8109"
# 占位符 AppId实际需替换
# self.app_id = "8ce4d435fa77492f84d2fafa241c2804"
self.app_id = "e699e6ef74504f4d86776b3d244ce602"
def _post(self, path, data):
"""发送 POST 请求"""
url = f"{self.api_base_url}{path}"
headers = {
"AppId": self.app_id,
"Content-Type": "application/json;charset=UTF-8"
}
try:
# 不验签模式: 直接发送 data json
# 真实场景需确认: 这里的 data 是作为 json body 发送,还是作为 params?
# 根据文档 '非加密请求: 按业务正常的请求参数进行传输' -> 通常是 json body
response = requests.post(url, json=data, headers=headers, timeout=10)
return response.json()
except Exception as e:
print(f"[API Error] {url}: {e}")
return None
def get_internal_product_id(self, scbank_goods_name):
"""
根据商城商品名查询内部商品编号 (Product Code)
接口: /api/v1/open/cus/goods/list
逻辑: 模糊搜索 -> 必须有且仅有1个结果 -> 返回 goods_num
"""
if not scbank_goods_name: return None
payload = {
"title": scbank_goods_name,
"page": 1,
"limit": 10
}
resp = self._post("/api/v1/open/cus/goods/list", payload)
# 检查响应
if resp and resp.get("code") == 200:
data = resp.get("data", {})
# 兼容 data 可能是 list 或 dict (根据文档响应报文 data 包含 list 和 total)
# 文档: data -> {list: [], total: int}
product_list = data.get("list", [])
total = data.get("total", 0)
if product_list and len(product_list) == 1:
return product_list[0].get("goods_num")
else:
print(f"[Match Fail] '{scbank_goods_name}' 匹配到 {len(product_list)} 个商品")
return None
return None
def upload_order(self, order_detail, internal_pid):
"""
上传订单
接口: /api/v1/open/order/submit
"""
if not internal_pid:
return False, "商品匹配失败: 未找到或找到多个商品"
# 映射字段
# scbank detail -> internal api payload
# 收件人信息
receive_info = order_detail.get("mallOrderReceiveInfo", {})
# 构建 payload
payload = {
"orderBasic": {
"customer_order_num": order_detail.get("orderNo"),
"consignee": receive_info.get("receiverName", "未知"),
"consignee_mobile": receive_info.get("receiverMobile", "00000000000"),
# 省市区编码 (文档必填但用户说是误写,传空或默认值)
"consignee_province_code": "",
"consignee_city_code": "",
"consignee_area_code": "",
# 详细地址 (用户要求使用完整地址)
"consignee_address": receive_info.get("fullAddress", ""),
# 客户备注 - 没找到,传空
"in_remark": receive_info.get("remark", ""),
},
"goodsList": [
{
"goods_num": internal_pid,
"number": 1, # 默认为 1需确认 scbank 数据是否有数量
"sale_price": order_detail.get("orderAmt", 0) # 暂用订单总额作为单价
}
]
}
# 修正数量与价格: 如果有 goodsInfoList尝试获取更准确的数量
goods_info = order_detail.get("goodsInfoList", [])
if goods_info:
first_good = goods_info[0]
payload["goodsList"][0]["number"] = first_good.get("count", 1)
payload["goodsList"][0]["sale_price"] = first_good.get("price", 0)
# 查看 payload
print("=== 上传订单 Payload ===")
print(json.dumps(payload, ensure_ascii=False, indent=2))
print("=== 上传订单 Payload End ===")
resp = self._post("/api/v1/open/order/submit", payload)
if resp and resp.get("code") == 200:
return True, "上传成功"
else:
msg = resp.get("message") if resp else "请求失败"
return False, f"API错误: {msg}"
class SCBankProcessor:
def __init__(self):
self.data_dir = "data"
self.archive_dir = "data/archive"
self.output_dir = "output"
self.client = InternalApiClient()
def log(self, msg):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
def run(self):
# 1. 扫描文件
files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")]
if not files:
self.log("没有待处理的数据文件。")
return
self.log(f"发现 {len(files)} 个待处理文件: {files}")
for file_name in files:
file_path = os.path.join(self.data_dir, file_name)
self._process_file(file_path, file_name)
def _process_file(self, file_path, file_name):
self.log(f"正在处理文件: {file_name}")
# 2. 读取与清洗
raw_records = []
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
try:
raw_records.append(json.loads(line))
except: pass
except Exception as e:
self.log(f"读取文件失败: {e}")
return
if not raw_records:
self.log("文件内容为空,跳过")
return
# 3. 数据结构化 (合并列表与详情)
unique_details = {}
for r in raw_records:
r_type = r.get("type")
payload = r.get("payload", {})
body = payload.get("body", {})
# 兼容 body 可能是字符串的情况
if isinstance(body, str):
try: body = json.loads(body)
except: continue
if not isinstance(body, dict): continue
if r_type == "list":
page_list = body.get("pageDataList", [])
if isinstance(page_list, list):
for item in page_list:
order_no = item.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = item
else:
# 列表数据补全:仅当字段缺失或为空时填充
for k, v in item.items():
# 列表中的 goodsInfoList 通常为空,跳过
if k == "goodsInfoList" and not v: continue
if k not in unique_details[order_no] or not unique_details[order_no][k]:
unique_details[order_no][k] = v
elif r_type == "detail":
order_no = body.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = body
else:
# 详情数据覆盖 (高优先级)
unique_details[order_no].update(body)
self.log(f"解析出 {len(unique_details)} 条唯一订单 (列表+详情合并)")
# 4. 业务处理与同步
results = []
for order_no, detail in unique_details.items():
# 解析商品信息
# goodsInfoList 是一个列表,这里取第一个商品作为主商品
goods_list = detail.get("goodsInfoList", [])
goods_name = "未知商品"
if goods_list and len(goods_list) > 0:
goods_name = goods_list[0].get("spuName", "未知商品")
# Step 1: 查内部 ID
internal_pid = self.client.get_internal_product_id(goods_name)
# Step 2: 上传
# 注意: 这里传入 internal_pid
success, msg = self.client.upload_order(detail, internal_pid)
# 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37
order_time = detail.get("orderCreateTime")
if order_time:
try:
# 尝试解析 ISO 8601 格式
# 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime
# 格式: 2026-03-10T14:42:28.000+0000
# %z 只能解析 +0000 这种无冒号的时区
dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z")
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
try:
# 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同
dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S")
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
pass
results.append({
"处理状态": "成功" if success else "失败",
"失败原因": "" if success else msg,
"下单时间": order_time,
"订单编号": order_no,
"下单用户手机号码": detail.get("orderMobile"),
"商户名称": detail.get("exMerchant"),
"订单金额": detail.get("orderAmt"),
"商品名称": goods_name,
"内部商品ID": internal_pid,
})
# 5. 导出 Excel
if results:
ts = file_name.replace("raw_", "").replace(".jsonl", "")
output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx")
df = pd.DataFrame(results)
df.to_excel(output_file, index=False)
self.log(f"结果已导出至: {output_file}")
# 6. 归档
try:
shutil.move(file_path, os.path.join(self.archive_dir, file_name))
self.log(f"源文件已归档至: {self.archive_dir}")
except Exception as e:
self.log(f"归档失败: {e}")
if __name__ == "__main__":
processor = SCBankProcessor()
processor.run()