(精简魔改版)基于 CloudFlare Workers 优化 OpanAI 格式 API 的流式输出

精简版基于 CloudFlare Workers 优化 OpanAI 格式 API 接口的流式输出,基于这个帖子的思路及代码魔改而来:

佬原本的实现已经非常非常好啦!流式输出优化的思路也很厉害。只是我自己部署了 One/New-API,原版的管理功能就有点用不上了。

参考佬的代码,面向 cursor 编程搓了一个简化版本的 worker.js,主要差异:

  1. 仅保留流式输出优化功能。
  2. 删除了管理功能、面板及 Gemini/Anthropic 支持。仅支持 OpenAI 格式请求及输出。

工作方式很简单:
将所有请求及 API_KEY 转发到指定的 upstreamURL,如果返回的是流式输出的结果,则进行优化,如果不是,则原样返回。

使用方式:
复制全部代码部署 worker.js,然后把原本使用这个 UPSTREAM_URL API 的地方改成 <你的worker地址>/原本的API地址

例如,原本的 API 地址是:https://api.xxxx.com(必须带上 http 或 https),你的 worker 地址是 https://fluent.bbb.workers.dev
直接在前面加上 你的worker地址,替换成 https://fluent.bbb.workers.dev/https://api.xxxx.com 即可。

可选环境变量设置:

  • DEFAULT_CONFIG:是否打印调试日志,默认为false
  • MIN_DELAY:最小字符延迟(毫秒),默认为5
  • MAX_DELAY:最大字符延迟(毫秒),默认为40
  • ADAPTIVE_DELAY_FACTOR:自适应延迟因子,默认为0.8
  • CHUNK_BUFFER_SIZE:响应块缓冲区大小,默认为 8

使用场景:
对已有的 one/new-api 等 openAI 格式的接口,直接替换地址,即可得到优化的流式输出。当想用原版 API 的时候,就用原版的;当想得到流式优化的时候,只要加上前缀,其它都不用变。
或者,考虑到流式输出优化的效果仅对于部分模型比较有效,可以在 one/new-api 中,给对应渠道的 BaseURL 加上这个前缀。

请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!
请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!
请注意,这样用 CF 对目标 API 地址发起请求,类似于反代,具有一定风险!

参见:

可能的解决方式:https://linux.do/t/topic/451265

此外,CF Worker 可能会有执行时长限制。

总而言之,更推荐佬友们自行在 VPS 部署。若在 CF 部署,不建议用于大厂 API。

代码都是 Cursor 写的没有经过充分测试, 如有问题烦请指出 Orz

由于和原版差异比较大,就另开了一个新楼,佬看看要是不合适我就删帖。 @George

代码:

// 默认配置
const DEFAULT_CONFIG = {
  // 基础配置
  debug: false,                           // 是否启用调试日志
  
  // 流式输出优化配置
  minDelay: 5,                // 最小字符延迟(毫秒)
  maxDelay: 40,              // 最大字符延迟(毫秒)
  adaptiveDelayFactor: 0.8,  // 自适应延迟因子
  chunkBufferSize: 8,        // 响应块缓冲区大小
};

// 日志工具函数
function logger(type, ...args) {
  if (!DEFAULT_CONFIG.debug) return;
  
  const timestamp = new Date().toISOString();
  switch (type) {
    case 'error':
      console.error(`[${timestamp}]`, ...args);
      break;
    default:
      console.log(`[${timestamp}]`, ...args);
  }
}

// Worker入口函数
export default {
  async fetch(request, env, ctx) {
    logger('info', `[Request] ${request.method} ${request.url}`);
    
    // 处理预检请求
    if (request.method === "OPTIONS") {
      logger('info', "[CORS] Handling preflight request");
      return handleCORS();
    }

    try {
      // 从环境变量获取配置
      const config = getConfig(env);
      logger('info', "[Config] Using upstream URL:", config.upstreamUrl);
      return handleRequest(request, config);
    } catch (error) {
      logger('error', "[Error] Worker entry error:", error);
      return createErrorResponse(error);
    }
  },
};

