167 lines
5.6 KiB
Python
167 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
import asyncio
|
||
import os
|
||
import logging
|
||
from typing import Optional
|
||
from contextlib import asynccontextmanager
|
||
from fastapi import FastAPI, HTTPException
|
||
from playwright.async_api import async_playwright
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger("fingerprint_service")
|
||
|
||
TARGET_PAGE = os.getenv("TARGET_PAGE")
|
||
TARGET_API = os.getenv("TARGET_API")
|
||
FINGERPRINT_HEADER = os.getenv("FINGERPRINT_HEADER")
|
||
|
||
playwright_instance = None
|
||
browser = None
|
||
|
||
# [新增] 全局内存缓存池,存储 JS 脚本
|
||
RESOURCE_CACHE = {}
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
global playwright_instance, browser
|
||
logger.info("Starting Playwright instance...")
|
||
playwright_instance = await async_playwright().start()
|
||
browser = await playwright_instance.chromium.launch(
|
||
headless=True,
|
||
args=[
|
||
"--no-sandbox",
|
||
"--disable-setuid-sandbox",
|
||
"--disable-dev-shm-usage",
|
||
"--disable-software-rasterizer",
|
||
]
|
||
)
|
||
logger.info("Playwright browser launched successfully.")
|
||
yield
|
||
logger.info("Closing Playwright browser...")
|
||
await browser.close()
|
||
await playwright_instance.stop()
|
||
logger.info("Playwright instance stopped.")
|
||
|
||
|
||
app = FastAPI(
|
||
title="Web Header Signature Service",
|
||
description="API service for dynamic header signature generation via headless browser",
|
||
version="1.0.0",
|
||
lifespan=lifespan
|
||
)
|
||
|
||
|
||
async def get_fingerprint():
|
||
logger.info("Opening new browser page...")
|
||
page = await browser.new_page()
|
||
try:
|
||
# [新增] 拦截无用资源,极大提升加载速度
|
||
async def route_intercept(route):
|
||
request = route.request
|
||
url = request.url
|
||
resource_type = request.resource_type
|
||
|
||
# 1. 丢弃无用资源
|
||
if resource_type in ["image", "stylesheet", "media", "font", "other"]:
|
||
await route.abort()
|
||
return
|
||
|
||
# 2. 强缓存 JS 文件
|
||
if resource_type == "script":
|
||
if url in RESOURCE_CACHE:
|
||
logger.debug(f"Cache hit for script: {url}")
|
||
await route.fulfill(
|
||
status=200,
|
||
headers=RESOURCE_CACHE[url]["headers"],
|
||
body=RESOURCE_CACHE[url]["body"]
|
||
)
|
||
return
|
||
|
||
# 缓存未命中,去真实抓取
|
||
try:
|
||
logger.debug(f"Fetching script: {url}")
|
||
response = await route.fetch()
|
||
body = await response.body()
|
||
RESOURCE_CACHE[url] = {
|
||
"headers": response.headers,
|
||
"body": body
|
||
}
|
||
await route.fulfill(response=response, body=body)
|
||
return
|
||
except Exception as e:
|
||
logger.warning(f"Failed to fetch script {url}: {e}")
|
||
pass # 抓取失败,降级给底层处理
|
||
|
||
await route.continue_()
|
||
|
||
await page.route("**/*", route_intercept)
|
||
|
||
fingerprint_future = asyncio.Future()
|
||
|
||
async def handle_request(request):
|
||
if TARGET_API in request.url and not fingerprint_future.done():
|
||
headers = request.headers
|
||
if FINGERPRINT_HEADER in headers:
|
||
logger.info("Successfully intercepted target API request with required header.")
|
||
fingerprint_future.set_result(headers[FINGERPRINT_HEADER])
|
||
|
||
page.on("request", handle_request)
|
||
|
||
# [修改] 并发执行 goto 和 wait_for。拿到结果就立刻返回,不再死等 goto 结束
|
||
logger.info(f"Navigating to target page: {TARGET_PAGE}")
|
||
goto_task = asyncio.create_task(page.goto(TARGET_PAGE, wait_until="domcontentloaded"))
|
||
|
||
logger.info("Waiting for header generation...")
|
||
fingerprint = await asyncio.wait_for(fingerprint_future, timeout=15)
|
||
logger.info("Header successfully generated and retrieved.")
|
||
return fingerprint
|
||
except asyncio.TimeoutError:
|
||
logger.error("Timeout while waiting for header generation.")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Unexpected error during generation: {str(e)}")
|
||
return None
|
||
finally:
|
||
logger.info("Closing browser page...")
|
||
await page.close()
|
||
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
return {"status": "ok", "message": "Web Header Signature Service is running"}
|
||
|
||
|
||
@app.get("/health")
|
||
async def health():
|
||
return {"status": "healthy"}
|
||
|
||
|
||
@app.get("/fingerprint")
|
||
async def get_fingerprint_endpoint():
|
||
logger.info("Received request for new header signature")
|
||
try:
|
||
fingerprint = await get_fingerprint()
|
||
if fingerprint:
|
||
logger.info("Successfully handled signature request")
|
||
return {
|
||
"status": "success",
|
||
"data": {
|
||
FINGERPRINT_HEADER: fingerprint
|
||
}
|
||
}
|
||
else:
|
||
logger.error("Failed to generate signature (returned None)")
|
||
raise HTTPException(status_code=500, detail="Failed to generate signature")
|
||
except Exception as e:
|
||
logger.error(f"Service error during request: {str(e)}")
|
||
raise HTTPException(status_code=500, detail=f"Service error: {str(e)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|