#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio from typing import Optional from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from playwright.async_api import async_playwright TARGET_PAGE = "https://gd.10086.cn/gdshop/qdxsd/index.html#/gd-fls/marketingActivity/index?id=1956241557401346048" TARGET_API = "https://gd.10086.cn/gdshop/apigw/adv/ad/getInsertCode" FINGERPRINT_HEADER = "x-device-fingerprint" playwright_instance = None browser = None # [新增] 全局内存缓存池,存储 JS 脚本 RESOURCE_CACHE = {} @asynccontextmanager async def lifespan(app: FastAPI): global playwright_instance, browser playwright_instance = await async_playwright().start() browser = await playwright_instance.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", "--disable-software-rasterizer", ] ) yield await browser.close() await playwright_instance.stop() app = FastAPI( title="广东移动指纹获取服务", description="获取 x-device-fingerprint 参数的 API 服务", version="1.0.0", lifespan=lifespan ) async def get_fingerprint(): page = await browser.new_page() try: # [新增] 拦截无用资源,极大提升加载速度 async def route_intercept(route): request = route.request url = request.url resource_type = request.resource_type # 1. 丢弃无用资源 if resource_type in ["image", "stylesheet", "media", "font", "other"]: await route.abort() return # 2. 强缓存 JS 文件 if resource_type == "script": if url in RESOURCE_CACHE: await route.fulfill( status=200, headers=RESOURCE_CACHE[url]["headers"], body=RESOURCE_CACHE[url]["body"] ) return # 缓存未命中,去真实抓取 try: response = await route.fetch() body = await response.body() RESOURCE_CACHE[url] = { "headers": response.headers, "body": body } await route.fulfill(response=response, body=body) return except Exception: pass # 抓取失败,降级给底层处理 await route.continue_() await page.route("**/*", route_intercept) fingerprint_future = asyncio.Future() async def handle_request(request): if TARGET_API in request.url and not fingerprint_future.done(): headers = request.headers if FINGERPRINT_HEADER in headers: fingerprint_future.set_result(headers[FINGERPRINT_HEADER]) page.on("request", handle_request) # [修改] 并发执行 goto 和 wait_for。拿到结果就立刻返回,不再死等 goto 结束 goto_task = asyncio.create_task(page.goto(TARGET_PAGE, wait_until="domcontentloaded")) fingerprint = await asyncio.wait_for(fingerprint_future, timeout=15) return fingerprint except asyncio.TimeoutError: return None finally: await page.close() @app.get("/") async def root(): return {"status": "ok", "message": "广东移动指纹获取服务"} @app.get("/health") async def health(): return {"status": "healthy"} @app.get("/fingerprint") async def get_fingerprint_endpoint(): try: fingerprint = await get_fingerprint() if fingerprint: return { "status": "success", "data": { FINGERPRINT_HEADER: fingerprint } } else: raise HTTPException(status_code=500, detail="获取指纹失败") except Exception as e: raise HTTPException(status_code=500, detail=f"服务错误: {str(e)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)