// 从环境变量获取配置
function getConfig(env) {
  const config = {
    ...DEFAULT_CONFIG,
    // 从环境变量覆盖配置
    debug: env.DEBUG === "true" || DEFAULT_CONFIG.debug,
    minDelay: parseInt(env.MIN_DELAY) || DEFAULT_CONFIG.minDelay,
    maxDelay: parseInt(env.MAX_DELAY) || DEFAULT_CONFIG.maxDelay,
    adaptiveDelayFactor: parseFloat(env.ADAPTIVE_DELAY_FACTOR) || DEFAULT_CONFIG.adaptiveDelayFactor,
    chunkBufferSize: parseInt(env.CHUNK_BUFFER_SIZE) || DEFAULT_CONFIG.chunkBufferSize,
  };
  
  // 更新全局配置的debug设置
  DEFAULT_CONFIG.debug = config.debug;
  return config;
}

// 主请求处理函数
async function handleRequest(request, config) {
  try {
    // 解析请求
    const url = new URL(request.url);
    const path = url.pathname.slice(1); // 移除开头的斜杠
    
    // 验证并解析上游URL
    if (!path.startsWith('http://') && !path.startsWith('https://')) {
      throw new Error('无效的上游URL格式');
    }
    
    logger('info', "[Request] Original path:", path);
    
    // 解析请求体和检查是否是流式请求
    const { requestBody, isStreamRequest } = await parseRequestBody(request);
    logger('info', "[Request] Stream request:", isStreamRequest);
    if (Object.keys(requestBody).length > 0) {
      logger('info', "[Request] Request body:", JSON.stringify(requestBody));
    }
    
    // 使用完整的上游URL
    const upstreamUrl = path + url.search;
    logger('info', "[Upstream] Forwarding to:", upstreamUrl);
    const upstreamRequest = createUpstreamRequest(
      upstreamUrl,
      request,
      requestBody
    );
    
    // 发送请求到上游API
    logger('info', "[Upstream] Sending request...");
    const upstreamResponse = await fetch(upstreamRequest);
    logger('info', "[Upstream] Response status:", upstreamResponse.status);
    
    // 如果不是流式请求或响应不成功,直接返回上游响应
    if (!isStreamRequest || !upstreamResponse.ok) {
      logger('info', "[Response] Sending direct response");
      return addCorsHeaders(upstreamResponse);
    }
    
    // 处理流式响应
    logger('info', "[Stream] Processing streaming response");
    return handleStreamingResponse(upstreamResponse, config);
  } catch (error) {
    logger('error', "[Error] Request handling error:", error);
    return createErrorResponse(error);
  }
}

// 解析请求体和检查是否是流式请求
async function parseRequestBody(request) {
  let requestBody = {};
  let isStreamRequest = false;
  
  if (request.method === "POST") {
    try {
      const contentType = request.headers.get("content-type") || "";
      if (contentType.includes("application/json")) {
        requestBody = await request.clone().json();
        isStreamRequest = requestBody.stream === true;
      }
    } catch (e) {
      logger('error', "Error parsing request body:", e);
      throw new Error("Invalid JSON body");
    }
  }
  
  return { requestBody, isStreamRequest };
}

// 创建上游请求
function createUpstreamRequest(url, originalRequest, requestBody) {
  const headers = new Headers(originalRequest.headers);
  
  // 确保Content-Type正确设置
  if (originalRequest.method === "POST") {
    headers.set("Content-Type", "application/json");
  }
  
  const requestInit = {
    method: originalRequest.method,
    headers: headers,
    redirect: "follow",
  };
  
  // 仅在POST请求时添加body
  if (originalRequest.method === "POST" && Object.keys(requestBody).length > 0) {
    requestInit.body = JSON.stringify(requestBody);
  }
  
  return new Request(url, requestInit);
}

// 处理流式响应
async function handleStreamingResponse(response, config) {
  const { readable, writable } = new TransformStream();
  
  processStreamedResponse(response.body, writable, config).catch(err => {
    logger('error', "Error processing stream:", err);
    writable.abort(err);
  });
  
  return addCorsHeaders(new Response(readable, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  }));
}

