ai-chat-ui/server/adapters/dashscope_adapter.py

776 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阿里云百炼 DashScope 适配器
基于 api/chat_routes.py 重构
"""
import json
import os
from typing import Dict, List
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from core import get_logger
logger = get_logger()
# 支持深度思考的模型
THINKING_MODELS = {"qwen3-max", "qwen3.5-plus"}
# 需要使用多模态接口的模型qwen3.5 系列)
MULTIMODAL_API_MODELS = {"qwen3.5-plus", "qwen3.5-flash"}
# 百炼模型配置
DASHSCOPE_MODELS = [
ModelInfo(
id="qwen3-max",
name="Qwen3-Max",
description="千问系列效果最好的模型,适合复杂、多步骤的任务。",
max_tokens=8192,
provider="Aliyun",
supports_thinking=True,
supports_web_search=False,
supports_vision=False,
supports_files=False,
),
ModelInfo(
id="qwen3.5-plus",
name="Qwen3.5-Plus",
description="能力均衡推理效果、成本和速度介于千问Max和千问Flash之间适合中等复杂任务。",
max_tokens=8192,
provider="Aliyun",
supports_thinking=True,
supports_web_search=False,
supports_vision=True,
supports_files=False,
),
ModelInfo(
id="qwen3.5-flash",
name="Qwen3.5-Flash",
description="千问系列速度最快、成本极低的模型适合简单任务。千问Flash采用灵活的阶梯定价相比千问Turbo计费更合理。",
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
supports_web_search=False,
supports_vision=False,
supports_files=False,
),
ModelInfo(
id="qwen-vl-max",
name="通义万相 VL-Max",
description="支持视觉理解的多模态模型",
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
supports_web_search=False,
supports_vision=True,
supports_files=False,
),
]
class DashScopeAdapter(BaseAdapter):
"""阿里云百炼 DashScope 平台适配器"""
@property
def provider_name(self) -> str:
return "dashscope"
def is_available(self) -> bool:
"""检查 API Key 是否配置"""
return bool(os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY"))
def _get_api_key(self) -> str:
"""获取 API Key"""
return os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY", "")
def _needs_multimodal_api(self, model: str) -> bool:
"""检查模型是否需要使用多模态 API"""
return model.lower() in MULTIMODAL_API_MODELS
def _supports_thinking(self, model: str) -> bool:
"""检查模型是否支持深度思考"""
return model.lower() in THINKING_MODELS
def list_models(self) -> List[ModelInfo]:
return DASHSCOPE_MODELS
async def chat(self, request: ChatCompletionRequest):
"""
处理 DashScope 聊天请求
支持流式/非流式、多模态
"""
# 打印请求参数
logger.info(f"[DashScope] 请求参数:")
logger.info(f" - model: {request.model}")
logger.info(f" - stream: {request.stream}")
logger.info(f" - temperature: {request.temperature}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - files: {request.files}")
logger.info(f" - deep_thinking: {request.deep_thinking}")
logger.info(
f" - messages: {json.dumps(request.messages, ensure_ascii=False, indent=2)}"
)
# 检测是否包含多模态内容
has_multimodal = self._has_multimodal_content(request)
logger.info(f" - has_multimodal: {has_multimodal}")
# 检查是否需要使用多模态接口qwen3.5 系列)
needs_multimodal_api = self._needs_multimodal_api(request.model)
logger.info(f" - needs_multimodal_api: {needs_multimodal_api}")
if has_multimodal or needs_multimodal_api:
return await self._multimodal_chat(request)
else:
return await self._text_chat(request)
def _has_multimodal_content(self, request: ChatCompletionRequest) -> bool:
"""检查是否包含多模态内容"""
for msg in request.messages:
content = msg.get("content", "")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "image_url":
return True
return bool(request.files)
async def _text_chat(self, request: ChatCompletionRequest):
"""纯文本聊天"""
import dashscope
from dashscope import Generation
dashscope.api_key = self._get_api_key()
# 转换消息格式
messages = self._build_text_messages(request)
logger.info(f"[DashScope] 文本聊天 - 转换后的消息:")
logger.info(f" - messages_count: {len(messages)}")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
if request.stream:
return self._stream_text_chat(messages, request)
else:
return self._sync_text_chat(messages, request)
def _build_text_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""构建文本消息"""
messages = []
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
messages.append({"role": role, "content": content})
elif isinstance(content, list):
text = ""
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
text += item.get("text", "")
if text.strip():
messages.append({"role": role, "content": text})
return messages
def _stream_text_chat(self, messages: List[Dict], request: ChatCompletionRequest):
"""流式文本聊天"""
logger.info(f"[DashScope] 开始流式文本响应...")
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(request.model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import Generation
full_content = ""
full_reasoning = ""
chunk_count = 0
error_occurred = False
# 构建 API 调用参数
api_params = {
"model": request.model,
"messages": messages,
"stream": True,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
# 打印 API 调用参数
logger.info(f"[DashScope] API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - result_format: {api_params['result_format']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
try:
responses = Generation.call(**api_params)
except Exception as e:
error_occurred = True
logger.error(f"[DashScope] API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 返回错误响应
error_data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [{
"index": 0,
"delta": {"content": f"API 调用失败: {str(e)}"},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
for resp in responses:
if resp.status_code == 200:
chunk_count += 1
choice = resp.output.choices[0]
# 处理深度思考内容reasoning_content
reasoning_content = getattr(choice.message, "reasoning_content", None)
if reasoning_content:
# 计算增量
if len(reasoning_content) > len(full_reasoning):
delta_reasoning = reasoning_content[len(full_reasoning):]
full_reasoning = reasoning_content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {"reasoning_content": delta_reasoning},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content = choice.message.content
if content and len(content) > len(full_content):
# DashScope 流式响应返回完整内容,计算增量
delta = content[len(full_content) :]
full_content = content
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"delta": {"content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
else:
# 记录非200响应
logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}")
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": request.model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[DashScope] 流式文本响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_text_chat(self, messages: List[Dict], request: ChatCompletionRequest):
"""非流式文本聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import Generation
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(request.model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})")
# 构建 API 调用参数
api_params = {
"model": request.model,
"messages": messages,
"stream": False,
"temperature": request.temperature,
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
# 打印 API 调用参数
logger.info(f"[DashScope] API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - temperature: {api_params['temperature']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - result_format: {api_params['result_format']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
try:
resp = Generation.call(**api_params)
except Exception as e:
logger.error(f"[DashScope] API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"DashScope API 调用异常: {str(e)}"},
)
if resp.status_code == 200:
message = resp.output.choices[0].message
content = message.content or ""
# 构建响应消息
response_message = {"role": "assistant", "content": content}
# 处理深度思考内容
reasoning_content = getattr(message, "reasoning_content", None)
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": request.model,
"choices": [
{
"index": 0,
"message": response_message,
"finish_reason": "stop",
}
],
}
if hasattr(resp, "usage") and resp.usage:
response["usage"] = {
"prompt_tokens": resp.usage.input_tokens,
"completion_tokens": resp.usage.output_tokens,
"total_tokens": resp.usage.total_tokens,
}
# 打印响应结果
logger.info(f"[DashScope] 响应成功:")
logger.info(f" - status_code: {resp.status_code}")
logger.info(f" - content_length: {len(content)} 字符")
if reasoning_content:
logger.info(f" - reasoning_length: {len(reasoning_content)} 字符")
logger.info(
f" - content_preview: {content[:200]}..."
if len(content) > 200
else f" - content: {content}"
)
if hasattr(resp, "usage") and resp.usage:
logger.info(f" - usage: {response['usage']}")
return JSONResponse(content=response)
logger.error(f"[DashScope] 请求失败:")
logger.error(f" - status_code: {resp.status_code}")
logger.error(f" - code: {resp.code}")
logger.error(f" - message: {resp.message}")
return JSONResponse(
status_code=500,
content={"error": f"DashScope Error: {resp.code} - {resp.message}"},
)
async def _multimodal_chat(self, request: ChatCompletionRequest):
"""多模态聊天"""
import dashscope
from dashscope import MultiModalConversation
dashscope.api_key = self._get_api_key()
logger.info(f"[DashScope] 开始多模态聊天...")
# 转换消息格式
messages = self._build_multimodal_messages(request)
logger.info(f"[DashScope] 多模态消息转换完成:")
logger.info(f" - messages_count: {len(messages)}")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
# 选择多模态模型
model = request.model
if "qwen-" in model and "vl" not in model:
original_model = model
model = model.replace("qwen-", "qwen-vl-")
logger.info(f"[DashScope] 模型自动切换: {original_model} -> {model}")
if request.stream:
return self._stream_multimodal_chat(messages, model, request)
else:
return self._sync_multimodal_chat(messages, model, request)
def _build_multimodal_messages(self, request: ChatCompletionRequest) -> List[Dict]:
"""构建多模态消息"""
messages = []
for msg in request.messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if isinstance(content, str):
if content.strip():
messages.append({"role": role, "content": [{"text": content}]})
elif isinstance(content, list):
ds_content = []
for item in content:
if isinstance(item, dict):
if item.get("type") == "text":
ds_content.append({"text": item.get("text", "")})
elif item.get("type") == "image_url":
img_url = self._extract_image_url(item)
if img_url:
ds_content.append({"image": img_url})
if ds_content:
messages.append({"role": role, "content": ds_content})
return messages
def _extract_image_url(self, item: Dict) -> str:
"""提取并转换图片 URL"""
img_val = item.get("image_url", "")
if isinstance(img_val, str):
img_url = img_val
elif isinstance(img_val, dict):
img_url = img_val.get("url", "")
else:
img_url = ""
logger.info(f"[DashScope] 原始图片URL: {img_url}")
# 转换 http URL 为 file:// 格式(如果是本地文件)
if img_url.startswith(("http://", "https://")):
from urllib.parse import urlparse
parsed = urlparse(img_url)
if "localhost" in parsed.netloc or "127.0.0.1" in parsed.netloc:
path_parts = parsed.path.split("/")
try:
uploads_idx = path_parts.index("uploads")
img_url = f"file://{'/'.join(path_parts[uploads_idx:])}"
except ValueError:
pass
elif not img_url.startswith("file://") and not img_url.startswith(("http://", "https://")):
img_url = f"file://{img_url}"
logger.info(f"[DashScope] 转换后图片URL: {img_url}")
return img_url
def _stream_multimodal_chat(
self, messages: List[Dict], model: str, request: ChatCompletionRequest
):
"""流式多模态聊天"""
logger.info(f"[DashScope] 开始流式多模态响应...")
logger.info(f" - model: {model}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - temperature: {request.temperature}")
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})")
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import MultiModalConversation
full_content = ""
full_reasoning = ""
chunk_count = 0
error_occurred = False
# 打印 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": True,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
logger.info(f"[DashScope] 多模态 API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - temperature: {api_params['temperature']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}")
try:
responses = MultiModalConversation.call(**api_params)
except Exception as e:
error_occurred = True
logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
error_data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": f"API 调用失败: {str(e)}"},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
for resp in responses:
chunk_count += 1
logger.info(f"[DashScope] === chunk {chunk_count} ===")
if resp.status_code == 200:
try:
# 打印原始响应结构
logger.info(f" - resp.status_code: {resp.status_code}")
logger.info(f" - resp.output: {resp.output}")
choice = resp.output.choices[0]
message = choice["message"]
# 处理深度思考内容reasoning_content
# 多模态 API 返回的 reasoning_content 也是独立的片段
reasoning_content = message.get("reasoning_content", "")
if reasoning_content:
delta_reasoning = reasoning_content
full_reasoning += reasoning_content
logger.info(f" - reasoning_delta: {delta_reasoning}")
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"reasoning_content": delta_reasoning},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
continue
# 处理普通内容
content_items = message.get("content", [])
text = ""
for item in content_items:
if isinstance(item, dict) and "text" in item:
text += item["text"]
# 打印每个 chunk 的内容
logger.info(f" - text_len: {len(text)}, full_len: {len(full_content)}")
logger.info(f" - text: {text}")
# 多模态 API 返回的 content 是独立的片段(不是累积的),直接作为 delta
if text:
delta = text
full_content += text
logger.info(f" - delta: {delta}")
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
except (KeyError, IndexError, TypeError) as e:
logger.warning(f"[DashScope] 解析多模态响应异常: {str(e)}")
else:
logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}")
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[DashScope] 流式多模态响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
return StreamingResponse(generator(), media_type="text/event-stream")
def _sync_multimodal_chat(
self, messages: List[Dict], model: str, request: ChatCompletionRequest
):
"""非流式多模态聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
from dashscope import MultiModalConversation
# 检查是否启用深度思考
thinking_enabled = request.deep_thinking and self._supports_thinking(model)
logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})")
logger.info(f"[DashScope] 开始非流式多模态响应...")
logger.info(f" - model: {model}")
logger.info(f" - max_tokens: {request.max_tokens}")
logger.info(f" - temperature: {request.temperature}")
# 打印 API 调用参数
api_params = {
"model": model,
"messages": messages,
"stream": False,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
}
# 添加深度思考参数
if thinking_enabled:
api_params["enable_thinking"] = True
logger.info(f"[DashScope] 多模态 API 调用参数:")
logger.info(f" - model: {api_params['model']}")
logger.info(f" - stream: {api_params['stream']}")
logger.info(f" - max_tokens: {api_params['max_tokens']}")
logger.info(f" - temperature: {api_params['temperature']}")
if thinking_enabled:
logger.info(f" - enable_thinking: True")
try:
resp = MultiModalConversation.call(**api_params)
except Exception as e:
logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"DashScope API 调用异常: {str(e)}"},
)
if resp.status_code == 200:
try:
message = resp.output.choices[0]["message"]
content_items = message.get("content", [])
text = ""
for item in content_items:
if isinstance(item, dict) and "text" in item:
text += item["text"]
# 构建响应消息
response_message = {"role": "assistant", "content": text}
# 处理深度思考内容
reasoning_content = message.get("reasoning_content")
if reasoning_content:
response_message["reasoning_content"] = reasoning_content
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",
"created": get_current_timestamp(),
"model": model,
"choices": [
{
"index": 0,
"message": response_message,
"finish_reason": "stop",
}
],
}
# 打印响应结果
logger.info(f"[DashScope] 多模态响应成功:")
logger.info(f" - status_code: {resp.status_code}")
logger.info(f" - content_length: {len(text)} 字符")
if reasoning_content:
logger.info(f" - reasoning_length: {len(reasoning_content)} 字符")
logger.info(
f" - content_preview: {text[:200]}..."
if len(text) > 200
else f" - content: {text}"
)
return JSONResponse(content=response)
except (KeyError, IndexError, TypeError) as e:
logger.error(f"[DashScope] 解析多模态响应异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"error": f"Parse error: {str(e)}"},
)
logger.error(f"[DashScope] 多模态请求失败:")
logger.error(f" - status_code: {resp.status_code}")
logger.error(f" - code: {resp.code}")
logger.error(f" - message: {resp.message}")
return JSONResponse(
status_code=500,
content={"error": f"DashScope Error: {resp.code} - {resp.message}"},
)