openwebui gemini 绘图上传图床

注意!! 使用该脚本,需要配合 MarSeventh/CloudFlare-ImgBed: 基于 CloudFlare Pages 的开源文件托管解决方案(图床/文件床/网盘)项目,自建图床使用!!!

在这里我提供一个公益的图床 稳定性不保证:
图床上传URL:https://cloudflare-imgbed-7b0.pages.dev/upload
鉴权Key:B1NuKs2VOtxGXe9

根据 给 owu 的 gemini pipe 加上 图文支持 - 资源荟萃 / 资源荟萃, Lv1 - LINUX DO 大佬的脚本进行修改

在使大佬的脚本过程中,发现有一些地方用着不太舒服,于是删减了部分功能,并且加入了对gemini-2.0-flash-exp-image模型返回图片的上传图床出来,从而解决openwebui 在显示大量base64时出现的卡顿问题。

并且在上传图床的基础上,增加了多轮对话的图片提取自动回传聊天的功能,从而实现和google 仪表盘一致的可连续绘图对话功能。

函数代码
"""
title: Gemini Pipe
author_url: https://linux.do/u/coker/summary
author: coker
modifier_url: https://linux.do/u/zgccrui/summary
modifier: zgccrui
version: 1.1.4
license: MIT
"""

import json
import random
import httpx
import re
import base64
import io
import traceback
from typing import List, AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import time
import uuid


