""" 改进版Python FastAPI服务器实现,使用DashScope Python SDK连接阿里云百炼平台API """ import os import json import uuid import asyncio from datetime import datetime from typing import Dict, List, Optional, Any from pathlib import Path from enum import Enum import dashscope from dashscope import Generation from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, File, UploadFile, Request from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field import uvicorn # 加载环境变量 load_dotenv() # 设置 DashScope API 密钥 api_key = os.getenv("ALIYUN_API_KEY") if not api_key: raise ValueError("请在环境变量中设置 ALIYUN_API_KEY") dashscope.api_key = api_key # 创建 FastAPI 应用 app = FastAPI(title="AI Chat API Server (Python)", version="2.0.0") # 数据模型定义 class ChatMessage(BaseModel): role: str content: str images: Optional[List[str]] = None files: Optional[List[str]] = None class ChatRequest(BaseModel): model: str = "qwen-plus" messages: List[Dict[str, Any]] stream: bool = True temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 2000 class ModelInfo(BaseModel): id: str name: str description: str maxTokens: int provider: str # 模拟数据库 - 实际应用中应使用持久化存储 conversations_db: Dict[str, dict] = {} # 配置上传目录 upload_dir = Path("uploads") upload_dir.mkdir(exist_ok=True) @app.middleware("http") async def logging_middleware(request: Request, call_next): """中间件:记录请求日志""" start_time = datetime.utcnow() # 记录请求信息 print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - " f"HTTP {request.method} {request.url.path} - " f"IP: {request.client.host if request.client else 'unknown'}") response = await call_next(request) # 计算处理时间 process_time = (datetime.utcnow() - start_time).total_seconds() * 1000 # 记录响应信息 print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - " f"Response {response.status_code}, Process Time: {process_time:.2f}ms") # 在响应头中添加处理时间 response.headers["X-Process-Time"] = f"{process_time:.2f}ms" return response @app.get("/health") async def health_check(): """健康检查端点""" return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()} @app.post("/api/chat-ui/chat") async def chat_endpoint(request: Request): """ 聊天接口 - 与阿里云百炼API兼容的接口 这个端点会接收前端的聊天请求并转发到阿里云百炼API """ try: # 获取请求体数据 body = await request.json() # 检查请求格式并适配 # 如果是OpenAI兼容格式 (来自streamChat) if 'messages' in body: messages = body.get('messages', []) model = body.get('model', 'qwen-plus') stream = body.get('stream', True) temperature = body.get('temperature', 0.7) max_tokens = body.get('max_tokens', 2000) else: # 否则是前端简化格式 (来自chat函数) # 需要将其转换为OpenAI兼容格式 message_text = body.get('message', '') # 检查message是否已经是格式化的列表(带图片的情况) if isinstance(message_text, list): user_content = message_text else: # 如果是字符串,转换为标准格式 user_content = [{"type": "text", "text": message_text}] messages = [ {"role": "system", "content": body.get('systemPrompt', '你是一个支持视觉理解的助手。')}, {"role": "user", "content": user_content} ] model = body.get('model', 'qwen-plus') stream = body.get('stream', False) # 默认为非流式 temperature = body.get('temperature', 0.7) max_tokens = body.get('maxTokens', 2000) if stream: # 流式响应 async def event_generator(): try: responses = Generation.call( model=model, messages=messages, stream=True, max_tokens=max_tokens, temperature=temperature ) full_content = "" # 用于累计完整内容 for idx, response in enumerate(responses): if response.status_code == 200: # 检查响应是否包含预期的内容 # DashScope API的响应结构可能是 output.choices 或 output.text content = None # 尝试从 output.choices 获取内容 if (hasattr(response, 'output') and response.output and hasattr(response.output, 'choices') and response.output.choices is not None and len(response.output.choices) > 0 and 'message' in response.output.choices[0] and 'content' in response.output.choices[0]['message']): content = response.output.choices[0]['message']['content'] # 只有当内容发生变化时才发送增量 if len(content) > len(full_content): delta_content = content[len(full_content):] full_content = content if delta_content.strip(): # 只有当有非空白新内容时才发送 # 构建 SSE 数据块 data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(datetime.utcnow().timestamp()), "model": model, "choices": [ { "index": 0, "delta": {"content": delta_content}, "finish_reason": None } ] } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" # 否则尝试从 output.text 获取内容(DashScope特定格式) elif (hasattr(response, 'output') and response.output and 'text' in response.output): content = response.output.get('text') # 只有当内容发生变化时才发送增量 if len(content) > len(full_content): delta_content = content[len(full_content):] full_content = content if delta_content.strip(): # 只有当有非空白新内容时才发送 # 构建 SSE 数据块 data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(datetime.utcnow().timestamp()), "model": model, "choices": [ { "index": 0, "delta": {"content": delta_content}, "finish_reason": None } ] } yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n" else: # 错误处理 error_data = { "error": { "message": f"API Error: {response.code} - {response.message}", "type": "api_error", "param": None, "code": response.code } } yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" break # 发送结束信号 finish_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(datetime.utcnow().timestamp()), "model": model, "choices": [ { "index": 0, "delta": {}, "finish_reason": "stop" } ] } yield f"data: {json.dumps(finish_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_data = { "error": { "message": str(e), "type": "server_error" } } yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") else: # 非流式响应 response = Generation.call( model=model, messages=messages, stream=False, max_tokens=max_tokens, temperature=temperature ) if response.status_code == 200: # 检查响应是否包含预期的内容 # DashScope API的响应结构可能是 output.choices 或 output.text content = None # 尝试从 output.choices 获取内容 if (hasattr(response, 'output') and response.output and hasattr(response.output, 'choices') and response.output.choices is not None and len(response.output.choices) > 0 and 'message' in response.output.choices[0] and 'content' in response.output.choices[0]['message']): content = response.output.choices[0]['message']['content'] # 否则尝试从 output.text 获取内容(DashScope特定格式) elif (hasattr(response, 'output') and response.output and 'text' in response.output): content = response.output.get('text') if content: # 构建前端期望的响应格式 chat_response = { "id": str(uuid.uuid4()), "conversationId": body.get('conversationId', str(uuid.uuid4())), "content": content, "model": model, "createdAt": int(datetime.utcnow().timestamp()) } if hasattr(response, 'usage') and response.usage: chat_response["usage"] = { "promptTokens": response.usage.input_tokens, "completionTokens": response.usage.output_tokens, "totalTokens": response.usage.total_tokens } return JSONResponse(content=chat_response, ensure_ascii=False) else: raise HTTPException( status_code=500, detail="API Response does not contain expected content" ) else: raise HTTPException( status_code=500, detail=f"API Error: {response.code} - {response.message}" ) except Exception as e: print(f"[ERROR] Error in chat endpoint: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/chat-ui/models") async def get_models(): """获取模型列表""" models = [ ModelInfo( id="qwen-max", name="通义千问 Max", description="最强大的模型", maxTokens=8192, provider="Aliyun" ), ModelInfo( id="qwen-plus", name="通义千问 Plus", description="能力均衡", maxTokens=8192, provider="Aliyun" ), ModelInfo( id="qwen-turbo", name="通义千问 Turbo", description="速度更快、成本更低", maxTokens=8192, provider="Aliyun" ) ] return [model.dict() for model in models] @app.get("/api/chat-ui/conversations") async def get_conversations(): """获取所有对话""" return list(conversations_db.values()) @app.get("/api/chat-ui/conversations/{conversation_id}") async def get_conversation(conversation_id: str): """获取特定对话""" conversation = conversations_db.get(conversation_id) if not conversation: raise HTTPException(status_code=404, detail="对话不存在") return conversation @app.post("/api/chat-ui/conversations") async def save_conversation(request: Request): """保存或更新对话""" try: data = await request.json() conversation_id = data.get('id') or str(uuid.uuid4()) conversation = { "id": conversation_id, "title": data.get('title', '新对话'), "messages": data.get('messages', []), "updatedAt": datetime.utcnow().isoformat(), "createdAt": data.get('createdAt', datetime.utcnow().isoformat()) } conversations_db[conversation_id] = conversation return conversation except Exception as e: print(f"[ERROR] Error saving conversation: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/chat-ui/conversations/{conversation_id}") async def delete_conversation(conversation_id: str): """删除对话""" if conversation_id in conversations_db: del conversations_db[conversation_id] return {"success": True, "message": "删除成功"} else: raise HTTPException(status_code=404, detail="对话不存在") @app.post("/api/chat-ui/upload") async def upload_file(file: UploadFile = File(...)): """文件上传接口""" try: # 检查文件类型 allowed_types = ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'text/plain', 'application/pdf'] if file.content_type not in allowed_types: raise HTTPException(status_code=400, detail=f"不支持的文件类型: {file.content_type}") # 生成唯一文件名 file_extension = Path(file.filename).suffix.lower() unique_filename = f"{int(datetime.utcnow().timestamp())}_{uuid.uuid4()}{file_extension}" file_path = upload_dir / unique_filename # 保存文件 with open(file_path, "wb") as f: content = await file.read() f.write(content) # 返回文件信息 file_url = f"http://localhost:8000/uploads/{unique_filename}" result = { "url": file_url, "name": file.filename, "size": len(content), "mimeType": file.content_type } print(f"[INFO] File uploaded: {result}") return result except Exception as e: print(f"[ERROR] Upload error: {str(e)}") raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}") @app.get("/uploads/{filename}") async def serve_upload(filename: str): """提供上传文件的访问""" file_path = upload_dir / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="文件不存在") from fastapi.responses import FileResponse return FileResponse(str(file_path)) @app.post("/api/chat-ui/stop") async def stop_generation(): """停止生成接口""" # 在实际实现中,这里可能需要维护正在运行的任务ID列表 # 目前只是返回成功消息 return {"success": True, "message": "已发出停止指令"} @app.post("/api/chat-ui/stop/{message_id}") async def stop_generation_by_id(message_id: str): """根据消息ID停止生成""" return {"success": True, "message": "已发出停止指令,消息ID: " + message_id} if __name__ == "__main__": port = int(os.getenv("PORT", 8000)) print("="*50) print(f"Python AI Chat Server 启动中...") print(f"监听端口: {port}") print(f"API Key 状态: {'已配置' if api_key else '未配置'}") print("="*50) if not api_key: print("警告: 未在环境变量中检测到 ALIYUN_API_KEY!") print("请在 .env 文件中添加您的百炼 API Key。") else: print("API Key 已检测到。") uvicorn.run(app, host="0.0.0.0", port=port)