分享一下我在用的open-webui函数

看到有佬友在问这个问题,就再单独开一贴分享一下

顺带抛砖引玉,看看各位的

统计token,输出速度和耗费时间

"""
title: Chat Metrics
author: constLiakos
funding_url: https://github.com/open-webui
version: 0.0.4
license: MIT
changelog:
- 0.0.1 - Initial upload to openwebui community.
- 0.0.2 - format, remove unnecessary code
- 0.0.3 - add advanced stats: tokens & elapsed time
- 0.0.4 - each metric has its enable switch Valve, experimental metrics are disabled by default
"""

from pydantic import BaseModel, Field
from typing import Optional, Callable, Any, Awaitable
import tiktoken
import os
import time
from open_webui.utils.misc import get_last_assistant_message


def num_tokens_from_string(user_message: str, model_name: str) -> int:
    encoding = tiktoken.encoding_for_model(model_name)
    print(encoding)
    num_tokens = len(encoding.encode(user_message))
    return num_tokens


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=5, description="Priority level for the filter operations."
        )
        elapsed_time: bool = Field(
            default=True,
            description="Enable for advanced stats",
        )
        tokens_no: bool = Field(
            default=False,
            description="Display total Tokens (Experimantal, NOT Accurate)",
        )
        tokens_per_sec: bool = Field(
            default=False,
            description="Display Tokens per Second (Experimantal, NOT Accurate)",
        )
        pass

    def __init__(self):
        self.valves = self.Valves()
        self.start_time = None
        pass

    def inlet(
        self,
        body: dict,
    ):
        self.start_time = time.time()
        return body

    async def outlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __model__: Optional[dict] = None,
    ) -> dict:
        end_time = time.time()
        elapsed_time = end_time - self.start_time
        elapsed_time_str = f"Elapsed time: {elapsed_time:.2f} seconds"
        response_message = get_last_assistant_message(body["messages"])
        # model = __model__["id"]
        model = "gpt-4o"
        tokens = num_tokens_from_string(response_message, model)
        tokens_per_sec = tokens / elapsed_time
        stats_array = []

        if self.valves.tokens_per_sec:
            stats_array.append(f"{tokens_per_sec:.2f} T/s")
        if self.valves.tokens_no:
            stats_array.append(f"{tokens} tokens")
        if self.valves.elapsed_time:
            stats_array.append(f"{elapsed_time:.2f} sec")

        stats = " | ".join(stat for stat in stats_array)
        await __event_emitter__(
            {
                "type": "status",
                "data": {
                    "description": stats,
                    "done": True,
                },
            }
        )
        return body

https://openwebui.com/f/constliakos/chat_metrics


给gemini添加联网,思考模型模型的思考过程从正文分离

下面这个函数是我修改后的版本,在open-webui上面只会显示gemini-2.0-flash-thinking-expgemini-2.0-flash-exp-search两个模型


"""
title: Gemini Pipe
author_url:https://linux.do/u/coker/summary
author:coker
version: 0.0.6
license: MIT
"""

