owu 使用调优……使用自定义图像url

更换用户头像为自定义

curl '{$owu_url}/api/v1/auths/update/profile' \
  -H 'Accept: */*' \
  -H 'Accept-Language: zh-CN,zh;q=0.9' \
  -H 'Connection: keep-alive' \
  -H 'Content-Type: application/json' \
  -H 'authorization: Bearer {user_token}' \
  --data-raw '{"name":"{user_name}","profile_image_url":"{你想要的url}"}' 

需要注意的是,导入后需要把自定义模型中的知识库删除后重新添加以更新所属者的头像url

自动化把全部模型都扫一遍……添加默认参数……提供模型图片更改入口(from config.json)(bro是真不想一个一个改)

import json

def transform_json(input_json):
    """
    Transforms the input JSON to the desired format, preserving existing meta values.

    Args:
        input_json: The input JSON dictionary.

    Returns:
        The transformed JSON dictionary.
    """

    transformed_json = {
        "base_model_id": None,
        "created": input_json.get("created"),
        "id": input_json.get("id"),
        "is_active": input_json.get("is_active"),
        "meta": {  # Initialize meta, then update with existing values
            "description": "",
            "profile_image_url": "「your image url here」",
            "suggestion_prompts": None,
            "tags": []
        },
        "name": input_json.get("name"),
        "object": input_json.get("object"),
        "openai": input_json.get("openai"),
        "owned_by": input_json.get("owned_by"),
        "params": {
            "system": ""
        },
        "parent": input_json.get("parent"),
        "permission": input_json.get("permission"),
        "root": input_json.get("id"),
        "urlIdx": input_json.get("urlIdx")
    }

    # Preserve existing 'meta' values if they exist
    if "meta" in input_json:
        existing_meta = input_json["meta"]
        # Update only if the key exists in the existing meta and input_json.
        if isinstance(existing_meta, dict): # Ensure existing_meta is dictionary
             for key in ["description", "profile_image_url", "suggestion_prompts", "tags"]:
                if key in existing_meta and existing_meta[key] is not None:
                    transformed_json["meta"][key] = existing_meta[key]
    return transformed_json



def process_config(config_file, output_file="output.json"):
    """
    Reads the JSON from the config file, transforms it, and writes to output file.

    Args:
        config_file: The path to the JSON config file.
        output_file: The path to the output JSON file (default: output.json).
    """
    try:
        with open(config_file, 'r') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: File '{config_file}' not found.")
        return
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON in '{config_file}'.")
        return

    if isinstance(data, list):
        transformed_data = [transform_json(item) for item in data]
    elif isinstance(data, dict):
        transformed_data = transform_json(data)
    else:
        print("Error: Unexpected data type in config file. Expected list or dictionary.")
        return

    # Write to the output file
    try:
        with open(output_file, "w") as outfile:
            json.dump(transformed_data, outfile, indent=4)
            print(f"Transformed JSON written to {output_file}")
    except Exception as e:
        print(f"Error writing to output file: {e}")



if __name__ == "__main__":
    config_file = "config.json"
    process_config(config_file)

至于工作空间中的模型的图片可能需要在工作空间导入,导出,当然,你也可以添加默认tag……

22 个赞

能不能详细说说步骤,刚用owu,不知道怎么使用你的代码

1 个赞

需要注意的是,这里的curl命令是bash版本,cmd写起来太难受
谁人Windows用cmd+curl啊,原来是我啊,那没事了

第一个{$owu_url}就是你部署的openwebui的地址(末尾不携带/)
{user_name}就是你的用户名字啊
{你想要的url}就是目标图像url
{user_token} 在点击头像 → 设置 → 账号 → API密钥 → 显示 → JWT令牌那个
也可以在


里面查看token


比如我部署的位置是 http://armbian.local
至于模型图像更改,就是在
http://armbian.local/admin/settings
设置 → 模型 → 导出预设


在导出预设导出模型设定,你会得到形如models-export-1742602515017.json的json文件,更名为config.json文件,和那个Python脚本在同一个地方处理一下
会得到output.json,在里面更改profile_image_url键值来更改每一个模型对应的url
更改完之后上传output.json即可(可以监控一下控制台的网络,每次更新模型都会导致)


至于 {$owu_url}/workspace/models 里面的 直接导入导出,相同操作即可
1 个赞

