精简版基于 CloudFlare Workers 优化 OpanAI 格式 API 接口的流式输出,基于这个帖子的思路及代码魔改而来:
佬原本的实现已经非常非常好啦!流式输出优化的思路也很厉害。只是我自己部署了 One/New-API,原版的管理功能就有点用不上了。
参考佬的代码,面向 cursor 编程搓了一个简化版本的 worker.js,主要差异:
- 仅保留流式输出优化功能。
- 删除了管理功能、面板及 Gemini/Anthropic 支持。仅支持 OpenAI 格式请求及输出。
工作方式很简单:
将所有请求及 API_KEY 转发到指定的 upstreamURL,如果返回的是流式输出的结果,则进行优化,如果不是,则原样返回。
使用方式:
复制全部代码部署 worker.js,然后把原本使用这个 UPSTREAM_URL
API 的地方改成 <你的worker地址>/原本的API地址
例如,原本的 API 地址是:https://api.xxxx.com
(必须带上 http 或 https),你的 worker 地址是 https://fluent.bbb.workers.dev
直接在前面加上 你的worker地址
,替换成 https://fluent.bbb.workers.dev/https://api.xxxx.com
即可。
可选环境变量设置:
- DEFAULT_CONFIG:是否打印调试日志,默认为
false
。 - MIN_DELAY:最小字符延迟(毫秒),默认为5
- MAX_DELAY:最大字符延迟(毫秒),默认为40
- ADAPTIVE_DELAY_FACTOR:自适应延迟因子,默认为0.8
- CHUNK_BUFFER_SIZE:响应块缓冲区大小,默认为 8
使用场景:
对已有的 one/new-api 等 openAI 格式的接口,直接替换地址,即可得到优化的流式输出。当想用原版 API 的时候,就用原版的;当想得到流式优化的时候,只要加上前缀,其它都不用变。
或者,考虑到流式输出优化的效果仅对于部分模型比较有效,可以在 one/new-api 中,给对应渠道的 BaseURL 加上这个前缀。
请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!
请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!
请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!
参见:
可能的解决方式:https://linux.do/t/topic/451265
此外,CF Worker 可能会有执行时长限制。
总而言之,更推荐佬友们自行在 VPS 部署。若在 CF 部署,不建议用于大厂 API。
代码都是 Cursor 写的没有经过充分测试, 如有问题烦请指出 Orz
由于和原版差异比较大,就另开了一个新楼,佬看看要是不合适我就删帖。 @George
代码:
// 默认配置
const DEFAULT_CONFIG = {
// 基础配置
debug: false, // 是否启用调试日志
// 流式输出优化配置
minDelay: 5, // 最小字符延迟(毫秒)
maxDelay: 40, // 最大字符延迟(毫秒)
adaptiveDelayFactor: 0.8, // 自适应延迟因子
chunkBufferSize: 8, // 响应块缓冲区大小
};
// 日志工具函数
function logger(type, ...args) {
if (!DEFAULT_CONFIG.debug) return;
const timestamp = new Date().toISOString();
switch (type) {
case 'error':
console.error(`[${timestamp}]`, ...args);
break;
default:
console.log(`[${timestamp}]`, ...args);
}
}
// Worker入口函数
export default {
async fetch(request, env, ctx) {
logger('info', `[Request] ${request.method} ${request.url}`);
// 处理预检请求
if (request.method === "OPTIONS") {
logger('info', "[CORS] Handling preflight request");
return handleCORS();
}
try {
// 从环境变量获取配置
const config = getConfig(env);
logger('info', "[Config] Using upstream URL:", config.upstreamUrl);
return handleRequest(request, config);
} catch (error) {
logger('error', "[Error] Worker entry error:", error);
return createErrorResponse(error);
}
},
};
// 从环境变量获取配置
function getConfig(env) {
const config = {
...DEFAULT_CONFIG,
// 从环境变量覆盖配置
debug: env.DEBUG === "true" || DEFAULT_CONFIG.debug,
minDelay: parseInt(env.MIN_DELAY) || DEFAULT_CONFIG.minDelay,
maxDelay: parseInt(env.MAX_DELAY) || DEFAULT_CONFIG.maxDelay,
adaptiveDelayFactor: parseFloat(env.ADAPTIVE_DELAY_FACTOR) || DEFAULT_CONFIG.adaptiveDelayFactor,
chunkBufferSize: parseInt(env.CHUNK_BUFFER_SIZE) || DEFAULT_CONFIG.chunkBufferSize,
};
// 更新全局配置的debug设置
DEFAULT_CONFIG.debug = config.debug;
return config;
}
// 主请求处理函数
async function handleRequest(request, config) {
try {
// 解析请求
const url = new URL(request.url);
const path = url.pathname.slice(1); // 移除开头的斜杠
// 验证并解析上游URL
if (!path.startsWith('http://') && !path.startsWith('https://')) {
throw new Error('无效的上游URL格式');
}
logger('info', "[Request] Original path:", path);
// 解析请求体和检查是否是流式请求
const { requestBody, isStreamRequest } = await parseRequestBody(request);
logger('info', "[Request] Stream request:", isStreamRequest);
if (Object.keys(requestBody).length > 0) {
logger('info', "[Request] Request body:", JSON.stringify(requestBody));
}
// 使用完整的上游URL
const upstreamUrl = path + url.search;
logger('info', "[Upstream] Forwarding to:", upstreamUrl);
const upstreamRequest = createUpstreamRequest(
upstreamUrl,
request,
requestBody
);
// 发送请求到上游API
logger('info', "[Upstream] Sending request...");
const upstreamResponse = await fetch(upstreamRequest);
logger('info', "[Upstream] Response status:", upstreamResponse.status);
// 如果不是流式请求或响应不成功,直接返回上游响应
if (!isStreamRequest || !upstreamResponse.ok) {
logger('info', "[Response] Sending direct response");
return addCorsHeaders(upstreamResponse);
}
// 处理流式响应
logger('info', "[Stream] Processing streaming response");
return handleStreamingResponse(upstreamResponse, config);
} catch (error) {
logger('error', "[Error] Request handling error:", error);
return createErrorResponse(error);
}
}
// 解析请求体和检查是否是流式请求
async function parseRequestBody(request) {
let requestBody = {};
let isStreamRequest = false;
if (request.method === "POST") {
try {
const contentType = request.headers.get("content-type") || "";
if (contentType.includes("application/json")) {
requestBody = await request.clone().json();
isStreamRequest = requestBody.stream === true;
}
} catch (e) {
logger('error', "Error parsing request body:", e);
throw new Error("Invalid JSON body");
}
}
return { requestBody, isStreamRequest };
}
// 创建上游请求
function createUpstreamRequest(url, originalRequest, requestBody) {
const headers = new Headers(originalRequest.headers);
// 确保Content-Type正确设置
if (originalRequest.method === "POST") {
headers.set("Content-Type", "application/json");
}
const requestInit = {
method: originalRequest.method,
headers: headers,
redirect: "follow",
};
// 仅在POST请求时添加body
if (originalRequest.method === "POST" && Object.keys(requestBody).length > 0) {
requestInit.body = JSON.stringify(requestBody);
}
return new Request(url, requestInit);
}
// 处理流式响应
async function handleStreamingResponse(response, config) {
const { readable, writable } = new TransformStream();
processStreamedResponse(response.body, writable, config).catch(err => {
logger('error', "Error processing stream:", err);
writable.abort(err);
});
return addCorsHeaders(new Response(readable, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
}));
}
// 处理流式响应数据
async function processStreamedResponse(inputStream, outputWriter, config) {
logger('info', "[Stream] Starting stream processing");
const reader = inputStream.getReader();
const writer = outputWriter.getWriter();
const decoder = new TextDecoder();
const encoder = new TextEncoder();
let buffer = "";
let lastChunkTime = Date.now();
let recentChunkSizes = [];
let currentDelay = config.minDelay;
let totalChunks = 0;
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
if (buffer.length > 0) {
await processSSELine(buffer, writer, encoder, currentDelay);
}
logger('info', `[Stream] Completed processing ${totalChunks} chunks`);
break;
}
totalChunks++;
// 更新时间跟踪和延迟计算
const currentTime = Date.now();
const timeSinceLastChunk = currentTime - lastChunkTime;
lastChunkTime = currentTime;
if (value && value.length) {
// 更新最近块大小列表
recentChunkSizes.push(value.length);
if (recentChunkSizes.length > config.chunkBufferSize) {
recentChunkSizes.shift();
}
// 计算新的延迟
const avgChunkSize = recentChunkSizes.reduce((a, b) => a + b, 0) / recentChunkSizes.length;
currentDelay = adaptDelay(avgChunkSize, timeSinceLastChunk, config);
}
// 处理接收到的数据
buffer += decoder.decode(value, { stream: true });
// 按行处理SSE消息
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
await processSSELine(line, writer, encoder, currentDelay);
}
}
} catch (e) {
logger('error', "[Stream] Processing error:", e);
} finally {
try {
await writer.close();
logger('info', "[Stream] Stream writer closed");
} catch (e) {
logger('error', "[Stream] Error closing writer:", e);
}
reader.releaseLock();
}
}
// 处理SSE行
async function processSSELine(line, writer, encoder, delay) {
if (!line.trim()) {
await writer.write(encoder.encode("\n"));
return;
}
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
await writer.write(encoder.encode("data: [DONE]\n\n"));
return;
}
try {
const jsonData = JSON.parse(data);
if (jsonData.choices && jsonData.choices.length > 0) {
const choice = jsonData.choices[0];
let content = "";
let isCompletionAPI = false;
if (choice.delta && choice.delta.content !== undefined) {
content = choice.delta.content;
} else if (choice.text !== undefined) {
content = choice.text;
isCompletionAPI = true;
}
if (content) {
await sendContentCharByChar(content, jsonData, writer, encoder, delay, isCompletionAPI);
} else {
await writer.write(encoder.encode(`data: ${data}\n\n`));
}
} else {
await writer.write(encoder.encode(`data: ${data}\n\n`));
}
} catch (e) {
logger('error', "Error parsing JSON data:", e);
await writer.write(encoder.encode(`data: ${data}\n\n`));
}
} else {
await writer.write(encoder.encode(`${line}\n`));
}
}
// 自适应调整延迟
function adaptDelay(chunkSize, timeSinceLastChunk, config) {
if (chunkSize <= 0) return config.minDelay;
// 块大小反比因子:块越大,字符间延迟越小
const sizeInverseFactor = Math.max(0.2, Math.min(2.0, 50 / chunkSize));
// 时间因子:接收间隔越长,延迟越大
const timeFactor = Math.max(0.5, Math.min(1.5, timeSinceLastChunk / 300));
// 组合因子计算最终延迟
const adaptiveDelay = config.minDelay +
(config.maxDelay - config.minDelay) *
sizeInverseFactor * timeFactor * config.adaptiveDelayFactor;
// 确保延迟在允许范围内
return Math.min(config.maxDelay, Math.max(config.minDelay, adaptiveDelay));
}
// 逐字符发送内容
async function sendContentCharByChar(content, originalJson, writer, encoder, delay, isCompletionAPI) {
if (!content) return;
let buffer = '';
let inTag = false;
for (let i = 0; i < content.length; i++) {
const char = content[i];
// 检查是否遇到可能的标签开始
if (char === '<' && (content.substring(i, i + 7) === '<think>' || content.substring(i, i + 8) === '</think>')) {
inTag = true;
buffer = char;
continue;
}
// 如果在标签内,继续缓存
if (inTag) {
buffer += char;
// 检查是否到达标签结束
if ((buffer === '<think>' || buffer === '</think>')) {
let charResponse;
if (isCompletionAPI) {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
text: buffer
}]
};
} else {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
delta: { content: buffer }
}]
};
}
await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
// 重置状态
buffer = '';
inTag = false;
continue;
}
continue;
}
// 正常字符的处理
let charResponse;
if (isCompletionAPI) {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
text: char
}]
};
} else {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
delta: { content: char }
}]
};
}
await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
if (i < content.length - 1 && delay > 0) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}
// 处理可能剩余的缓存内容
if (buffer.length > 0) {
let charResponse;
if (isCompletionAPI) {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
text: buffer
}]
};
} else {
charResponse = {
...originalJson,
choices: [{
...originalJson.choices[0],
delta: { content: buffer }
}]
};
}
await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
}
}
// 处理CORS预检请求
function handleCORS() {
return new Response(null, {
headers: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "86400",
},
});
}
// 添加CORS头
function addCorsHeaders(response) {
const corsHeaders = new Headers(response.headers);
corsHeaders.set("Access-Control-Allow-Origin", "*");
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: corsHeaders,
});
}
// 创建错误响应
function createErrorResponse(error) {
return new Response(JSON.stringify({
error: {
message: error.message || "An error occurred",
type: "proxy_error"
}
}), {
status: 500,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
}
});
}