2 个赞
蹲一个囤货
webpilot
1 个赞
我把排名前二十的翻译了一下
1 个赞
Reddit感覺不錯
Mark之後體驗
是不是这些函数大部分都要代理
我也不知
其實有的函數運行的不是很好
昨天嘗試了個dc通知的
十次才通知一次
如果穩定的話也是一個挺好的工具
1 个赞
分享一下我在用的
统计token,输出速度和耗费时间
"""
title: Chat Metrics
author: constLiakos
funding_url: https://github.com/open-webui
version: 0.0.4
license: MIT
changelog:
- 0.0.1 - Initial upload to openwebui community.
- 0.0.2 - format, remove unnecessary code
- 0.0.3 - add advanced stats: tokens & elapsed time
- 0.0.4 - each metric has its enable switch Valve, experimental metrics are disabled by default
"""
from pydantic import BaseModel, Field
from typing import Optional, Callable, Any, Awaitable
import tiktoken
import os
import time
from open_webui.utils.misc import get_last_assistant_message
def num_tokens_from_string(user_message: str, model_name: str) -> int:
encoding = tiktoken.encoding_for_model(model_name)
print(encoding)
num_tokens = len(encoding.encode(user_message))
return num_tokens
class Filter:
class Valves(BaseModel):
priority: int = Field(
default=5, description="Priority level for the filter operations."
)
elapsed_time: bool = Field(
default=True,
description="Enable for advanced stats",
)
tokens_no: bool = Field(
default=False,
description="Display total Tokens (Experimantal, NOT Accurate)",
)
tokens_per_sec: bool = Field(
default=False,
description="Display Tokens per Second (Experimantal, NOT Accurate)",
)
pass
def __init__(self):
self.valves = self.Valves()
self.start_time = None
pass
def inlet(
self,
body: dict,
):
self.start_time = time.time()
return body
async def outlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__model__: Optional[dict] = None,
) -> dict:
end_time = time.time()
elapsed_time = end_time - self.start_time
elapsed_time_str = f"Elapsed time: {elapsed_time:.2f} seconds"
response_message = get_last_assistant_message(body["messages"])
# model = __model__["id"]
model = "gpt-4o"
tokens = num_tokens_from_string(response_message, model)
tokens_per_sec = tokens / elapsed_time
stats_array = []
if self.valves.tokens_per_sec:
stats_array.append(f"{tokens_per_sec:.2f} T/s")
if self.valves.tokens_no:
stats_array.append(f"{tokens} tokens")
if self.valves.elapsed_time:
stats_array.append(f"{elapsed_time:.2f} sec")
stats = " | ".join(stat for stat in stats_array)
await __event_emitter__(
{
"type": "status",
"data": {
"description": stats,
"done": True,
},
}
)
return body
给gemini添加联网,思考模型模型的思考过程从正文分离
下面这个函数是我修改后的版本,在open-webui上面只会显示
gemini-2.0-flash-thinking-exp
和gemini-2.0-flash-exp-search
两个模型
"""
title: Gemini Pipe
author_url:https://linux.do/u/coker/summary
author:coker
version: 0.0.6
license: MIT
"""
import json
import random
import httpx
import requests
from typing import List, AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
GOOGLE_API_KEYS_STR: str = Field(
default="", description="API Keys for Google, use , to split"
)
OPEN_SAFETY: bool = Field(default=False, description="Gemini safety settings")
BASE_URL: str = Field(
default="https://generativelanguage.googleapis.com/v1beta",
description="API Base Url",
)
OPEN_SEARCH_INFO: bool = Field(
default=True, description="Open search info show "
)
def __init__(self):
self.type = "manifold"
self.name = ""
self.valves = self.Valves()
self.OPEN_SEARCH_MODELS = ["gemini-2.0-flash-exp"]
self.OPEN_THINK_MODELS = ["gemini-2.0-flash-thinking-exp"]
self.base_url = ""
self.emitter = None
self.open_search = False
self.open_think = False
self.think_first = True
def get_google_models(self) -> List[dict]:
self.base_url = self.valves.BASE_URL
self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
if not self.GOOGLE_API_KEY:
return [{"id": "error", "name": f"Error: API Key not found"}]
try:
url = f"{self.base_url}/models?key={self.GOOGLE_API_KEY}"
response = requests.get(url, timeout=10)
if response.status_code != 200:
raise Exception(f"HTTP {response.status_code}: {response.text}")
data = response.json()
models = [
{
"id": model["name"].split("/")[-1],
"name": model["name"].split("/")[-1],
}
for model in data.get("models", [])
if "generateContent" in model.get("supportedGenerationMethods", [])
]
models = [] # 清空 models 列表
if self.OPEN_THINK_MODELS:
models.extend(
[
{
"id": model,
"name": model,
}
for model in self.OPEN_THINK_MODELS
]
)
if self.OPEN_SEARCH_MODELS:
models.extend(
[
{
"id": model + "-search",
"name": model + "-search",
}
for model in self.OPEN_SEARCH_MODELS
]
)
return models
except Exception as e:
return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]
async def emit_status(
self,
message: str = "",
done: bool = False,
):
if self.emitter:
await self.emitter(
{
"type": "status",
"data": {
"description": message,
"done": done,
},
}
)
def pipes(self) -> List[dict]:
return self.get_google_models()
def create_search_link(self, idx, web):
return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'
def create_think_info(self, think_info):
pass
async def do_parts(self, parts):
if not parts or not isinstance(parts, list):
return "Error: No parts found"
if len(parts) == 1:
if self.open_think and self.think_first:
self.think_first = False
return (
f"\n<details>\n<summary>思考过程</summary>\n"
"```thinking…… \n" + parts[0]["text"]
)
return parts[0]["text"]
if len(parts) == 2:
await self.emit_status(message="😄 思考已结束", done=False)
self.open_think = False
if self.think_first:
self.think_first = False
return (
f"\n<details>\n<summary>思考过程</summary>\n"
"```thinking…… \n"
+ parts[0]["text"]
+ "\n```\n"
+ "\n</details>\n"
+ parts[1]["text"]
)
return parts[0]["text"] + "\n```\n" + "\n</details>\n" + parts[1]["text"]
res = ""
for part in parts:
res += part["text"]
return res
async def pipe(
self,
body: dict,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> AsyncGenerator[str, None]:
self.emitter = __event_emitter__
self.GOOGLE_API_KEYS_LIST = self.valves.GOOGLE_API_KEYS_STR.split(",")
self.GOOGLE_API_KEY = random.choice(self.GOOGLE_API_KEYS_LIST)
self.base_url = self.valves.BASE_URL
if not self.GOOGLE_API_KEY:
yield "Error: GOOGLE_API_KEY is not set"
return
try:
model_id = body["model"]
if "." in model_id:
model_id = model_id.split(".", 1)[1]
messages = body["messages"]
stream = body.get("stream", False)
# Prepare the request payload
contents = []
request_data = {
"generationConfig": {
"temperature": body.get("temperature", 0.7),
"topP": body.get("top_p", 0.9),
"topK": body.get("top_k", 40),
"maxOutputTokens": body.get("max_tokens", 8192),
"stopSequences": body.get("stop", []),
},
}
for message in messages:
if message["role"] == "system":
request_data["system_instruction"] = {
"parts": [{"text": message["content"]}]
}
if message["role"] != "system":
if isinstance(message.get("content"), str):
contents.append(
{
"role": (
"user" if message["role"] == "user" else "model"
),
"parts": [{"text": message["content"]}],
}
)
if isinstance(message.get("content"), list):
parts = []
for content in message["content"]:
if content["type"] == "text":
parts.append({"text": content["text"]})
elif content["type"] == "image_url":
image_url = content["image_url"]["url"]
if image_url.startswith("data:image"):
image_data = image_url.split(",")[1]
parts.append(
{
"inline_data": {
"mime_type": "image/jpeg",
"data": image_data,
}
}
)
else:
parts.append({"image_url": image_url})
contents.append(
{
"role": (
"user" if message["role"] == "user" else "model"
),
"parts": parts,
}
)
request_data["contents"] = contents
if model_id.endswith("-search"):
model_id = model_id[:-7]
request_data["tools"] = [{"googleSearch": {}}]
self.open_search = True
await self.emit_status(message="🔍 我好像在搜索……")
elif model_id in self.OPEN_THINK_MODELS:
await self.emit_status(message="🧐 我好像在思考……")
self.open_think = True
self.think_first = True
else:
await self.emit_status(message="🚀 飞速生成中……")
if self.valves.OPEN_SAFETY:
request_data["safetySettings"] = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"threshold": "BLOCK_NONE",
},
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"threshold": "BLOCK_NONE",
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"threshold": "BLOCK_NONE",
},
]
params = {"key": self.GOOGLE_API_KEY}
if stream:
url = f"{self.base_url}/models/{model_id}:streamGenerateContent"
params["alt"] = "sse"
else:
url = f"{self.base_url}/models/{model_id}:generateContent"
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
if stream:
async with client.stream(
"POST",
url,
json=request_data,
headers=headers,
params=params,
timeout=120,
) as response:
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
await self.emit_status(message="❌ 生成失败", done=True)
return
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if "candidates" in data and data["candidates"]:
parts = data["candidates"][0]["content"][
"parts"
]
text = await self.do_parts(parts)
yield text
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
):
yield "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
yield self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
except Exception as e:
yield f"Error parsing stream: {str(e)}"
await self.emit_status(message="🎉 生成成功", done=True)
else:
response = await client.post(
url,
json=request_data,
headers=headers,
params=params,
timeout=120,
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
return
data = response.json()
res = ""
if "candidates" in data and data["candidates"]:
parts = data["candidates"][0]["content"]["parts"]
res = await self.do_parts(parts)
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0]["groundingMetadata"][
"groundingChunks"
]
):
res += "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
res += self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
await self.emit_status(message="🎉 生成成功", done=True)
yield res
return
else:
yield "No response data"
return
except Exception as e:
yield f"Error: {str(e)}"
await self.emit_status(message="❌ 生成失败", done=True)
搞个openwebui的 pipe 玩玩,直连 Gemini 付费key支持搜索,添加-search 模型分离
使不支持图像的o1-mini也能读图做数学题
import asyncio
import re
from typing import Callable, Awaitable, Any, Optional
import aiohttp
from pydantic import BaseModel, Field
class Filter:
class Valves(BaseModel):
priority: int = Field(default=0, description="用于过滤操作的优先级别。")
OCR_Base_URL: str = Field(
default="https://api.openai.com", description="LLm OCR API的基础URL。"
)
OCR_API_KEY: str = Field(default="", description="API的API密钥。")
max_retries: int = Field(default=3, description="HTTP请求的最大重试次数。")
ocr_prompt: str = Field(
default="Please only recognize and extract the text or data from this image without interpreting, analyzing, or understanding the content. Do not output any additional information. Simply return the recognized text or data content.",
description="进行OCR识别的提示词",
)
model_name: str = Field(default="gemini-1.5-flash-latest", description="用于OCR图像的模型名称。推荐使用gemini系列")
def __init__(self):
self.valves = self.Valves()
async def _perform_ocr(
self, image: str, event_emitter: Callable[[Any], Awaitable[None]]
) -> str:
"""执行OCR识别的内部方法"""
await event_emitter(
{
"type": "status",
"data": {
"description": "✨正在对图像进行文字识别中,请耐心等待...",
"done": False,
},
}
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.valves.OCR_API_KEY}",
}
ocr_body = {
"model": self.valves.model_name,
"messages": [
{
"role": "system",
"content": [{"type": "text", "text": self.valves.ocr_prompt}],
},
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": image, "detail": "high"},
}
],
},
],
}
url = f"{self.valves.OCR_Base_URL}/v1/chat/completions"
async with aiohttp.ClientSession() as session:
for attempt in range(self.valves.max_retries):
try:
async with session.post(
url, json=ocr_body, headers=headers
) as response:
response.raise_for_status()
response_data = await response.json()
result = response_data["choices"][0]["message"]["content"]
await event_emitter(
{
"type": "status",
"data": {
"description": "🎉识别成功,交由模型进行处理...",
"done": True,
},
}
)
return result
except Exception as e:
if attempt == self.valves.max_retries - 1:
raise RuntimeError(f"OCR识别失败:{e}")
async def inlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
messages = body.get("messages", [])
# 查找图像
image_info = self._find_image_in_messages(messages)
if not image_info:
return body
message_index, content_index, image = image_info
# 如果已经是第二轮及以上对话,直接返回
if (len(messages) // 2) >= 1:
del messages[message_index]["content"][content_index]
body["messages"] = messages
return body
try:
# 执行OCR识别
result = await self._perform_ocr(image, __event_emitter__)
# 更新消息内容
messages[message_index]["content"][content_index]["type"] = "text"
messages[message_index]["content"][content_index].pop("image_url", None)
messages[message_index]["content"][content_index]["text"] = result
body["messages"] = messages
except Exception as e:
print(f"OCR识别错误: {e}")
# 可以根据需要进行错误处理
return body
def _find_image_in_messages(self, messages):
"""在消息中查找图像"""
for m_index, message in enumerate(messages):
if message["role"] == "user" and isinstance(message.get("content"), list):
for c_index, content in enumerate(message["content"]):
if content["type"] == "image_url":
return m_index, c_index, content["image_url"]["url"]
return None
async def outlet(
self,
body: dict,
__event_emitter__: Callable[[Any], Awaitable[None]],
__user__: Optional[dict] = None,
__model__: Optional[dict] = None,
) -> dict:
return body
1 个赞
我试了一下排名前几的几个函数。确实不如预期。可能大部分网络条件严格
看起来不错
我也覺得不錯 尤其最後的
1 个赞