import json
import random
import httpx
import requests
from typing import List, AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        GOOGLE_API_KEYS_STR: str = Field(
            default="", description="API Keys for Google, use , to split"
        )
        OPEN_SAFETY: bool = Field(default=False, description="Gemini safety settings")
        BASE_URL: str = Field(
            default="https://generativelanguage.googleapis.com/v1beta",
            description="API Base Url",
        )
        OPEN_SEARCH_INFO: bool = Field(
            default=True, description="Open search info show "
        )

    def __init__(self):
        self.type = "manifold"
        self.name = ""
        self.valves = self.Valves()
        self.OPEN_SEARCH_MODELS = ["gemini-2.0-flash-exp"]
        self.OPEN_THINK_MODELS = ["gemini-2.0-flash-thinking-exp"]

        self.base_url = ""
        self.emitter = None
        self.open_search = False
        self.open_think = False
        self.think_first = True

    def get_google_models(self) -> List[dict]:
        self.base_url = self.valves.BASE_URL
        self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
        self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
        if not self.GOOGLE_API_KEY:
            return [{"id": "error", "name": f"Error: API Key not found"}]

        try:
            url = f"{self.base_url}/models?key={self.GOOGLE_API_KEY}"
            response = requests.get(url, timeout=10)

            if response.status_code != 200:
                raise Exception(f"HTTP {response.status_code}: {response.text}")

            data = response.json()
            models = [
                {
                    "id": model["name"].split("/")[-1],
                    "name": model["name"].split("/")[-1],
                }
                for model in data.get("models", [])
                if "generateContent" in model.get("supportedGenerationMethods", [])
            ]
            models = []  # 清空 models 列表
            if self.OPEN_THINK_MODELS:
                models.extend(
                    [
                        {
                            "id": model,
                            "name": model,
                        }
                        for model in self.OPEN_THINK_MODELS
                    ]
                )
            if self.OPEN_SEARCH_MODELS:
                models.extend(
                    [
                        {
                            "id": model + "-search",
                            "name": model + "-search",
                        }
                        for model in self.OPEN_SEARCH_MODELS
                    ]
                )
            return models
        except Exception as e:
            return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]

    async def emit_status(
        self,
        message: str = "",
        done: bool = False,
    ):
        if self.emitter:
            await self.emitter(
                {
                    "type": "status",
                    "data": {
                        "description": message,
                        "done": done,
                    },
                }
            )

    def pipes(self) -> List[dict]:
        return self.get_google_models()

    def create_search_link(self, idx, web):
        return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'

    def create_think_info(self, think_info):
        pass

    async def do_parts(self, parts):
        if not parts or not isinstance(parts, list):
            return "Error: No parts found"
        if len(parts) == 1:
            if self.open_think and self.think_first:
                self.think_first = False
                return (
                    f"\n<details>\n<summary>思考过程</summary>\n"
                    "```thinking…… \n" + parts[0]["text"]
                )
            return parts[0]["text"]
        if len(parts) == 2:
            await self.emit_status(message="😄 思考已结束", done=False)
            self.open_think = False
            if self.think_first:
                self.think_first = False
                return (
                    f"\n<details>\n<summary>思考过程</summary>\n"
                    "```thinking…… \n"
                    + parts[0]["text"]
                    + "\n```\n"
                    + "\n</details>\n"
                    + parts[1]["text"]
                )
            return parts[0]["text"] + "\n```\n" + "\n</details>\n" + parts[1]["text"]
        res = ""
        for part in parts:
            res += part["text"]
        return res

    async def pipe(
        self,
        body: dict,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> AsyncGenerator[str, None]:
        self.emitter = __event_emitter__
        self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
        self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
        self.base_url = self.valves.BASE_URL
        if not self.GOOGLE_API_KEY:
            yield "Error: GOOGLE_API_KEY is not set"
            return
        try:
            model_id = body["model"]
            if "." in model_id:
                model_id = model_id.split(".", 1)[1]
            messages = body["messages"]
            stream = body.get("stream", False)
            # Prepare the request payload
            contents = []
            request_data = {
                "generationConfig": {
                    "temperature": body.get("temperature", 0.7),
                    "topP": body.get("top_p", 0.9),
                    "topK": body.get("top_k", 40),
                    "maxOutputTokens": body.get("max_tokens", 8192),
                    "stopSequences": body.get("stop", []),
                },
            }

            for message in messages:
                if message["role"] == "system":
                    request_data["system_instruction"] = {
                        "parts": [{"text": message["content"]}]
                    }
                if message["role"] != "system":
                    if isinstance(message.get("content"), str):
                        contents.append(
                            {
                                "role": (
                                    "user" if message["role"] == "user" else "model"
                                ),
                                "parts": [{"text": message["content"]}],
                            }
                        )
                    if isinstance(message.get("content"), list):
                        parts = []
                        for content in message["content"]:
                            if content["type"] == "text":
                                parts.append({"text": content["text"]})
                            elif content["type"] == "image_url":
                                image_url = content["image_url"]["url"]
                                if image_url.startswith("data:image"):
                                    image_data = image_url.split(",")[1]
                                    parts.append(
                                        {
                                            "inline_data": {
                                                "mime_type": "image/jpeg",
                                                "data": image_data,
                                            }
                                        }
                                    )
                                else:
                                    parts.append({"image_url": image_url})
                        contents.append(
                            {
                                "role": (
                                    "user" if message["role"] == "user" else "model"
                                ),
                                "parts": parts,
                            }
                        )
            request_data["contents"] = contents
            if model_id.endswith("-search"):
                model_id = model_id[:-7]
                request_data["tools"] = [{"googleSearch": {}}]
                self.open_search = True
                await self.emit_status(message="🔍 我好像在搜索……")
            elif model_id in self.OPEN_THINK_MODELS:
                await self.emit_status(message="🧐 我好像在思考……")
                self.open_think = True
                self.think_first = True
            else:
                await self.emit_status(message="🚀 飞速生成中……")
            if self.valves.OPEN_SAFETY:
                request_data["safetySettings"] = [
                    {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
                    {
                        "category": "HARM_CATEGORY_HATE_SPEECH",
                        "threshold": "BLOCK_NONE",
                    },
                    {
                        "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
                        "threshold": "BLOCK_NONE",
                    },
                    {
                        "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
                        "threshold": "BLOCK_NONE",
                    },
                ]
            params = {"key": self.GOOGLE_API_KEY}
            if stream:
                url = f"{self.base_url}/models/{model_id}:streamGenerateContent"
                params["alt"] = "sse"
            else:
                url = f"{self.base_url}/models/{model_id}:generateContent"
            headers = {"Content-Type": "application/json"}
            async with httpx.AsyncClient() as client:
                if stream:
                    async with client.stream(
                        "POST",
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    ) as response:
                        if response.status_code != 200:
                            yield f"Error: HTTP {response.status_code}: {response.text}"
                            await self.emit_status(message="❌ 生成失败", done=True)
                            return

                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                try:
                                    data = json.loads(line[6:])
                                    if "candidates" in data and data["candidates"]:
                                        parts = data["candidates"][0]["content"][
                                            "parts"
                                        ]
                                        text = await self.do_parts(parts)
                                        yield text
                                        try:
                                            if (
                                                self.open_search
                                                and self.valves.OPEN_SEARCH_INFO
                                                and data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                            ):
                                                yield "\n---------------------------------\n"
                                                groundingChunks = data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                                for idx, groundingChunk in enumerate(
                                                    groundingChunks, 1
                                                ):
                                                    if "web" in groundingChunk:
                                                        yield self.create_search_link(
                                                            idx, groundingChunk["web"]
                                                        )
                                        except Exception as e:
                                            pass
                                except Exception as e:
                                    yield f"Error parsing stream: {str(e)}"
                        await self.emit_status(message="🎉 生成成功", done=True)
                else:
                    response = await client.post(
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    )
                    if response.status_code != 200:
                        yield f"Error: HTTP {response.status_code}: {response.text}"
                        return
                    data = response.json()
                    res = ""
                    if "candidates" in data and data["candidates"]:
                        parts = data["candidates"][0]["content"]["parts"]
                        res = await self.do_parts(parts)
                        try:
                            if (
                                self.open_search
                                and self.valves.OPEN_SEARCH_INFO
                                and data["candidates"][0]["groundingMetadata"][
                                    "groundingChunks"
                                ]
                            ):
                                res += "\n---------------------------------\n"
                                groundingChunks = data["candidates"][0][
                                    "groundingMetadata"
                                ]["groundingChunks"]
                                for idx, groundingChunk in enumerate(
                                    groundingChunks, 1
                                ):
                                    if "web" in groundingChunk:
                                        res += self.create_search_link(
                                            idx, groundingChunk["web"]
                                        )
                        except Exception as e:
                            pass
                        await self.emit_status(message="🎉 生成成功", done=True)
                        yield res
                        return
                    else:
                        yield "No response data"
                    return
        except Exception as e:
            yield f"Error: {str(e)}"
            await self.emit_status(message="❌ 生成失败", done=True)

搞个openwebui的 pipe 玩玩,直连 Gemini 付费key支持搜索,添加-search 模型分离


使不支持图像的o1-mini也能读图做数学题

import asyncio
import re
from typing import Callable, Awaitable, Any, Optional

import aiohttp
from pydantic import BaseModel, Field


class Filter:
    class Valves(BaseModel):
        priority: int = Field(default=0, description="用于过滤操作的优先级别。")
        OCR_Base_URL: str = Field(
            default="https://api.openai.com", description="LLm OCR API的基础URL。"
        )
        OCR_API_KEY: str = Field(default="", description="API的API密钥。")
        max_retries: int = Field(default=3, description="HTTP请求的最大重试次数。")
        ocr_prompt: str = Field(
            default="Please only recognize and extract the text or data from this image without interpreting, analyzing, or understanding the content. Do not output any additional information. Simply return the recognized text or data content.",
            description="进行OCR识别的提示词",
        )
        model_name: str = Field(default="gemini-1.5-flash-latest", description="用于OCR图像的模型名称。推荐使用gemini系列")

    def __init__(self):
        self.valves = self.Valves()

    async def _perform_ocr(
        self, image: str, event_emitter: Callable[[Any], Awaitable[None]]
    ) -> str:
        """执行OCR识别的内部方法"""
        await event_emitter(
            {
                "type": "status",
                "data": {
                    "description": "✨正在对图像进行文字识别中,请耐心等待...",
                    "done": False,
                },
            }
        )

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.valves.OCR_API_KEY}",
        }
        ocr_body = {
            "model": self.valves.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": [{"type": "text", "text": self.valves.ocr_prompt}],
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {"url": image, "detail": "high"},
                        }
                    ],
                },
            ],
        }
        url = f"{self.valves.OCR_Base_URL}/v1/chat/completions"

        async with aiohttp.ClientSession() as session:
            for attempt in range(self.valves.max_retries):
                try:
                    async with session.post(
                        url, json=ocr_body, headers=headers
                    ) as response:
                        response.raise_for_status()
                        response_data = await response.json()
                        result = response_data["choices"][0]["message"]["content"]

                        await event_emitter(
                            {
                                "type": "status",
                                "data": {
                                    "description": "🎉识别成功,交由模型进行处理...",
                                    "done": True,
                                },
                            }
                        )

                        return result
                except Exception as e:
                    if attempt == self.valves.max_retries - 1:
                        raise RuntimeError(f"OCR识别失败:{e}")

    async def inlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        messages = body.get("messages", [])

        # 查找图像
        image_info = self._find_image_in_messages(messages)
        if not image_info:
            return body

        message_index, content_index, image = image_info

        # 如果已经是第二轮及以上对话,直接返回
        if (len(messages) // 2) >= 1:
            del messages[message_index]["content"][content_index]
            body["messages"] = messages
            return body

        try:
            # 执行OCR识别
            result = await self._perform_ocr(image, __event_emitter__)

            # 更新消息内容
            messages[message_index]["content"][content_index]["type"] = "text"
            messages[message_index]["content"][content_index].pop("image_url", None)
            messages[message_index]["content"][content_index]["text"] = result
            body["messages"] = messages
        except Exception as e:
            print(f"OCR识别错误: {e}")
            # 可以根据需要进行错误处理

        return body

    def _find_image_in_messages(self, messages):
        """在消息中查找图像"""
        for m_index, message in enumerate(messages):
            if message["role"] == "user" and isinstance(message.get("content"), list):
                for c_index, content in enumerate(message["content"]):
                    if content["type"] == "image_url":
                        return m_index, c_index, content["image_url"]["url"]
        return None

    async def outlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        return body

使用方式:在Openwebui中对传入的图片进行文字识别后再交给模型处理

77 Likes

好东西啊,能不能搞个删除上下文的

3 Likes

删除上下文是?
image

没搞懂,为什么不开一个新话题呢

3 Likes

如果设置好了提示词或者参数,重新弄很麻烦

3 Likes


就类似next chat 这个

image

1 Like

试试工作空间里的模型,新建一个

1 Like

lobe chat 和 next chat 都有这个功能,就 oi 没

1 Like

看看别的佬友的回复把,我没在意过这问题
image

1 Like

gemini联网不用管道,直接用函数修改进口请求是不是也能实现?

不知道画布功能以后open webui能不能实现,挺好用的

早有了吧

1 Like


我想要的是类似于ai在原代码中修改,减少篇幅的增长,又可以协同合作,不用傻傻的在生成新的一整段代码

1 Like

好东西啊佬

感谢佬友分享

太强了!大佬

1 Like

你說canva是吧
可以是可以但感覺還好
我希望他在優化性能的路上越走越遠

有这样一个项目 同时也看到open-webui在讨论把这个融合进来

真的,之前一直有这个想法,用的是动态图像处理器,只要有图像一整条都跑别的模型。受到佬这个启发,我试着(用claude)写了一个类似能多轮处理不同图片的东西,在prompt和内部逻辑上应该还有很大优化空间,但是现在只要处理第二轮图片就会Unexpected token ‘<’, "<!DOCTYPE "… is not valid JSON

import asyncio
import re
from typing import List, Dict, Callable, Awaitable, Any, Optional
from dataclasses import dataclass

import aiohttp
from pydantic import BaseModel, Field


@dataclass
class ImageAnalysisResult:
    """Image analysis result data structure"""

    ocr_result: str
    visual_description: str
    context_analysis: Dict[str, str]
    message_index: int
    content_index: int
    image_url: str


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=0, description="Priority for filtering operations"
        )
        OCR_Base_URL: str = Field(
            default="https://api.openai.com", description="Base URL for LLM OCR API"
        )
        OCR_API_KEY: str = Field(default="", description="API key")
        max_retries: int = Field(
            default=3, description="Maximum retries for HTTP requests"
        )
        model_name: str = Field(
            default="gemini-1.5-pro", description="Model name for image analysis"
        )
        whitelist_models: List[str] = Field(
            default=["claude-2", "gpt-4"], description="Explicitly allowed models"
        )
        whitelist_patterns: List[str] = Field(
            default=["claude", "gpt"], description="Model name patterns to match"
        )
        system_prompt: str = Field(
            default="""You are a professional image analysis assistant. Please analyze the provided image and context, providing the following information:

1. Text Content: Extract all text from the image
2. Visual Description: Describe the visual elements in detail
3. Context Analysis: Based on conversation history, analyze the likely intent of sending this image
4. Relationship Analysis: Explain how this image relates to previous dialogue or images

Please output in a structured format using markdown.
If the image contains code or technical content, please specifically note and ensure accurate description."""
        )

    def __init__(self):
        self.valves = self.Valves()
        self.previous_analyses: List[ImageAnalysisResult] = []

    def _is_model_allowed(self, model: Optional[dict]) -> bool:
        if not model or "id" not in model:
            return False

        model_id = model["id"].lower()
        # 1.
        if model_id in [m.lower() for m in self.valves.whitelist_models]:
            return True
        # 2.
        if any(
            pattern.lower() in model_id for pattern in self.valves.whitelist_patterns
        ):
            return True

        return False

    def _extract_context(self, messages: List[dict]) -> str:
        context = []
        for msg in messages:
            if msg["role"] == "user":
                context.append(f"User: {self._format_message_content(msg['content'])}")
            else:
                context.append(
                    f"Assistant: {self._format_message_content(msg['content'])}"
                )
        return "\n".join(context)

    def _format_message_content(self, content) -> str:
        if isinstance(content, str):
            return content
        if isinstance(content, list):
            return " ".join([item.get("text", "[Image]") for item in content])
        return ""

    async def _analyze_image(
        self,
        image: str,
        context: str,
        message_index: int,
        content_index: int,
        event_emitter: Callable[[Any], Awaitable[None]],
        model_name: str,
    ) -> ImageAnalysisResult:
        # OCR phase
        await event_emitter(
            {
                "type": "status",
                "data": {
                    "description": "🔍 Gemini is performing OCR...",
                    "done": False,
                },
            }
        )

        await asyncio.sleep(1)

        # Understanding phase
        await event_emitter(
            {
                "type": "status",
                "data": {
                    "description": "🔍 Gemini is analyzing the image...",
                    "done": False,
                },
            }
        )

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.valves.OCR_API_KEY}",
        }

        analysis_body = {
            "model": self.valves.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": [{"type": "text", "text": self.valves.system_prompt}],
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": f"Current conversation context:\n{context}",
                        },
                        {
                            "type": "image_url",
                            "image_url": {"url": image, "detail": "high"},
                        },
                    ],
                },
            ],
        }

        url = f"{self.valves.OCR_Base_URL}/v1/chat/completions"

        async with aiohttp.ClientSession() as session:
            for attempt in range(self.valves.max_retries):
                try:
                    async with session.post(
                        url, json=analysis_body, headers=headers
                    ) as response:
                        response.raise_for_status()
                        response_data = await response.json()
                        result = response_data["choices"][0]["message"]["content"]

                        # Parse the structured response
                        analysis_result = ImageAnalysisResult(
                            ocr_result=result,
                            visual_description="[Visual Description]",
                            context_analysis={
                                "intent": "[User Intent]",
                                "relation": "[Context Relation]",
                            },
                            message_index=message_index,
                            content_index=content_index,
                            image_url=image,
                        )

                        await event_emitter(
                            {
                                "type": "status",
                                "data": {
                                    "description": f"🎉 Analysis complete, handing over to {model_name}...",
                                    "done": True,
                                },
                            }
                        )

                        return analysis_result
                except Exception as e:
                    if attempt == self.valves.max_retries - 1:
                        await event_emitter(
                            {
                                "type": "status",
                                "data": {
                                    "description": f"❌ Image analysis failed: {str(e)}",
                                    "done": True,
                                },
                            }
                        )
                        raise RuntimeError(f"Image analysis failed: {e}")

    def _format_analysis_result(self, result: ImageAnalysisResult) -> str:
        return f"""[Image Analysis Result]
### Text Content
{result.ocr_result}

### Visual Description
{result.visual_description}

### Context Analysis
- Inferred Intent: {result.context_analysis['intent']}
- Context Relation: {result.context_analysis['relation']}
"""

    async def inlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        # 1. 首先检查模型白名单
        if not self._is_model_allowed(__model__):
            return body  # 不在白名单中的模型直接返回原始消息

        model_name = (
            __model__.get("id", "Unknown Model") if __model__ else "Unknown Model"
        )
        messages = body.get("messages", [])

        # 2. 查找新图片
        image_info = self._find_new_image_in_messages(messages)

        # 3. 如果没有新图片,直接返回原始消息给配置模型处理
        if not image_info:
            # 确保消息格式正确,处理普通文本
            for msg in messages:
                if isinstance(msg.get("content"), list):
                    # 合并列表中的文本内容
                    text_content = " ".join(
                        item.get("text", "")
                        for item in msg["content"]
                        if isinstance(item, dict) and "text" in item
                    )
                    msg["content"] = text_content

            body["messages"] = messages
            return body  # 返回处理后的文本消息

        # 4. 如果有新图片,进行处理
        message_index, content_index, image = image_info
        context = self._extract_context(messages)

        try:
            # 处理新图片
            analysis = await self._analyze_image(
                image,
                context,
                message_index,
                content_index,
                __event_emitter__,
                model_name,
            )

            # 保存分析结果
            self.previous_analyses.append(analysis)

            # 替换图片为分析结果文本
            messages[message_index]["content"][content_index] = {
                "type": "text",
                "text": self._format_analysis_result(analysis),
            }

            body["messages"] = messages
        except Exception as e:
            print(f"Image analysis error: {e}")
            # 出错时也返回原始消息
            return body

        return body

    def _find_new_image_in_messages(self, messages: List[dict]) -> Optional[tuple]:
        """查找消息中的新图片"""
        for m_index, message in enumerate(messages):
            if message["role"] == "user" and isinstance(message.get("content"), list):
                for c_index, content in enumerate(message["content"]):
                    if content["type"] == "image_url" and not any(
                        analysis.image_url == content["image_url"]["url"]
                        for analysis in self.previous_analyses
                    ):
                        return m_index, c_index, content["image_url"]["url"]
        return None

    async def outlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        return body

求助各位佬orz帮看一下,也可能是我的api来源不太好,我平时对话触发这个报错也挺多的,一直不知道为什么

佬友厉害的 tieba_013

如果hf的openwebui重启或者更新,会丢失数据吗?不是聊天数据,是部署的数据和用户账号