注意!! 使用该脚本,需要配合 MarSeventh/CloudFlare-ImgBed: 基于 CloudFlare Pages 的开源文件托管解决方案(图床/文件床/网盘)项目,自建图床使用!!!
在这里我提供一个公益的图床 稳定性不保证:
图床上传URL:https://cloudflare-imgbed-7b0.pages.dev/upload
鉴权Key:B1NuKs2VOtxGXe9
根据 给 owu 的 gemini pipe 加上 图文支持 - 资源荟萃 / 资源荟萃, Lv1 - LINUX DO 大佬的脚本进行修改
在使大佬的脚本过程中,发现有一些地方用着不太舒服,于是删减了部分功能,并且加入了对gemini-2.0-flash-exp-image
模型返回图片的上传图床出来,从而解决openwebui 在显示大量base64时出现的卡顿问题。
并且在上传图床的基础上,增加了多轮对话的图片提取自动回传聊天的功能,从而实现和google 仪表盘一致的可连续绘图对话功能。
函数代码
"""
title: Gemini Pipe
author_url: https://linux.do/u/coker/summary
author: coker
modifier_url: https://linux.do/u/zgccrui/summary
modifier: zgccrui
version: 1.1.4
license: MIT
"""
import json
import random
import httpx
import re
import base64
import io
import traceback
from typing import List, AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import time
import uuid
class Pipe:
class Valves(BaseModel):
GOOGLE_API_KEYS: str = Field(
default="", description="API Keys for Google, use , to split"
)
BASE_URL: str = Field(
default="https://generativelanguage.googleapis.com/v1beta",
description="API Base Url",
)
OPEN_SEARCH_INFO: bool = Field(
default=True, description="Open search info show "
)
GEMINI_API_MODEL: str = Field(
default="gemini-2.0-flash-exp",
description="API请求的模型名称,默认为 gemini-2.0-flash-exp,多模型名可使用`,`分隔",
)
IMAGE_HOST_URL: str = Field(
default="https://cloudflare-imgbed-7b0.pages.dev/upload",
description="图床URL,使用图床项目地址为 https://github.com/MarSeventh/CloudFlare-ImgBed",
)
IMAGE_HOST_AUTH_CODE: str = Field(default="", description="图床鉴权Key")
RESEND_IMAGES: bool = Field(
default=True, description="是否重新发送历史消息中的图片到大模型"
)
def __init__(self):
self.type = "manifold"
self.name = ""
self.valves = self.Valves()
self.OPEN_SEARCH_MODELS = ["gemini-2.0-pro-exp"]
self.OPEN_IMAGE_OUT_MODELS = ["gemini-2.0-flash-exp"]
self.emitter = None
self.open_search = False
self.open_image = False
def pipes(self) -> List[dict]:
models = self.valves.GEMINI_API_MODEL.split(",")
return [
{
"id": model.strip(),
"name": model.strip(),
}
for model in models
]
def create_search_link(self, idx, web):
return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'
async def upload_image_to_host(self, image_data, mime_type):
"""上传图片到图床并返回URL"""
try:
file_ext = mime_type.split("/")[1] if "/" in mime_type else "jpg"
file_like = io.BytesIO(image_data)
# 准备多部分表单数据
current_date = time.strftime("%Y/%m/%d")
files = {
"file": (
f"{current_date}/{uuid.uuid4().hex[:8]}.{file_ext}",
file_like,
mime_type,
)
}
# 使用Valves中的图床URL和认证码
upload_url = f"{self.valves.IMAGE_HOST_URL}?authCode={self.valves.IMAGE_HOST_AUTH_CODE}&uploadNameType=origin"
# 上传到图床服务
async with httpx.AsyncClient() as client:
response = await client.post(upload_url, files=files, timeout=30)
if response.status_code == 200:
# 从响应中获取路径
image_path = response.json()[0]["src"]
# 从IMAGE_HOST_URL提取基本域名
base_domain = "/".join(self.valves.IMAGE_HOST_URL.split("/")[:3])
# 构建完整URL
full_image_url = f"{base_domain}{image_path}"
return full_image_url
else:
print(
f"图片上传失败,状态码 {response.status_code}: {response.text}"
)
return None
except Exception as e:
print(f"图片上传错误: {str(e)}")
print(traceback.format_exc())
return None
async def download_image_from_url(self, url):
"""从URL下载图片"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=30)
if response.status_code == 200:
return response.content, response.headers.get(
"content-type", "image/jpeg"
)
else:
print(f"图片下载失败,状态码 {response.status_code}")
return None, None
except Exception as e:
print(f"图片下载错误: {str(e)}")
return None, None
async def do_parts(self, parts):
res = ""
if not parts or not isinstance(parts, list):
return "Error: No parts found"
for part in parts:
if "text" in part:
res += part["text"]
if "inlineData" in part and part["inlineData"]:
try:
# 解码base64图片数据
image_data = base64.b64decode(part["inlineData"]["data"])
mime_type = part["inlineData"]["mimeType"]
# 上传到图床
image_url = await self.upload_image_to_host(image_data, mime_type)
if image_url:
res += f"\n\n"
else:
res += "\n[图片上传失败]\n"
except Exception as e:
print(f"图片上传错误: {str(e)}")
print(traceback.format_exc())
res += f"\n[图片上传错误: {str(e)}]\n"
return res
async def process_message_content(self, message):
"""处理消息内容,包括解析和重新发送图片到大模型"""
if isinstance(message.get("content"), str):
content = message["content"]
# 检查内容中是否包含Markdown格式的图片链接
img_pattern = r"!\[image\]\((.*?)\)"
img_matches = re.findall(img_pattern, content)
# 如果是assistant消息,包含图片链接,并启用了图片重新发送功能
if (
img_matches
and message["role"] == "assistant"
and self.valves.RESEND_IMAGES
):
# 准备处理后的parts
parts = []
# 处理文本部分
text_parts = re.split(img_pattern, content)
for i, text_part in enumerate(text_parts):
if text_part: # 添加非空文本
parts.append({"text": text_part})
# 如果还有对应的图片链接,尝试下载并作为图片数据发送
if i < len(img_matches):
try:
# 下载图片数据
img_data, content_type = await self.download_image_from_url(
img_matches[i]
)
if img_data:
# 获取mime类型
mime_type = content_type or "image/jpeg"
# 以base64编码添加为内联数据
img_base64 = base64.b64encode(img_data).decode("utf-8")
parts.append(
{
"inline_data": {
"mime_type": mime_type,
"data": img_base64,
}
}
)
else:
# 下载失败,保留原始文本
parts.append(
{"text": f"\n\n"}
)
except Exception as e:
print(f"处理图片错误: {str(e)}")
# 处理失败,添加原始链接作为文本
parts.append({"text": f"\n\n"})
return {
"role": "model" if message["role"] == "assistant" else "user",
"parts": parts,
}
else:
# 没有图片或不需要处理,返回原始内容
return {
"role": "user" if message["role"] == "user" else "model",
"parts": [{"text": content}],
}
elif isinstance(message.get("content"), list):
parts = []
for content in message["content"]:
if content["type"] == "text":
parts.append({"text": content["text"]})
elif content["type"] == "image_url":
image_url = content["image_url"]["url"]
if image_url.startswith("data:image"):
# 处理base64编码的图片
image_data = image_url.split(",")[1]
mime_type = image_url.split(";")[0].split(":")[1]
parts.append(
{
"inline_data": {
"mime_type": mime_type,
"data": image_data,
}
}
)
else:
# 处理外部图片URL
try:
img_data, content_type = await self.download_image_from_url(
image_url
)
if img_data:
mime_type = content_type or "image/jpeg"
img_base64 = base64.b64encode(img_data).decode("utf-8")
parts.append(
{
"inline_data": {
"mime_type": mime_type,
"data": img_base64,
}
}
)
else:
parts.append({"image_url": image_url})
except Exception as e:
print(f"处理URL图片错误: {str(e)}")
parts.append({"image_url": image_url})
return {
"role": "user" if message["role"] == "user" else "model",
"parts": parts,
}
return {
"role": "user" if message["role"] == "user" else "model",
"parts": [{"text": "无内容"}],
}
async def pipe(
self,
body: dict,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
) -> AsyncGenerator[str, None]:
self.emitter = __event_emitter__
self.GOOGLE_API_KEY = random.choice(
self.valves.GOOGLE_API_KEYS.split(",")
).strip()
self.base_url = self.valves.BASE_URL
if not self.GOOGLE_API_KEY:
yield "Error: GOOGLE_API_KEY is not set"
return
try:
model_id = body["model"]
if "." in model_id:
model_id = model_id.split(".", 1)[1]
messages = body["messages"]
stream = body.get("stream", False)
# 准备请求载荷
contents = []
request_data = {
"generationConfig": {
"temperature": body.get("temperature", 0.7),
"topP": body.get("top_p", 0.9),
"topK": body.get("top_k", 40),
"maxOutputTokens": body.get("max_tokens", 8192),
"stopSequences": body.get("stop", []),
},
}
for message in messages:
if message["role"] == "system":
request_data["system_instruction"] = {
"parts": [{"text": message["content"]}]
}
elif message["role"] != "system":
# 使用新的消息处理函数
processed_message = await self.process_message_content(message)
contents.append(processed_message)
request_data["contents"] = contents
if model_id.endswith("-search"):
model_id = model_id[:-7]
request_data["tools"] = [{"googleSearch": {}}]
self.open_search = True
elif "image-generation" in model_id:
# model_id = model_id[:-6]
request_data["generationConfig"]["response_modalities"] = [
"Text",
"Image",
]
self.open_image = True
params = {"key": self.GOOGLE_API_KEY}
if stream:
url = f"{self.valves.BASE_URL}/models/{model_id}:streamGenerateContent"
params["alt"] = "sse"
else:
url = f"{self.valves.BASE_URL}/models/{model_id}:generateContent"
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient() as client:
if stream:
async with client.stream(
"POST",
url,
json=request_data,
headers=headers,
params=params,
timeout=120,
) as response:
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
return
async for line in response.aiter_lines():
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if "candidates" in data and data["candidates"]:
parts = data["candidates"][0]["content"][
"parts"
]
text = await self.do_parts(parts)
yield text
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
):
yield "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
yield self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
except Exception as e:
# yield f"Error parsing stream: {str(e)}"
pass
else:
response = await client.post(
url,
json=request_data,
headers=headers,
params=params,
timeout=120,
)
if response.status_code != 200:
yield f"Error: HTTP {response.status_code}: {response.text}"
return
data = response.json()
res = ""
if "candidates" in data and data["candidates"]:
parts = data["candidates"][0]["content"]["parts"]
res = await self.do_parts(parts)
try:
if (
self.open_search
and self.valves.OPEN_SEARCH_INFO
and data["candidates"][0]["groundingMetadata"][
"groundingChunks"
]
):
res += "\n---------------------------------\n"
groundingChunks = data["candidates"][0][
"groundingMetadata"
]["groundingChunks"]
for idx, groundingChunk in enumerate(
groundingChunks, 1
):
if "web" in groundingChunk:
res += self.create_search_link(
idx, groundingChunk["web"]
)
except Exception as e:
pass
yield res
else:
yield "No response data"
except Exception as e:
yield f"Error: {str(e)}"