// 处理流式响应数据
async function processStreamedResponse(inputStream, outputWriter, config) {
  logger('info', "[Stream] Starting stream processing");
  const reader = inputStream.getReader();
  const writer = outputWriter.getWriter();
  
  const decoder = new TextDecoder();
  const encoder = new TextEncoder();
  
  let buffer = "";
  let lastChunkTime = Date.now();
  let recentChunkSizes = [];
  let currentDelay = config.minDelay;
  let totalChunks = 0;
  
  try {
    while (true) {
      const { done, value } = await reader.read();
      
      if (done) {
        if (buffer.length > 0) {
          await processSSELine(buffer, writer, encoder, currentDelay);
        }
        logger('info', `[Stream] Completed processing ${totalChunks} chunks`);
        break;
      }
      
      totalChunks++;
      // 更新时间跟踪和延迟计算
      const currentTime = Date.now();
      const timeSinceLastChunk = currentTime - lastChunkTime;
      lastChunkTime = currentTime;
      
      if (value && value.length) {
        // 更新最近块大小列表
        recentChunkSizes.push(value.length);
        if (recentChunkSizes.length > config.chunkBufferSize) {
          recentChunkSizes.shift();
        }
        
        // 计算新的延迟
        const avgChunkSize = recentChunkSizes.reduce((a, b) => a + b, 0) / recentChunkSizes.length;
        currentDelay = adaptDelay(avgChunkSize, timeSinceLastChunk, config);
      }
      
      // 处理接收到的数据
      buffer += decoder.decode(value, { stream: true });
      
      // 按行处理SSE消息
      const lines = buffer.split("\n");
      buffer = lines.pop() || "";
      
      for (const line of lines) {
        await processSSELine(line, writer, encoder, currentDelay);
      }
    }
  } catch (e) {
    logger('error', "[Stream] Processing error:", e);
  } finally {
    try {
      await writer.close();
      logger('info', "[Stream] Stream writer closed");
    } catch (e) {
      logger('error', "[Stream] Error closing writer:", e);
    }
    reader.releaseLock();
  }
}

// 处理SSE行
async function processSSELine(line, writer, encoder, delay) {
  if (!line.trim()) {
    await writer.write(encoder.encode("\n"));
    return;
  }
  
  if (line.startsWith("data: ")) {
    const data = line.slice(6);
    
    if (data === "[DONE]") {
      await writer.write(encoder.encode("data: [DONE]\n\n"));
      return;
    }
    
    try {
      const jsonData = JSON.parse(data);
      
      if (jsonData.choices && jsonData.choices.length > 0) {
        const choice = jsonData.choices[0];
        let content = "";
        let isCompletionAPI = false;
        
        if (choice.delta && choice.delta.content !== undefined) {
          content = choice.delta.content;
        } else if (choice.text !== undefined) {
          content = choice.text;
          isCompletionAPI = true;
        }
        
        if (content) {
          await sendContentCharByChar(content, jsonData, writer, encoder, delay, isCompletionAPI);
        } else {
          await writer.write(encoder.encode(`data: ${data}\n\n`));
        }
      } else {
        await writer.write(encoder.encode(`data: ${data}\n\n`));
      }
    } catch (e) {
      logger('error', "Error parsing JSON data:", e);
      await writer.write(encoder.encode(`data: ${data}\n\n`));
    }
  } else {
    await writer.write(encoder.encode(`${line}\n`));
  }
}

// 自适应调整延迟
function adaptDelay(chunkSize, timeSinceLastChunk, config) {
  if (chunkSize <= 0) return config.minDelay;
  
  // 块大小反比因子:块越大,字符间延迟越小
  const sizeInverseFactor = Math.max(0.2, Math.min(2.0, 50 / chunkSize));
  
  // 时间因子:接收间隔越长,延迟越大
  const timeFactor = Math.max(0.5, Math.min(1.5, timeSinceLastChunk / 300));
  
  // 组合因子计算最终延迟
  const adaptiveDelay = config.minDelay + 
    (config.maxDelay - config.minDelay) * 
    sizeInverseFactor * timeFactor * config.adaptiveDelayFactor;
  
  // 确保延迟在允许范围内
  return Math.min(config.maxDelay, Math.max(config.minDelay, adaptiveDelay));
}

