openwebui 显示 R1 思维链 [更新硅基流动支持][支持火山引擎][支持腾讯云]

哎 就不能完全对齐openai么 这样搞得好麻烦

1 个赞

大佬,太强啦!受益良多:joy:

太强了 佬

佬,反馈一个BUG,以下这段代码给_format_error传入的是str,而_format_error的error设定为byte变量会进行error.decode操作(但str没有decode函数),如果内容解析失败进行错误格式化时会再次报错

# 格式化错误信息,这里传入错误类型和详细原因(包括出错内容和异常信息)
error_detail = f"解析失败 - 内容:{json_str},原因:{e}"
yield self._format_error("JSONDecodeError", error_detail)
return

将_format_error函数修改为下面这样就可以避免发生这个问题了:

    def _format_error(self, status_code: int, error: bytes) -> str:
        """错误格式化保持不变"""
        try:
            # 检查error类型是否为bytes
            if isinstance(error, bytes):
                err_msg = json.loads(error.decode(errors="ignore")).get(
                    "message", error.decode(errors="ignore")
                )[:200]
            else:
                err_msg = json.loads(error).get("message", error)[:200]
        except:
            # 如果错误解析失败,直接截取前200个字符
            err_msg = (
                error.decode(errors="ignore")[:200]
                if isinstance(error, bytes)
                else error[:200]
            )
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )
1 个赞

感谢佬,已经修改

1 个赞

openwebui必须得自己实现思维链的展示吗

如果api返回<think></think>标签则不用单独实现

现在貌似都转reasoning_content了
这个openwebui还显示不了吗

感谢感谢!!实测已经没问题了!大佬很强!

1 个赞

破案了,log里报错,是生成搜索关键字的步骤里有问题

completions:1 Failed to load resource: the server responded with a status of 404 (Not Found)

可能是id的问题,然后我去设置>界面>设置任务模型>外部模型 。
设置一个具体的模型(比如v3),而不是当前模型,联网搜索就行了

2 个赞

用deepseek的官方api,只有重启openwebui后第一次可以正常询问ai,后面就没有反应了

用了隔壁佬的公益 api 会出现一个小尾巴 麻烦佬修复一下

隔壁的公益 api
https://linux.do/t/topic/416597/120

更新 1.2.9 后也有小尾巴

正常完成后应该不会有这个结束标记,我现在手边没有电脑,一会我看下是怎么回事

1 个赞
"""
title: DeepSeek R1
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本
version: 1.2.9
licence: MIT
"""

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import asyncio


class Pipe:
    class Valves(BaseModel):
        DEEPSEEK_API_BASE_URL: str = Field(
            default="https://aifree4.fly.dev/v1",
            description="DeepSeek API的基础请求地址",
        )
        DEEPSEEK_API_KEY: str = Field(
            default="sk-SqkvYZROzSd0dLetAEZEzKwwQ9X4hGizrrWA8qJ6JddPF81e",
            description="用于身份验证的DeepSeek API密钥,可从控制台获取",
        )
        DEEPSEEK_API_MODEL: str = Field(
            default="deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
            description="API请求的模型名称,默认为 deepseek-reasoner ",
        )

    def __init__(self):
        self.valves = self.Valves()
        self.data_prefix = "data:"
        self.emitter = None

    def pipes(self):
        return [
            {
                "id": self.valves.DEEPSEEK_API_MODEL,
                "name": self.valves.DEEPSEEK_API_MODEL,
            }
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """主处理管道(已移除缓冲)"""
        thinking_state = {"thinking": -1}  # 使用字典来存储thinking状态
        self.emitter = __event_emitter__

        # 验证配置
        if not self.valves.DEEPSEEK_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 准备请求参数
        headers = {
            "Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 模型ID提取
            model_id = body["model"].split(".", 1)[-1]
            payload = {**body, "model": model_id}

            # 处理消息以防止连续的相同角色
            messages = payload["messages"]
            i = 0
            while i < len(messages) - 1:
                if messages[i]["role"] == messages[i + 1]["role"]:
                    # 插入具有替代角色的占位符消息
                    alternate_role = (
                        "assistant" if messages[i]["role"] == "user" else "user"
                    )
                    messages.insert(
                        i + 1,
                        {"role": alternate_role, "content": "[Unfinished thinking]"},
                    )
                i += 1

            # yield json.dumps(payload, ensure_ascii=False)

            # 发起API请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=300,
                ) as response:
                    # 错误处理
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue

                        # 截取 JSON 字符串
                        json_str = line[len(self.data_prefix) :]

                        try:
                            data = json.loads(json_str)
                        except json.JSONDecodeError as e:
                            # 格式化错误信息,这里传入错误类型和详细原因(包括出错内容和异常信息)
                            error_detail = f"解析失败 - 内容:{json_str},原因:{e}"
                            yield self._format_error("JSONDecodeError", error_detail)
                            return

                        choice = data.get("choices", [{}])[0]

                        # 结束条件判断
                        if choice.get("finish_reason"):
                            return

                        # 状态机处理
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {}), thinking_state
                        )
                        if state_output:
                            yield state_output  # 直接发送状态标记
                            if state_output == "<think>":
                                yield "\n"

                        # 内容处理并立即发送
                        content = self._process_content(choice["delta"])
                        if content:
                            if content.startswith("<think>"):
                                match = re.match(r"^<think>", content)
                                if match:
                                    content = re.sub(r"^<think>", "", content)
                                    yield "<think>"
                                    await asyncio.sleep(0.1)
                                    yield "\n"

                            elif content.startswith("</think>"):
                                match = re.match(r"^</think>", content)
                                if match:
                                    content = re.sub(r"^</think>", "", content)
                                    yield "</think>"
                                    await asyncio.sleep(0.1)
                                    yield "\n"
                            yield content

        except Exception as e:
            yield self._format_exception(e)

    async def _update_thinking_state(self, delta: dict, thinking_state: dict) -> str:
        """更新思考状态机(简化版)"""
        state_output = ""

        # 状态转换:未开始 -> 思考中
        if thinking_state["thinking"] == -1 and delta.get("reasoning_content"):
            thinking_state["thinking"] = 0
            state_output = "<think>"

        # 状态转换:思考中 -> 已回答
        elif (
            thinking_state["thinking"] == 0
            and not delta.get("reasoning_content")
            and delta.get("content")
        ):
            thinking_state["thinking"] = 1
            state_output = "\n</think>\n\n"

        return state_output

    def _process_content(self, delta: dict) -> str:
        """直接返回处理后的内容"""
        return delta.get("reasoning_content", "") or delta.get("content", "")

    def _format_error(self, status_code: int, error: bytes) -> str:
        # 如果 error 已经是字符串,则无需 decode
        if isinstance(error, str):
            error_str = error
        else:
            error_str = error.decode(errors="ignore")

        try:
            err_msg = json.loads(error_str).get("message", error_str)[:200]
        except Exception as e:
            err_msg = error_str[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """异常格式化保持不变"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)

我把我的函数完整的贴出给佬分析一下 反正 api 都是公开的公益~

他这个api返回的不是标准格式 最后一次流式返回多了一个[DONE] 我稍微改下就好了

1 个赞

v1.2.10 已修复

1 个赞

1.2.10没有小尾巴了 完美 多谢佬!

佬,腾讯云显示这个错误,是咋回事?我在设置里填的参数不是在函数内填写的
函数版本:v1.2.10 webui版本:0.5.10

看看用的模型是不是函数生成出来的

火山引擎的怎么直接加啊?还是得自己chat api一下啊?