功能开发基本完成

This commit is contained in:
fuzhongyun 2026-03-11 19:53:20 +08:00
parent 36f30b85cc
commit 9a3c16d155
1 changed files with 150 additions and 44 deletions

View File

@ -4,39 +4,132 @@ import os
import shutil
import random
import pandas as pd
import requests
from datetime import datetime
class InternalApiClient:
"""
内部接口客户端桩代码 (Mock)
内部接口客户端 (Real)
"""
def __init__(self):
self.api_url = "http://internal-system.local/api"
# 生产环境
# self.api_base_url = "https://hyt.86698.cn/open"
# 测试环境 (默认)
self.api_base_url = "http://120.55.12.245:8109"
# 占位符 AppId实际需替换
# self.app_id = "8ce4d435fa77492f84d2fafa241c2804"
self.app_id = "e699e6ef74504f4d86776b3d244ce602"
def _post(self, path, data):
"""发送 POST 请求"""
url = f"{self.api_base_url}{path}"
headers = {
"AppId": self.app_id,
"Content-Type": "application/json;charset=UTF-8"
}
try:
# 不验签模式: 直接发送 data json
# 真实场景需确认: 这里的 data 是作为 json body 发送,还是作为 params?
# 根据文档 '非加密请求: 按业务正常的请求参数进行传输' -> 通常是 json body
response = requests.post(url, json=data, headers=headers, timeout=10)
return response.json()
except Exception as e:
print(f"[API Error] {url}: {e}")
return None
def get_internal_product_id(self, scbank_goods_name):
"""
Mock: 根据商城商品名查询内部商品编号
根据商城商品名查询内部商品编号 (Product Code)
接口: /api/v1/open/cus/goods/list
逻辑: 模糊搜索 -> 必须有且仅有1个结果 -> 返回 goods_num
"""
# 模拟延时
time.sleep(0.1)
if not scbank_goods_name: return None
# 简单模拟
if "iPhone" in scbank_goods_name: return "P_APPLE_001"
if "华为" in scbank_goods_name: return "P_HUAWEI_002"
return "P_UNKNOWN_999"
payload = {
"title": scbank_goods_name,
"page": 1,
"limit": 10
}
resp = self._post("/api/v1/open/cus/goods/list", payload)
# 检查响应
if resp and resp.get("code") == 200:
data = resp.get("data", {})
# 兼容 data 可能是 list 或 dict (根据文档响应报文 data 包含 list 和 total)
# 文档: data -> {list: [], total: int}
product_list = data.get("list", [])
total = data.get("total", 0)
if product_list and len(product_list) == 1:
return product_list[0].get("goods_num")
else:
print(f"[Match Fail] '{scbank_goods_name}' 匹配到 {len(product_list)} 个商品")
return None
return None
def upload_order(self, order_detail):
def upload_order(self, order_detail, internal_pid):
"""
Mock: 上传订单
返回: (Success: bool, Message: str)
上传订单
接口: /api/v1/open/order/submit
"""
time.sleep(0.2)
# 模拟随机成功率 90%
if random.random() < 0.9:
if not internal_pid:
return False, "商品匹配失败: 未找到或找到多个商品"
# 映射字段
# scbank detail -> internal api payload
# 收件人信息
receive_info = order_detail.get("mallOrderReceiveInfo", {})
# 构建 payload
payload = {
"orderBasic": {
"customer_order_num": order_detail.get("orderNo"),
"consignee": receive_info.get("receiverName", "未知"),
"consignee_mobile": receive_info.get("receiverMobile", "00000000000"),
# 省市区编码 (文档必填但用户说是误写,传空或默认值)
"consignee_province_code": "",
"consignee_city_code": "",
"consignee_area_code": "",
# 详细地址 (用户要求使用完整地址)
"consignee_address": receive_info.get("fullAddress", ""),
# 客户备注 - 没找到,传空
"in_remark": receive_info.get("remark", ""),
},
"goodsList": [
{
"goods_num": internal_pid,
"number": 1, # 默认为 1需确认 scbank 数据是否有数量
"sale_price": order_detail.get("orderAmt", 0) # 暂用订单总额作为单价
}
]
}
# 修正数量与价格: 如果有 goodsInfoList尝试获取更准确的数量
goods_info = order_detail.get("goodsInfoList", [])
if goods_info:
first_good = goods_info[0]
payload["goodsList"][0]["number"] = first_good.get("count", 1)
payload["goodsList"][0]["sale_price"] = first_good.get("price", 0)
# 查看 payload
print("=== 上传订单 Payload ===")
print(json.dumps(payload, ensure_ascii=False, indent=2))
print("=== 上传订单 Payload End ===")
resp = self._post("/api/v1/open/order/submit", payload)
if resp and resp.get("code") == 200:
return True, "上传成功"
else:
return False, "模拟网络超时"
msg = resp.get("message") if resp else "请求失败"
return False, f"API错误: {msg}"
class SCBankProcessor:
def __init__(self):
@ -81,37 +174,49 @@ class SCBankProcessor:
self.log("文件内容为空,跳过")
return
# 3. 数据结构化
# 提取详情数据 (以此为主)
details = []
for r in raw_records:
if r.get("type") == "detail":
payload = r.get("payload", {})
# 根据真实数据结构: payload -> body -> orderNo
body = payload.get("body", {})
if body and "orderNo" in str(body):
# 如果 body 是字符串,尝试解析
if isinstance(body, str):
try: body = json.loads(body)
except: pass
details.append(body)
# 去重 (按 orderNo)
# 注意:这里假设 payload 是 dict如果已经是 dict 则直接用
# 如果是字符串则需要 json.loads
# 3. 数据结构化 (合并列表与详情)
unique_details = {}
for d in details:
# 兼容 payload 可能是字符串的情况 (虽然 Collector 存的是 dict)
if isinstance(d, str):
try: d = json.loads(d)
for r in raw_records:
r_type = r.get("type")
payload = r.get("payload", {})
body = payload.get("body", {})
# 兼容 body 可能是字符串的情况
if isinstance(body, str):
try: body = json.loads(body)
except: continue
order_no = d.get("orderNo") or d.get("orderId")
if order_no:
unique_details[order_no] = d
if not isinstance(body, dict): continue
self.log(f"解析出 {len(unique_details)} 条有效唯一订单详情")
if r_type == "list":
page_list = body.get("pageDataList", [])
if isinstance(page_list, list):
for item in page_list:
order_no = item.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = item
else:
# 列表数据补全:仅当字段缺失或为空时填充
for k, v in item.items():
# 列表中的 goodsInfoList 通常为空,跳过
if k == "goodsInfoList" and not v: continue
if k not in unique_details[order_no] or not unique_details[order_no][k]:
unique_details[order_no][k] = v
elif r_type == "detail":
order_no = body.get("orderNo")
if not order_no: continue
if order_no not in unique_details:
unique_details[order_no] = body
else:
# 详情数据覆盖 (高优先级)
unique_details[order_no].update(body)
self.log(f"解析出 {len(unique_details)} 条唯一订单 (列表+详情合并)")
# 4. 业务处理与同步
results = []
@ -127,7 +232,8 @@ class SCBankProcessor:
internal_pid = self.client.get_internal_product_id(goods_name)
# Step 2: 上传
success, msg = self.client.upload_order(detail)
# 注意: 这里传入 internal_pid
success, msg = self.client.upload_order(detail, internal_pid)
# 时间格式化: 2026-03-10T00:50:37.000+0000 -> 2026-03-10 00:50:37
order_time = detail.get("orderCreateTime")