从api转了几手,是官转、逆向还是掺假 再次讨论,附源码

那换个地址就好了啊,而且源代码在我上一个帖子都有,屏蔽我一个,难不成大家都给屏蔽

继续点赞支持,感谢分享!

理论上代理可以不去调用 image url? 他把地址透传给openai就可以了?

one api 和new api要计费的,不能不调,并且如果没到openai 或azure,那渠道可能大概率有问题哦

1 个赞

附实测图

太强了吧 :tieba_087:

现在应该是不好惨假了,除非检测到有图片就使用官转或azure

附上最新的探测源码

import logging
import random

from fastapi import FastAPI, Request
from faker import Faker
from io import BytesIO
from PIL import Image
from fastapi.responses import StreamingResponse
from datetime import datetime, timedelta
from fastapi_utils.tasks import repeat_every
import time
import asyncio
import httpx
from fastapi.middleware.cors import CORSMiddleware
import base64
from hashlib import md5

# Set logging level to WARNING to suppress INFO messages
logging.basicConfig(level=logging.WARNING)

app = FastAPI()
fake = Faker()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有来源
    allow_credentials=True,
    allow_methods=["*"],  # 允许所有方法
    allow_headers=["*"],  # 允许所有头
)


recorded_ips = {}

def generate_image(traceId):
    seed = int(md5(str(traceId).encode()).hexdigest(), 16) % (10 ** 8)
    random.seed(seed)
    image = Image.new('RGB', (50, 50), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
    return image

def generate_base64_image(traceId):
    # 使用traceId生成一个固定种子
    seed = int(md5(str(traceId).encode()).hexdigest(), 16) % (10 ** 8)
    random.seed(seed)

    image = Image.new('RGB', (50, 50), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
    buffer = BytesIO()
    image.save(buffer, format="WEBP")
    buffer.seek(0)
    img_str = base64.b64encode(buffer.getvalue()).decode('utf-8')
    return img_str


@app.get("/trace/openai")
async def openai_request(url: str, key: str):
    global recorded_ips
    traceId = int(time.time())

    current_time = datetime.now()

    # 判断recorded_ips是否有traceId,如果没有,则新建一个set
    if traceId not in recorded_ips:
        recorded_ips[traceId] = (current_time, [], [])

    # 立即返回时间戳
    response = {"traceId": traceId, "image": generate_base64_image(traceId)}

    # 异步发送 POST 请求
    asyncio.create_task(send_post_request(url, key, traceId))

    return response


async def send_post_request(url: str, key: str, traceId: str):
    global recorded_ips
    headers = {
        'Accept': '',
        'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {key}'
    }
    # 改成你的图片地址
    image_url = f"https://example.com/trace/fake-image?traceId={traceId}"
    data = {
        "model": "gpt-4o",
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "image_url", "image_url": {"url": image_url}},
                    {"type": "text", "text": "What is this?"}
                ]
            }
        ],
        "max_tokens": 50,
        "stream": False
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(url, headers=headers, json=data)

            recorded_ips[traceId][2].append(f"渠道回复: {response.text}")
            print("渠道回复 ", response.text)
            if response.status_code != 200:
                recorded_ips[traceId][2].append(f"Error: {response.text}")
            else:
                recorded_ips[traceId][2].append(f"完成探测")
        except Exception as e:
            # 输出异常信息
            recorded_ips[traceId][2].append(f"Exception: {response.text}")

    return traceId


@app.on_event("startup")
@repeat_every(seconds=60)  # Run every 60 seconds
def cleanup_old_ips():
    global recorded_ips
    current_time = datetime.now()
    for traceId in list(recorded_ips.keys()):
        timestamp, _, _ = recorded_ips[traceId]
        if current_time - timestamp > timedelta(minutes=3):
            del recorded_ips[traceId]


@app.get("/trace/get-agent")
async def fake_image(request: Request, traceId: str):
    global recorded_ips
    traceId = int(traceId)
    # print("traceId ", traceId)
    # print("recorded_ips ", recorded_ips)
    if traceId in recorded_ips:
        # print("====== recorded_ips[traceId][2] ", recorded_ips[traceId][2])
        res = recorded_ips[traceId][2]
        current_time = datetime.now()
        timestamp, _, _ = recorded_ips[traceId]
        if current_time - timestamp > timedelta(seconds=80):
            current_time = datetime.now()
            time_str = current_time.strftime("%H:%M:%S")
            recorded_ips[traceId][2].append(str(time_str) + " 超过80秒未收到响应,未收到回调,可能来自逆向,完成探测" )
            return recorded_ips[traceId][2]
        else:
            return recorded_ips[traceId][2]
    return set()


