Stevessr:
添加
追加一个改的websearch(支持抓取网页)
"""
title: 更好的网络搜索工具
! 将 web_wearch ID 放入 AUTO WEBSEARCH 工具中使用 !
description: 使用 SearXNG 和 Scraper 抓取首页,包含消息和引用。
author: TRUC Yoann
"""
import os
import requests
from datetime import datetime
import json
from requests import get
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse, urljoin
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any
import chardet
from socket import error as SocketError
class HelpFunctions:
def __init__(self):
pass
def get_base_url(self, url):
# 获取URL的基础部分 (例如:https://www.example.com)
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url
def generate_excerpt(self, content, max_length=200):
# 生成文本内容的摘要
return content[:max_length] + "..." if len(content) > max_length else content
def format_text(self, original_text):
# 格式化文本,移除HTML标记,规范化字符
soup = BeautifulSoup(original_text, "html.parser")
formatted_text = soup.get_text(separator=" ", strip=True)
formatted_text = unicodedata.normalize("NFKC", formatted_text)
formatted_text = re.sub(r"\s+", " ", formatted_text)
formatted_text = formatted_text.strip()
formatted_text = self.remove_emojis(formatted_text)
return formatted_text
def remove_emojis(self, text):
# 移除文本中的emoji表情
return "".join(c for c in text if not unicodedata.category(c).startswith("So"))
def detect_encoding(self, response_content):
# 检测给定内容的编码格式
try:
result = chardet.detect(response_content)
return result["encoding"]
except Exception as e:
print(f"编码检测失败: {e}")
return "utf-8" # 默认使用 utf-8
def decode_response(self, response):
# 使用检测到的编码来解码响应内容
encoding = self.detect_encoding(response.content)
try:
return response.content.decode(encoding)
except UnicodeDecodeError:
print(f"使用 {encoding} 解码失败,尝试使用 utf-8 编码")
return response.content.decode("utf-8", errors="replace") # 替换错误字符
def truncate_to_n_words(self, text, token_limit):
# 将文本截断到指定的单词数
tokens = text.split()
truncated_tokens = tokens[:token_limit]
return " ".join(truncated_tokens)
def process_search_result(self, result, valves):
# 处理单个搜索结果
title_site = self.remove_emojis(result["title"])
url_site = result["url"]
snippet = result.get("content", "")
# 检查网站是否在忽略列表中 (仅当 IGNORED_WEBSITES 不为空时)
if valves.IGNORED_WEBSITES:
base_url = self.get_base_url(url_site)
if any(
ignored_site.strip() in base_url
for ignored_site in valves.IGNORED_WEBSITES.split(",")
):
return None
try:
response_site = requests.get(url_site, timeout=20)
response_site.raise_for_status()
html_content = self.decode_response(response_site)
soup = BeautifulSoup(html_content, "html.parser")
content_site = self.format_text(soup.get_text(separator=" ", strip=True))
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": url_site,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.RequestException as e:
# 使用 valves 中配置的重试URL前缀
alternative_url = valves.RETRY_URL_PREFIX + url_site
try:
response_site = requests.get(alternative_url, timeout=20)
response_site.raise_for_status()
html_content = self.decode_response(response_site)
soup = BeautifulSoup(html_content, "html.parser")
content_site = self.format_text(
soup.get_text(separator=" ", strip=True)
)
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": alternative_url,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.RequestException as e:
return None
except SocketError as e:
if e.errno == 101: # Errno 101: 网络不可达
# 使用 valves 中配置的重试URL前缀
alternative_url = valves.RETRY_URL_PREFIX + url_site
try:
response_site = requests.get(alternative_url, timeout=20)
response_site.raise_for_status()
html_content = self.decode_response(response_site)
soup = BeautifulSoup(html_content, "html.parser")
content_site = self.format_text(
soup.get_text(separator=" ", strip=True)
)
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": alternative_url,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.RequestException as e:
return None
else:
raise # 抛出其他 SocketError
class EventEmitter:
def __init__(self, event_emitter: Callable[[dict], Any] = None):
self.event_emitter = event_emitter
async def emit(
self,
description="Unknown State",
status="in_progress",
done=False,
step_number=None,
):
# 发送事件消息
if self.event_emitter:
message = {
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
if step_number:
message["data"]["step"] = step_number
await self.event_emitter(message)
class Tools:
class Valves(BaseModel):
# 存储工具配置的类
SEARXNG_ENGINE_API_BASE_URL: str = Field(
default="http://host.docker.internal:8080/search",
description="搜索引擎的API基础URL",
)
IGNORED_WEBSITES: str = Field(
default="",
description="逗号分隔的要忽略的网站列表",
)
RETURNED_SCRAPPED_PAGES_NO: int = Field(
default=3,
description="要解析的搜索引擎结果数",
)
SCRAPPED_PAGES_NO: int = Field(
default=5,
description="抓取的总页数。理想情况下大于返回的页数之一",
)
PAGE_CONTENT_WORDS_LIMIT: int = Field(
default=5000,
description="每页内容的字数限制",
)
CITATION_LINKS: bool = Field(
default=True, # 默认值为 True
description="如果为 True,发送带有链接和元数据的自定义引用",
)
RETRY_URL_PREFIX: str = Field( # 新增配置项
default="",
description="用于重试请求的URL前缀",
)
def __init__(self):
self.valves = self.Valves()
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
self.help_functions = HelpFunctions() # 实例化 HelpFunctions
async def search_web(
self,
query: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
搜索网络并获取相关页面的内容。 搜索未知知识、新闻、信息、公开联系信息、天气等。
:params query: 用于搜索引擎的网络查询。
:return: 页面内容的json格式。
"""
functions = self.help_functions
emitter = EventEmitter(__event_emitter__)
await emitter.emit("🔍 开始网络搜索", step_number=1)
await emitter.emit(f"📝 搜索: {query}", step_number=2)
search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL
if self.valves.RETURNED_SCRAPPED_PAGES_NO > self.valves.SCRAPPED_PAGES_NO:
self.valves.RETURNED_SCRAPPED_PAGES_NO = self.valves.SCRAPPED_PAGES_NO
await emitter.emit(
"⚙️ 调整搜索参数",
step_number=3,
)
params = {
"q": query,
"format": "json",
"number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO,
}
try:
await emitter.emit("🌐 连接到搜索引擎", step_number=4)
resp = requests.get(
search_engine_url, params=params, headers=self.headers, timeout=120
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
limited_results = results[: self.valves.SCRAPPED_PAGES_NO]
await emitter.emit(
f"📊 找到 {len(limited_results)} 个结果",
step_number=5,
)
except requests.exceptions.RequestException as e:
await emitter.emit(
status="error",
description=f"❌ 搜索时发生错误: {str(e)}",
done=True,
)
return json.dumps({"error": str(e)})
results_json = []
if limited_results:
await emitter.emit(
"🔄 处理搜索结果",
step_number=6,
)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(
functions.process_search_result, result, self.valves
)
for result in limited_results
]
processed_count = 0
for future in concurrent.futures.as_completed(futures):
result_json = future.result()
if result_json:
try:
json.dumps(result_json)
results_json.append(result_json)
processed_count += 1
await emitter.emit(
f"📄 正在处理页面 {processed_count}/{len(limited_results)}",
step_number=7,
)
except (TypeError, ValueError):
continue
if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO:
break
results_json = results_json[: self.valves.RETURNED_SCRAPPED_PAGES_NO]
if self.valves.CITATION_LINKS and __event_emitter__:
await emitter.emit(
"📚 生成引用和参考",
step_number=8,
)
for result in results_json:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result["content"]],
"metadata": [
{
"source": result["url"],
"date_accessed": datetime.now().isoformat(),
"title": result["title"],
}
],
"source": {
"name": result["title"],
"url": result["url"],
},
},
}
)
await emitter.emit(
status="complete",
description=f"✅ 搜索完成 - {len(results_json)} 页已分析",
done=True,
step_number=9,
)
return json.dumps(results_json, ensure_ascii=False)
async def get_website(
self,
url: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
抓取提供的网站并获取其内容。
:params url: 网站的URL。
:return: 网站内容的json格式。
"""
functions = self.help_functions
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"🔍 访问URL: {url}", step_number=1)
results_json = []
try:
await emitter.emit("🌐 下载内容", step_number=2)
response_site = requests.get(url, headers=self.headers, timeout=120)
response_site.raise_for_status()
html_content = functions.decode_response(response_site)
await emitter.emit("📑 分析页面内容", step_number=3)
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.title.string if soup.title else "No title found"
page_title = unicodedata.normalize("NFKC", page_title.strip())
page_title = functions.remove_emojis(page_title)
title_site = page_title
url_site = url
await emitter.emit("📝 提取并格式化文本", step_number=4)
content_site = functions.format_text(
soup.get_text(separator=" ", strip=True)
)
truncated_content = functions.truncate_to_n_words(
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
)
await emitter.emit("📊 创建摘要", step_number=5)
result_site = {
"title": title_site,
"url": url_site,
"content": truncated_content,
"excerpt": functions.generate_excerpt(content_site),
"date_accessed": datetime.now().isoformat(),
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await emitter.emit("📚 生成引用", step_number=6)
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [truncated_content],
"metadata": [
{
"source": url_site,
"date_accessed": datetime.now().isoformat(),
"title": title_site,
}
],
"source": {
"name": title_site,
"url": url_site,
"type": "webpage",
},
},
}
)
await emitter.emit(
status="complete",
description="✅ 网站内容提取和处理成功",
done=True,
step_number=7,
)
except requests.exceptions.RequestException as e:
# 使用 valves 中配置的重试URL前缀
alternative_url = self.valves.RETRY_URL_PREFIX + url
try:
response_site = requests.get(
alternative_url, headers=self.headers, timeout=120
)
response_site.raise_for_status()
html_content = functions.decode_response(response_site)
await emitter.emit("📑 分析页面内容", step_number=3)
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.title.string if soup.title else "No title found"
page_title = unicodedata.normalize("NFKC", page_title.strip())
page_title = functions.remove_emojis(page_title)
title_site = page_title
url_site = alternative_url
await emitter.emit("📝 提取并格式化文本", step_number=4)
content_site = functions.format_text(
soup.get_text(separator=" ", strip=True)
)
truncated_content = functions.truncate_to_n_words(
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
)
await emitter.emit("📊 创建摘要", step_number=5)
result_site = {
"title": title_site,
"url": url_site,
"content": truncated_content,
"excerpt": functions.generate_excerpt(content_site),
"date_accessed": datetime.now().isoformat(),
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await emitter.emit("📚 生成引用", step_number=6)
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [truncated_content],
"metadata": [
{
"source": url_site,
"date_accessed": datetime.now().isoformat(),
"title": title_site,
}
],
"source": {
"name": title_site,
"url": url_site,
"type": "webpage",
},
},
}
)
await emitter.emit(
status="complete",
description="✅ 网站内容提取和处理成功",
done=True,
step_number=7,
)
except requests.exceptions.RequestException as e:
await emitter.emit(
status="error",
description=f"❌ 访问页面时出错: {str(e)}",
done=True,
)
results_json.append(
{
"url": url,
"content": f"无法获取页面。 错误: {str(e)}",
"error": True,
"date_accessed": datetime.now().isoformat(),
}
)
except SocketError as e:
# 使用 valves 中配置的重试URL前缀
alternative_url = self.valves.RETRY_URL_PREFIX + url
try:
response_site = requests.get(
alternative_url, headers=self.headers, timeout=120
)
response_site.raise_for_status()
html_content = functions.decode_response(response_site)
await emitter.emit("📑 分析页面内容", step_number=3)
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.title.string if soup.title else "No title found"
page_title = unicodedata.normalize("NFKC", page_title.strip())
page_title = functions.remove_emojis(page_title)
title_site = page_title
url_site = alternative_url
await emitter.emit("📝 提取并格式化文本", step_number=4)
content_site = functions.format_text(
soup.get_text(separator=" ", strip=True)
)
truncated_content = functions.truncate_to_n_words(
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
)
await emitter.emit("📊 创建摘要", step_number=5)
result_site = {
"title": title_site,
"url": url_site,
"content": truncated_content,
"excerpt": functions.generate_excerpt(content_site),
"date_accessed": datetime.now().isoformat(),
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await emitter.emit("📚 生成引用", step_number=6)
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [truncated_content],
"metadata": [
{
"source": url_site,
"date_accessed": datetime.now().isoformat(),
"title": title_site,
}
],
"source": {
"name": title_site,
"url": url_site,
"type": "webpage",
},
},
}
)
await emitter.emit(
status="complete",
description="✅ 网站内容提取和处理成功",
done=True,
step_number=7,
)
except requests.exceptions.RequestException as e:
await emitter.emit(
status="error",
description=f"❌ 访问页面时出错: {str(e)}",
done=True,
)
results_json.append(
{
"url": url,
"content": f"无法获取页面。 错误: {str(e)}",
"error": True,
"date_accessed": datetime.now().isoformat(),
}
)
else:
raise # 抛出其他 SocketError
return json.dumps(results_json, ensure_ascii=False)