deel model让你薅的R1物尽其用,让所有模型都可以思考

众所周知R1的思考是挺好用的但是思考后输出的内容就比较一般所以
用R1思考后丢给普通模型让普通模型结合上下文和思考内容生成回答,这样子也许可以让你的模型变得更强大,例如搭配gemini-2-pro我测了一下效果挺不错的
下面是我网站测试的4omini

代码如下

import requests
import json
import re
from typing import List, Dict

class ChatFlow:
    def __init__(self):
        self.base_url = "url端点"
        self.api_key = "密钥"
        self.headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}"
        }

    def process_thinking_stream(self, messages: List[Dict]) -> str:
        
        messages_with_system = [
            {"role": "system", "content": "你只要输出思考过程就行了不需要输出最后的答案"}
        ] + messages
        
        payload = {
            "model": "DeepSeek-R1",
            "messages": messages_with_system,
            "stream": True
        }
        
        buffer = ""
        is_in_think_tag = False
        complete_think_tag = ""
        
        try:
            response = requests.post(
                self.base_url,
                headers=self.headers,
                json=payload,
                stream=True
            )
            
            for line in response.iter_lines():
                if line:
                    if line.startswith(b'data: '):
                        json_str = line[6:].decode('utf-8')
                        if json_str != '[DONE]':
                            try:
                                json_data = json.loads(json_str)
                                content = json_data['choices'][0]['delta'].get('content', '')
                                buffer += content
                                
                                # 检查是否进入think标签
                                if '<think>' in buffer and not is_in_think_tag:
                                    is_in_think_tag = True
                                    start = buffer.find('<think>')
                                    print('<think>', end='', flush=True)
                                    # 打印<think>后的剩余内容
                                    remaining = buffer[start + 7:]
                                    if remaining:
                                        print(remaining, end='', flush=True)
                                    buffer = remaining
                                    continue
                                
                                # 如果在think标签内,流式输出内容
                                if is_in_think_tag:
                                    if '</think>' in content:
                                        end = content.find('</think>')
                                        if end > 0:
                                            print(content[:end], end='', flush=True)
                                        print('</think>', end='', flush=True)
                                        complete_think_tag = '<think>' + buffer + content[:end] + '</think>'
                                        return complete_think_tag
                                    else:
                                        print(content, end='', flush=True)
                                        buffer += content
                                    
                            except json.JSONDecodeError:
                                continue
                                
        except Exception as e:
            print(f"Error in thinking model: {str(e)}")
            
        return complete_think_tag

    def process_generation_stream(self, messages: List[Dict]):
        payload = {
            "model": "gpt-4o-mini",
            "messages": messages,
            "stream": True
        }
        
        try:
            response = requests.post(
                self.base_url,
                headers=self.headers,
                json=payload,
                stream=True
            )
            
            for line in response.iter_lines():
                if line:
                    if line.startswith(b'data: '):
                        json_str = line[6:].decode('utf-8')
                        if json_str != '[DONE]':
                            try:
                                json_data = json.loads(json_str)
                                content = json_data['choices'][0]['delta'].get('content', '')
                                if content:
                                    print(content, end='', flush=True)
                            except json.JSONDecodeError:
                                continue
                                
        except Exception as e:
            print(f"Error in generation model: {str(e)}")

    def chat(self, messages: List[Dict]):
        
        think_content = self.process_thinking_stream(messages)
        print()
        
        if think_content:
           
            gen_messages = messages + [
                {"role": "assistant", "content": f"思考过程: {think_content}"},
                {"role": "user", "content": "请基于以上思考过程,给出最终答案。"}
            ]
        else:
            gen_messages = messages
            
        
        self.process_generation_stream(gen_messages)

def main():
    chat_history = [
        {
            "role": "user",
            "content": "解释一下量子计算的基本原理"  
        }
    ]
    
    chatflow = ChatFlow()
    chatflow.chat(chat_history)

if __name__ == "__main__":
    main()

逻辑大概是这样子的
改一改就可以实现api

16 个赞

早!这个idea有点意思!

PS:翔佬不会做梦写code然后醒来第一件事是水贴吧?

1 个赞

请问,怎么使用这段代码

1 个赞

感谢佬友

哈哈,刚刚修完bug准备睡,水一贴 :smiley:

这个只是个test代码要用的话你可以改改或者等我睡起来改改打包docker等叫你

有点意思

既然都调用 R1 了,我觉得可以把原生 R1 的回答和嫁接模型的回答都列出来,做个对比 :tieba_087:

佬现在国内的厂商好像都不输出<think></think>了,

有意思,真有意思 :tieba_013:

有点东西


昨天我一直在用的程序就更新这个了 :partying_face:

太强了,大佬!

可以加提示词

不错不错

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。