Open WebUI有什么好用的函数推荐?

https://openwebui.com/tools

2 个赞

蹲一个囤货

webpilot

1 个赞

我把排名前二十的翻译了一下

1 个赞

https://openwebui.com/t/2171/webpilot
这个是吧

Reddit感覺不錯
Mark之後體驗

是不是这些函数大部分都要代理

我也不知
其實有的函數運行的不是很好
昨天嘗試了個dc通知的
十次才通知一次
如果穩定的話也是一個挺好的工具

1 个赞

分享一下我在用的

统计token,输出速度和耗费时间

"""
title: Chat Metrics
author: constLiakos
funding_url: https://github.com/open-webui
version: 0.0.4
license: MIT
changelog:
- 0.0.1 - Initial upload to openwebui community.
- 0.0.2 - format, remove unnecessary code
- 0.0.3 - add advanced stats: tokens & elapsed time
- 0.0.4 - each metric has its enable switch Valve, experimental metrics are disabled by default
"""

from pydantic import BaseModel, Field
from typing import Optional, Callable, Any, Awaitable
import tiktoken
import os
import time
from open_webui.utils.misc import get_last_assistant_message


def num_tokens_from_string(user_message: str, model_name: str) -> int:
    encoding = tiktoken.encoding_for_model(model_name)
    print(encoding)
    num_tokens = len(encoding.encode(user_message))
    return num_tokens


class Filter:
    class Valves(BaseModel):
        priority: int = Field(
            default=5, description="Priority level for the filter operations."
        )
        elapsed_time: bool = Field(
            default=True,
            description="Enable for advanced stats",
        )
        tokens_no: bool = Field(
            default=False,
            description="Display total Tokens (Experimantal, NOT Accurate)",
        )
        tokens_per_sec: bool = Field(
            default=False,
            description="Display Tokens per Second (Experimantal, NOT Accurate)",
        )
        pass

    def __init__(self):
        self.valves = self.Valves()
        self.start_time = None
        pass

    def inlet(
        self,
        body: dict,
    ):
        self.start_time = time.time()
        return body

    async def outlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __model__: Optional[dict] = None,
    ) -> dict:
        end_time = time.time()
        elapsed_time = end_time - self.start_time
        elapsed_time_str = f"Elapsed time: {elapsed_time:.2f} seconds"
        response_message = get_last_assistant_message(body["messages"])
        # model = __model__["id"]
        model = "gpt-4o"
        tokens = num_tokens_from_string(response_message, model)
        tokens_per_sec = tokens / elapsed_time
        stats_array = []

        if self.valves.tokens_per_sec:
            stats_array.append(f"{tokens_per_sec:.2f} T/s")
        if self.valves.tokens_no:
            stats_array.append(f"{tokens} tokens")
        if self.valves.elapsed_time:
            stats_array.append(f"{elapsed_time:.2f} sec")

        stats = " | ".join(stat for stat in stats_array)
        await __event_emitter__(
            {
                "type": "status",
                "data": {
                    "description": stats,
                    "done": True,
                },
            }
        )
        return body

给gemini添加联网,思考模型模型的思考过程从正文分离

下面这个函数是我修改后的版本,在open-webui上面只会显示gemini-2.0-flash-thinking-expgemini-2.0-flash-exp-search两个模型


"""
title: Gemini Pipe
author_url:https://linux.do/u/coker/summary
author:coker
version: 0.0.6
license: MIT
"""

