408 lines
16 KiB
Python
408 lines
16 KiB
Python
import json
|
||
import time
|
||
import os
|
||
import shutil
|
||
import random
|
||
import pandas as pd
|
||
import requests
|
||
from datetime import datetime
|
||
|
||
class InternalApiClient:
|
||
"""
|
||
内部接口客户端 (Real)
|
||
"""
|
||
def __init__(self):
|
||
# 生产环境
|
||
self.api_base_url = "https://hyt.86698.cn/open"
|
||
# 测试环境 (默认)
|
||
# self.api_base_url = "http://120.55.12.245:8109"
|
||
|
||
# 生产环境
|
||
self.app_id = "8ce4d435fa77492f84d2fafa241c2804"
|
||
# 测试环境 (默认)
|
||
# self.app_id = "e699e6ef74504f4d86776b3d244ce602"
|
||
|
||
# 加载商品映射配置
|
||
self.product_map = self._load_product_map()
|
||
|
||
def _load_product_map(self):
|
||
"""加载商品名称映射文件 (product_map.txt)"""
|
||
mapping = {}
|
||
map_file = "product_map.txt"
|
||
|
||
# 不再自动生成文件,避免覆盖用户配置
|
||
if not os.path.exists(map_file):
|
||
print(f"[Warn] 未找到映射配置文件: {map_file},将使用原始名称匹配")
|
||
return mapping
|
||
|
||
try:
|
||
with open(map_file, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
# 跳过注释和空行
|
||
if not line or line.startswith("#"): continue
|
||
|
||
# 使用 ">>>>" 作为分隔符,因为它非常罕见,不可能是商品名的一部分
|
||
sep = ">>>>"
|
||
if sep in line:
|
||
parts = line.split(sep, 1)
|
||
key = parts[0].strip()
|
||
val = parts[1].strip()
|
||
if key and val:
|
||
mapping[key] = val
|
||
print(f"[Info] 已加载 {len(mapping)} 条商品映射规则")
|
||
except Exception as e:
|
||
print(f"[Warn] 加载映射文件失败: {e}")
|
||
|
||
return mapping
|
||
|
||
def _post(self, path, data):
|
||
"""发送 POST 请求"""
|
||
url = f"{self.api_base_url}{path}"
|
||
headers = {
|
||
"AppId": self.app_id,
|
||
"Content-Type": "application/json;charset=UTF-8"
|
||
}
|
||
try:
|
||
# 不验签模式: 直接发送 data json
|
||
# 真实场景需确认: 这里的 data 是作为 json body 发送,还是作为 params?
|
||
# 根据文档 '非加密请求: 按业务正常的请求参数进行传输' -> 通常是 json body
|
||
response = requests.post(url, json=data, headers=headers, timeout=10)
|
||
return response.json()
|
||
except Exception as e:
|
||
print(f"[API Error] {url}: {e}")
|
||
return None
|
||
|
||
def get_internal_product_id(self, scbank_goods_name):
|
||
"""
|
||
根据商城商品名查询内部商品编号 (Product Code)
|
||
接口: /api/v1/open/cus/goods/list
|
||
逻辑: 模糊搜索 -> 必须有且仅有1个结果 -> 返回 goods_num
|
||
"""
|
||
if not scbank_goods_name: return None
|
||
|
||
# 0. 优先使用映射
|
||
search_name = scbank_goods_name
|
||
if scbank_goods_name in self.product_map:
|
||
search_name = self.product_map[scbank_goods_name]
|
||
print(f"[Map] 使用映射: '{scbank_goods_name}' -> '{search_name}'")
|
||
|
||
payload = {
|
||
"title": search_name,
|
||
"page": 1,
|
||
"limit": 10
|
||
}
|
||
|
||
resp = self._post("/api/v1/open/cus/goods/list", payload)
|
||
|
||
# 检查响应
|
||
if resp and resp.get("code") == 200:
|
||
data = resp.get("data", {})
|
||
# 兼容 data 可能是 list 或 dict (根据文档响应报文 data 包含 list 和 total)
|
||
# 文档: data -> {list: [], total: int}
|
||
product_list = data.get("list", [])
|
||
total = data.get("total", 0)
|
||
|
||
if product_list and len(product_list) == 1:
|
||
return product_list[0].get("goods_num")
|
||
else:
|
||
print(f"[Match Fail] '{search_name}' 匹配到 {len(product_list)} 个商品")
|
||
return None
|
||
|
||
return None
|
||
|
||
def upload_order(self, order_detail, internal_pid):
|
||
"""
|
||
上传订单
|
||
接口: /api/v1/open/order/submit
|
||
"""
|
||
if not internal_pid:
|
||
return False, "商品匹配失败: 未找到或找到多个商品"
|
||
|
||
# 映射字段
|
||
# scbank detail -> internal api payload
|
||
|
||
# 收件人信息
|
||
receive_info = order_detail.get("mallOrderReceiveInfo", {})
|
||
|
||
# 构建 payload
|
||
payload = {
|
||
"orderBasic": {
|
||
"customer_order_num": order_detail.get("orderNo"),
|
||
"consignee": receive_info.get("receiverName", "未知"),
|
||
"consignee_mobile": receive_info.get("receiverMobile", "00000000000"),
|
||
|
||
# 省市区编码 (文档必填但用户说是误写,传空或默认值)
|
||
"consignee_province_code": "",
|
||
"consignee_city_code": "",
|
||
"consignee_area_code": "",
|
||
|
||
# 详细地址 (用户要求使用完整地址)
|
||
"consignee_address": receive_info.get("fullAddress", ""),
|
||
|
||
# 客户备注 - 没找到,传空
|
||
"in_remark": receive_info.get("remark", ""),
|
||
},
|
||
"goodsList": [
|
||
{
|
||
"goods_num": internal_pid,
|
||
"number": 1, # 默认为 1,需确认 scbank 数据是否有数量
|
||
"sale_price": order_detail.get("orderAmt", 0) # 暂用订单总额作为单价
|
||
}
|
||
]
|
||
}
|
||
|
||
# 修正数量与价格: 如果有 goodsInfoList,尝试获取更准确的数量
|
||
goods_info = order_detail.get("goodsInfoList", [])
|
||
if goods_info:
|
||
first_good = goods_info[0]
|
||
payload["goodsList"][0]["number"] = first_good.get("count", 1)
|
||
payload["goodsList"][0]["sale_price"] = first_good.get("price", 0)
|
||
|
||
# 查看 payload
|
||
print("=== 上传订单 Payload ===")
|
||
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
||
print("=== 上传订单 Payload End ===")
|
||
|
||
resp = self._post("/api/v1/open/order/submit", payload)
|
||
|
||
if resp and resp.get("code") == 200:
|
||
return True, "上传成功"
|
||
else:
|
||
msg = resp.get("message") if resp else "请求失败"
|
||
return False, f"API错误: {msg}"
|
||
|
||
class SCBankProcessor:
|
||
def __init__(self):
|
||
self.data_dir = "data"
|
||
self.archive_dir = "data/archive"
|
||
self.output_dir = "output"
|
||
|
||
# 确保目录结构存在
|
||
for d in [self.data_dir, self.archive_dir, self.output_dir]:
|
||
if not os.path.exists(d):
|
||
os.makedirs(d)
|
||
|
||
self.client = InternalApiClient()
|
||
|
||
def log(self, msg):
|
||
print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}")
|
||
|
||
def run(self):
|
||
# 这个方法是为了向后兼容旧的 JSONL 处理方式(如果需要的话),目前菜单已不再直接调用
|
||
files = [f for f in os.listdir(self.data_dir) if f.startswith("raw_") and f.endswith(".jsonl")]
|
||
if not files:
|
||
self.log("没有待处理的JSONL数据文件。")
|
||
return
|
||
|
||
self.log(f"发现 {len(files)} 个待处理文件: {files}")
|
||
|
||
for file_name in files:
|
||
file_path = os.path.join(self.data_dir, file_name)
|
||
self._process_file(file_path, file_name)
|
||
|
||
def run_excel(self):
|
||
"""
|
||
处理导出的 Excel 数据
|
||
"""
|
||
# 查找 data 目录下的 excel 文件
|
||
files = [f for f in os.listdir(self.data_dir) if f.endswith(".xls") or f.endswith(".xlsx")]
|
||
|
||
if not files:
|
||
self.log("没有找到待处理的 Excel 文件 (.xls / .xlsx)。请先执行步骤一!")
|
||
return
|
||
|
||
self.log(f"发现 {len(files)} 个待处理的 Excel 文件: {files}")
|
||
|
||
for file_name in files:
|
||
file_path = os.path.join(self.data_dir, file_name)
|
||
self._process_excel_file(file_path, file_name)
|
||
|
||
def _process_excel_file(self, file_path, file_name):
|
||
self.log(f"正在处理 Excel 文件: {file_name}")
|
||
|
||
try:
|
||
df = pd.read_excel(file_path)
|
||
except Exception as e:
|
||
self.log(f"读取 Excel 文件失败: {e}")
|
||
return
|
||
|
||
if df.empty:
|
||
self.log("Excel 文件内容为空,跳过")
|
||
return
|
||
|
||
unique_details = {}
|
||
for _, row in df.iterrows():
|
||
order_no = str(row.get("订单号", ""))
|
||
if not order_no or order_no == "nan":
|
||
continue
|
||
|
||
# 构造与 JSON 一致的 detail 结构
|
||
detail = {
|
||
"orderNo": order_no,
|
||
"orderCreateTime": str(row.get("下单时间", "")),
|
||
"mallOrderReceiveInfo": {
|
||
"receiverName": str(row.get("收货人名称", "")) if pd.notna(row.get("收货人名称")) else "",
|
||
"receiverMobile": str(row.get("收货人电话", "")) if pd.notna(row.get("收货人电话")) else "00000000000",
|
||
"fullAddress": str(row.get("收货地址", "")) if pd.notna(row.get("收货地址")) else "",
|
||
"remark": str(row.get("备注", "")) if pd.notna(row.get("备注")) else ""
|
||
},
|
||
"goodsInfoList": [
|
||
{
|
||
"spuName": str(row.get("产品名称", "未知商品")) if pd.notna(row.get("产品名称")) else "未知商品",
|
||
"count": int(row.get("数量", 1)) if pd.notna(row.get("数量")) else 1,
|
||
"price": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0,
|
||
}
|
||
],
|
||
"orderAmt": float(row.get("单价", 0.0)) if pd.notna(row.get("单价")) else 0.0,
|
||
"exMerchant": "成都蓝色兄弟网络科技有限公司"
|
||
}
|
||
unique_details[order_no] = detail
|
||
|
||
self.log(f"解析出 {len(unique_details)} 条唯一订单 (来自 Excel)")
|
||
self._process_unique_details(unique_details, file_name, file_path)
|
||
|
||
def _process_file(self, file_path, file_name):
|
||
self.log(f"正在处理文件: {file_name}")
|
||
|
||
# 2. 读取与清洗
|
||
raw_records = []
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
if line.strip():
|
||
try:
|
||
raw_records.append(json.loads(line))
|
||
except: pass
|
||
except Exception as e:
|
||
self.log(f"读取文件失败: {e}")
|
||
return
|
||
|
||
if not raw_records:
|
||
self.log("文件内容为空,跳过")
|
||
return
|
||
|
||
# 3. 数据结构化 (合并列表与详情)
|
||
unique_details = {}
|
||
|
||
for r in raw_records:
|
||
r_type = r.get("type")
|
||
payload = r.get("payload", {})
|
||
body = payload.get("body", {})
|
||
|
||
# 兼容 body 可能是字符串的情况
|
||
if isinstance(body, str):
|
||
try: body = json.loads(body)
|
||
except: continue
|
||
|
||
if not isinstance(body, dict): continue
|
||
|
||
if r_type == "list":
|
||
page_list = body.get("pageDataList", [])
|
||
if isinstance(page_list, list):
|
||
for item in page_list:
|
||
order_no = item.get("orderNo")
|
||
if not order_no: continue
|
||
|
||
if order_no not in unique_details:
|
||
unique_details[order_no] = item
|
||
else:
|
||
# 列表数据补全:仅当字段缺失或为空时填充
|
||
for k, v in item.items():
|
||
# 列表中的 goodsInfoList 通常为空,跳过
|
||
if k == "goodsInfoList" and not v: continue
|
||
if k not in unique_details[order_no] or not unique_details[order_no][k]:
|
||
unique_details[order_no][k] = v
|
||
|
||
elif r_type == "detail":
|
||
order_no = body.get("orderNo")
|
||
if not order_no: continue
|
||
|
||
if order_no not in unique_details:
|
||
unique_details[order_no] = body
|
||
else:
|
||
# 详情数据覆盖 (高优先级)
|
||
unique_details[order_no].update(body)
|
||
|
||
self.log(f"解析出 {len(unique_details)} 条唯一订单 (列表+详情合并)")
|
||
self._process_unique_details(unique_details, file_name, file_path)
|
||
|
||
def _process_unique_details(self, unique_details, file_name, file_path):
|
||
# 4. 业务处理与同步
|
||
results = []
|
||
for order_no, detail in unique_details.items():
|
||
# 解析商品信息
|
||
# goodsInfoList 是一个列表,这里取第一个商品作为主商品
|
||
goods_list = detail.get("goodsInfoList", [])
|
||
goods_name = "未知商品"
|
||
if goods_list and len(goods_list) > 0:
|
||
goods_name = goods_list[0].get("spuName", "未知商品")
|
||
|
||
# Step 1: 查内部 ID
|
||
internal_pid = self.client.get_internal_product_id(goods_name)
|
||
|
||
# Step 2: 上传
|
||
# 注意: 这里传入 internal_pid
|
||
success, msg = self.client.upload_order(detail, internal_pid)
|
||
|
||
# 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37
|
||
order_time = detail.get("orderCreateTime")
|
||
if order_time:
|
||
try:
|
||
# 尝试解析 ISO 8601 格式
|
||
# 注意: python 3.7+ 的 fromisoformat 处理带时区的比较麻烦,这里用 strptime
|
||
# 格式: 2026-03-10T14:42:28.000+0000
|
||
# %z 只能解析 +0000 这种无冒号的时区
|
||
dt = datetime.strptime(order_time, "%Y-%m-%dT%H:%M:%S.%f%z")
|
||
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except ValueError:
|
||
try:
|
||
# 备用: 如果是 +00:00 这种格式,或者其他微秒位数不同
|
||
dt = datetime.strptime(order_time.split('.')[0], "%Y-%m-%dT%H:%M:%S")
|
||
order_time = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except:
|
||
pass
|
||
|
||
# 收件人信息
|
||
receive_info = detail.get("mallOrderReceiveInfo", {})
|
||
|
||
results.append({
|
||
"处理状态": "成功" if success else "失败",
|
||
"失败原因": "" if success else msg,
|
||
"下单时间": order_time,
|
||
"订单编号": order_no,
|
||
"收货人": receive_info.get("receiverName"),
|
||
"收货地址": receive_info.get("fullAddress"),
|
||
"下单用户手机号码": receive_info.get("receiverMobile", "00000000000"),
|
||
"商户名称": detail.get("exMerchant"),
|
||
"订单金额": detail.get("orderAmt"),
|
||
"商品名称": goods_name,
|
||
"内部商品ID": internal_pid,
|
||
})
|
||
|
||
# 5. 导出 Excel
|
||
if results:
|
||
# 处理不同后缀和前缀的文件名,生成统一的结果文件名
|
||
base_name = os.path.splitext(file_name)[0]
|
||
if base_name.startswith("raw_"):
|
||
ts = base_name.replace("raw_", "")
|
||
else:
|
||
# 针对非 raw_ 前缀的文件(如直接下载的 shipping_order)附加时间戳
|
||
ts = f"{base_name}_{datetime.now().strftime('%H%M%S')}"
|
||
|
||
output_file = os.path.join(self.output_dir, f"result_{ts}.xlsx")
|
||
df = pd.DataFrame(results)
|
||
df.to_excel(output_file, index=False)
|
||
self.log(f"结果已导出至: {output_file}")
|
||
|
||
# 6. 归档
|
||
try:
|
||
shutil.move(file_path, os.path.join(self.archive_dir, file_name))
|
||
self.log(f"源文件已归档至: {self.archive_dir}")
|
||
except Exception as e:
|
||
self.log(f"归档失败: {e}")
|
||
|
||
if __name__ == "__main__":
|
||
processor = SCBankProcessor()
|
||
processor.run()
|