headPicker/main.py

163 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import os
import logging
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from playwright.async_api import async_playwright
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("headPickerService")
TARGET_PAGE = os.getenv("TARGET_PAGE")
TARGET_API = os.getenv("TARGET_API")
FINGERPRINT_HEADER = os.getenv("FINGERPRINT_HEADER")
playwright_instance = None
browser = None
# [新增] 全局内存缓存池,存储 JS 脚本
RESOURCE_CACHE = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
global playwright_instance, browser
playwright_instance = await async_playwright().start()
browser = await playwright_instance.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
"--disable-software-rasterizer",
]
)
yield
await browser.close()
await playwright_instance.stop()
app = FastAPI(
title="Web Header Signature Service",
description="API service for dynamic header signature generation via headless browser",
version="1.0.0",
lifespan=lifespan
)
@app.middleware("http")
async def log_request_time(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = (time.time() - start_time) * 1000
logger.info(f"{request.method} {request.url.path} - {response.status_code} - {process_time:.2f}ms")
return response
async def get_fingerprint():
page = await browser.new_page()
try:
# [新增] 拦截无用资源,极大提升加载速度
async def route_intercept(route):
request = route.request
url = request.url
resource_type = request.resource_type
# 1. 丢弃无用资源
if resource_type in ["image", "stylesheet", "media", "font", "other"]:
await route.abort()
return
# 2. 强缓存 JS 文件
if resource_type == "script":
if url in RESOURCE_CACHE:
logger.debug(f"Cache hit for script: {url}")
await route.fulfill(
status=200,
headers=RESOURCE_CACHE[url]["headers"],
body=RESOURCE_CACHE[url]["body"]
)
return
# 缓存未命中,去真实抓取
try:
logger.debug(f"Fetching script: {url}")
response = await route.fetch()
body = await response.body()
RESOURCE_CACHE[url] = {
"headers": response.headers,
"body": body
}
await route.fulfill(response=response, body=body)
return
except Exception as e:
pass # 抓取失败,降级给底层处理
await route.continue_()
await page.route("**/*", route_intercept)
fingerprint_future = asyncio.Future()
async def handle_request(request):
if TARGET_API in request.url and not fingerprint_future.done():
headers = request.headers
if FINGERPRINT_HEADER in headers:
fingerprint_future.set_result(headers[FINGERPRINT_HEADER])
page.on("request", handle_request)
# [修改] 并发执行 goto 和 wait_for。拿到结果就立刻返回不再死等 goto 结束
goto_task = asyncio.create_task(page.goto(TARGET_PAGE, wait_until="domcontentloaded"))
fingerprint = await asyncio.wait_for(fingerprint_future, timeout=15)
return fingerprint
except asyncio.TimeoutError:
logger.error("Timeout while waiting for header generation.")
return None
except Exception as e:
logger.error(f"Unexpected error during generation: {str(e)}")
return None
finally:
await page.close()
@app.get("/")
async def root():
return {"status": "ok", "message": "Web Header Signature Service is running"}
@app.get("/health")
async def health():
return {"status": "healthy"}
@app.get("/fingerprint")
async def get_fingerprint_endpoint():
try:
fingerprint = await get_fingerprint()
if fingerprint:
return {
"status": "success",
"data": {
FINGERPRINT_HEADER: fingerprint
}
}
else:
logger.error("Failed to generate signature (returned None)")
raise HTTPException(status_code=500, detail="Failed to generate signature")
except Exception as e:
logger.error(f"Service error during request: {str(e)}")
raise HTTPException(status_code=500, detail=f"Service error: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)