import json
import random
import httpx
import requests
from typing import List, AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        GOOGLE_API_KEYS_STR: str = Field(
            default="", description="API Keys for Google, use , to split"
        )
        OPEN_SAFETY: bool = Field(default=False, description="Gemini safety settings")
        BASE_URL: str = Field(
            default="https://generativelanguage.googleapis.com/v1beta",
            description="API Base Url",
        )
        OPEN_SEARCH_INFO: bool = Field(
            default=True, description="Open search info show "
        )

    def __init__(self):
        self.type = "manifold"
        self.name = ""
        self.valves = self.Valves()
        self.OPEN_SEARCH_MODELS = ["gemini-2.0-flash-exp"]
        self.OPEN_THINK_MODELS = ["gemini-2.0-flash-thinking-exp"]

        self.base_url = ""
        self.emitter = None
        self.open_search = False
        self.open_think = False
        self.think_first = True

    def get_google_models(self) -> List[dict]:
        self.base_url = self.valves.BASE_URL
        self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
        self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
        if not self.GOOGLE_API_KEY:
            return [{"id": "error", "name": f"Error: API Key not found"}]

        try:
            url = f"{self.base_url}/models?key={self.GOOGLE_API_KEY}"
            response = requests.get(url, timeout=10)

            if response.status_code != 200:
                raise Exception(f"HTTP {response.status_code}: {response.text}")

            data = response.json()
            models = [
                {
                    "id": model["name"].split("/")[-1],
                    "name": model["name"].split("/")[-1],
                }
                for model in data.get("models", [])
                if "generateContent" in model.get("supportedGenerationMethods", [])
            ]
            models = []  # 清空 models 列表
            if self.OPEN_THINK_MODELS:
                models.extend(
                    [
                        {
                            "id": model,
                            "name": model,
                        }
                        for model in self.OPEN_THINK_MODELS
                    ]
                )
            if self.OPEN_SEARCH_MODELS:
                models.extend(
                    [
                        {
                            "id": model + "-search",
                            "name": model + "-search",
                        }
                        for model in self.OPEN_SEARCH_MODELS
                    ]
                )
            return models
        except Exception as e:
            return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]

    async def emit_status(
        self,
        message: str = "",
        done: bool = False,
    ):
        if self.emitter:
            await self.emitter(
                {
                    "type": "status",
                    "data": {
                        "description": message,
                        "done": done,
                    },
                }
            )

    def pipes(self) -> List[dict]:
        return self.get_google_models()

    def create_search_link(self, idx, web):
        return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'

    def create_think_info(self, think_info):
        pass

    async def do_parts(self, parts):
        if not parts or not isinstance(parts, list):
            return "Error: No parts found"
        if len(parts) == 1:
            if self.open_think and self.think_first:
                self.think_first = False
                return (
                    f"\n<details>\n<summary>思考过程</summary>\n"
                    "```thinking…… \n" + parts[0]["text"]
                )
            return parts[0]["text"]
        if len(parts) == 2:
            await self.emit_status(message="😄 思考已结束", done=False)
            self.open_think = False
            if self.think_first:
                self.think_first = False
                return (
                    f"\n<details>\n<summary>思考过程</summary>\n"
                    "```thinking…… \n"
                    + parts[0]["text"]
                    + "\n```\n"
                    + "\n</details>\n"
                    + parts[1]["text"]
                )
            return parts[0]["text"] + "\n```\n" + "\n</details>\n" + parts[1]["text"]
        res = ""
        for part in parts:
            res += part["text"]
        return res

    async def pipe(
        self,
        body: dict,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> AsyncGenerator[str, None]:
        self.emitter = __event_emitter__
        self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
        self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
        self.base_url = self.valves.BASE_URL
        if not self.GOOGLE_API_KEY:
            yield "Error: GOOGLE_API_KEY is not set"
            return
        try:
            model_id = body["model"]
            if "." in model_id:
                model_id = model_id.split(".", 1)[1]
            messages = body["messages"]
            stream = body.get("stream", False)
            # Prepare the request payload
            contents = []
            request_data = {
                "generationConfig": {
                    "temperature": body.get("temperature", 0.7),
                    "topP": body.get("top_p", 0.9),
                    "topK": body.get("top_k", 40),
                    "maxOutputTokens": body.get("max_tokens", 8192),
                    "stopSequences": body.get("stop", []),
                },
            }

            for message in messages:
                if message["role"] == "system":
                    request_data["system_instruction"] = {
                        "parts": [{"text": message["content"]}]
                    }
                if message["role"] != "system":
                    if isinstance(message.get("content"), str):
                        contents.append(
                            {
                                "role": (
                                    "user" if message["role"] == "user" else "model"
                                ),
                                "parts": [{"text": message["content"]}],
                            }
                        )
                    if isinstance(message.get("content"), list):
                        parts = []
                        for content in message["content"]:
                            if content["type"] == "text":
                                parts.append({"text": content["text"]})
                            elif content["type"] == "image_url":
                                image_url = content["image_url"]["url"]
                                if image_url.startswith("data:image"):
                                    image_data = image_url.split(",")[1]
                                    parts.append(
                                        {
                                            "inline_data": {
                                                "mime_type": "image/jpeg",
                                                "data": image_data,
                                            }
                                        }
                                    )
                                else:
                                    parts.append({"image_url": image_url})
                        contents.append(
                            {
                                "role": (
                                    "user" if message["role"] == "user" else "model"
                                ),
                                "parts": parts,
                            }
                        )
            request_data["contents"] = contents
            if model_id.endswith("-search"):
                model_id = model_id[:-7]
                request_data["tools"] = [{"googleSearch": {}}]
                self.open_search = True
                await self.emit_status(message="🔍 我好像在搜索……")
            elif model_id in self.OPEN_THINK_MODELS:
                await self.emit_status(message="🧐 我好像在思考……")
                self.open_think = True
                self.think_first = True
            else:
                await self.emit_status(message="🚀 飞速生成中……")
            if self.valves.OPEN_SAFETY:
                request_data["safetySettings"] = [
                    {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
                    {
                        "category": "HARM_CATEGORY_HATE_SPEECH",
                        "threshold": "BLOCK_NONE",
                    },
                    {
                        "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
                        "threshold": "BLOCK_NONE",
                    },
                    {
                        "category": "HARM_CATEGORY_DANGEROUS_CONTENT",
                        "threshold": "BLOCK_NONE",
                    },
                ]
            params = {"key": self.GOOGLE_API_KEY}
            if stream:
                url = f"{self.base_url}/models/{model_id}:streamGenerateContent"
                params["alt"] = "sse"
            else:
                url = f"{self.base_url}/models/{model_id}:generateContent"
            headers = {"Content-Type": "application/json"}
            async with httpx.AsyncClient() as client:
                if stream:
                    async with client.stream(
                        "POST",
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    ) as response:
                        if response.status_code != 200:
                            yield f"Error: HTTP {response.status_code}: {response.text}"
                            await self.emit_status(message="❌ 生成失败", done=True)
                            return

                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                try:
                                    data = json.loads(line[6:])
                                    if "candidates" in data and data["candidates"]:
                                        parts = data["candidates"][0]["content"][
                                            "parts"
                                        ]
                                        text = await self.do_parts(parts)
                                        yield text
                                        try:
                                            if (
                                                self.open_search
                                                and self.valves.OPEN_SEARCH_INFO
                                                and data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                            ):
                                                yield "\n---------------------------------\n"
                                                groundingChunks = data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                                for idx, groundingChunk in enumerate(
                                                    groundingChunks, 1
                                                ):
                                                    if "web" in groundingChunk:
                                                        yield self.create_search_link(
                                                            idx, groundingChunk["web"]
                                                        )
                                        except Exception as e:
                                            pass
                                except Exception as e:
                                    yield f"Error parsing stream: {str(e)}"
                        await self.emit_status(message="🎉 生成成功", done=True)
                else:
                    response = await client.post(
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    )
                    if response.status_code != 200:
                        yield f"Error: HTTP {response.status_code}: {response.text}"
                        return
                    data = response.json()
                    res = ""
                    if "candidates" in data and data["candidates"]:
                        parts = data["candidates"][0]["content"]["parts"]
                        res = await self.do_parts(parts)
                        try:
                            if (
                                self.open_search
                                and self.valves.OPEN_SEARCH_INFO
                                and data["candidates"][0]["groundingMetadata"][
                                    "groundingChunks"
                                ]
                            ):
                                res += "\n---------------------------------\n"
                                groundingChunks = data["candidates"][0][
                                    "groundingMetadata"
                                ]["groundingChunks"]
                                for idx, groundingChunk in enumerate(
                                    groundingChunks, 1
                                ):
                                    if "web" in groundingChunk:
                                        res += self.create_search_link(
                                            idx, groundingChunk["web"]
                                        )
                        except Exception as e:
                            pass
                        await self.emit_status(message="🎉 生成成功", done=True)
                        yield res
                        return
                    else:
                        yield "No response data"
                    return
        except Exception as e:
            yield f"Error: {str(e)}"
            await self.emit_status(message="❌ 生成失败", done=True)

搞个openwebui的 pipe 玩玩,直连 Gemini 付费key支持搜索,添加-search 模型分离


使不支持图像的o1-mini也能读图做数学题

import asyncio
import re
from typing import Callable, Awaitable, Any, Optional

import aiohttp
from pydantic import BaseModel, Field


class Filter:
    class Valves(BaseModel):
        priority: int = Field(default=0, description="用于过滤操作的优先级别。")
        OCR_Base_URL: str = Field(
            default="https://api.openai.com", description="LLm OCR API的基础URL。"
        )
        OCR_API_KEY: str = Field(default="", description="API的API密钥。")
        max_retries: int = Field(default=3, description="HTTP请求的最大重试次数。")
        ocr_prompt: str = Field(
            default="Please only recognize and extract the text or data from this image without interpreting, analyzing, or understanding the content. Do not output any additional information. Simply return the recognized text or data content.",
            description="进行OCR识别的提示词",
        )
        model_name: str = Field(default="gemini-1.5-flash-latest", description="用于OCR图像的模型名称。推荐使用gemini系列")

    def __init__(self):
        self.valves = self.Valves()

    async def _perform_ocr(
        self, image: str, event_emitter: Callable[[Any], Awaitable[None]]
    ) -> str:
        """执行OCR识别的内部方法"""
        await event_emitter(
            {
                "type": "status",
                "data": {
                    "description": "✨正在对图像进行文字识别中,请耐心等待...",
                    "done": False,
                },
            }
        )

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.valves.OCR_API_KEY}",
        }
        ocr_body = {
            "model": self.valves.model_name,
            "messages": [
                {
                    "role": "system",
                    "content": [{"type": "text", "text": self.valves.ocr_prompt}],
                },
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image_url",
                            "image_url": {"url": image, "detail": "high"},
                        }
                    ],
                },
            ],
        }
        url = f"{self.valves.OCR_Base_URL}/v1/chat/completions"

        async with aiohttp.ClientSession() as session:
            for attempt in range(self.valves.max_retries):
                try:
                    async with session.post(
                        url, json=ocr_body, headers=headers
                    ) as response:
                        response.raise_for_status()
                        response_data = await response.json()
                        result = response_data["choices"][0]["message"]["content"]

                        await event_emitter(
                            {
                                "type": "status",
                                "data": {
                                    "description": "🎉识别成功,交由模型进行处理...",
                                    "done": True,
                                },
                            }
                        )

                        return result
                except Exception as e:
                    if attempt == self.valves.max_retries - 1:
                        raise RuntimeError(f"OCR识别失败:{e}")

    async def inlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        messages = body.get("messages", [])

        # 查找图像
        image_info = self._find_image_in_messages(messages)
        if not image_info:
            return body

        message_index, content_index, image = image_info

        # 如果已经是第二轮及以上对话,直接返回
        if (len(messages) // 2) >= 1:
            del messages[message_index]["content"][content_index]
            body["messages"] = messages
            return body

        try:
            # 执行OCR识别
            result = await self._perform_ocr(image, __event_emitter__)

            # 更新消息内容
            messages[message_index]["content"][content_index]["type"] = "text"
            messages[message_index]["content"][content_index].pop("image_url", None)
            messages[message_index]["content"][content_index]["text"] = result
            body["messages"] = messages
        except Exception as e:
            print(f"OCR识别错误: {e}")
            # 可以根据需要进行错误处理

        return body

    def _find_image_in_messages(self, messages):
        """在消息中查找图像"""
        for m_index, message in enumerate(messages):
            if message["role"] == "user" and isinstance(message.get("content"), list):
                for c_index, content in enumerate(message["content"]):
                    if content["type"] == "image_url":
                        return m_index, c_index, content["image_url"]["url"]
        return None

    async def outlet(
        self,
        body: dict,
        __event_emitter__: Callable[[Any], Awaitable[None]],
        __user__: Optional[dict] = None,
        __model__: Optional[dict] = None,
    ) -> dict:
        return body

使用方式:在Openwebui中对传入的图片进行文字识别后再交给模型处理

1 个赞

我试了一下排名前几的几个函数。确实不如预期。可能大部分网络条件严格

看起来不错

我也覺得不錯 尤其最後的

1 个赞