class Pipe:
    class Valves(BaseModel):
        GOOGLE_API_KEYS: str = Field(
            default="", description="API Keys for Google, use , to split"
        )
        BASE_URL: str = Field(
            default="https://generativelanguage.googleapis.com/v1beta",
            description="API Base Url",
        )
        OPEN_SEARCH_INFO: bool = Field(
            default=True, description="Open search info show "
        )
        GEMINI_API_MODEL: str = Field(
            default="gemini-2.0-flash-exp",
            description="API请求的模型名称,默认为 gemini-2.0-flash-exp,多模型名可使用`,`分隔",
        )
        IMAGE_HOST_URL: str = Field(
            default="https://cloudflare-imgbed-7b0.pages.dev/upload",
            description="图床URL,使用图床项目地址为 https://github.com/MarSeventh/CloudFlare-ImgBed",
        )
        IMAGE_HOST_AUTH_CODE: str = Field(default="", description="图床鉴权Key")
        RESEND_IMAGES: bool = Field(
            default=True, description="是否重新发送历史消息中的图片到大模型"
        )

    def __init__(self):
        self.type = "manifold"
        self.name = ""
        self.valves = self.Valves()
        self.OPEN_SEARCH_MODELS = ["gemini-2.0-pro-exp"]
        self.OPEN_IMAGE_OUT_MODELS = ["gemini-2.0-flash-exp"]

        self.emitter = None
        self.open_search = False
        self.open_image = False

    def pipes(self) -> List[dict]:
        models = self.valves.GEMINI_API_MODEL.split(",")
        return [
            {
                "id": model.strip(),
                "name": model.strip(),
            }
            for model in models
        ]

    def create_search_link(self, idx, web):
        return f'\n{idx:02d}: [**{web["title"]}**]({web["uri"]})'

    async def upload_image_to_host(self, image_data, mime_type):
        """上传图片到图床并返回URL"""
        try:
            file_ext = mime_type.split("/")[1] if "/" in mime_type else "jpg"
            file_like = io.BytesIO(image_data)

            # 准备多部分表单数据
            current_date = time.strftime("%Y/%m/%d")
            files = {
                "file": (
                    f"{current_date}/{uuid.uuid4().hex[:8]}.{file_ext}",
                    file_like,
                    mime_type,
                )
            }

            # 使用Valves中的图床URL和认证码
            upload_url = f"{self.valves.IMAGE_HOST_URL}?authCode={self.valves.IMAGE_HOST_AUTH_CODE}&uploadNameType=origin"

            # 上传到图床服务
            async with httpx.AsyncClient() as client:
                response = await client.post(upload_url, files=files, timeout=30)

                if response.status_code == 200:
                    # 从响应中获取路径
                    image_path = response.json()[0]["src"]
                    # 从IMAGE_HOST_URL提取基本域名
                    base_domain = "/".join(self.valves.IMAGE_HOST_URL.split("/")[:3])
                    # 构建完整URL
                    full_image_url = f"{base_domain}{image_path}"
                    return full_image_url
                else:
                    print(
                        f"图片上传失败,状态码 {response.status_code}: {response.text}"
                    )
                    return None
        except Exception as e:
            print(f"图片上传错误: {str(e)}")
            print(traceback.format_exc())
            return None

    async def download_image_from_url(self, url):
        """从URL下载图片"""
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(url, timeout=30)
                if response.status_code == 200:
                    return response.content, response.headers.get(
                        "content-type", "image/jpeg"
                    )
                else:
                    print(f"图片下载失败,状态码 {response.status_code}")
                    return None, None
        except Exception as e:
            print(f"图片下载错误: {str(e)}")
            return None, None

    async def do_parts(self, parts):
        res = ""
        if not parts or not isinstance(parts, list):
            return "Error: No parts found"

        for part in parts:
            if "text" in part:
                res += part["text"]
            if "inlineData" in part and part["inlineData"]:
                try:
                    # 解码base64图片数据
                    image_data = base64.b64decode(part["inlineData"]["data"])
                    mime_type = part["inlineData"]["mimeType"]

                    # 上传到图床
                    image_url = await self.upload_image_to_host(image_data, mime_type)

                    if image_url:
                        res += f"\n![image]({image_url})\n"
                    else:
                        res += "\n[图片上传失败]\n"

                except Exception as e:
                    print(f"图片上传错误: {str(e)}")
                    print(traceback.format_exc())
                    res += f"\n[图片上传错误: {str(e)}]\n"

        return res

    async def process_message_content(self, message):
        """处理消息内容,包括解析和重新发送图片到大模型"""
        if isinstance(message.get("content"), str):
            content = message["content"]

            # 检查内容中是否包含Markdown格式的图片链接
            img_pattern = r"!\[image\]\((.*?)\)"
            img_matches = re.findall(img_pattern, content)

            # 如果是assistant消息,包含图片链接,并启用了图片重新发送功能
            if (
                img_matches
                and message["role"] == "assistant"
                and self.valves.RESEND_IMAGES
            ):
                # 准备处理后的parts
                parts = []

                # 处理文本部分
                text_parts = re.split(img_pattern, content)
                for i, text_part in enumerate(text_parts):
                    if text_part:  # 添加非空文本
                        parts.append({"text": text_part})

                    # 如果还有对应的图片链接,尝试下载并作为图片数据发送
                    if i < len(img_matches):
                        try:
                            # 下载图片数据
                            img_data, content_type = await self.download_image_from_url(
                                img_matches[i]
                            )
                            if img_data:
                                # 获取mime类型
                                mime_type = content_type or "image/jpeg"

                                # 以base64编码添加为内联数据
                                img_base64 = base64.b64encode(img_data).decode("utf-8")
                                parts.append(
                                    {
                                        "inline_data": {
                                            "mime_type": mime_type,
                                            "data": img_base64,
                                        }
                                    }
                                )
                            else:
                                # 下载失败,保留原始文本
                                parts.append(
                                    {"text": f"\n![image]({img_matches[i]})\n"}
                                )
                        except Exception as e:
                            print(f"处理图片错误: {str(e)}")
                            # 处理失败,添加原始链接作为文本
                            parts.append({"text": f"\n![image]({img_matches[i]})\n"})

                return {
                    "role": "model" if message["role"] == "assistant" else "user",
                    "parts": parts,
                }
            else:
                # 没有图片或不需要处理,返回原始内容
                return {
                    "role": "user" if message["role"] == "user" else "model",
                    "parts": [{"text": content}],
                }
        elif isinstance(message.get("content"), list):
            parts = []
            for content in message["content"]:
                if content["type"] == "text":
                    parts.append({"text": content["text"]})
                elif content["type"] == "image_url":
                    image_url = content["image_url"]["url"]
                    if image_url.startswith("data:image"):
                        # 处理base64编码的图片
                        image_data = image_url.split(",")[1]
                        mime_type = image_url.split(";")[0].split(":")[1]
                        parts.append(
                            {
                                "inline_data": {
                                    "mime_type": mime_type,
                                    "data": image_data,
                                }
                            }
                        )
                    else:
                        # 处理外部图片URL
                        try:
                            img_data, content_type = await self.download_image_from_url(
                                image_url
                            )
                            if img_data:
                                mime_type = content_type or "image/jpeg"
                                img_base64 = base64.b64encode(img_data).decode("utf-8")
                                parts.append(
                                    {
                                        "inline_data": {
                                            "mime_type": mime_type,
                                            "data": img_base64,
                                        }
                                    }
                                )
                            else:
                                parts.append({"image_url": image_url})
                        except Exception as e:
                            print(f"处理URL图片错误: {str(e)}")
                            parts.append({"image_url": image_url})
            return {
                "role": "user" if message["role"] == "user" else "model",
                "parts": parts,
            }
        return {
            "role": "user" if message["role"] == "user" else "model",
            "parts": [{"text": "无内容"}],
        }

    async def pipe(
        self,
        body: dict,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
    ) -> AsyncGenerator[str, None]:
        self.emitter = __event_emitter__
        self.GOOGLE_API_KEY = random.choice(
            self.valves.GOOGLE_API_KEYS.split(",")
        ).strip()
        self.base_url = self.valves.BASE_URL
        if not self.GOOGLE_API_KEY:
            yield "Error: GOOGLE_API_KEY is not set"
            return
        try:
            model_id = body["model"]
            if "." in model_id:
                model_id = model_id.split(".", 1)[1]
            messages = body["messages"]
            stream = body.get("stream", False)
            # 准备请求载荷
            contents = []
            request_data = {
                "generationConfig": {
                    "temperature": body.get("temperature", 0.7),
                    "topP": body.get("top_p", 0.9),
                    "topK": body.get("top_k", 40),
                    "maxOutputTokens": body.get("max_tokens", 8192),
                    "stopSequences": body.get("stop", []),
                },
            }

            for message in messages:
                if message["role"] == "system":
                    request_data["system_instruction"] = {
                        "parts": [{"text": message["content"]}]
                    }
                elif message["role"] != "system":
                    # 使用新的消息处理函数
                    processed_message = await self.process_message_content(message)
                    contents.append(processed_message)

            request_data["contents"] = contents
            if model_id.endswith("-search"):
                model_id = model_id[:-7]
                request_data["tools"] = [{"googleSearch": {}}]
                self.open_search = True
            elif "image-generation" in model_id:
                # model_id = model_id[:-6]
                request_data["generationConfig"]["response_modalities"] = [
                    "Text",
                    "Image",
                ]
                self.open_image = True
            params = {"key": self.GOOGLE_API_KEY}
            if stream:
                url = f"{self.valves.BASE_URL}/models/{model_id}:streamGenerateContent"
                params["alt"] = "sse"
            else:
                url = f"{self.valves.BASE_URL}/models/{model_id}:generateContent"
            headers = {"Content-Type": "application/json"}
            async with httpx.AsyncClient() as client:
                if stream:
                    async with client.stream(
                        "POST",
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    ) as response:
                        if response.status_code != 200:
                            yield f"Error: HTTP {response.status_code}: {response.text}"
                            return

                        async for line in response.aiter_lines():
                            if line.startswith("data: "):
                                try:
                                    data = json.loads(line[6:])
                                    if "candidates" in data and data["candidates"]:
                                        parts = data["candidates"][0]["content"][
                                            "parts"
                                        ]
                                        text = await self.do_parts(parts)
                                        yield text
                                        try:
                                            if (
                                                self.open_search
                                                and self.valves.OPEN_SEARCH_INFO
                                                and data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                            ):
                                                yield "\n---------------------------------\n"
                                                groundingChunks = data["candidates"][0][
                                                    "groundingMetadata"
                                                ]["groundingChunks"]
                                                for idx, groundingChunk in enumerate(
                                                    groundingChunks, 1
                                                ):
                                                    if "web" in groundingChunk:
                                                        yield self.create_search_link(
                                                            idx, groundingChunk["web"]
                                                        )
                                        except Exception as e:
                                            pass
                                except Exception as e:
                                    # yield f"Error parsing stream: {str(e)}"
                                    pass
                else:
                    response = await client.post(
                        url,
                        json=request_data,
                        headers=headers,
                        params=params,
                        timeout=120,
                    )
                    if response.status_code != 200:
                        yield f"Error: HTTP {response.status_code}: {response.text}"
                        return
                    data = response.json()
                    res = ""
                    if "candidates" in data and data["candidates"]:
                        parts = data["candidates"][0]["content"]["parts"]
                        res = await self.do_parts(parts)
                        try:
                            if (
                                self.open_search
                                and self.valves.OPEN_SEARCH_INFO
                                and data["candidates"][0]["groundingMetadata"][
                                    "groundingChunks"
                                ]
                            ):
                                res += "\n---------------------------------\n"
                                groundingChunks = data["candidates"][0][
                                    "groundingMetadata"
                                ]["groundingChunks"]
                                for idx, groundingChunk in enumerate(
                                    groundingChunks, 1
                                ):
                                    if "web" in groundingChunk:
                                        res += self.create_search_link(
                                            idx, groundingChunk["web"]
                                        )
                        except Exception as e:
                            pass
                        yield res
                    else:
                        yield "No response data"
        except Exception as e:
            yield f"Error: {str(e)}"
20 Likes

越来越强了 :tieba_087:

太强了佬!

太强了佬 :tieba_087:

太强了,谢谢大佬

哇!好用!

大佬太强了 :rofl:

提供一个公益的图床 稳定性不保证:
图床上传URL:https://cloudflare-imgbed-7b0.pages.dev/upload
鉴权Key:B1NuKs2VOtxGXe9

1 Like

byd太牛批了啊

大佬太强了

大佬 是不是能透過fliter實現一個東西是只要訊息中有圖片都上傳的圖床 減少帶有圖片對話開啟帶來的卡頓

发出的信息 openwebui好像是直接保存了 这个得具体看看

好 謝大佬回覆

牛的夸张

支持作者 :grin:

大佬这个该怎么解决呀