使用地址: https://aicnn.cn/trace
始皇的 https://api.oaipro.com/ 检测结果
某公益渠道的检测结果
服务绝对安全,不会泄漏任何端点和认证信息
完全放心,不会保存任何key,只做探测使用,这种已经涉及偷窃了,涉及刑事责任的事,不会做的
时间仓促,暂时只做了gpt-4o模型的探测,待会再完善完善,适配一下
稍微更新了一版,输出了更详细的信息,把渠道重试也加了进去
避免走失,快捷入口
附上对比图
针对始皇的建议,贴上源码,大家随意修改,一起净化市场
import logging
from fastapi import FastAPI, Request
from faker import Faker
from io import BytesIO
from PIL import Image
from fastapi.responses import StreamingResponse
from datetime import datetime, timedelta
from fastapi_utils.tasks import repeat_every
import time
import asyncio
import httpx
from fastapi.middleware.cors import CORSMiddleware
logging.basicConfig(level=logging.WARNING)
app = FastAPI()
fake = Faker()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头
)
# 创建一个map映射 名字是recorded_ips
recorded_ips = {}
@app.get("/trace/openai")
async def openai_request(url: str, key: str):
global recorded_ips
traceId = int(time.time())
current_time = datetime.now()
if traceId not in recorded_ips:
recorded_ips[traceId] = (current_time, [], [])
asyncio.create_task(send_post_request(url, key, traceId))
return traceId
async def send_post_request(url: str, key: str, traceId: str):
global recorded_ips
headers = {
'Accept': '',
'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
'Content-Type': 'application/json',
'Authorization': f'Bearer {key}'
}
image_url = f"https://api3.aicnn.cn/trace/fake-image?traceId={traceId}"
data = {
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "What is this?"}
]
}
],
"max_tokens": 3,
"stream": False
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=data)
if response.status_code != 200:
recorded_ips[traceId][2].append(f"Error: {response.text}")
else:
recorded_ips[traceId][2].append(f"完成探测")
except Exception as e:
if "error" in str(e):
recorded_ips[traceId][2].append(f"Exception: {str(e)}")
return traceId
@app.on_event("startup")
@repeat_every(seconds=60) # Run every 60 seconds
def cleanup_old_ips():
global recorded_ips
current_time = datetime.now()
for traceId in list(recorded_ips.keys()):
timestamp, _, _ = recorded_ips[traceId]
if current_time - timestamp > timedelta(minutes=3):
del recorded_ips[traceId]
@app.get("/trace/get-agent")
async def fake_image(request: Request, traceId: str):
global recorded_ips
traceId = int(traceId)
if traceId in recorded_ips:
res = recorded_ips[traceId][2]
current_time = datetime.now()
timestamp, _, _ = recorded_ips[traceId]
if current_time - timestamp > timedelta(seconds=60):
current_time = datetime.now()
time_str = current_time.strftime("%H:%M:%S")
recorded_ips[traceId][2].append(str(time_str) + " 超过60秒未收到响应,完成探测" )
return recorded_ips[traceId][2]
else:
return recorded_ips[traceId][2]
return set()
@app.get("/trace/fake-image")
async def fake_image(request: Request, traceId: str):
global recorded_ips
current_time = datetime.now()
traceId = int(traceId)
# 判断recorded_ips是否有traceId,如果没有,则新建一个set
if traceId not in recorded_ips:
recorded_ips[traceId] = (current_time, [], [])
# 生成一个假的 WebP 图片
image = Image.new('RGB', (100, 100),
color=(fake.random_int(0, 255), fake.random_int(0, 255), fake.random_int(0, 255)))
buffer = BytesIO()
image.save(buffer, format="WEBP")
buffer.seek(0)
# 获取请求的 host, 源 IP, user agent 和其他详细信息
user_agent = request.headers.get('user-agent')
if user_agent and "IPS" in user_agent:
user_agent = "Azure " + user_agent
if user_agent and "OpenAI" in user_agent:
user_agent = "OpenAI" + user_agent
if user_agent is None:
user_agent = "未知,可能来自逆向,完成探测"
x_forwarded_for = request.headers.get('x-forwarded-for')
cf_connecting_ip = request.headers.get('cf-connecting-ip')
client_host = request.client.host
headers = request.headers
# 检查并记录IP地址
new_ips = True
# if cf_connecting_ip and cf_connecting_ip in recorded_ips[traceId][1]:
# new_ips = False
# else:
# recorded_ips[traceId][1].append(cf_connecting_ip)
# if x_forwarded_for:
# for ip in x_forwarded_for.split(','):
# ip = ip.strip()
# if ip in recorded_ips[traceId][1]:
# new_ips = False
# else:
# recorded_ips[traceId][1].append(ip)
# break
if new_ips:
# 脱敏
new_x_forwarded_for = ""
if x_forwarded_for:
for ip in x_forwarded_for.split(','):
ip_parts = ip.split('.')
if len(ip_parts) == 4:
new_x_forwarded_for = new_x_forwarded_for + f"{ip_parts[0]}.***.***.{ip_parts[3]}, "
time_str = current_time.strftime("%H:%M:%S")
recorded_ips[traceId][2].append(str(time_str) + " " + user_agent + " " + str(new_x_forwarded_for))
print(
f"Time: {current_time}, TraceId: {traceId}, x_forwarded_for: {x_forwarded_for}, cf_connecting_ip: {cf_connecting_ip}, Client Host: {client_host}, User Agent: {user_agent}")
return StreamingResponse(buffer, media_type="image/webp")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8921, log_level="warning")