// 逐字符发送内容
async function sendContentCharByChar(content, originalJson, writer, encoder, delay, isCompletionAPI) {
  if (!content) return;
  
  let buffer = '';
  let inTag = false;
  
  for (let i = 0; i < content.length; i++) {
    const char = content[i];
    
    // 检查是否遇到可能的标签开始
    if (char === '<' && (content.substring(i, i + 7) === '<think>' || content.substring(i, i + 8) === '</think>')) {
      inTag = true;
      buffer = char;
      continue;
    }
    
    // 如果在标签内,继续缓存
    if (inTag) {
      buffer += char;
      
      // 检查是否到达标签结束
      if ((buffer === '<think>' || buffer === '</think>')) {
        let charResponse;
        
        if (isCompletionAPI) {
          charResponse = {
            ...originalJson,
            choices: [{
              ...originalJson.choices[0],
              text: buffer
            }]
          };
        } else {
          charResponse = {
            ...originalJson,
            choices: [{
              ...originalJson.choices[0],
              delta: { content: buffer }
            }]
          };
        }
        
        await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
        
        // 重置状态
        buffer = '';
        inTag = false;
        continue;
      }
      continue;
    }
    
    // 正常字符的处理
    let charResponse;
    
    if (isCompletionAPI) {
      charResponse = {
        ...originalJson,
        choices: [{
          ...originalJson.choices[0],
          text: char
        }]
      };
    } else {
      charResponse = {
        ...originalJson,
        choices: [{
          ...originalJson.choices[0],
          delta: { content: char }
        }]
      };
    }
    
    await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
    
    if (i < content.length - 1 && delay > 0) {
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
  
  // 处理可能剩余的缓存内容
  if (buffer.length > 0) {
    let charResponse;
    if (isCompletionAPI) {
      charResponse = {
        ...originalJson,
        choices: [{
          ...originalJson.choices[0],
          text: buffer
        }]
      };
    } else {
      charResponse = {
        ...originalJson,
        choices: [{
          ...originalJson.choices[0],
          delta: { content: buffer }
        }]
      };
    }
    await writer.write(encoder.encode(`data: ${JSON.stringify(charResponse)}\n\n`));
  }
}

// 处理CORS预检请求
function handleCORS() {
  return new Response(null, {
    headers: {
      "Access-Control-Allow-Origin": "*",
      "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
      "Access-Control-Allow-Headers": "Content-Type, Authorization",
      "Access-Control-Max-Age": "86400",
    },
  });
}

// 添加CORS头
function addCorsHeaders(response) {
  const corsHeaders = new Headers(response.headers);
  corsHeaders.set("Access-Control-Allow-Origin", "*");
  
  return new Response(response.body, {
    status: response.status,
    statusText: response.statusText,
    headers: corsHeaders,
  });
}

// 创建错误响应
function createErrorResponse(error) {
  return new Response(JSON.stringify({
    error: {
      message: error.message || "An error occurred",
      type: "proxy_error"
    }
  }), {
    status: 500,
    headers: {
      "Content-Type": "application/json",
      "Access-Control-Allow-Origin": "*"
    }
  });
}
31 个赞

路过就是一个赞

10 个赞

啊哈哈哈哈,很像我最初的版本,点赞!!!

9 个赞

佬,需要注意这样有被其他人白嫖的风险,最好加点认证措施

10 个赞

对,这个我刚也有在想,不过感觉,反正是要求用户自带 API 和 KEY,把 worker 地址藏好就行了,被白嫖的风险≈被 DDOS 的风险()

9 个赞

如果有替代New-API版本就好了,New-API真的太肿了,只想自己简单管理众多api。

有个佬做的 uni-api 不错:

1 个赞

非常强大

没有前端还是不习惯,感觉有时候还是想看看使用数据什么的

太强了,大佬!

大佬强强

感谢佬分享

我也是这样觉得,一直在等他改版web UI。