Update:已经把思考过程在下一次请求中剔除 抱歉
从openwebui 显示 R1 思维链 [更新硅基流动支持]继续讨论:感谢大佬提供的函数,但是发现默认折叠思考过程,所以连夜请r1改了一版 基本上能用了 使用在思考过程和输出正文内容前添加emoji和文字的方式来区分
import json
import httpx
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
DEEPSEEK_API_MODEL: str = Field(
default="deepseek-reasoner",
description="API请求的模型名称,默认为 deepseek-reasoner ",
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data: "
self.thinking = -1 # -1:未开始 0:思考中 1:已回答
self.emitter = None
def pipes(self):
return [
{
"id": self.valves.DEEPSEEK_API_MODEL,
"name": self.valves.DEEPSEEK_API_MODEL,
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道"""
self.thinking = -1
self.emitter = __event_emitter__
if not self.valves.DEEPSEEK_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
headers = {
"Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
}
try:
model_id = body["model"].split(".", 1)[-1]
# 清理对话历史中的思考前缀
processed_messages = []
for msg in body.get("messages", []):
if msg.get("role") == "assistant":
content = msg.get("content", "")
answer_prefix = "【📖📖📖答案📝】\n"
last_answer_idx = content.rfind(answer_prefix)
if last_answer_idx != -1:
# 提取答案部分
new_content = content[last_answer_idx + len(answer_prefix) :]
else:
# 清理思考前缀
thinking_prefix = "【💡💡💡深度思考中】\n"
thinking_idx = content.find(thinking_prefix)
if thinking_idx != -1:
new_content = content[thinking_idx + len(thinking_prefix) :]
else:
new_content = content
processed_msg = {**msg, "content": new_content}
processed_messages.append(processed_msg)
else:
processed_messages.append(msg)
payload = {
**body,
"model": model_id,
"messages": processed_messages, # 使用处理后的消息
}
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=300,
) as response:
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data = json.loads(line[len(self.data_prefix) :])
choice = data.get("choices", [{}])[0]
if choice.get("finish_reason"):
return
content = self._process_content(choice["delta"])
if content:
yield content
except Exception as e:
yield self._format_exception(e)
def _process_content(self, delta: dict) -> str:
"""处理内容增量,保留原始格式并添加换行"""
reasoning_content = delta.get("reasoning_content")
content = delta.get("content")
# 处理思维链内容(保留原始换行)
if reasoning_content is not None:
if self.thinking == -1:
self.thinking = 0
return f"【💡💡💡深度思考中】\n{reasoning_content}"
else:
return reasoning_content
# 处理正文内容(保留原始换行)
elif content is not None:
if self.thinking in (-1, 0):
prefix = (
"\n【📖📖📖答案📝】\n"
if self.thinking == 0
else "【📖📖📖答案📝】\n"
)
self.thinking = 1
return f"{prefix}{content}"
else:
return content
return ""
def _format_error(self, status_code: int, error: bytes) -> str:
try:
err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
:200
]
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)