python 逆向kimi 转oai 流式输出

效果:

接口

curl --request POST \
  --url https://kimi.zhc.ink/v1/chat/completions \
  --header 'Authorization: Bearer xxxxxxx' \
  --header 'content-type: application/json' \
  --data '{
    "model": "kimi",
    "messages": [
        {
            "role": "user",
            "content": "一个孕妇打人算是群殴吗?"
        }
    ],
    "use_search": true,
    "stream": true
}'

Authorization获取教程

打开 kimi

  1. 如图所示:
  2. 随便发一句话,查看一个接口即可获得
13 个赞

跪求大佬帮忙,!!!

你应该是需要流式响应吧,可以了解一下yield和flask流式响应

2 个赞

是这样的,但是我尝试了一下,但是不行。他说falsk 不行。

然后我换其他的框架 也不太行。。哎

可以迭代器实现伪流式,想要实现真流式还得看另两个python的web框架,只不过都没有原生的只有加软件包

例如?什么样的?

make一下

写了个demo,你试试。

# server.py
from flask import Response
import time
from flask import Flask
app = Flask(__name__)
@app.route('/text')
def generate_num():
    def generate():
        for i in range(1000):
            time.sleep(0.1)
            yield f"data: {{'num': {i}, 'event': 'num'}}\n"
    return Response(generate())

if __name__ == '__main__':
    app.run(host='127.0.0.1',port=5000,debug=True)
# client.py
import requests
url='http://127.0.0.1:5000/text'
r = requests.get(url, stream=True)
for line in r.iter_lines():
    if line:
        print(line.decode('utf-8'))

尝试了 不行

因为是需要进行转换的。所以说。比较麻烦。

这个牛皮!

不明觉厉,火钳留名

万丈高楼平地起,大佬加油:muscle:

from flask import Flask, Response

...
...

    res = "1+2=5\n3+9=10"
    
    
    def generate(res):
        for a in res:
            x2x_json = {"choices":[{"delta":{"content":a}}]}
            x3x_str = json.dumps(x2x_json)
            x4x_str = f"data: {x3x_str}\n\n"
            x5x_byte = x4x_str.encode('utf-8')
            yield x5x_byte

    # flask 的stream写法。   
    return Response(generate(res),content_type="text/event-stream") 

最新的 openai 函数调用, 返回的函数的调用部分的字段 不是这样组装的。 要重新 分析函数调用返回格式,然后重写。

我尝试了一下 确实不行 还是 falsk 有限制的。
看看我写的内容


    async def create_trans_stream(self, model: str, conv_id: str, response, end_callback=None):
        # 创建新的响应对象,设置为 SSE 格式
        new_response = web.StreamResponse()
        new_response.headers['Content-Type'] = 'text/event-stream'
        new_response.headers['Cache-Control'] = 'no-cache'
        new_response.headers['Connection'] = 'keep-alive'
        await new_response.prepare(response)

        # 消息创建时间
        created = datetime.now().timestamp()

        search_flag = False
        silent_search = 'silent_search' in model

        # 初始化数据发送
        await new_response.write(f"data: {json.dumps({'id': conv_id, 'model': model, 'object': 'chat.completion.chunk', 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': ''}, 'finish_reason': None}], 'created': created})}\n\n".encode('utf-8'))

        async for data, end_of_http_chunk in response.content.iter_chunks():
            text = data.decode('utf-8')  # 解码接收到的数据块
            # 这里简化了SSE消息处理逻辑,你可以根据实际需求调整
            try:
                # 检查是否为有效的SSE消息,然后提取数据
                if text.startswith("data:"):
                    # 处理SSE数据
                    event_data = text.lstrip("data:").strip()
                    event = json.loads(event_data)

                    # 根据事件类型更新内容
                    content = ''
                    if event.get('event') == 'cmpl':
                        except_char_index = event['text'].find("�")
                        chunk = event['text'][:except_char_index if except_char_index != -
                                              1 else len(event['text'])]
                        content = ('\n' if search_flag else '') + chunk
                        if search_flag:
                            search_flag = False
                    elif event.get('event') in ['all_done', 'error']:
                        content = '\n[内容由于不合规被停止生成,我们换个话题吧]' if event.get(
                            'event') == 'error' else ''
                    elif not silent_search and event.get('event') == 'search_plus' and event.get('msg') and event['msg'].get('type') == 'get_res':
                        if not search_flag:
                            search_flag = True
                        content = f"检索 {event['msg']['title']}({event['msg']['url']}) ...\n"

                    await new_response.write(f"data: {json.dumps({'id': conv_id, 'model': model, 'object': 'chat.completion.chunk', 'choices': [{'index': 0, 'delta': {'content': content}, 'finish_reason': None}], 'created': created})}\n\n".encode('utf-8'))
            except Exception as e:
                print(f"Error processing SSE data: {e}")
                break

        # 结束响应
        if end_callback:
            await end_callback()
        await new_response.write_eof()
        return new_response

mark

加油

Flask什么时候开始支持asyncio了?你需要 FastAPISanic