从开始【开源】立足Cloudflare,让你的 AI 像deepSeek-R1一样深度思考:deepX (R1 x Anything 双模型)
代码完全是ai写的有问题找GPT
从大佬开源的版本修改了一下,站在巨人的肩膀上
请求时使用R1模型的apikey去请求
修改内容:
1、响应修改成了官方的格式
2、请求模型格式改为deepseek-{基础模型},前提是能请求到的基础模型
其他没动,
经过测试适用于豆包、硅基流动、Deepseek、腾讯的R1模型,azure、openrouter不适配,其他供应商未知
2025/2/12
在代码的298行有一个max_tokens,请自行根据各家的最大输出自行填写thinkingRequest.max_tokens = 16384;
/**
* ===============================
* 用户配置 - 请根据实际环境修改各项参数
* ===============================
*/
const userConfig = {
// 通用设定
debug: true, // 调试开关,设置为 true 时会输出调试信息
showThinking: true, // 当为 true 时,流式传输 thinking_LLM 的响应给客户端(仅流模式下生效)
fetch_max_retries: 20, // 网络请求最大重试次数
// 基础模型设定 - OpenAI 格式
base_LLM_url: "https://api.openai.com/v1/chat/completions",
base_LLM_authkey: "sk-xxxxx",
// 思考模型设定
thinking_model: "deepseek-ai/DeepSeek-R1",
thinking_LLM_url: "https://api.siliconflow.cn/v1/chat/completions",
// 提示工程设定
thinking_LLM_prefix:
"理解用户需求,依据用户的提问开始思考推理,你需要思考推理理解或解决问题的思路,以及一些可能遗漏的难点,在推理中你并不是一定要得出答案。在每次推理开始时,以 \"<think>\\n嗯\" 开启你的推理",
introThought:
"[请你基于上述信息,直接给出对这个问题的最佳解答,不要涉及对思考过程的评价,专注于用户的问题]\n", // 稍微修改措辞
default_sys_prompt: "You are a helpful assistant.",
default_end_prompt: "continue",
};
/**
* 提取基础模型名称
* @param {string} clientModel 客户端请求的模型名称,格式必须为 deepseek-{基础模型}
* @returns {string} 基础模型名称
*/
const extractBaseModel = (clientModel) => {
if (!clientModel || !clientModel.startsWith("deepseek-")) {
throw new Error("Invalid model format. Model must start with 'deepseek-'");
}
return clientModel.slice("deepseek-".length);
};
/**
* ===============================
* CORS OPTIONS 响应处理
* ===============================
*/
const handleOPTIONS = async () =>
new Response(null, {
headers: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "*",
"Access-Control-Allow-Headers": "*",
},
});
/**
* ===============================
* 带重试机制的 fetch 封装,并支持 AbortSignal
* ===============================
*/
const fetchWithRetries = async (url, options, signal = null) => {
const maxRetries = userConfig.fetch_max_retries;
let attempt = 0;
let response = null;
const mergedOptions = { ...options, signal };
while (attempt <= maxRetries) {
response = await fetch(url, mergedOptions);
if (response.status < 500 || response.status >= 600) {
return response;
}
attempt++;
}
return response;
};
/**
* ===============================
* 标准化消息请求
* ===============================
*/
const openai_messages_normalizer = (reqBody) => {
const normalized = { ...reqBody };
let messages = Array.isArray(normalized.messages) ? normalized.messages.slice() : [];
if (messages.length === 0 || messages[0].role !== "system") {
messages.unshift({ role: "system", content: userConfig.default_sys_prompt });
}
if (messages[messages.length - 1]?.role !== "user") {
messages.push({ role: "user", content: userConfig.default_end_prompt });
}
normalized.messages = messages;
userConfig.debug && console.log("标准化后的请求体:", normalized);
return normalized;
};
/**
* ===============================
* 生成 OpenAI 格式的 chat 补全 ID
* ===============================
*/
const generateChatcmplId = () => {
const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
return (
"chatcmpl-" +
Array.from({ length: 29 }, () => chars[Math.floor(Math.random() * chars.length)]).join("")
);
};
/**
* ===============================
* 根据 delta 内容构造 SSE 消息数据块
* ===============================
*/
const createSSEChunk = (content, type = "content", model) => {
let delta = {};
if (type === "reasoning") {
delta = { reasoning_content: content };
} else {
delta = { content: content };
}
return (
"data: " +
JSON.stringify({
id: generateChatcmplId(),
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
model: model,
choices: [
{
delta: delta,
index: 0,
finish_reason: null,
},
],
}) +
"\n\n"
);
};
/**
* ===============================
* 构造发送给基础模型的请求体
* 说明:保留整个对话历史,每次都拼接最新的用户消息与思考内容
* ===============================
*/
const buildThoughtfulRequest = (normalizedRequest, thinkingContent, baseModel) => {
const newRequest = structuredClone(normalizedRequest);
const lastUserIndex = newRequest.messages.findLastIndex(msg => msg.role === "user");
if (lastUserIndex !== -1) {
newRequest.messages[lastUserIndex].content =
newRequest.messages[lastUserIndex].content +
"\n我有一个初步的思考过程(仅供参考,可能存在错误或不足):\n" +
thinkingContent +
"\n" +
userConfig.introThought;
} else {
newRequest.messages.push({
role: "user",
content: "我有一个初步的思考过程(仅供参考,可能存在错误或不足):\n" + thinkingContent + "\n" + userConfig.introThought,
});
}
newRequest.model = baseModel;
userConfig.debug && console.log("构造后的基础模型请求体:", newRequest);
return newRequest;
};
/**
* ===============================
* 调用基础模型接口发送请求, 支持 AbortSignal
* ===============================
*/
const fetchBaseModel = async (requestBody, signal) => {
try {
return await fetchWithRetries(userConfig.base_LLM_url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${userConfig.base_LLM_authkey}`,
},
body: JSON.stringify(requestBody),
}, signal);
} catch (err) {
if (err.name === 'AbortError') {
console.log('fetchBaseModel aborted');
return;
}
throw new Error("Error contacting base model: " + err.toString());
}
};
/**
* ===============================
* SSE 事件生成器
* ===============================
*/
async function* sseEventIterator(reader, signal) {
const decoder = new TextDecoder("utf-8");
let buffer = "";
try {
while (true) {
if (signal?.aborted) {
await reader.cancel();
break;
}
const { value, done } = await reader.read();
if (value) {
buffer += decoder.decode(value, { stream: true });
}
const parts = buffer.split("\n\n");
buffer = parts.pop();
for (const part of parts) {
if (part.startsWith("data: ")) {
const dataStr = part.slice("data: ".length).trim();
if (dataStr) {
try {
yield JSON.parse(dataStr);
} catch (e) {
console.error("Error parsing SSE event:", e);
}
}
}
}
if (done) break;
}
if (buffer.trim().startsWith("data: ")) {
const dataStr = buffer.trim().slice("data: ".length).trim();
if (dataStr) {
try {
yield JSON.parse(dataStr);
} catch (e) {
console.error("Error parsing final SSE chunk:", e);
}
}
}
} finally {
reader.releaseLock();
}
}
/**
* ===============================
* 处理思考模型的 SSE 流
* 说明:实时转发思考过程,并在思考结束后更新请求体
* ===============================
*/
const processThinkingStream = async (reader, controller = null, clientModel) => {
const encoder = new TextEncoder();
let reasoningContent = "";
const abortController = new AbortController();
try {
for await (const event of sseEventIterator(reader, abortController.signal)) {
const deltaObj = event?.choices?.[0]?.delta || {};
if (deltaObj.reasoning_content) {
reasoningContent += deltaObj.reasoning_content;
if (controller && userConfig.showThinking) {
controller.enqueue(
encoder.encode(
createSSEChunk(deltaObj.reasoning_content, "reasoning", clientModel)
)
);
}
} else if (deltaObj.content) {
abortController.abort();
break;
}
}
return { thinking: reasoningContent.trim() };
} catch (error) {
if (error.name === 'AbortError') {
return { thinking: reasoningContent.trim() };
}
console.error("Error in processThinkingStream:", error);
throw error;
}
};
/**
* ===============================
* 流式请求处理(支持 SSE 流)
* ===============================
*/
const handleStreamRequest = async (clientReqPayload, token, request) => {
try {
const baseModel = extractBaseModel(clientReqPayload.model);
const normalized = openai_messages_normalizer(clientReqPayload);
const thinkingRequest = structuredClone(normalized);
thinkingRequest.model = userConfig.thinking_model;
thinkingRequest.messages[0].content =
userConfig.thinking_LLM_prefix + thinkingRequest.messages[0].content;
thinkingRequest.stream = true;
thinkingRequest.max_tokens = 16384;
const thinkingResponse = await fetchWithRetries(userConfig.thinking_LLM_url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
},
body: JSON.stringify(thinkingRequest),
});
if (thinkingResponse.status !== 200) {
return thinkingResponse;
}
const baseModelAbortController = new AbortController();
const baseModelSignal = baseModelAbortController.signal;
request.signal.addEventListener('abort', () => {
console.log("Client disconnected, aborting base model request.");
baseModelAbortController.abort();
});
return new Response(
new ReadableStream({
async start(controller) {
try {
const result = await processThinkingStream(
thinkingResponse.body.getReader(),
controller,
clientReqPayload.model
);
const baseRequest = buildThoughtfulRequest(
normalized,
result.thinking,
baseModel
);
const baseResponse = await fetchBaseModel(baseRequest, baseModelSignal);
if (!baseResponse) {
controller.close();
return;
}
if (!baseResponse.ok) {
throw new Error(`Base model request failed: ${baseResponse.status}`);
}
const reader = baseResponse.body.getReader();
const encoder = new TextEncoder();
while (true) {
const { value, done } = await reader.read();
if (done) {
controller.enqueue(encoder.encode("data: [DONE]\n\n"));
break;
}
controller.enqueue(value);
}
} catch (error) {
console.error("Stream error:", error);
controller.error(error);
if (error.name !== 'AbortError') {
baseModelAbortController.abort();
}
} finally {
controller.close();
}
},
}),
{
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
}
);
} catch (error) {
return new Response(
JSON.stringify({
error: "Stream processing failed",
details: error.message,
}),
{
status: 500,
headers: { "Content-Type": "application/json" },
}
);
}
};
// 主请求处理函数
const handleRequest = async (request) => {
if (request.method === "OPTIONS") return handleOPTIONS();
const authHeader = request.headers.get("Authorization");
if (!authHeader || !authHeader.startsWith("Bearer ")) {
return new Response(
JSON.stringify({ error: "Unauthorized: Missing Authorization header" }),
{ status: 401, headers: { "Content-Type": "application/json" } }
);
}
const token = authHeader.slice("Bearer ".length).trim();
if (request.method !== "POST") {
return new Response(JSON.stringify({ error: "Only POST is allowed" }), {
status: 405,
headers: { "Content-Type": "application/json" },
});
}
let clientRequestPayload;
try {
clientRequestPayload = await request.json();
} catch (err) {
return new Response(JSON.stringify({ error: "Invalid JSON" }), {
status: 400,
headers: { "Content-Type": "application/json" },
});
}
return clientRequestPayload.stream === true
? await handleStreamRequest(clientRequestPayload, token, request)
: await handleNonStreamRequest(clientRequestPayload, token);
};
// 环境兼容处理和导出定义
(async () => {
const port =
typeof Deno !== "undefined"
? Deno.env.get("PORT") || 8000
: process?.env?.PORT || 8000;
if (typeof Deno !== "undefined") {
console.log(`Listening on http://localhost:${port}`);
return Deno.serve({ port }, handleRequest);
}
if (typeof EdgeRuntime !== "undefined" || typeof addEventListener === "function") {
return;
}
const { serve } = await import("@hono/node-server");
console.log(`Listening on http://localhost:${port}`);
serve({ fetch: handleRequest, port });
})();
// Vercel Edge Serverless 配置
export const config = {
runtime: "edge",
regions: ["sfo1"],
};
export const GET = handleRequest;
export const POST = handleRequest;
export const OPTIONS = handleRequest;
// Cloudflare Pages Function
export function onRequest({ request }) {
return handleRequest(request);
}
// Cloudflare Workers Function
export default {
fetch: (req) => handleRequest(req),
};