做了一个pdf转markdown的工具,分析给佬友们,佬友们点点赞,不止GPT-4,支持识别图片的模型都可以用,请求地址改成你自己的one/new api就行
PDF OCR 转换工具
这是一个将PDF文件转换为Markdown格式的OCR工具。它使用OpenAI的GPT-4模型来识别PDF中的文本和公式,并将其转换为Markdown和LaTeX格式。
功能
- 将PDF文件转换为图像
- 使用OCR识别图像中的文本和公式
- 将识别结果转换为Markdown和LaTeX格式
- 支持批量处理多个PDF文件
- 使用图形界面选择输入和输出文件夹
安装依赖
在运行程序之前,请确保已安装所有必要的依赖。你可以使用以下命令安装:
pip install asyncio aiohttp PyMuPDF tkinter
注意:tkinter
通常已经包含在Python标准库中。如果安装失败,可能需要使用系统包管理器安装。
配置
在运行程序之前,请确保正确设置以下变量:
API_BASE_URL
: OpenAI API的基础URLapi_key
: 你的OpenAI API密钥MODEL
: 使用的OpenAI模型名称(例如:“gpt-4”)
使用方法
- 运行程序:
python script_name.py
- 在弹出的对话框中选择包含PDF文件的输入文件夹
- 选择保存转换结果的输出文件夹
- 程序将开始处理PDF文件并显示进度
注意事项
- 程序使用异步处理来提高效率,可以同时处理多个页面
- 如果处理过程中出现错误,程序会自动重试
- 输出文件将以原PDF文件名保存为.md格式
- 如果输出文件已存在,程序将跳过处理该文件
限制
- 程序依赖于OpenAI的API,请确保你有足够的API使用额度
- OCR识别的准确性取决于PDF的质量和GPT模型的性能
- 处理大型PDF文件可能需要较长时间
效果
效果根据你用的模型不同,效果不同,用4o效果最好,基本没什么错误,Gemini 1.5pro次之,其他的没试过,可以去试试看一下
这是4o的识别出来的
代码
import os
import asyncio
import aiohttp
from pathlib import Path
import fitz
import base64
import tkinter as tk
from tkinter import filedialog
# 设置 OpenAI API 密钥
API_BASE_URL = "https://api.openai.com"#请求地址,不带v1
api_key = "sk- "#请求的API密钥
MODEL = "gpt-4o"#模型名称
# 并发执行的数量
CONCURRENCY = 20
# 错误重试次数
MAX_RETRIES = 1000
def pdf_to_images(pdf_path, zoom_x=5, zoom_y=5, rotation_angle=0):
"""将PDF文件转换为图片"""
pdf = fitz.open(pdf_path)
images = []
for pg in range(pdf.page_count):
page = pdf[pg]
trans = fitz.Matrix(zoom_x, zoom_y).prerotate(rotation_angle)
pm = page.get_pixmap(matrix=trans, alpha=False)
img_bytes = pm.tobytes()
images.append((pg + 1, img_bytes))
pdf.close()
return images
async def process_image(session, image_data, semaphore, page_number, max_retries=MAX_RETRIES):
"""使用 OCR 识别图像并进行 Markdown 格式化"""
system_prompt = """
OCR识别图片上的内容,给出markdown的katex的格式的内容。
选择题的序号使用A. B.依次类推。
支持的主要语法:
1. 基本语法:
- 使用 $ 或 $$ 包裹行内或块级数学公式
- 支持大量数学符号、希腊字母、运算符等
- 分数:\\frac{分子}{分母}
- 根号:\\sqrt{被开方数}
- 上下标:x^2, x_n
2. 极限使用:\\lim\\limits_x
3. 参考以下例子格式:
### 35. 上3个无穷小量按照从低阶到高阶的排序是( )
A.$\\alpha_1,\\alpha_2,\\alpha_3$
B.$\\alpha_2,\\alpha_1,\\alpha_3$
C.$\\alpha_1,\\alpha_3,\\alpha_2$
D. $\\alpha_2,\\alpha_3,\\alpha_1$
36. (I) 求 $\\lim\\limits_{x \\to +\\infty} \\frac{\\arctan 2x - \\arctan x}{\\frac{\\pi}{2} - \\arctan x}$;
(II) 若 $\\lim\\limits_{x \\to +\\infty} x[1-f(x)]$ 不存在, 而 $l = \\lim\\limits_{x \\to +\\infty} \\frac{\\arctan 2x + [b-1-bf(x)]\\arctan x}{\\frac{\\pi}{2} - \\arctan x}$ 存在,
试确定 $b$ 的值, 并求 (I)
"""
for attempt in range(max_retries):
try:
async with semaphore:
encoded_image = base64.b64encode(image_data).decode('utf-8')
response = await session.post(
f"{API_BASE_URL}/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={
"messages": [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": [
{
"type": "text",
"text": "Analyze the image and provide the content in the specified format."
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{encoded_image}"
}
}
]
}
],
"stream": False,
"model": MODEL,
"temperature": 0.5,
"presence_penalty": 0,
"frequency_penalty": 0,
"top_p": 1
},
)
if response.status == 200:
result = await response.json()
content = result['choices'][0]['message']['content']
print(f" 完成处理第 {page_number} 页")
return content
else:
raise Exception(f"请求失败,状态码: {response.status}")
except Exception as e:
print(f"处理第 {page_number} 页时发生错误 (尝试 {attempt+1}/{max_retries}): {str(e)}")
if attempt == max_retries - 1:
print(f"处理第 {page_number} 页失败,已达到最大重试次数")
return None
await asyncio.sleep(2 * attempt) # 指数退避
return None
async def process_pdf(pdf_file, output_dir):
"""处理单个 PDF 文件"""
print(f"\n开始处理文件: {pdf_file}")
# 创建输出文件
file_name = Path(pdf_file).stem
output_file = Path(output_dir) / f"{file_name}.md"
# 检查输出文件是否已存在
if output_file.exists():
print(f"文件 {output_file} 已存在,跳过处理。")
return
# 将 PDF 转换为图片
images = pdf_to_images(pdf_file)
# 清空输出文件
open(output_file, 'w').close()
# 创建异步 HTTP 会话
async with aiohttp.ClientSession() as session:
# 使用信号量限制并发
semaphore = asyncio.Semaphore(CONCURRENCY)
# 创建任务列表
tasks = [process_image(session, image_data, semaphore, page_number) for page_number, image_data in images]
# 并发执行任务并获取结果
results = await asyncio.gather(*tasks)
# 按顺序保存结果到输出文件
with open(output_file, "w", encoding="utf-8") as f:
for page_number, content in enumerate(results, 1):
if content:
print(f" 保存第 {page_number} 页内容")
f.write(f"## 第 {page_number} 页\n\n{content}\n\n")
print(f"文件 {pdf_file} 处理完成。输出文件: {output_file}")
async def process_files(pdf_files, output_dir):
"""处理所有 PDF 文件"""
for pdf_file in pdf_files:
await process_pdf(pdf_file, output_dir)
async def main():
root = tk.Tk()
root.withdraw() # 隐藏主窗口
print("请选择包含PDF文件的输入文件夹:")
input_dir = filedialog.askdirectory(title="选择输入文件夹")
if not input_dir:
print("未选择输入文件夹,程序退出。")
return
print("请选择输出文件夹:")
output_dir = filedialog.askdirectory(title="选择输出文件夹")
if not output_dir:
print("未选择输出文件夹,程序退出。")
return
input_dir = Path(input_dir)
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
pdf_files = list(input_dir.glob("*.pdf")) # 获取所有 .pdf 文件
total_files = len(pdf_files)
if total_files == 0:
print(f"错误: 在 '{input_dir}' 目录中未找到任何 .pdf 文件。")
return
print(f"找到 {total_files} 个 PDF 文件待处理。")
await process_files(pdf_files, output_dir)
print("\n所有文件处理完成。")
if __name__ == "__main__":
asyncio.run(main())