我选择直接覆盖 head.jpg :tieba_025:

1 个赞

svg性能更优~~ :bili_040:
changing custom background :bili_040:

1 个赞

也是
Owu笑传之Create custom better

1 个赞

太强了,大佬!

没几个用户还都是默认头像是不是不用改了

看得自己爽,每个自定义模型都要穿作者信息

追加一个改的websearch(支持抓取网页)

"""
title: 更好的网络搜索工具

! 将 web_wearch ID 放入 AUTO WEBSEARCH 工具中使用 !

description: 使用 SearXNG 和 Scraper 抓取首页,包含消息和引用。

author: TRUC Yoann

"""

import os
import requests
from datetime import datetime
import json
from requests import get
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any
import chardet
from socket import error as SocketError


class HelpFunctions:
    def __init__(self):
        pass

    def get_base_url(self, url):
        # 获取URL的基础部分 (例如:https://www.example.com)
        parsed_url = urlparse(url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        return base_url

    def generate_excerpt(self, content, max_length=200):
        # 生成文本内容的摘要
        return content[:max_length] + "..." if len(content) > max_length else content

    def format_text(self, original_text):
        # 格式化文本,移除HTML标记,规范化字符
        soup = BeautifulSoup(original_text, "html.parser")
        formatted_text = soup.get_text(separator=" ", strip=True)
        formatted_text = unicodedata.normalize("NFKC", formatted_text)
        formatted_text = re.sub(r"\s+", " ", formatted_text)
        formatted_text = formatted_text.strip()
        formatted_text = self.remove_emojis(formatted_text)
        return formatted_text

    def remove_emojis(self, text):
        # 移除文本中的emoji表情
        return "".join(c for c in text if not unicodedata.category(c).startswith("So"))

    def detect_encoding(self, response_content):
        # 检测给定内容的编码格式
        try:
            result = chardet.detect(response_content)
            return result["encoding"]
        except Exception as e:
            print(f"编码检测失败: {e}")
            return "utf-8"  # 默认使用 utf-8

    def decode_response(self, response):
        # 使用检测到的编码来解码响应内容
        encoding = self.detect_encoding(response.content)
        try:
            return response.content.decode(encoding)
        except UnicodeDecodeError:
            print(f"使用 {encoding} 解码失败,尝试使用 utf-8 编码")
            return response.content.decode("utf-8", errors="replace")  # 替换错误字符

    def truncate_to_n_words(self, text, token_limit):
        # 将文本截断到指定的单词数
        tokens = text.split()
        truncated_tokens = tokens[:token_limit]
        return " ".join(truncated_tokens)

    def process_search_result(self, result, valves):
        # 处理单个搜索结果
        title_site = self.remove_emojis(result["title"])
        url_site = result["url"]
        snippet = result.get("content", "")
        # 检查网站是否在忽略列表中 (仅当 IGNORED_WEBSITES 不为空时)
        if valves.IGNORED_WEBSITES:
            base_url = self.get_base_url(url_site)
            if any(
                ignored_site.strip() in base_url
                for ignored_site in valves.IGNORED_WEBSITES.split(",")
            ):
                return None
        try:
            response_site = requests.get(url_site, timeout=20)
            response_site.raise_for_status()
            html_content = self.decode_response(response_site)
            soup = BeautifulSoup(html_content, "html.parser")
            content_site = self.format_text(soup.get_text(separator=" ", strip=True))
            truncated_content = self.truncate_to_n_words(
                content_site, valves.PAGE_CONTENT_WORDS_LIMIT
            )
            return {
                "title": title_site,
                "url": url_site,
                "content": truncated_content,
                "snippet": self.remove_emojis(snippet),
            }
        except requests.exceptions.RequestException as e:
            # 使用 valves 中配置的重试URL前缀
            alternative_url = valves.RETRY_URL_PREFIX + url_site
            try:
                response_site = requests.get(alternative_url, timeout=20)
                response_site.raise_for_status()
                html_content = self.decode_response(response_site)
                soup = BeautifulSoup(html_content, "html.parser")
                content_site = self.format_text(
                    soup.get_text(separator=" ", strip=True)
                )
                truncated_content = self.truncate_to_n_words(
                    content_site, valves.PAGE_CONTENT_WORDS_LIMIT
                )
                return {
                    "title": title_site,
                    "url": alternative_url,
                    "content": truncated_content,
                    "snippet": self.remove_emojis(snippet),
                }
            except requests.exceptions.RequestException as e:
                return None
        except SocketError as e:
            if e.errno == 101:  # Errno 101: 网络不可达
                # 使用 valves 中配置的重试URL前缀
                alternative_url = valves.RETRY_URL_PREFIX + url_site
                try:
                    response_site = requests.get(alternative_url, timeout=20)
                    response_site.raise_for_status()
                    html_content = self.decode_response(response_site)
                    soup = BeautifulSoup(html_content, "html.parser")
                    content_site = self.format_text(
                        soup.get_text(separator=" ", strip=True)
                    )
                    truncated_content = self.truncate_to_n_words(
                        content_site, valves.PAGE_CONTENT_WORDS_LIMIT
                    )
                    return {
                        "title": title_site,
                        "url": alternative_url,
                        "content": truncated_content,
                        "snippet": self.remove_emojis(snippet),
                    }
                except requests.exceptions.RequestException as e:
                    return None
            else:
                raise  # 抛出其他 SocketError


class EventEmitter:
    def __init__(self, event_emitter: Callable[[dict], Any] = None):
        self.event_emitter = event_emitter

    async def emit(
        self,
        description="Unknown State",
        status="in_progress",
        done=False,
        step_number=None,
    ):
        # 发送事件消息
        if self.event_emitter:
            message = {
                "type": "status",
                "data": {
                    "status": status,
                    "description": description,
                    "done": done,
                },
            }
            if step_number:
                message["data"]["step"] = step_number
            await self.event_emitter(message)


class Tools:
    class Valves(BaseModel):
        # 存储工具配置的类
        SEARXNG_ENGINE_API_BASE_URL: str = Field(
            default="http://host.docker.internal:8080/search",
            description="搜索引擎的API基础URL",
        )
        IGNORED_WEBSITES: str = Field(
            default="",
            description="逗号分隔的要忽略的网站列表",
        )
        RETURNED_SCRAPPED_PAGES_NO: int = Field(
            default=3,
            description="要解析的搜索引擎结果数",
        )
        SCRAPPED_PAGES_NO: int = Field(
            default=5,
            description="抓取的总页数。理想情况下大于返回的页数之一",
        )
        PAGE_CONTENT_WORDS_LIMIT: int = Field(
            default=5000,
            description="每页内容的字数限制",
        )
        CITATION_LINKS: bool = Field(
            default=True,  # 默认值为 True
            description="如果为 True,发送带有链接和元数据的自定义引用",
        )
        RETRY_URL_PREFIX: str = Field(  # 新增配置项
            default="",
            description="用于重试请求的URL前缀",
        )

    def __init__(self):
        self.valves = self.Valves()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
        }
        self.help_functions = HelpFunctions()  # 实例化 HelpFunctions

    async def search_web(
        self,
        query: str,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        搜索网络并获取相关页面的内容。 搜索未知知识、新闻、信息、公开联系信息、天气等。
        :params query: 用于搜索引擎的网络查询。
        :return: 页面内容的json格式。
        """
        functions = self.help_functions
        emitter = EventEmitter(__event_emitter__)
        await emitter.emit("🔍 开始网络搜索", step_number=1)
        await emitter.emit(f"📝 搜索: {query}", step_number=2)
        search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL
        if self.valves.RETURNED_SCRAPPED_PAGES_NO > self.valves.SCRAPPED_PAGES_NO:
            self.valves.RETURNED_SCRAPPED_PAGES_NO = self.valves.SCRAPPED_PAGES_NO
            await emitter.emit(
                "⚙️ 调整搜索参数",
                step_number=3,
            )
        params = {
            "q": query,
            "format": "json",
            "number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO,
        }
        try:
            await emitter.emit("🌐 连接到搜索引擎", step_number=4)
            resp = requests.get(
                search_engine_url, params=params, headers=self.headers, timeout=120
            )
            resp.raise_for_status()
            data = resp.json()
            results = data.get("results", [])
            limited_results = results[: self.valves.SCRAPPED_PAGES_NO]
            await emitter.emit(
                f"📊 找到 {len(limited_results)} 个结果",
                step_number=5,
            )
        except requests.exceptions.RequestException as e:
            await emitter.emit(
                status="error",
                description=f"❌ 搜索时发生错误: {str(e)}",
                done=True,
            )
            return json.dumps({"error": str(e)})
        results_json = []
        if limited_results:
            await emitter.emit(
                "🔄 处理搜索结果",
                step_number=6,
            )
            with concurrent.futures.ThreadPoolExecutor() as executor:
                futures = [
                    executor.submit(
                        functions.process_search_result, result, self.valves
                    )
                    for result in limited_results
                ]
                processed_count = 0
                for future in concurrent.futures.as_completed(futures):
                    result_json = future.result()
                    if result_json:
                        try:
                            json.dumps(result_json)
                            results_json.append(result_json)
                            processed_count += 1
                            await emitter.emit(
                                f"📄 正在处理页面 {processed_count}/{len(limited_results)}",
                                step_number=7,
                            )
                        except (TypeError, ValueError):
                            continue
                    if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO:
                        break
            results_json = results_json[: self.valves.RETURNED_SCRAPPED_PAGES_NO]
            if self.valves.CITATION_LINKS and __event_emitter__:
                await emitter.emit(
                    "📚 生成引用和参考",
                    step_number=8,
                )
                for result in results_json:
                    await __event_emitter__(
                        {
                            "type": "citation",
                            "data": {
                                "document": [result["content"]],
                                "metadata": [
                                    {
                                        "source": result["url"],
                                        "date_accessed": datetime.now().isoformat(),
                                        "title": result["title"],
                                    }
                                ],
                                "source": {
                                    "name": result["title"],
                                    "url": result["url"],
                                },
                            },
                        }
                    )
        await emitter.emit(
            status="complete",
            description=f"✅ 搜索完成 - {len(results_json)} 页已分析",
            done=True,
            step_number=9,
        )
        return json.dumps(results_json, ensure_ascii=False)

    async def get_website(
        self,
        url: str,
        __event_emitter__: Callable[[dict], Any] = None,
    ) -> str:
        """
        抓取提供的网站并获取其内容。
        :params url: 网站的URL。
        :return: 网站内容的json格式。
        """
        functions = self.help_functions
        emitter = EventEmitter(__event_emitter__)
        await emitter.emit(f"🔍 访问URL: {url}", step_number=1)
        results_json = []
        try:
            await emitter.emit("🌐 下载内容", step_number=2)
            response_site = requests.get(url, headers=self.headers, timeout=120)
            response_site.raise_for_status()
            html_content = functions.decode_response(response_site)
            await emitter.emit("📑 分析页面内容", step_number=3)
            soup = BeautifulSoup(html_content, "html.parser")
            page_title = soup.title.string if soup.title else "No title found"
            page_title = unicodedata.normalize("NFKC", page_title.strip())
            page_title = functions.remove_emojis(page_title)
            title_site = page_title
            url_site = url
            await emitter.emit("📝 提取并格式化文本", step_number=4)
            content_site = functions.format_text(
                soup.get_text(separator=" ", strip=True)
            )
            truncated_content = functions.truncate_to_n_words(
                content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
            )
            await emitter.emit("📊 创建摘要", step_number=5)
            result_site = {
                "title": title_site,
                "url": url_site,
                "content": truncated_content,
                "excerpt": functions.generate_excerpt(content_site),
                "date_accessed": datetime.now().isoformat(),
            }
            results_json.append(result_site)
            if self.valves.CITATION_LINKS and __event_emitter__:
                await emitter.emit("📚 生成引用", step_number=6)
                await __event_emitter__(
                    {
                        "type": "citation",
                        "data": {
                            "document": [truncated_content],
                            "metadata": [
                                {
                                    "source": url_site,
                                    "date_accessed": datetime.now().isoformat(),
                                    "title": title_site,
                                }
                            ],
                            "source": {
                                "name": title_site,
                                "url": url_site,
                                "type": "webpage",
                            },
                        },
                    }
                )
            await emitter.emit(
                status="complete",
                description="✅ 网站内容提取和处理成功",
                done=True,
                step_number=7,
            )
        except requests.exceptions.RequestException as e:
            # 使用 valves 中配置的重试URL前缀
            alternative_url = self.valves.RETRY_URL_PREFIX + url
            try:
                response_site = requests.get(
                    alternative_url, headers=self.headers, timeout=120
                )
                response_site.raise_for_status()
                html_content = functions.decode_response(response_site)
                await emitter.emit("📑 分析页面内容", step_number=3)
                soup = BeautifulSoup(html_content, "html.parser")
                page_title = soup.title.string if soup.title else "No title found"
                page_title = unicodedata.normalize("NFKC", page_title.strip())
                page_title = functions.remove_emojis(page_title)
                title_site = page_title
                url_site = alternative_url
                await emitter.emit("📝 提取并格式化文本", step_number=4)
                content_site = functions.format_text(
                    soup.get_text(separator=" ", strip=True)
                )
                truncated_content = functions.truncate_to_n_words(
                    content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
                )
                await emitter.emit("📊 创建摘要", step_number=5)
                result_site = {
                    "title": title_site,
                    "url": url_site,
                    "content": truncated_content,
                    "excerpt": functions.generate_excerpt(content_site),
                    "date_accessed": datetime.now().isoformat(),
                }
                results_json.append(result_site)
                if self.valves.CITATION_LINKS and __event_emitter__:
                    await emitter.emit("📚 生成引用", step_number=6)
                    await __event_emitter__(
                        {
                            "type": "citation",
                            "data": {
                                "document": [truncated_content],
                                "metadata": [
                                    {
                                        "source": url_site,
                                        "date_accessed": datetime.now().isoformat(),
                                        "title": title_site,
                                    }
                                ],
                                "source": {
                                    "name": title_site,
                                    "url": url_site,
                                    "type": "webpage",
                                },
                            },
                        }
                    )
                await emitter.emit(
                    status="complete",
                    description="✅ 网站内容提取和处理成功",
                    done=True,
                    step_number=7,
                )
            except requests.exceptions.RequestException as e:
                await emitter.emit(
                    status="error",
                    description=f"❌ 访问页面时出错: {str(e)}",
                    done=True,
                )
                results_json.append(
                    {
                        "url": url,
                        "content": f"无法获取页面。 错误: {str(e)}",
                        "error": True,
                        "date_accessed": datetime.now().isoformat(),
                    }
                )
        except SocketError as e:
            # 使用 valves 中配置的重试URL前缀
            alternative_url = self.valves.RETRY_URL_PREFIX + url
            try:
                response_site = requests.get(
                    alternative_url, headers=self.headers, timeout=120
                )
                response_site.raise_for_status()
                html_content = functions.decode_response(response_site)
                await emitter.emit("📑 分析页面内容", step_number=3)
                soup = BeautifulSoup(html_content, "html.parser")
                page_title = soup.title.string if soup.title else "No title found"
                page_title = unicodedata.normalize("NFKC", page_title.strip())
                page_title = functions.remove_emojis(page_title)
                title_site = page_title
                url_site = alternative_url
                await emitter.emit("📝 提取并格式化文本", step_number=4)
                content_site = functions.format_text(
                    soup.get_text(separator=" ", strip=True)
                )
                truncated_content = functions.truncate_to_n_words(
                    content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
                )
                await emitter.emit("📊 创建摘要", step_number=5)
                result_site = {
                    "title": title_site,
                    "url": url_site,
                    "content": truncated_content,
                    "excerpt": functions.generate_excerpt(content_site),
                    "date_accessed": datetime.now().isoformat(),
                }
                results_json.append(result_site)
                if self.valves.CITATION_LINKS and __event_emitter__:
                    await emitter.emit("📚 生成引用", step_number=6)
                    await __event_emitter__(
                        {
                            "type": "citation",
                            "data": {
                                "document": [truncated_content],
                                "metadata": [
                                    {
                                        "source": url_site,
                                        "date_accessed": datetime.now().isoformat(),
                                        "title": title_site,
                                    }
                                ],
                                "source": {
                                    "name": title_site,
                                    "url": url_site,
                                    "type": "webpage",
                                },
                            },
                        }
                    )
                await emitter.emit(
                    status="complete",
                    description="✅ 网站内容提取和处理成功",
                    done=True,
                    step_number=7,
                )
            except requests.exceptions.RequestException as e:
                await emitter.emit(
                    status="error",
                    description=f"❌ 访问页面时出错: {str(e)}",
                    done=True,
                )
                results_json.append(
                    {
                        "url": url,
                        "content": f"无法获取页面。 错误: {str(e)}",
                        "error": True,
                        "date_accessed": datetime.now().isoformat(),
                    }
                )
            else:
                raise  # 抛出其他 SocketError
        return json.dumps(results_json, ensure_ascii=False)