dobule转api(流式,token计算)

更新了一下,现在的double不能像以前一样直接用api-key请求需要动态获取token,
获取api-key方法还是访问注册可以参考这个帖子,代码是原来逆向其他的随手糊的大佬轻喷,第一次发帖多点赞水点积分 :smiling_face_with_three_hearts:
还有wrtn.ai的逆向下次水贴发 :rofl:

import json
import random
import string
import time
import asyncio
import tiktoken
import uvicorn
from fastapi import FastAPI, Request, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import StreamingResponse
import httpx
import logging

app = FastAPI()

apikey = ""  # 填入你的apikey

def num_tokens_from_string(string: str, encoding_name: str) -> int:
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

def count_token_messages(messages):
    count = 0
    for message in messages:
        content = message['content']
        count += content
    return count

# 创建 KeyManager 实例
logging.basicConfig(level=logging.INFO)

async def verify_key(authorization: str = Header(...)):
    try:
        prefix, token = authorization.split()
        if prefix.lower() != "bearer" or token != "123":
            raise HTTPException(status_code=400, detail="请填写正确的密钥")
    except ValueError:
        raise HTTPException(status_code=400, detail="请填写正确的密钥")

def transform_data(client_data):
    server_data = {
        "api_key": "null",
        "messages": [],
        "chat_model": ""
    }

    for message in client_data['messages']:
        role = message['role']
        content = message['content']

        if role == 'system':
            role = "user"

        server_message = {
            "role": role,
            "message": content,
            "codeContexts": []
        }
        server_data['messages'].append(server_message)

    model_mapping = {
        "gpt-4-turbo": "GPT4 Turbo",
        "claude-3-opus-20240229": "Claude 3 (Opus)",
        "llama-3-70B": "Llama 3 70B",
        "gpt-4-turbo-2024-04-09": "GPT4 Turbo (2024-04-09)"
    }

    requested_model = client_data.get("model", "")
    server_data["chat_model"] = model_mapping.get(requested_model, "GPT4 Turbo")

    return server_data

class DoubleBot:
    def __init__(self, api_key):
        self.api_key = api_key
        self.access_token = None
        self.token_expiration_time = 0

    async def refresh_token(self):
        url = "https://api.double.bot/api/auth/refresh"
        headers = {
            "Authorization": f"Bearer {self.api_key}"
        }
        async with httpx.AsyncClient() as client:
            try:
                response = await client.post(url, headers=headers)
                response.raise_for_status()

                data = response.json()
                self.access_token = data.get("access_token")
                self.token_expiration_time = time.time() + 10 

                if self.access_token:
                    logging.info("Access token refreshed successfully.")
                else:
                    logging.warning("Access token not found in response.")

            except httpx.HTTPError as e:
                logging.error(f"Error refreshing access token: {e}")

    async def get_access_token(self):
        if time.time() >= self.token_expiration_time:
            await self.refresh_token()
        return self.access_token

    async def initialize(self):
        await self.refresh_token()

double_bot = DoubleBot(apikey)

@app.post('/v1/chat/completions')
async def forward_request(request: Request, authorization: str = Depends(verify_key)):
    global target_url
    request_data = await request.json()
    messages = request_data.get('messages', [])
    total = ""
    for message in messages:
        total += message['content']
    prompt_tokens = num_tokens_from_string(total, "cl100k_base")
    target_url = "https://api.double.bot/api/v1/chat"
    headers = {
        "Accept": "*/*",
        "Accept-Encoding": "gzip, deflate, br",
        "Accept-Language": "zh-CN",
        "Authorization": f"Bearer {await double_bot.get_access_token()}",
        "Double-Version": "2024-03-04",
    }
    print(target_url)
    server_data = transform_data(request_data)
    should_stream = request_data.get("stream", False)
    if should_stream:
        response_data = handle_streaming_and_sending(server_data, server_data["chat_model"], headers, target_url)
        return StreamingResponse(response_data, media_type="application/json")
    else:
        response_data = await generate_non_streaming_response(server_data, server_data["chat_model"], headers,
                                                              prompt_tokens)
        return response_data

def chunks(string, chunk_size):
    return [string[i:i + chunk_size] for i in range(0, len(string), chunk_size)]

def get_random_string(length):
    letters = string.ascii_letters + string.digits
    result_str = ''.join(random.choice(letters) for _ in range(length))
    return result_str

