""" 阿里云百炼 DashScope 适配器 基于 api/chat_routes.py 重构 """ import json import os from typing import Dict, List from fastapi.responses import JSONResponse, StreamingResponse from .base import BaseAdapter, ChatCompletionRequest, ModelInfo from core import get_logger logger = get_logger() # 支持深度思考的模型 THINKING_MODELS = {"qwen3-max", "qwen3.5-plus"} # 需要使用多模态接口的模型(qwen3.5 系列) MULTIMODAL_API_MODELS = {"qwen3.5-plus", "qwen3.5-flash"} # 百炼模型配置 DASHSCOPE_MODELS = [ ModelInfo( id="qwen3-max", name="Qwen3-Max", description="千问系列效果最好的模型,适合复杂、多步骤的任务。", max_tokens=8192, provider="Aliyun", supports_thinking=True, supports_web_search=False, supports_vision=False, supports_files=False, ), ModelInfo( id="qwen3.5-plus", name="Qwen3.5-Plus", description="能力均衡,推理效果、成本和速度介于千问Max和千问Flash之间,适合中等复杂任务。", max_tokens=8192, provider="Aliyun", supports_thinking=True, supports_web_search=False, supports_vision=True, supports_files=False, ), ModelInfo( id="qwen3.5-flash", name="Qwen3.5-Flash", description="千问系列速度最快、成本极低的模型,适合简单任务。千问Flash采用灵活的阶梯定价,相比千问Turbo计费更合理。", max_tokens=8192, provider="Aliyun", supports_thinking=False, supports_web_search=False, supports_vision=False, supports_files=False, ), ModelInfo( id="qwen-vl-max", name="通义万相 VL-Max", description="支持视觉理解的多模态模型", max_tokens=8192, provider="Aliyun", supports_thinking=False, supports_web_search=False, supports_vision=True, supports_files=False, ), ] class DashScopeAdapter(BaseAdapter): """阿里云百炼 DashScope 平台适配器""" @property def provider_name(self) -> str: return "dashscope" def is_available(self) -> bool: """检查 API Key 是否配置""" return bool(os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY")) def _get_api_key(self) -> str: """获取 API Key""" return os.getenv("ALIYUN_API_KEY") or os.getenv("DASHSCOPE_API_KEY", "") def _needs_multimodal_api(self, model: str) -> bool: """检查模型是否需要使用多模态 API""" return model.lower() in MULTIMODAL_API_MODELS def _supports_thinking(self, model: str) -> bool: """检查模型是否支持深度思考""" return model.lower() in THINKING_MODELS def list_models(self) -> List[ModelInfo]: return DASHSCOPE_MODELS async def chat(self, request: ChatCompletionRequest): """ 处理 DashScope 聊天请求 支持流式/非流式、多模态 """ # 打印请求参数 logger.info(f"[DashScope] 请求参数:") logger.info(f" - model: {request.model}") logger.info(f" - stream: {request.stream}") logger.info(f" - temperature: {request.temperature}") logger.info(f" - max_tokens: {request.max_tokens}") logger.info(f" - files: {request.files}") logger.info(f" - deep_thinking: {request.deep_thinking}") logger.info( f" - messages: {json.dumps(request.messages, ensure_ascii=False, indent=2)}" ) # 检测是否包含多模态内容 has_multimodal = self._has_multimodal_content(request) logger.info(f" - has_multimodal: {has_multimodal}") # 检查是否需要使用多模态接口(qwen3.5 系列) needs_multimodal_api = self._needs_multimodal_api(request.model) logger.info(f" - needs_multimodal_api: {needs_multimodal_api}") if has_multimodal or needs_multimodal_api: return await self._multimodal_chat(request) else: return await self._text_chat(request) def _has_multimodal_content(self, request: ChatCompletionRequest) -> bool: """检查是否包含多模态内容""" for msg in request.messages: content = msg.get("content", "") if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get("type") == "image_url": return True return bool(request.files) async def _text_chat(self, request: ChatCompletionRequest): """纯文本聊天""" import dashscope from dashscope import Generation dashscope.api_key = self._get_api_key() # 转换消息格式 messages = self._build_text_messages(request) logger.info(f"[DashScope] 文本聊天 - 转换后的消息:") logger.info(f" - messages_count: {len(messages)}") logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}") if request.stream: return self._stream_text_chat(messages, request) else: return self._sync_text_chat(messages, request) def _build_text_messages(self, request: ChatCompletionRequest) -> List[Dict]: """构建文本消息""" messages = [] for msg in request.messages: role = msg.get("role", "user") content = msg.get("content", "") if isinstance(content, str) and content.strip(): messages.append({"role": role, "content": content}) elif isinstance(content, list): text = "" for item in content: if isinstance(item, dict) and item.get("type") == "text": text += item.get("text", "") if text.strip(): messages.append({"role": role, "content": text}) return messages def _stream_text_chat(self, messages: List[Dict], request: ChatCompletionRequest): """流式文本聊天""" logger.info(f"[DashScope] 开始流式文本响应...") # 检查是否启用深度思考 thinking_enabled = request.deep_thinking and self._supports_thinking(request.model) logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})") def generator(): from utils.helpers import generate_unique_id, get_current_timestamp from dashscope import Generation full_content = "" full_reasoning = "" chunk_count = 0 error_occurred = False # 构建 API 调用参数 api_params = { "model": request.model, "messages": messages, "stream": True, "temperature": request.temperature, "max_tokens": request.max_tokens, "result_format": "message", } # 添加深度思考参数 if thinking_enabled: api_params["enable_thinking"] = True # 打印 API 调用参数 logger.info(f"[DashScope] API 调用参数:") logger.info(f" - model: {api_params['model']}") logger.info(f" - stream: {api_params['stream']}") logger.info(f" - temperature: {api_params['temperature']}") logger.info(f" - max_tokens: {api_params['max_tokens']}") logger.info(f" - result_format: {api_params['result_format']}") if thinking_enabled: logger.info(f" - enable_thinking: True") try: responses = Generation.call(**api_params) except Exception as e: error_occurred = True logger.error(f"[DashScope] API 调用异常: {str(e)}") import traceback logger.error(traceback.format_exc()) # 返回错误响应 error_data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": request.model, "choices": [{ "index": 0, "delta": {"content": f"API 调用失败: {str(e)}"}, "finish_reason": "stop", }], } yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return for resp in responses: if resp.status_code == 200: chunk_count += 1 choice = resp.output.choices[0] # 处理深度思考内容(reasoning_content) reasoning_content = getattr(choice.message, "reasoning_content", None) if reasoning_content: # 计算增量 if len(reasoning_content) > len(full_reasoning): delta_reasoning = reasoning_content[len(full_reasoning):] full_reasoning = reasoning_content data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": request.model, "choices": [ { "index": 0, "delta": {"reasoning_content": delta_reasoning}, "finish_reason": None, } ], } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" continue # 处理普通内容 content = choice.message.content if content and len(content) > len(full_content): # DashScope 流式响应返回完整内容,计算增量 delta = content[len(full_content) :] full_content = content data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": request.model, "choices": [ { "index": 0, "delta": {"content": delta}, "finish_reason": None, } ], } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" else: # 记录非200响应 logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}") finish = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": request.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" # 打印流式响应结果 logger.info(f"[DashScope] 流式文本响应完成:") logger.info(f" - chunks: {chunk_count}") logger.info(f" - content_length: {len(full_content)} 字符") if full_reasoning: logger.info(f" - reasoning_length: {len(full_reasoning)} 字符") logger.info( f" - content_preview: {full_content[:200]}..." if len(full_content) > 200 else f" - content: {full_content}" ) return StreamingResponse(generator(), media_type="text/event-stream") def _sync_text_chat(self, messages: List[Dict], request: ChatCompletionRequest): """非流式文本聊天""" from utils.helpers import generate_unique_id, get_current_timestamp from dashscope import Generation # 检查是否启用深度思考 thinking_enabled = request.deep_thinking and self._supports_thinking(request.model) logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(request.model)})") # 构建 API 调用参数 api_params = { "model": request.model, "messages": messages, "stream": False, "temperature": request.temperature, "max_tokens": request.max_tokens, "result_format": "message", } # 添加深度思考参数 if thinking_enabled: api_params["enable_thinking"] = True # 打印 API 调用参数 logger.info(f"[DashScope] API 调用参数:") logger.info(f" - model: {api_params['model']}") logger.info(f" - stream: {api_params['stream']}") logger.info(f" - temperature: {api_params['temperature']}") logger.info(f" - max_tokens: {api_params['max_tokens']}") logger.info(f" - result_format: {api_params['result_format']}") if thinking_enabled: logger.info(f" - enable_thinking: True") try: resp = Generation.call(**api_params) except Exception as e: logger.error(f"[DashScope] API 调用异常: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse( status_code=500, content={"error": f"DashScope API 调用异常: {str(e)}"}, ) if resp.status_code == 200: message = resp.output.choices[0].message content = message.content or "" # 构建响应消息 response_message = {"role": "assistant", "content": content} # 处理深度思考内容 reasoning_content = getattr(message, "reasoning_content", None) if reasoning_content: response_message["reasoning_content"] = reasoning_content response = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion", "created": get_current_timestamp(), "model": request.model, "choices": [ { "index": 0, "message": response_message, "finish_reason": "stop", } ], } if hasattr(resp, "usage") and resp.usage: response["usage"] = { "prompt_tokens": resp.usage.input_tokens, "completion_tokens": resp.usage.output_tokens, "total_tokens": resp.usage.total_tokens, } # 打印响应结果 logger.info(f"[DashScope] 响应成功:") logger.info(f" - status_code: {resp.status_code}") logger.info(f" - content_length: {len(content)} 字符") if reasoning_content: logger.info(f" - reasoning_length: {len(reasoning_content)} 字符") logger.info( f" - content_preview: {content[:200]}..." if len(content) > 200 else f" - content: {content}" ) if hasattr(resp, "usage") and resp.usage: logger.info(f" - usage: {response['usage']}") return JSONResponse(content=response) logger.error(f"[DashScope] 请求失败:") logger.error(f" - status_code: {resp.status_code}") logger.error(f" - code: {resp.code}") logger.error(f" - message: {resp.message}") return JSONResponse( status_code=500, content={"error": f"DashScope Error: {resp.code} - {resp.message}"}, ) async def _multimodal_chat(self, request: ChatCompletionRequest): """多模态聊天""" import dashscope from dashscope import MultiModalConversation dashscope.api_key = self._get_api_key() logger.info(f"[DashScope] 开始多模态聊天...") # 转换消息格式 messages = self._build_multimodal_messages(request) logger.info(f"[DashScope] 多模态消息转换完成:") logger.info(f" - messages_count: {len(messages)}") logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}") # 选择多模态模型 model = request.model if "qwen-" in model and "vl" not in model: original_model = model model = model.replace("qwen-", "qwen-vl-") logger.info(f"[DashScope] 模型自动切换: {original_model} -> {model}") if request.stream: return self._stream_multimodal_chat(messages, model, request) else: return self._sync_multimodal_chat(messages, model, request) def _build_multimodal_messages(self, request: ChatCompletionRequest) -> List[Dict]: """构建多模态消息""" messages = [] for msg in request.messages: role = msg.get("role", "user") content = msg.get("content", "") if isinstance(content, str): if content.strip(): messages.append({"role": role, "content": [{"text": content}]}) elif isinstance(content, list): ds_content = [] for item in content: if isinstance(item, dict): if item.get("type") == "text": ds_content.append({"text": item.get("text", "")}) elif item.get("type") == "image_url": img_url = self._extract_image_url(item) if img_url: ds_content.append({"image": img_url}) if ds_content: messages.append({"role": role, "content": ds_content}) return messages def _extract_image_url(self, item: Dict) -> str: """提取并转换图片 URL""" img_val = item.get("image_url", "") if isinstance(img_val, str): img_url = img_val elif isinstance(img_val, dict): img_url = img_val.get("url", "") else: img_url = "" logger.info(f"[DashScope] 原始图片URL: {img_url}") # 转换 http URL 为 file:// 格式(如果是本地文件) if img_url.startswith(("http://", "https://")): from urllib.parse import urlparse parsed = urlparse(img_url) if "localhost" in parsed.netloc or "127.0.0.1" in parsed.netloc: path_parts = parsed.path.split("/") try: uploads_idx = path_parts.index("uploads") img_url = f"file://{'/'.join(path_parts[uploads_idx:])}" except ValueError: pass elif not img_url.startswith("file://") and not img_url.startswith(("http://", "https://")): img_url = f"file://{img_url}" logger.info(f"[DashScope] 转换后图片URL: {img_url}") return img_url def _stream_multimodal_chat( self, messages: List[Dict], model: str, request: ChatCompletionRequest ): """流式多模态聊天""" logger.info(f"[DashScope] 开始流式多模态响应...") logger.info(f" - model: {model}") logger.info(f" - max_tokens: {request.max_tokens}") logger.info(f" - temperature: {request.temperature}") # 检查是否启用深度思考 thinking_enabled = request.deep_thinking and self._supports_thinking(model) logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})") def generator(): from utils.helpers import generate_unique_id, get_current_timestamp from dashscope import MultiModalConversation full_content = "" full_reasoning = "" chunk_count = 0 error_occurred = False # 打印 API 调用参数 api_params = { "model": model, "messages": messages, "stream": True, "enable_thinking": False, "max_tokens": request.max_tokens, "temperature": request.temperature, } # 添加深度思考参数 if thinking_enabled: api_params["enable_thinking"] = True logger.info(f"[DashScope] 流式多模态 API 调用参数:") logger.info(f" - model: {api_params['model']}") logger.info(f" - stream: {api_params['stream']}") logger.info(f" - max_tokens: {api_params['max_tokens']}") logger.info(f" - temperature: {api_params['temperature']}") logger.info(f" - enable_thinking: {api_params['enable_thinking']}") logger.info(f" - messages: {json.dumps(messages, ensure_ascii=False, indent=2)}") try: responses = MultiModalConversation.call(**api_params) except Exception as e: error_occurred = True logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}") import traceback logger.error(traceback.format_exc()) error_data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": model, "choices": [{ "index": 0, "delta": {"content": f"API 调用失败: {str(e)}"}, "finish_reason": "stop", }], } yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return for resp in responses: chunk_count += 1 if resp.status_code == 200: try: choice = resp.output.choices[0] message = choice["message"] # 处理深度思考内容(reasoning_content) # 多模态 API 返回的 reasoning_content 也是独立的片段 reasoning_content = message.get("reasoning_content", "") if reasoning_content: delta_reasoning = reasoning_content full_reasoning += reasoning_content data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": model, "choices": [ { "index": 0, "delta": {"reasoning_content": delta_reasoning}, "finish_reason": None, } ], } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" continue # 处理普通内容 content_items = message.get("content", []) text = "" for item in content_items: if isinstance(item, dict) and "text" in item: text += item["text"] # 多模态 API 返回的 content 是独立的片段(不是累积的),直接作为 delta if text: delta = text full_content += text data = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": model, "choices": [ { "index": 0, "delta": {"content": delta}, "finish_reason": None, } ], } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" except (KeyError, IndexError, TypeError) as e: logger.warning(f"[DashScope] 解析多模态响应异常: {str(e)}") else: logger.warning(f"[DashScope] 非200响应: status_code={resp.status_code}, code={resp.code}, message={resp.message}") finish = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion.chunk", "created": get_current_timestamp(), "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], } yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" # 打印流式响应结果 logger.info(f"[DashScope] 流式多模态响应完成:") logger.info(f" - chunks: {chunk_count}") if full_reasoning: logger.info(f" - reasoning_length: {len(full_reasoning)} 字符") logger.info(f" - reasoning: {full_reasoning[:500]}..." if len(full_reasoning) > 500 else f" - reasoning: {full_reasoning}") logger.info(f" - content_length: {len(full_content)} 字符") logger.info( f" - content: {full_content[:500]}..." if len(full_content) > 500 else f" - content: {full_content}" ) return StreamingResponse(generator(), media_type="text/event-stream") def _sync_multimodal_chat( self, messages: List[Dict], model: str, request: ChatCompletionRequest ): """非流式多模态聊天""" from utils.helpers import generate_unique_id, get_current_timestamp from dashscope import MultiModalConversation # 检查是否启用深度思考 thinking_enabled = request.deep_thinking and self._supports_thinking(model) logger.info(f"[DashScope] 深度思考: {thinking_enabled} (request={request.deep_thinking}, supports={self._supports_thinking(model)})") logger.info(f"[DashScope] 开始非流式多模态响应...") logger.info(f" - model: {model}") logger.info(f" - max_tokens: {request.max_tokens}") logger.info(f" - temperature: {request.temperature}") # 打印 API 调用参数 api_params = { "model": model, "messages": messages, "stream": False, "max_tokens": request.max_tokens, "enable_thinking": False, "temperature": request.temperature, } # 添加深度思考参数 if thinking_enabled: api_params["enable_thinking"] = True logger.info(f"[DashScope] 非流式多模态 API 调用参数:") logger.info(f" - model: {api_params['model']}") logger.info(f" - stream: {api_params['stream']}") logger.info(f" - max_tokens: {api_params['max_tokens']}") logger.info(f" - temperature: {api_params['temperature']}") logger.info(f" - enable_thinking: {api_params['enable_thinking']}") try: resp = MultiModalConversation.call(**api_params) except Exception as e: logger.error(f"[DashScope] 多模态 API 调用异常: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse( status_code=500, content={"error": f"DashScope API 调用异常: {str(e)}"}, ) if resp.status_code == 200: try: message = resp.output.choices[0]["message"] content_items = message.get("content", []) text = "" for item in content_items: if isinstance(item, dict) and "text" in item: text += item["text"] # 构建响应消息 response_message = {"role": "assistant", "content": text} # 处理深度思考内容 reasoning_content = message.get("reasoning_content") if reasoning_content: response_message["reasoning_content"] = reasoning_content response = { "id": f"chatcmpl-{generate_unique_id()}", "object": "chat.completion", "created": get_current_timestamp(), "model": model, "choices": [ { "index": 0, "message": response_message, "finish_reason": "stop", } ], } # 打印响应结果 logger.info(f"[DashScope] 多模态响应成功:") logger.info(f" - status_code: {resp.status_code}") logger.info(f" - content_length: {len(text)} 字符") if reasoning_content: logger.info(f" - reasoning_length: {len(reasoning_content)} 字符") logger.info( f" - content_preview: {text[:200]}..." if len(text) > 200 else f" - content: {text}" ) return JSONResponse(content=response) except (KeyError, IndexError, TypeError) as e: logger.error(f"[DashScope] 解析多模态响应异常: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse( status_code=500, content={"error": f"Parse error: {str(e)}"}, ) logger.error(f"[DashScope] 多模态请求失败:") logger.error(f" - status_code: {resp.status_code}") logger.error(f" - code: {resp.code}") logger.error(f" - message: {resp.message}") return JSONResponse( status_code=500, content={"error": f"DashScope Error: {resp.code} - {resp.message}"}, )