@app.get("/trace/fake-image")
async def fake_image(request: Request, traceId: str):
    global recorded_ips
    current_time = datetime.now()
    traceId = int(traceId)
    # 判断recorded_ips是否有traceId,如果没有,则新建一个set
    if traceId not in recorded_ips:
        recorded_ips[traceId] = (current_time, [], [])

    # 生成一个假的 WebP 图片
    image = generate_image(traceId)
    buffer = BytesIO()
    image.save(buffer, format="WEBP")
    buffer.seek(0)

    # 获取请求的 host, 源 IP, user agent 和其他详细信息
    user_agent = request.headers.get('user-agent')
    if user_agent and "IPS" in user_agent:
        user_agent = "Azure " + user_agent
    if user_agent and "OpenAI" in user_agent:
        user_agent = user_agent
    if user_agent is None:
        user_agent = "未知UA"
    x_forwarded_for = request.headers.get('x-forwarded-for')
    cf_connecting_ip = request.headers.get('cf-connecting-ip')
    client_host = request.client.host
    headers = request.headers

    # 给header 脱敏,将header中所有的ip的中间部分用*代替

    # 检查并记录IP地址
    new_ips = True
    # if cf_connecting_ip and cf_connecting_ip in recorded_ips[traceId][1]:
    #     new_ips = False
    # else:
    #     recorded_ips[traceId][1].append(cf_connecting_ip)

    # if x_forwarded_for:
    #     for ip in x_forwarded_for.split(','):
    #         ip = ip.strip()
    #         if ip in recorded_ips[traceId][1]:
    #             new_ips = False
    #         else:
    #             recorded_ips[traceId][1].append(ip)
    #         break

    if new_ips:
        # x_forwarded_for脱敏,ip的中间部分用*代替
        new_x_forwarded_for = ""
        if x_forwarded_for:
            for ip in x_forwarded_for.split(','):
                ip_parts = ip.split('.')
                if len(ip_parts) == 4:
                    new_x_forwarded_for = new_x_forwarded_for +  f"{ip_parts[0]}.***.***.{ip_parts[3]}, "
                    # break

        # new_cf_connecting_ip = ""
        # if cf_connecting_ip:
        #     cf_connecting_ip_parts = cf_connecting_ip.split('.')
        #     if len(cf_connecting_ip_parts) == 4:
        #         cf_connecting_ip = f"{cf_connecting_ip_parts[0]}.***.***.{cf_connecting_ip_parts[3]}"

        cf_ipcountry = request.headers.get('cf-ipcountry')
        # x_openai_originator = request.headers.get('x-openai-originator')
        time_str = current_time.strftime("%H:%M:%S")
        if user_agent is None:
            recorded_ips[traceId][2].append(str(time_str) + " " + user_agent + "  x_forwarded_for:" + str(
                new_x_forwarded_for) + "  cf_connecting_ip:" + str(cf_connecting_ip)
                                            + "  cf_ipcountry:" + str(cf_ipcountry) + "  详细请求头信息:" + str(headers))
        else:
            recorded_ips[traceId][2].append(str(time_str) + " " + user_agent + "  x_forwarded_for:" + str(new_x_forwarded_for) + "  cf_connecting_ip:" + str(cf_connecting_ip)
                                        + "  cf_ipcountry:" + str(cf_ipcountry))
        print(
            f"Time: {current_time}, TraceId: {traceId}, x_forwarded_for: {x_forwarded_for}, cf_connecting_ip: {cf_connecting_ip}, Client Host: {client_host}, User Agent: {user_agent}")

    return StreamingResponse(buffer, media_type="image/webp")


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8921, log_level="warning")

佬太强啦

发挥佬友们的聪明才智,再更新更新

没有开启图像计费是这样的

没有图像识别结果回复呀,有可能是伪造的

我这是直接对接的openai

看我上面的图,用始皇的官转测的,可以准确回复图片的内容

不是不是,这是我自己的 我是中转端直接接的openai的key,是没有开启图像计费

是不是可以 用oai官方的ip段 判断一下,返回的ip在不在官方的ip段里
然后打个标记

建议新建一个临时的api测试,测完删除掉

IP + UA + 识图结果 基本就可以判断个大概了


没开启图像计费应该也能识图呀

这不是偷个懒吗
ip还得去官方核对,自己算ip段
不如直接代码写进来 代码核对