async def generate_response(request_data, models, headers, target_url):
    retry_count = 0
    max_retries = 3
    delay = 0.05
    i = 0
    same_id = f'chatcmpl-{get_random_string(29)}'
    while retry_count < max_retries and not i == 1:
        try:
            async with httpx.AsyncClient(timeout=httpx.Timeout(10, read=250), follow_redirects=True) as client:
                async with client.stream("POST", target_url, json=request_data, headers=headers) as res:
                    yield f"data: {json.dumps({'id': same_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': models, 'system_fingerprint': 'fp_a2448989', 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': ''}, 'finish_reason': 'null'}]})}\n\n".encode(
                        'utf-8')
                    if res.status_code == 200:
                        async for line in res.aiter_lines():
                            line = line + '\n'
                            if '```' in line:
                                line = '\n' + line
                            for char_in_line_chunked in list(chunks(line, 3)):
                                result = {
                                    "id": same_id,
                                    "object": "chat.completion.chunk",
                                    "created": int(time.time()),
                                    "model": models,
                                    "system_fingerprint": "fp_a24b4d720c",
                                    "choices": [
                                        {"index": 0, "delta": {"content": char_in_line_chunked},
                                         "finish_reason": "null"}
                                    ],
                                }
                                json_result = json.dumps(result, ensure_ascii=False)
                                yield f"data: {json_result}\n\n"
                                await asyncio.sleep(delay)
                        yield f"data: {json.dumps({'id': same_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': models, 'system_fingerprint': 'fp_a5689', 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n".encode(
                            'utf-8')
                        yield "data: [DONE]\n".encode('utf-8')
                        i = 1
                    else:
                        print("重试一次")
                        logging.warning(f"Received non-200 status code {res.status_code}, retrying...")
                        retry_count += 1
        except (httpx.ReadTimeout, httpx.ConnectError) as e:
            error_message = f"Error during request: {e}"
            retry_count += 1

    return

async def generate_non_streaming_response(request_data, models, headers, prompt):
    n = request_data.get('n', 1)
    choices = []
    for i in range(n):
        retry_count = 0
        max_retries = 3
        data = ""
        global target_url, current_key
        while retry_count < max_retries:
            try:
                async with httpx.AsyncClient(timeout=httpx.Timeout(999, read=999), follow_redirects=True) as client:
                    res = await client.post(target_url, json=request_data, headers=headers)
                    if res.status_code == 200:
                        response_data = res.text
                        choice = {
                            "index": i,
                            "message": {
                                "role": "assistant",
                                "content": response_data
                            },
                            "finish_reason": "stop"
                        }
                        choices.append(choice)
                        break

                    else:
                        print("重试一次")
                        logging.warning(f"Received status code {res.status_code}: {res.text}, retrying...")
                        retry_count += 1
                    data = data + response_data
            except (httpx.ReadTimeout, httpx.ConnectError) as e:
                error_message = f"Error during request: {e}"
                retry_count += 1
        if retry_count == max_retries:
            return {"error": "Request failed after multiple retries"}
    prompt_tokens = prompt
    completion_tokens = sum(num_tokens_from_string(choice["message"]["content"], "cl100k_base") for choice in choices)
    total_tokens = prompt_tokens + completion_tokens

    response = {
        "id": "chatcmpl-92cWCMyLpYQqOiLf7tB8MAWopTLu1",
        "object": "chat.completion",
        "created": int(time.time()),
        "model": models,
        "choices": choices,
        "usage": {
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
            "total_tokens": total_tokens
        },
        "system_fingerprint": "fp_a24b4d720c"
    }

    return response

async def handle_streaming_and_sending(request_data, models, headers, target_url):
    data = ""
    async for item in generate_response(request_data, models, headers, target_url):
        try:
            data_json = json.loads(item[5:])
            if 'content' in data_json['choices'][0]['delta']:
                content = data_json['choices'][0]['delta']['content']
                data = data + content

        except json.JSONDecodeError:
            pass
        yield item  # 将每个项作为流的一部分

@app.on_event("startup")
async def startup_event():
    await double_bot.initialize()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8081)
17 个赞

前排,谢谢大佬

1 个赞

前排感谢

1 个赞

感谢,同时期待这个

1 个赞

前排感谢

1 个赞

谢谢大佬分享。

Unlimited GPT-4 / Claude 3 Opus messages (fair use)

不知道是怎么个fair。有C成功的大佬欢迎补充。

1 个赞

获取api key 的方法是什么? 链接挂了 找了半天也没有地方获取

1 个赞


点一下not working

试了一下 gpt-4 正常输出 claude-3 不行 报错502

claude-3 好像需要订阅

vscode 里 claude 回复正常

vscode在哪切换的claude我没看到

文件->首选项->设置 搜索double 就有


还是没看到

image
版本不够新? 就是在这改的

double?不是早就限制次数了吗