feat: 去除Dashscope SDK和Z.ai SDK;全部使用统一的OpenAI SDK,为MCP的调用预留位置。更新开发模式的启动脚本

This commit is contained in:
肖应宇 2026-03-26 16:50:11 +08:00
parent b66bdaedd2
commit e3919107ab
6 changed files with 673 additions and 1366 deletions

View File

@ -1,26 +1,19 @@
"""
阿里云百炼 DashScope 适配器
基于 api/chat_routes.py 重构
使用 OpenAI SDK 调用阿里云 OpenAI 兼容 API
"""
import json
import os
from typing import Dict, List
from typing import Any, Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .base import ChatCompletionRequest, ModelInfo
from .unified_adapter import UnifiedOpenAIAdapter
from .plugins import get_web_search_mode
from core import get_logger
logger = get_logger()
# 支持深度思考的模型
THINKING_MODELS = {"qwen3-max", "qwen3.5-plus"}
# 需要使用多模态接口的模型qwen3.5 系列)
MULTIMODAL_API_MODELS = {"qwen3.5-plus", "qwen3.5-flash"}
# 百炼模型配置
DASHSCOPE_MODELS = [
ModelInfo(
@ -48,7 +41,18 @@ DASHSCOPE_MODELS = [
ModelInfo(
id="qwen3.5-flash",
name="Qwen3.5-Flash",
description="千问系列速度最快、成本极低的模型适合简单任务。千问Flash采用灵活的阶梯定价相比千问Turbo计费更合理。",
description="千问系列速度最快、成本极低的模型,适合简单任务。",
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
ModelInfo(
id="qwen-turbo",
name="Qwen-Turbo",
description="快速响应的通用模型",
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
@ -67,397 +71,49 @@ DASHSCOPE_MODELS = [
supports_vision=True,
supports_files=False,
),
ModelInfo(
id="qwen-vl-plus",
name="通义万相 VL-Plus",
description="支持视觉理解的多模态模型",
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
supports_web_search=False,
supports_vision=True,
supports_files=False,
),
]
# 从 DASHSCOPE_MODELS 自动计算
THINKING_MODELS = {m.id.lower() for m in DASHSCOPE_MODELS if m.supports_thinking}
VISION_MODELS = {m.id.lower() for m in DASHSCOPE_MODELS if m.supports_vision}
class DashScopeAdapter(BaseAdapter):
class DashScopeAdapter(UnifiedOpenAIAdapter):
"""阿里云百炼 DashScope 平台适配器"""
_provider_type = "dashscope"
@property
def provider_name(self) -> str:
return "dashscope"
def is_available(self) -> bool:
"""检查 API Key 是否配置"""
return bool(os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY"))
def _get_api_key(self) -> str:
"""获取 API Key"""
return os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY", "")
def _needs_multimodal_api(self, model: str) -> bool:
"""检查模型是否需要使用多模态 API"""
return model.lower() in MULTIMODAL_API_MODELS
def list_models(self) -> List[ModelInfo]:
return DASHSCOPE_MODELS
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in THINKING_MODELS
def list_models(self) -> List[ModelInfo]:
return DASHSCOPE_MODELS
def _is_vision_model(self, model: str) -> bool:
"""检查是否为多模态模型"""
return model.lower() in VISION_MODELS
async def chat(self, request: ChatCompletionRequest):
def _build_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""
处理 DashScope 聊天请求
支持流式/非流式多模态
构建 DashScope 格式的消息
处理多模态内容
"""
# 打印请求参数
logger.info(f"[DashScope] 请求参数:")
logger.info(f" - model: {request.model}")
logger.info(f" - stream: {request.stream}")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - files: {request.files}")
logger.info(f" - deep_thinking: {request.deep_thinking}")
logger.info(
f" - messages: {json.dumps(request.messages, ensure_ascii=False, indent=2)}"
)
# 检测是否包含多模态内容
has_multimodal = self._has_multimodal_content(request)
logger.info(f" - has_multimodal: {has_multimodal}")
# 检查是否需要使用多模态接口qwen3.5 系列)
needs_multimodal_api = self._needs_multimodal_api(request.model)
logger.info(f" - needs_multimodal_api: {needs_multimodal_api}")
if has_multimodal or needs_multimodal_api:
return await self._multimodal_chat(request)
else:
return await self._text_chat(request)
def _has_multimodal_content(self, request: ChatCompletionRequest) -> bool:
"""检查是否包含多模态内容"""
for msg in request.messages:
content = msg.get("content", "")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "image_url":
return True
return bool(request.files)
async def _text_chat(self, request: ChatCompletionRequest):
"""纯文本聊天"""
import dashscope
from dashscope import Generation
dashscope.api_key = self._get_api_key()
# 转换消息格式
messages = self._build_text_messages(request)
logger.info(f"[DashScope] 文本聊天 - 转换后的消息:")
logger.info(f" - messages_count: {len(messages)}")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
if request.stream:
return self._stream_text_chat(messages, request)
else:
return self._sync_text_chat(messages, request)
def _build_text_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""构建文本消息"""
messages = []
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
messages.append({"role": role, "content": content})
elif isinstance(content, list):
text = ""
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text += item.get("text", "")
if text.strip():
messages.append({"role": role, "content": text})
return messages
def _stream_text_chat(self, messages: List[Dict], request: ChatCompletionRequest):
"""流式文本聊天"""
logger.info(f"[DashScope] 开始流式文本响应...")
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(request.model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import Generation
full_content = ""
full_reasoning = ""
chunk_count = 0
error_occurred = False
# 打印 API 调用参数
api_params = {
"model": request.model,
"messages": messages,
"stream": True,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
# 打印 API 调用参数
logger.info(f"[DashScope] API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - result_format: {api_params['result_format']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
try:
responses = Generation.call(**api_params)
except Exception as e:
error_occurred = True
logger.error(f"[DashScope] API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 返回错误响应
error_data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [{
"index": 0,
"delta": {"content": f"API 调用失败: {str(e)}"},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
for resp in responses:
if resp.status_code == 200:
chunk_count += 1
choice = resp.output.choices[0]
# 处理深度思考内容reasoning_content
reasoning_content = getattr(choice.message, "reasoning_content", None)
if reasoning_content:
# 计算增量
if len(reasoning_content) > len(full_reasoning):
delta_reasoning = reasoning_content[len(full_reasoning):]
full_reasoning = reasoning_content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {"reasoning_content": delta_reasoning},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content = choice.message.content
if content and len(content) > len(full_content):
# DashScope 流式响应返回完整内容,计算增量
delta = content[len(full_content) :]
full_content = content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {"content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
else:
# 记录非200响应
logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}")
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[DashScope] 流式文本响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_text_chat(self, messages: List[Dict], request: ChatCompletionRequest):
"""非流式文本聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import Generation
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(request.model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})")
# 构建 API 调用参数
api_params = {
"model": request.model,
"messages": messages,
"stream": False,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
# 打印 API 调用参数
logger.info(f"[DashScope] API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - result_format: {api_params['result_format']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
try:
resp = Generation.call(**api_params)
except Exception as e:
logger.error(f"[DashScope] API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"DashScope API 调用异常: {str(e)}"},
)
if resp.status_code == 200:
message = resp.output.choices[0].message
content = message.content or ""
# 构建响应消息
response_message = {"role": "assistant", "content": content}
# 处理深度思考内容
reasoning_content = getattr(message, "reasoning_content", None)
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"message": response_message,
"finish_reason": "stop",
}
],
}
if hasattr(resp, "usage") and resp.usage:
response["usage"] = {
"prompt_tokens": resp.usage.input_tokens,
"completion_tokens": resp.usage.output_tokens,
"total_tokens": resp.usage.total_tokens,
}
# 打印响应结果
logger.info(f"[DashScope] 响应成功:")
logger.info(f" - status_code: {resp.status_code}")
logger.info(f" - content_length: {len(content)} 字符")
if reasoning_content:
logger.info(f" - reasoning_length: {len(reasoning_content)} 字符")
logger.info(
f" - content_preview: {content[:200]}..."
if len(content) > 200
else f" - content: {content}"
)
if hasattr(resp, "usage") and resp.usage:
logger.info(f" - usage: {response['usage']}")
return JSONResponse(content=response)
logger.error(f"[DashScope] 请求失败:")
logger.error(f" - status_code: {resp.status_code}")
logger.error(f" - code: {resp.code}")
logger.error(f" - message: {resp.message}")
return JSONResponse(
status_code=500,
content={"error": f"DashScope Error: {resp.code} - {resp.message}"},
)
async def _multimodal_chat(self, request: ChatCompletionRequest):
"""多模态聊天"""
import dashscope
from dashscope import MultiModalConversation
dashscope.api_key = self._get_api_key()
logger.info(f"[DashScope] 开始多模态聊天...")
# 转换消息格式
messages = self._build_multimodal_messages(request)
logger.info(f"[DashScope] 多模态消息转换完成:")
logger.info(f" - messages_count: {len(messages)}")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 选择多模态模型
model = request.model
if "qwen-" in model and "vl" not in model:
original_model = model
model = model.replace("qwen-", "qwen-vl-")
logger.info(f"[DashScope] 模型自动切换: {original_model} -> {model}")
if request.stream:
return self._stream_multimodal_chat(messages, model, request)
else:
return self._sync_multimodal_chat(messages, model, request)
def _build_multimodal_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""构建多模态消息"""
messages = []
for msg in request.messages:
@ -466,39 +122,43 @@ class DashScopeAdapter(BaseAdapter):
if isinstance(content, str):
if content.strip():
messages.append({"role": role, "content": [{"text": content}]})
messages.append({"role": role, "content": content})
elif isinstance(content, list):
# 多模态内容 - OpenAI 格式兼容
ds_content = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
ds_content.append({"text": item.get("text", "")})
ds_content.append({"type": "text", "text": item.get("text", "")})
elif item.get("type") == "image_url":
img_url = self._extract_image_url(item)
if img_url:
ds_content.append({"image": img_url})
ds_content.append({
"type": "image_url",
"image_url": {"url": img_url}
})
if ds_content:
messages.append({"role": role, "content": ds_content})
return messages
def _extract_image_url(self, item: Dict) -> str:
"""提取并转换图片 URL"""
def _extract_image_url(self, item: Dict) -> Optional[str]:
"""提取图片 URL"""
img_val = item.get("image_url", "")
if isinstance(img_val, str):
img_url = img_val
elif isinstance(img_val, dict):
img_url = img_val.get("url", "")
else:
img_url = ""
return None
logger.info(f"[DashScope] 原始图片URL: {img_url}")
# 记录图片 URL 转换
logger.info(f"[DashScope] 图片URL: {img_url}")
# 转换 http URL 为 file:// 格式(如果是本地文件)
# 处理本地文件 URL
if img_url.startswith(("http://", "https://")):
from urllib.parse import urlparse
parsed = urlparse(img_url)
if "localhost" in parsed.netloc or "127.0.0.1" in parsed.netloc:
path_parts = parsed.path.split("/")
@ -510,285 +170,38 @@ class DashScopeAdapter(BaseAdapter):
elif not img_url.startswith("file://") and not img_url.startswith(("http://", "https://")):
img_url = f"file://{img_url}"
logger.info(f"[DashScope] 转换后图片URL: {img_url}")
return img_url
def _stream_multimodal_chat(
self, messages: List[Dict], model: str, request: ChatCompletionRequest
):
"""流式多模态聊天"""
logger.info(f"[DashScope] 开始流式多模态响应...")
logger.info(f" - model: {model}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - temperature: {request.temperature}")
def _get_extra_params(self, request: ChatCompletionRequest) -> Dict[str, Any]:
"""
获取 DashScope 特殊参数
- 深度思考: extra_body={"enable_thinking": True/False}
- 联网搜索: extra_body={"enable_search": True}
"""
extra_params = {}
extra_body = {}
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})")
model = request.model
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
# 深度思考 - 始终传递,明确启用或禁用
logger.info(f"[DashScope] 深度思考请求: deep_thinking={request.deep_thinking}, model={model}")
from dashscope import MultiModalConversation
supports_thinking = self._supports_thinking(model)
logger.info(f"[DashScope] 模型 {model} 支持深度思考: {supports_thinking}")
full_content = ""
full_reasoning = ""
chunk_count = 0
error_occurred = False
thinking_enabled = request.deep_thinking and supports_thinking
extra_body["enable_thinking"] = thinking_enabled
logger.info(f"[DashScope] 深度思考最终状态: {thinking_enabled}")
# 打印 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": True,
"enable_thinking": False,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
logger.info(f"[DashScope] 流式多模态 API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - enable_thinking: {api_params['enable_thinking']}")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
try:
responses = MultiModalConversation.call(**api_params)
except Exception as e:
error_occurred = True
logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
error_data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": f"API 调用失败: {str(e)}"},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
for resp in responses:
chunk_count += 1
if resp.status_code == 200:
try:
choice = resp.output.choices[0]
message = choice["message"]
# 处理深度思考内容reasoning_content
# 多模态 API 返回的 reasoning_content 也是独立的片段
reasoning_content = message.get("reasoning_content", "")
if reasoning_content:
delta_reasoning = reasoning_content
full_reasoning += reasoning_content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"reasoning_content": delta_reasoning},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content_items = message.get("content", [])
text = ""
for item in content_items:
if isinstance(item, dict) and "text" in item:
text += item["text"]
# 多模态 API 返回的 content 是独立的片段(不是累积的),直接作为 delta
if text:
delta = text
full_content += text
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
except (KeyError, IndexError, TypeError) as e:
logger.warning(f"[DashScope] 解析多模态响应异常: {str(e)}")
else:
logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}")
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[DashScope] 流式多模态响应完成:")
logger.info(f" - chunks: {chunk_count}")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(f" - reasoning: {full_reasoning[:500]}..." if len(full_reasoning) > 500 else f" - reasoning: {full_reasoning}")
logger.info(f" - content_length: {len(full_content)} 字符")
logger.info(
f" - content: {full_content[:500]}..."
if len(full_content) > 500
else f" - content: {full_content}"
)
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_multimodal_chat(
self, messages: List[Dict], model: str, request: ChatCompletionRequest
):
"""非流式多模态聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import MultiModalConversation
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})")
logger.info(f"[DashScope] 开始非流式多模态响应...")
logger.info(f" - model: {model}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - temperature: {request.temperature}")
# 打印 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": False,
"max_tokens": request.max_tokens,
"enable_thinking": False,
"temperature": request.temperature,
}
# 使用统一网络搜索配置
# 联网搜索
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
extra_body["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
extra_body["search_options"] = {"enable_search_extension": True}
logger.info(f"[DashScope] 联网搜索已启用: mode={web_search_mode}")
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
# 添加 extra_body 到参数
extra_params["extra_body"] = extra_body
logger.info(f"[DashScope] 非流式多模态 API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - enable_thinking: {api_params['enable_thinking']}")
try:
resp = MultiModalConversation.call(**api_params)
except Exception as e:
logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"DashScope API 调用异常: {str(e)}"},
)
if resp.status_code == 200:
try:
message = resp.output.choices[0]["message"]
content_items = message.get("content", [])
text = ""
for item in content_items:
if isinstance(item, dict) and "text" in item:
text += item["text"]
# 构建响应消息
response_message = {"role": "assistant", "content": text}
# 处理深度思考内容
reasoning_content = message.get("reasoning_content")
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"message": response_message,
"finish_reason": "stop",
}
],
}
# 打印响应结果
logger.info(f"[DashScope] 多模态响应成功:")
logger.info(f" - status_code: {resp.status_code}")
logger.info(f" - content_length: {len(text)} 字符")
if reasoning_content:
logger.info(f" - reasoning_length: {len(reasoning_content)} 字符")
logger.info(
f" - content_preview: {text[:200]}..."
if len(text) > 200
else f" - content: {text}"
)
return JSONResponse(content=response)
except (KeyError, IndexError, TypeError) as e:
logger.error(f"[DashScope] 解析多模态响应异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"Parse error: {str(e)}"},
)
logger.error(f"[DashScope] 多模态请求失败:")
logger.error(f" - status_code: {resp.status_code}")
logger.error(f" - code: {resp.code}")
logger.error(f" - message: {resp.message}")
return JSONResponse(
status_code=500,
content={"error": f"DashScope Error: {resp.code} - {resp.message}"},
)
return extra_params

View File

@ -1,16 +1,14 @@
"""
智谱 GLM 适配器
基于 utils/glm_adapter.py 重构
使用zai-sdk因为已经完成这一部分的整套逻辑如果更换OpenAI-SDK会花很多时间调试
使用 OpenAI SDK 调用智谱 OpenAI 兼容 API
"""
import json
import os
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .base import ChatCompletionRequest, ModelInfo
from .unified_adapter import UnifiedOpenAIAdapter
from .plugins import get_web_search_mode, build_glm_search_tool
from core import get_logger
@ -75,122 +73,44 @@ GLM_MODELS = [
),
]
# 视觉模型列表(用于自动切换)
VISION_MODELS = {"glm-4v", "glm-4v-plus", "glm-4v-plus-0111", "glm-4.6v"}
# 支持深度思考的模型
THINKING_MODELS = {"glm-z1-flash", "glm-z1-air", "glm-4.6v", "glm-4.6"}
# 从 GLM_MODELS 自动计算
VISION_MODELS = {m.id.lower() for m in GLM_MODELS if m.supports_vision}
THINKING_MODELS = {m.id.lower() for m in GLM_MODELS if m.supports_thinking}
class GLMAdapter(BaseAdapter):
class GLMAdapter(UnifiedOpenAIAdapter):
"""智谱 GLM 平台适配器"""
_client = None
_provider_type = "zhipu"
@property
def provider_name(self) -> str:
return "glm"
def is_available(self) -> bool:
"""检查 API Key 是否配置"""
return bool(os.getenv("ZHIPU_API_KEY") or os.getenv("GLM_API_KEY"))
def _get_client(self):
"""获取 GLM 客户端(懒加载)"""
if self._client is None:
from zhipuai import ZhipuAI
api_key = os.getenv("ZHIPU_API_KEY") or os.getenv("GLM_API_KEY")
self._client = ZhipuAI(api_key=api_key)
return self._client
def list_models(self) -> List[ModelInfo]:
return GLM_MODELS
async def chat(self, request: ChatCompletionRequest):
"""
处理 GLM 聊天请求
支持流式/非流式图像文档联网搜索深度思考
"""
client = self._get_client()
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in THINKING_MODELS
# 构建消息
glm_messages, has_vision, has_files = self._build_messages(request)
actual_model = self._resolve_model(request.model, has_vision, has_files)
# 调试:打印原始请求参数
logger.info(f"[GLM] 原始请求参数:")
logger.info(
f" - request.deep_thinking: {request.deep_thinking} (type: {type(request.deep_thinking)})"
)
logger.info(f" - request.web_search: {request.web_search}")
logger.info(f" - request.deep_search: {request.deep_search}")
logger.info(f" - actual_model: {actual_model}")
logger.info(f" - supports_thinking: {self._supports_thinking(actual_model)}")
# 构建额外参数
extra_kwargs = {}
web_search_mode = get_web_search_mode(request)
if web_search_mode:
extra_kwargs["tools"] = [build_glm_search_tool(web_search_mode)]
extra_kwargs["tool_choice"] = "auto"
# 深度思考正向选择True 时启用False 时禁用)
# 注意:只有特定模型支持深度思考(如 glm-z1-flash
thinking_enabled = request.deep_thinking and self._supports_thinking(
actual_model
)
logger.info(
f"[GLM] 深度思考判断: {request.deep_thinking} and {self._supports_thinking(actual_model)} = {thinking_enabled}"
)
if thinking_enabled:
extra_kwargs["thinking"] = {"type": "enabled"}
logger.info(
f"[GLM] 深度思考已启用: extra_kwargs['thinking'] = {extra_kwargs['thinking']}"
)
else:
extra_kwargs["thinking"] = {"type": "disabled"}
logger.info(
f"[GLM] 深度思考已禁用: extra_kwargs['thinking'] = {extra_kwargs['thinking']}"
)
if extra_kwargs:
logger.info(
f"[GLM] 最终 extra_kwargs: {json.dumps(extra_kwargs, ensure_ascii=False)}"
)
if request.stream:
return self._stream_chat(
client, glm_messages, actual_model, request, extra_kwargs
)
else:
return self._sync_chat(
client, glm_messages, actual_model, request, extra_kwargs
)
def _build_messages(
self, request: ChatCompletionRequest
) -> tuple[List[Dict], bool, bool]:
def _build_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""
构建 GLM 格式的消息
返回(消息列表, 是否包含图片, 是否包含文件附件)
处理文件附件和多模态内容
"""
messages = []
has_vision = False
has_files = bool(request.files) # 检查是否有文件附件
has_files = bool(request.files)
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str):
# 纯文本
if content.strip():
messages.append({"role": role, "content": content})
elif isinstance(content, list):
# 多模态内容
glm_content = []
for item in content:
if isinstance(item, dict):
@ -214,7 +134,6 @@ class GLMAdapter(BaseAdapter):
if request.files:
file_content = self._build_file_content(request.files)
if messages and messages[-1]["role"] == "user":
# 追加到最后一个用户消息
if isinstance(messages[-1]["content"], list):
messages[-1]["content"].extend(file_content)
else:
@ -225,7 +144,7 @@ class GLMAdapter(BaseAdapter):
else:
messages.append({"role": "user", "content": file_content})
return messages, has_vision, has_files
return messages
def _extract_image_url(self, item: Dict) -> Optional[str]:
"""提取图片 URL"""
@ -244,232 +163,54 @@ class GLMAdapter(BaseAdapter):
content.append({"type": "file_url", "file_url": {"url": file_url}})
return content
def _resolve_model(
self, model: str, has_vision: bool, has_files: bool = False
) -> str:
def _resolve_model(self, model: str, has_vision: bool, has_files: bool = False) -> str:
"""解析实际使用的模型"""
model_lower = model.lower()
# 如果有图片或文件附件,强制使用 glm-4.6v(支持多模态)
if (has_vision or has_files) and model_lower not in VISION_MODELS:
logger.info(
f"[GLM] 检测到图片或文件附件,强制切换模型: {model} -> glm-4.6v"
)
logger.info(f"[GLM] 检测到图片或文件附件,切换模型: {model} -> glm-4.6v")
return "glm-4.6v"
return model
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in THINKING_MODELS
def _get_extra_params(self, request: ChatCompletionRequest) -> Dict[str, Any]:
"""
获取 GLM 特殊参数
- 深度思考: extra_body={"thinking": {"type": "enabled/disabled"}}
- 联网搜索: tools=[{"type": "web_search", ...}]
"""
extra_params = {}
def _stream_chat(
self, client, messages, model, request, extra_kwargs
) -> StreamingResponse:
"""流式聊天"""
logger.info(f"[GLM] 开始流式响应...")
# 提取深度思考配置
thinking_config = extra_kwargs.get("thinking")
tools_config = extra_kwargs.get("tools")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
full_content = ""
# 构建 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": True,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
}
# 深度思考:使用 extra_body 传递
if thinking_config:
api_params["extra_body"] = {"thinking": thinking_config}
# 联网搜索:使用 tools 参数
if tools_config:
api_params["tools"] = tools_config
api_params["tool_choice"] = "auto"
# 打印请求参数
logger.info(f"[GLM] API 调用参数:")
logger.info(f" - model: {model}")
logger.info(f" - stream: True")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
if thinking_config:
logger.info(f" - extra_body: {{'thinking': {thinking_config}}}")
if tools_config:
logger.info(
f" - tools: {json.dumps(tools_config, ensure_ascii=False)}"
)
logger.info(f" - tool_choice: auto")
logger.info(
f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}"
)
chunk_count = 0
resp = client.chat.completions.create(**api_params)
for chunk in resp:
chunk_count += 1
# 检查 delta 是否存在
if not hasattr(chunk.choices[0], "delta"):
continue
delta = chunk.choices[0].delta
# 处理深度思考内容reasoning_content
reasoning_content = getattr(delta, "reasoning_content", None)
if reasoning_content:
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"reasoning_content": reasoning_content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content = getattr(delta, "content", None)
if content:
full_content += content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
# 结束标记
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[GLM] 流式响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_chat(
self, client, messages, model, request, extra_kwargs
) -> JSONResponse:
"""非流式聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
# 提取深度思考配置
thinking_config = extra_kwargs.get("thinking")
tools_config = extra_kwargs.get("tools")
# 构建 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": False,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
}
# 深度思考:使用 extra_body 传递
if thinking_config:
api_params["extra_body"] = {"thinking": thinking_config}
# 联网搜索:使用 tools 参数
if tools_config:
api_params["tools"] = tools_config
api_params["tool_choice"] = "auto"
# 打印请求参数
logger.info(f"[GLM] API 调用参数:")
logger.info(f" - model: {model}")
logger.info(f" - stream: {request.stream}")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
if thinking_config:
logger.info(f" - extra_body: {{'thinking': {thinking_config}}}")
if tools_config:
logger.info(f" - tools: {json.dumps(tools_config, ensure_ascii=False)}")
logger.info(f" - tool_choice: auto")
logger.info(
f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}"
# 检测是否有多模态内容,决定最终使用的模型
messages = self._build_messages(request)
has_vision = any(
isinstance(m.get("content"), list) and
any(c.get("type") == "image_url" for c in m.get("content", []))
for m in messages
)
has_files = bool(request.files)
actual_model = self._resolve_model(request.model, has_vision, has_files)
resp = client.chat.completions.create(**api_params)
# 更新请求中的模型(如果有变化)
if actual_model != request.model:
extra_params["model"] = actual_model
message = resp.choices[0].message
content = message.content or ""
# 联网搜索
web_search_mode = get_web_search_mode(request)
if web_search_mode:
extra_params["tools"] = [build_glm_search_tool(web_search_mode)]
extra_params["tool_choice"] = "auto"
logger.info(f"[GLM] 联网搜索已启用: mode={web_search_mode}")
# 构建响应
response_message = {"role": "assistant", "content": content}
# 深度思考 - 始终传递,明确启用或禁用
logger.info(f"[GLM] 深度思考请求: deep_thinking={request.deep_thinking}, actual_model={actual_model}")
# 处理深度思考内容
reasoning_content = getattr(message, "reasoning_content", None)
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
# 判断是否支持深度思考
supports_thinking = self._supports_thinking(actual_model)
logger.info(f"[GLM] 模型 {actual_model} 支持深度思考: {supports_thinking}")
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"message": response_message,
"finish_reason": "stop",
}
],
}
# 只有前端请求启用 且 模型支持时才启用
thinking_enabled = request.deep_thinking and supports_thinking
thinking_type = "enabled" if thinking_enabled else "disabled"
extra_params["extra_body"] = {"thinking": {"type": thinking_type}}
logger.info(f"[GLM] 深度思考最终状态: {thinking_type}")
if hasattr(resp, "usage") and resp.usage:
response["usage"] = {
"prompt_tokens": resp.usage.prompt_tokens,
"completion_tokens": resp.usage.completion_tokens,
"total_tokens": resp.usage.total_tokens,
}
# 打印响应结果
logger.info(f"[GLM] 响应结果:")
logger.info(f" - content_length: {len(content)} 字符")
logger.info(
f" - content_preview: {content[:200]}..."
if len(content) > 200
else f" - content: {content}"
)
if hasattr(resp, "usage") and resp.usage:
logger.info(f" - usage: {response['usage']}")
return JSONResponse(content=response)
return extra_params

View File

@ -5,12 +5,18 @@ OpenAI 适配器
import json
import os
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.responses import StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .plugins import get_web_search_mode, build_openai_search_tool, execute_tavily_search, get_current_time_info
from .base import ChatCompletionRequest, ModelInfo
from .unified_adapter import UnifiedOpenAIAdapter
from .plugins import (
get_web_search_mode,
build_openai_search_tool,
execute_tavily_search,
get_current_time_info,
)
from core import get_logger
logger = get_logger()
@ -83,183 +89,67 @@ DEEPSEEK_MODELS = [
max_tokens=64000,
provider="Deepseek",
supports_thinking=True,
supports_web_search=True, # 注:通过内置检索增强实现
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
]
# DeepSeek 支持深度思考的模型
DEEPSEEK_THINKING_MODELS = {"deepseek-reasoner"}
# 从 DEEPSEEK_MODELS 自动计算
DEEPSEEK_THINKING_MODELS = {m.id.lower() for m in DEEPSEEK_MODELS if m.supports_thinking}
class OpenAIAdapter(BaseAdapter):
class OpenAIAdapter(UnifiedOpenAIAdapter):
"""OpenAI 平台适配器"""
_client = None
_provider_type: str = "openai" # openai 或 deepseek
def __init__(self, provider_type: str = "openai"):
self._provider_type = provider_type
_provider_type = "openai"
@property
def provider_name(self) -> str:
return self._provider_type
def is_available(self) -> bool:
"""检查 API Key 是否配置"""
if self._provider_type == "deepseek":
return bool(os.getenv("DEEPSEEK_API_KEY"))
return bool(os.getenv("OPENAI_API_KEY"))
def _get_client(self):
"""获取 OpenAI 客户端(懒加载)"""
if self._client is None:
from openai import OpenAI
if self._provider_type == "deepseek":
api_key = os.getenv("DEEPSEEK_API_KEY", "")
base_url = os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1")
else:
api_key = os.getenv("OPENAI_API_KEY", "")
base_url = os.getenv("OPENAI_BASE_URL") # 可选自定义端点
kwargs = {"api_key": api_key}
if base_url:
kwargs["base_url"] = base_url
self._client = OpenAI(**kwargs)
return self._client
return "openai"
def list_models(self) -> List[ModelInfo]:
if self._provider_type == "deepseek":
return DEEPSEEK_MODELS
return OPENAI_MODELS
async def chat(self, request: ChatCompletionRequest):
"""
处理 OpenAI 聊天请求
直接使用 OpenAI SDK支持流式/非流式
"""
client = self._get_client()
def _get_extra_params(self, request: ChatCompletionRequest) -> Dict[str, Any]:
"""获取 OpenAI 特殊参数"""
extra_params = {}
# 打印请求参数
provider_name = self._provider_type.upper()
logger.info(f"[{provider_name}] 请求参数:")
logger.info(f" - model: {request.model}")
logger.info(f" - stream: {request.stream}")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - provider_type: {self._provider_type}")
if self._provider_type == "deepseek":
logger.info(f" - deep_thinking: {request.deep_thinking}")
# 构建消息
messages = self._build_messages(request)
# 统一添加联网搜索插件参数
# 联网搜索 - 使用 Function Calling
web_search_mode = get_web_search_mode(request)
if web_search_mode:
# 注入当前时间信息到 System Prompt 中,以便模型拥有时间感知能力
time_info = get_current_time_info()
has_system = False
for msg in messages:
if msg.get("role") == "system":
msg["content"] = f"当前系统时间:{time_info}\n" + str(msg.get("content", ""))
has_system = True
break
if not has_system:
messages.insert(0, {"role": "system", "content": f"当前系统时间:{time_info}"})
extra_params["tools"] = [build_openai_search_tool(web_search_mode)]
logger.info(f"[OpenAI] 联网搜索已启用: mode={web_search_mode}")
logger.info(
f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}"
)
return extra_params
# 构建请求参数
kwargs = {
"model": request.model,
"messages": messages,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"stream": request.stream,
}
if web_search_mode:
search_tool = build_openai_search_tool(web_search_mode)
kwargs["tools"] = [search_tool]
# DeepSeek 深度思考支持
extra_body = None
if self._provider_type == "deepseek" and request.deep_thinking:
if self._supports_thinking(request.model):
extra_body = {"thinking": {"type": "enabled"}}
kwargs["extra_body"] = extra_body
logger.info(
f"[{provider_name}] 深度思考已启用: extra_body = {extra_body}"
)
if request.stream:
return self._stream_chat(client, kwargs, extra_body)
else:
return self._sync_chat(client, kwargs, extra_body)
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in DEEPSEEK_THINKING_MODELS
def _build_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""构建 OpenAI 格式消息"""
messages = []
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
# OpenAI 直接支持标准格式
if isinstance(content, str):
if content.strip():
messages.append({"role": role, "content": content})
elif isinstance(content, list):
# 多模态内容
openai_content = []
for item in content:
if isinstance(item, dict):
openai_content.append(item)
if openai_content:
messages.append({"role": role, "content": openai_content})
return messages
def _stream_chat(
self, client, kwargs: Dict, extra_body: Optional[Dict] = None
) -> StreamingResponse:
"""流式聊天"""
provider_name = self._provider_type.upper()
logger.info(f"[{provider_name}] 开始流式响应...")
def _stream_chat(self, client, kwargs: Dict) -> StreamingResponse:
"""
流式聊天 - 处理联网搜索的 Function Calling
"""
logger.info(f"[OpenAI] 开始流式响应...")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
nonlocal kwargs
# 可能需要执行多轮对话(当发生工具调用时)
# 可能需要多轮对话(当发生工具调用时)
while True:
resp = client.chat.completions.create(**kwargs)
full_content = ""
full_reasoning = ""
chunk_count = 0
tool_calls = []
current_tool_call = None
for chunk in resp:
if not chunk.choices:
continue
chunk_count += 1
delta = chunk.choices[0].delta
# 1. 收集可能有内容/推理
# 收集内容
delta_content = {}
if hasattr(delta, "content") and delta.content:
delta_content["content"] = delta.content
@ -268,18 +158,20 @@ class OpenAIAdapter(BaseAdapter):
delta_content["reasoning_content"] = delta.reasoning_content
full_reasoning += delta.reasoning_content
# 2. 收集可能产生的 tool_calls (流式)
# 收集 tool_calls流式
if hasattr(delta, "tool_calls") and delta.tool_calls:
for tool_call_chunk in delta.tool_calls:
idx = tool_call_chunk.index
# 确保 tool_calls 列表足够长
while len(tool_calls) <= idx:
tool_calls.append({"id": "", "type": "function", "function": {"name": "", "arguments": ""}})
tool_calls.append({
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""}
})
if tool_call_chunk.id:
tool_calls[idx]["id"] += tool_call_chunk.id
if tool_call_chunk.type:
# 对于 type, 因为 OpenAI 可能会传 chunks, 但通常只在第一块或者每块传, 为了避免 functionfunction, 使用赋值而非累加
tool_calls[idx]["type"] = tool_call_chunk.type
if tool_call_chunk.function:
if tool_call_chunk.function.name:
@ -287,64 +179,59 @@ class OpenAIAdapter(BaseAdapter):
if tool_call_chunk.function.arguments:
tool_calls[idx]["function"]["arguments"] += tool_call_chunk.function.arguments
# 3. 输出给前端普通文本
# 输出普通内容
if delta_content and not tool_calls:
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [
{
"index": 0,
"delta": delta_content,
"finish_reason": None,
}
],
"choices": [{
"index": 0,
"delta": delta_content,
"finish_reason": None,
}],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
# 检查此轮请求是否收到了完整工具调用,若是则执行搜索逻辑并追加继续请求,不再让外部函数退出
# 检查是否有完整的工具调用
if tool_calls:
logger.info(f"[{provider_name}] 检测到流式中包含了工具调用进行拦截并处理: {json.dumps(tool_calls, ensure_ascii=False)}")
# 把大模型的工具调用请求也追加进去
logger.info(f"[OpenAI] 检测到工具调用: {json.dumps(tool_calls, ensure_ascii=False)}")
# 添加助手消息
assistant_msg = {
"role": "assistant",
"content": full_content or None, # 如果工具和普通内容同时存在也保留
"content": full_content or None,
"tool_calls": tool_calls
}
if full_reasoning:
assistant_msg["reasoning_content"] = full_reasoning
elif self._provider_type == "deepseek" and self._supports_thinking(kwargs["model"]):
# DeepSeek 推理模型在有工具调用时必须有 reasoning_content 字段
assistant_msg["reasoning_content"] = ""
kwargs["messages"].append(assistant_msg)
# 执行搜索工具
for tc in tool_calls:
if tc["function"]["name"] == "web_search":
try:
args = json.loads(tc["function"]["arguments"])
query = args.get("query", "")
mode = "deep" if "advanced" in str(kwargs.get("tools", [])) else "simple"
logger.info(f"[{provider_name}] 执行搜索插件: {query}")
logger.info(f"[OpenAI] 执行搜索: {query}")
search_result = execute_tavily_search(query, mode=mode)
except Exception as e:
search_result = f"获取搜索参数或执行搜索失败: {str(e)}"
search_result = f"搜索失败: {str(e)}"
logger.error(search_result)
# 把执行结果告诉大模型
kwargs["messages"].append({
"role": "tool",
"tool_call_id": tc["id"],
"name": "web_search",
"content": search_result
})
# 工具执行完毕,继续发起下一轮请求大模型归纳总结输出
# 继续请求归纳答案
continue
# 如果没有工具调用或者全部分发完毕正常结束给前端
# 没有工具调用,结束
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
@ -355,130 +242,52 @@ class OpenAIAdapter(BaseAdapter):
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[{provider_name}] 流式响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
# 结束外层循环退出生成器
logger.info(f"[OpenAI] 流式响应完成: chunks={chunk_count}, content_len={len(full_content)}")
break
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_chat(
self, client, kwargs: Dict, extra_body: Optional[Dict] = None
) -> JSONResponse:
"""非流式聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
while True:
resp = client.chat.completions.create(**kwargs)
class DeepseekAdapter(UnifiedOpenAIAdapter):
"""Deepseek 平台适配器"""
message = resp.choices[0].message
# 判断是否涉及工具调用
if hasattr(message, "tool_calls") and message.tool_calls:
# 记录这轮的助手回复
assistant_msg = {"role": "assistant", "content": message.content or None}
# openai sdk 对象转 dict 存储 tool_calls
tool_calls_dict = []
for tc in message.tool_calls:
tc_dict = {
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
tool_calls_dict.append(tc_dict)
assistant_msg["tool_calls"] = tool_calls_dict
if hasattr(message, "reasoning_content") and message.reasoning_content:
assistant_msg["reasoning_content"] = message.reasoning_content
elif self._provider_type == "deepseek" and self._supports_thinking(kwargs["model"]):
# DeepSeek 推理模型在有工具调用时必须有 reasoning_content 字段
assistant_msg["reasoning_content"] = ""
kwargs["messages"].append(assistant_msg)
# 执行所有的工具调用
for tc in tool_calls_dict:
if tc["function"]["name"] == "web_search":
try:
args = json.loads(tc["function"]["arguments"])
query = args.get("query", "")
mode = "deep" if "advanced" in str(kwargs.get("tools", [])) else "simple"
search_result = execute_tavily_search(query, mode=mode)
except Exception as e:
search_result = f"执行搜索失败: {str(e)}"
# 把执行结果追加到消息中
kwargs["messages"].append({
"role": "tool",
"tool_call_id": tc["id"],
"name": "web_search",
"content": search_result
})
# 工具调用完成,发起下一轮请求获取归纳答案
continue
_provider_type = "deepseek"
# 处理普通的文本回复
content = message.content or ""
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [
{
"index": 0,
"message": {
"role": message.role,
"content": content,
},
"finish_reason": resp.choices[0].finish_reason,
}
],
}
@property
def provider_name(self) -> str:
return "deepseek"
# 添加推理内容(如有)
if hasattr(message, "reasoning_content") and message.reasoning_content:
response["choices"][0]["message"][
"reasoning_content"
] = message.reasoning_content
def list_models(self) -> List[ModelInfo]:
return DEEPSEEK_MODELS
if resp.usage:
response["usage"] = {
"prompt_tokens": resp.usage.prompt_tokens,
"completion_tokens": resp.usage.completion_tokens,
"total_tokens": resp.usage.total_tokens,
}
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in DEEPSEEK_THINKING_MODELS
# 打印响应结果
provider_name = self._provider_type.upper()
logger.info(f"[{provider_name}] 响应结果:")
logger.info(f" - content_length: {len(content)} 字符")
if hasattr(message, "reasoning_content") and message.reasoning_content:
logger.info(f" - reasoning_length: {len(message.reasoning_content)} 字符")
logger.info(
f" - content_preview: {content[:200]}..."
if len(content) > 200
else f" - content: {content}"
)
if resp.usage:
logger.info(f" - usage: {response['usage']}")
def _get_extra_params(self, request: ChatCompletionRequest) -> Dict[str, Any]:
"""获取 Deepseek 特殊参数"""
extra_params = {}
return JSONResponse(content=response)
# 深度思考 - 始终传递,明确启用或禁用
logger.info(f"[Deepseek] 深度思考请求: deep_thinking={request.deep_thinking}, model={request.model}")
supports_thinking = self._supports_thinking(request.model)
logger.info(f"[Deepseek] 模型 {request.model} 支持深度思考: {supports_thinking}")
class DeepseekAdapter(OpenAIAdapter):
"""Deepseek 平台适配器(继承 OpenAI 适配器)"""
thinking_enabled = request.deep_thinking and supports_thinking
thinking_type = "enabled" if thinking_enabled else "disabled"
extra_params["extra_body"] = {"thinking": {"type": thinking_type}}
logger.info(f"[Deepseek] 深度思考最终状态: {thinking_type}")
def __init__(self):
super().__init__(provider_type="deepseek")
# 联网搜索 - 使用 Function Calling
web_search_mode = get_web_search_mode(request)
if web_search_mode:
extra_params["tools"] = [build_openai_search_tool(web_search_mode)]
logger.info(f"[Deepseek] 联网搜索已启用: mode={web_search_mode}")
return extra_params
def _stream_chat(self, client, kwargs: Dict) -> StreamingResponse:
"""流式聊天 - 复用 OpenAI 的工具调用逻辑"""
# DeepSeek 使用相同的工具调用处理逻辑
return OpenAIAdapter._stream_chat(self, client, kwargs)

View File

@ -0,0 +1,382 @@
"""
统一 OpenAI SDK 适配器基类
所有平台适配器继承此类通过配置区分不同平台
MCP (Model Context Protocol) 支持
- 子类可覆盖 _get_mcp_tools() 返回 MCP 工具定义
- 子类可覆盖 _handle_mcp_tool_call() 处理 MCP 工具调用
"""
import json
import os
from abc import abstractmethod
from typing import Any, Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from openai import OpenAI
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from core import get_logger
logger = get_logger()
# 平台配置
PROVIDER_CONFIGS = {
"zhipu": {
"base_url": "https://open.bigmodel.cn/api/paas/v4/",
"api_key_env": "ZHIPU_API_KEY",
"alias_env": ["GLM_API_KEY"], # 备选环境变量
},
"dashscope": {
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"api_key_env": "DASHSCOPE_API_KEY",
"alias_env": ["ALIYUN_API_KEY"],
},
"deepseek": {
"base_url": "https://api.deepseek.com/v1",
"api_key_env": "DEEPSEEK_API_KEY",
"alias_env": [],
},
"openai": {
"base_url": None, # 使用 OpenAI 默认值
"api_key_env": "OPENAI_API_KEY",
"alias_env": [],
},
}
class UnifiedOpenAIAdapter(BaseAdapter):
"""
基于 OpenAI SDK 的统一适配器基类
子类只需提供:
- provider_name: 平台名称
- list_models(): 支持的模型列表
- _get_extra_params(): 特殊参数可选
MCP 扩展点:
- _get_mcp_tools(): 返回 MCP 工具定义
- _handle_mcp_tool_call(): 处理 MCP 工具调用
"""
_client: Optional[OpenAI] = None
_provider_type: str = "openai"
def _get_api_key(self) -> Optional[str]:
"""获取 API Key"""
config = PROVIDER_CONFIGS.get(self._provider_type, {})
api_key_env = config.get("api_key_env", "")
alias_env = config.get("alias_env", [])
# 优先使用主环境变量
api_key = os.getenv(api_key_env)
if api_key:
return api_key
# 尝试备选环境变量
for env_name in alias_env:
api_key = os.getenv(env_name)
if api_key:
return api_key
return None
def _get_base_url(self) -> Optional[str]:
"""获取 Base URL"""
config = PROVIDER_CONFIGS.get(self._provider_type, {})
return config.get("base_url")
def _get_client(self) -> OpenAI:
"""获取 OpenAI 客户端(懒加载)"""
if self._client is None:
api_key = self._get_api_key()
base_url = self._get_base_url()
kwargs = {"api_key": api_key or ""}
if base_url:
kwargs["base_url"] = base_url
self._client = OpenAI(**kwargs)
logger.info(f"[{self.provider_name}] 创建 OpenAI 客户端: base_url={base_url or 'default'}")
return self._client
def is_available(self) -> bool:
"""检查适配器是否可用"""
return bool(self._get_api_key())
def _get_extra_params(self, request: ChatCompletionRequest) -> Dict[str, Any]:
"""
获取额外参数子类可覆盖
Returns:
传递给 OpenAI API 的额外参数 extra_body
"""
return {}
# ============================================================
# MCP 扩展点(子类可覆盖)
# ============================================================
def _get_mcp_tools(self, request: ChatCompletionRequest) -> List[Dict]:
"""
获取 MCP 工具定义子类可覆盖
Returns:
MCP 工具列表格式与 OpenAI tools 相同
例如: [{"type": "function", "function": {...}}]
示例:
return [{
"type": "function",
"function": {
"name": "mcp_search",
"description": "通过 MCP 协议搜索",
"parameters": {...}
}
}]
"""
return []
def _handle_mcp_tool_call(
self,
tool_name: str,
tool_args: Dict,
request: ChatCompletionRequest
) -> Optional[str]:
"""
处理 MCP 工具调用子类可覆盖
Args:
tool_name: 工具名称
tool_args: 工具参数
request: 原始请求
Returns:
工具执行结果字符串返回 None 表示不是 MCP 工具
示例:
if tool_name == "mcp_search":
# 调用 MCP 客户端
result = await mcp_client.call(tool_name, tool_args)
return result
return None
"""
return None
# ============================================================
# 聊天处理
# ============================================================
async def chat(self, request: ChatCompletionRequest):
"""
处理聊天请求统一流程
"""
client = self._get_client()
# 打印请求参数
logger.info(f"[{self.provider_name}] 请求参数:")
logger.info(f" - model: {request.model}")
logger.info(f" - stream: {request.stream}")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - deep_thinking: {request.deep_thinking}")
logger.info(f" - web_search: {request.web_search}")
logger.info(f" - deep_search: {request.deep_search}")
# 构建消息
messages = self._build_messages(request)
# 构建请求参数
kwargs: Dict[str, Any] = {
"model": request.model,
"messages": messages,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"stream": request.stream,
}
# 添加特殊参数(由子类实现)
extra_params = self._get_extra_params(request)
# 分离 extra_body 和其他参数
# extra_body 需要作为 OpenAI SDK 的单独参数传递
extra_body = None
if extra_params:
if "extra_body" in extra_params:
extra_body = extra_params.pop("extra_body")
kwargs.update(extra_params)
logger.info(f" - extra_params: {json.dumps(extra_params, ensure_ascii=False)}")
if extra_body:
logger.info(f" - extra_body: {json.dumps(extra_body, ensure_ascii=False)}")
# 添加 MCP 工具(由子类实现)
mcp_tools = self._get_mcp_tools(request)
if mcp_tools:
if "tools" not in kwargs:
kwargs["tools"] = []
kwargs["tools"].extend(mcp_tools)
logger.info(f" - mcp_tools: {len(mcp_tools)} 个工具")
# 单独传递 extra_body
if extra_body:
kwargs["extra_body"] = extra_body
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
if request.stream:
return self._stream_chat(client, kwargs)
else:
return self._sync_chat(client, kwargs)
def _build_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""
构建 OpenAI 格式消息
子类可覆盖以处理特殊格式如多模态
"""
messages = []
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str):
if content.strip():
messages.append({"role": role, "content": content})
elif isinstance(content, list):
# 多模态内容
openai_content = []
for item in content:
if isinstance(item, dict):
openai_content.append(item)
if openai_content:
messages.append({"role": role, "content": openai_content})
return messages
def _stream_chat(self, client: OpenAI, kwargs: Dict) -> StreamingResponse:
"""流式聊天"""
logger.info(f"[{self.provider_name}] 开始流式响应...")
# 调试:打印最终传给 API 的参数
logger.info(f"[{self.provider_name}] API 调用参数:")
for key, value in kwargs.items():
if key == "messages":
logger.info(f" - {key}: [{len(value)} 条消息]")
elif key == "extra_body":
logger.info(f" - {key}: {json.dumps(value, ensure_ascii=False)}")
elif key == "tools":
logger.info(f" - {key}: {json.dumps(value, ensure_ascii=False)}")
else:
logger.info(f" - {key}: {value}")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
full_content = ""
full_reasoning = ""
chunk_count = 0
resp = client.chat.completions.create(**kwargs)
for chunk in resp:
if not chunk.choices:
continue
chunk_count += 1
delta = chunk.choices[0].delta
# 处理深度思考内容
reasoning_content = getattr(delta, "reasoning_content", None)
if reasoning_content:
full_reasoning += reasoning_content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{
"index": 0,
"delta": {"reasoning_content": reasoning_content},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content = getattr(delta, "content", None)
if content:
full_content += content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{
"index": 0,
"delta": {"content": content},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
# 结束标记
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
logger.info(f"[{self.provider_name}] 流式响应完成: chunks={chunk_count}, content_len={len(full_content)}")
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_chat(self, client: OpenAI, kwargs: Dict) -> JSONResponse:
"""非流式聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
resp = client.chat.completions.create(**kwargs)
message = resp.choices[0].message
content = message.content or ""
# 构建响应消息
response_message = {"role": message.role, "content": content}
# 处理深度思考内容
reasoning_content = getattr(message, "reasoning_content", None)
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{
"index": 0,
"message": response_message,
"finish_reason": resp.choices[0].finish_reason,
}],
}
if resp.usage:
response["usage"] = {
"prompt_tokens": resp.usage.prompt_tokens,
"completion_tokens": resp.usage.completion_tokens,
"total_tokens": resp.usage.total_tokens,
}
logger.info(f"[{self.provider_name}] 响应完成: content_len={len(content)}")
if reasoning_content:
logger.info(f"[{self.provider_name}] reasoning_len={len(reasoning_content)}")
return JSONResponse(content=response)

View File

@ -1,103 +1,65 @@
aiofiles==24.1.0
aiohappyeyeballs==2.6.1
aiohttp==3.13.3
aiosignal==1.4.0
aiosqlite==0.22.1
alibabacloud-oss-v2==1.2.4
aliyun-python-sdk-core==2.16.0
aliyun-python-sdk-kms==2.16.5
annotated-types==0.7.0
anyio==4.12.1
argcomplete==3.6.3
attrs==25.4.0
banks==2.4.1
black==26.1.0
cachetools==7.0.2
certifi==2026.2.25
cffi==2.0.0
charset-normalizer==3.4.4
click==8.3.1
colorama==0.4.6
colorlog==6.10.1
crcmod==1.7
crcmod-plus==2.3.1
cryptography==46.0.5
dashscope==1.20.12
dataclasses-json==0.6.7
dependency-groups==1.3.1
Deprecated==1.3.1
dirtyjson==1.0.8
distlib==0.4.0
distro==1.9.0
fastapi==0.115.4
filelock==3.25.0
filetype==1.2.0
frozenlist==1.8.0
fsspec==2026.2.0
greenlet==3.3.2
griffe==2.0.0
griffecli==2.0.0
griffelib==2.0.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
humanize==4.15.0
idna==3.11
isort==8.0.1
Jinja2==3.1.6
jiter==0.13.0
jmespath==0.10.0
joblib==1.5.3
llama-index-core==0.14.15
llama-index-instrumentation==0.4.2
llama-index-readers-dashscope==0.4.1
llama-index-workflows==2.15.0
MarkupSafe==3.0.3
marshmallow==3.26.2
multidict==6.7.1
mypy_extensions==1.1.0
nest-asyncio==1.6.0
networkx==3.6.1
nltk==3.9.3
nox==2026.2.9
numpy==2.4.2
# ============================================================
# 核心依赖
# ============================================================
openai==2.26.0
oss2==2.19.1
packaging==26.0
pathspec==1.0.4
pillow==12.1.1
platformdirs==4.9.2
propcache==0.4.1
pycparser==3.0
pycryptodome==3.23.0
pydantic==2.12.5
pydantic_core==2.41.5
PyJWT==2.11.0
python-discovery==1.1.0
python-dotenv==1.0.1
python-multipart==0.0.18
pytokens==0.4.1
PyYAML==6.0.3
regex==2026.2.28
requests==2.32.5
retrying==1.4.2
setuptools==82.0.0
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.48
starlette==0.41.3
tenacity==9.1.4
tiktoken==0.12.0
tinytag==2.2.0
tqdm==4.67.3
typing-inspect==0.9.0
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.6.3
fastapi==0.115.4
uvicorn==0.32.0
virtualenv==21.1.0
websocket-client==1.9.0
wrapt==2.1.1
yarl==1.23.0
# zai-sdk==0.2.2
zhipuai==2.1.5.20250825
pydantic==2.12.5
python-dotenv==1.0.1
# ============================================================
# 数据库
# ============================================================
SQLAlchemy==2.0.48
aiosqlite==0.22.1
# ============================================================
# 文件上传
# ============================================================
python-multipart==0.0.18
# ============================================================
# 阿里云 OSS
# ============================================================
alibabacloud-oss-v2==1.2.4
oss2==2.19.1
# ============================================================
# Token 计算
# ============================================================
tiktoken==0.12.0
# ============================================================
# 间接依赖(由上述包自动安装,但显式声明版本)
# ============================================================
starlette==0.41.3
httpx==0.28.1
httpcore==1.0.9
h11==0.16.0
anyio==4.12.1
sniffio==1.3.1
certifi==2026.2.25
idna==3.11
charset-normalizer==3.4.4
urllib3==2.6.3
requests==2.32.5
jiter==0.13.0
distro==1.9.0
pydantic_core==2.41.5
annotated-types==0.7.0
typing_extensions==4.15.0
typing-inspect==0.9.0
tenacity==9.1.4
# ============================================================
# 异步/网络
# ============================================================
aiohttp==3.13.3
aiofiles==24.1.0
# ============================================================
# 其他工具
# ============================================================
PyJWT==2.11.0
PyYAML==6.0.3
pillow==12.1.1

View File

@ -20,7 +20,7 @@ trap cleanup SIGINT SIGTERM EXIT
# 启动后端
echo "[系统] 正在启动后端服务器..."
cd /home/mt/project/ai-chat-ui/server
cd /home/mt/Project/ai-chat-ui/server
if [ -d ".venv" ]; then
source .venv/bin/activate
# 使用 -u 参数强制不缓冲输出,实时显示日志
@ -34,7 +34,7 @@ sleep 2
# 启动前端
echo "[系统] 正在启动前端服务器..."
cd /home/mt/project/ai-chat-ui
cd /home/mt/Project/ai-chat-ui
# 启动 vite 开发服务器
npm run dev &