refactor: 将单一的 app.py 拆分为模块化结构

This commit is contained in:
肖应宇 2026-03-03 14:45:59 +08:00
parent a8f631a034
commit 192013bd65
9 changed files with 247 additions and 407 deletions

35
server_python/__init__.py Normal file
View File

@ -0,0 +1,35 @@
"""
包初始化文件
"""
from .models.chat_models import ChatMessage, ChatRequest, ModelInfo
from .utils.helpers import (
get_current_timestamp,
generate_unique_id,
format_api_response,
log_request,
log_response,
extract_delta_content
)
from .api.chat_routes import (
chat_endpoint_handler,
get_models_handler,
get_conversations_handler,
get_conversation_handler,
save_conversation_handler,
delete_conversation_handler,
upload_file_handler,
serve_upload_handler,
stop_generation_handler
)
__all__ = [
# Models
'ChatMessage', 'ChatRequest', 'ModelInfo',
# Utils
'get_current_timestamp', 'generate_unique_id', 'format_api_response',
'log_request', 'log_response', 'extract_delta_content',
# API Handlers
'chat_endpoint_handler', 'get_models_handler', 'get_conversations_handler',
'get_conversation_handler', 'save_conversation_handler', 'delete_conversation_handler',
'upload_file_handler', 'serve_upload_handler', 'stop_generation_handler'
]

View File

@ -0,0 +1 @@
# api/__init__.py

View File

@ -1,57 +1,30 @@
"""
改进版Python FastAPI服务器实现使用DashScope Python SDK连接阿里云百炼平台API
API 路由定义
"""
import os
import json
import uuid
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Any
from typing import Dict, List
from pathlib import Path
from enum import Enum
from fastapi import HTTPException, File, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
import dashscope
from dashscope import Generation
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, File, UploadFile, Request
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
import uvicorn
# 加载环境变量
load_dotenv()
# 导入模型和工具函数(使用绝对路径)
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
# 设置 DashScope API 密钥
api_key = os.getenv("ALIYUN_API_KEY")
if not api_key:
raise ValueError("请在环境变量中设置 ALIYUN_API_KEY")
from models.chat_models import ChatRequest, ModelInfo
from utils.helpers import (
get_current_timestamp,
generate_unique_id,
format_api_response,
extract_delta_content
)
dashscope.api_key = api_key
# 创建 FastAPI 应用
app = FastAPI(title="AI Chat API Server (Python)", version="2.0.0")
# 数据模型定义
class ChatMessage(BaseModel):
role: str
content: str
images: Optional[List[str]] = None
files: Optional[List[str]] = None
class ChatRequest(BaseModel):
model: str = "qwen-plus"
messages: List[Dict[str, Any]]
stream: bool = True
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 2000
class ModelInfo(BaseModel):
id: str
name: str
description: str
maxTokens: int
provider: str
# 模拟数据库 - 实际应用中应使用持久化存储
conversations_db: Dict[str, dict] = {}
@ -60,45 +33,13 @@ conversations_db: Dict[str, dict] = {}
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
"""中间件:记录请求日志"""
start_time = datetime.utcnow()
# 记录请求信息
print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - "
f"HTTP {request.method} {request.url.path} - "
f"IP: {request.client.host if request.client else 'unknown'}")
response = await call_next(request)
# 计算处理时间
process_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# 记录响应信息
print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - "
f"Response {response.status_code}, Process Time: {process_time:.2f}ms")
# 在响应头中添加处理时间
response.headers["X-Process-Time"] = f"{process_time:.2f}ms"
return response
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.post("/api/chat-ui/chat")
async def chat_endpoint(request: Request):
async def chat_endpoint_handler(body: dict):
"""
聊天接口 - 与阿里云百炼API兼容的接口
聊天接口处理器 - 与阿里云百炼API兼容的接口
这个端点会接收前端的聊天请求并转发到阿里云百炼API
"""
try:
# 获取请求体数据
body = await request.json()
# 检查请求格式并适配
# 如果是OpenAI兼容格式 (来自streamChat)
if 'messages' in body:
@ -161,15 +102,15 @@ async def chat_endpoint(request: Request):
# 只有当内容发生变化时才发送增量
if len(content) > len(full_content):
delta_content = content[len(full_content):]
delta_content = extract_delta_content(content, full_content)
full_content = content
if delta_content.strip(): # 只有当有非空白新内容时才发送
# 构建 SSE 数据块
data = {
"id": f"chatcmpl-{uuid.uuid4()}",
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": int(datetime.utcnow().timestamp()),
"created": get_current_timestamp(),
"model": model,
"choices": [
{
@ -190,15 +131,15 @@ async def chat_endpoint(request: Request):
# 只有当内容发生变化时才发送增量
if len(content) > len(full_content):
delta_content = content[len(full_content):]
delta_content = extract_delta_content(content, full_content)
full_content = content
if delta_content.strip(): # 只有当有非空白新内容时才发送
# 构建 SSE 数据块
data = {
"id": f"chatcmpl-{uuid.uuid4()}",
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": int(datetime.utcnow().timestamp()),
"created": get_current_timestamp(),
"model": model,
"choices": [
{
@ -225,9 +166,9 @@ async def chat_endpoint(request: Request):
# 发送结束信号
finish_data = {
"id": f"chatcmpl-{uuid.uuid4()}",
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": int(datetime.utcnow().timestamp()),
"created": get_current_timestamp(),
"model": model,
"choices": [
{
@ -283,13 +224,11 @@ async def chat_endpoint(request: Request):
if content:
# 构建前端期望的响应格式
chat_response = {
"id": str(uuid.uuid4()),
"conversationId": body.get('conversationId', str(uuid.uuid4())),
"content": content,
"model": model,
"createdAt": int(datetime.utcnow().timestamp())
}
chat_response = format_api_response(
content=content,
conversation_id=body.get('conversationId'),
model=model
)
if hasattr(response, 'usage') and response.usage:
chat_response["usage"] = {
@ -314,9 +253,9 @@ async def chat_endpoint(request: Request):
print(f"[ERROR] Error in chat endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/chat-ui/models")
async def get_models():
"""获取模型列表"""
async def get_models_handler():
"""获取模型列表处理器"""
models = [
ModelInfo(
id="qwen-max",
@ -342,25 +281,24 @@ async def get_models():
]
return [model.dict() for model in models]
@app.get("/api/chat-ui/conversations")
async def get_conversations():
"""获取所有对话"""
async def get_conversations_handler():
"""获取所有对话处理器"""
return list(conversations_db.values())
@app.get("/api/chat-ui/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""获取特定对话"""
async def get_conversation_handler(conversation_id: str):
"""获取特定对话处理器"""
conversation = conversations_db.get(conversation_id)
if not conversation:
raise HTTPException(status_code=404, detail="对话不存在")
return conversation
@app.post("/api/chat-ui/conversations")
async def save_conversation(request: Request):
"""保存或更新对话"""
async def save_conversation_handler(data: dict):
"""保存或更新对话处理器"""
try:
data = await request.json()
conversation_id = data.get('id') or str(uuid.uuid4())
conversation_id = data.get('id') or generate_unique_id()
conversation = {
"id": conversation_id,
@ -377,18 +315,18 @@ async def save_conversation(request: Request):
print(f"[ERROR] Error saving conversation: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/api/chat-ui/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""删除对话"""
async def delete_conversation_handler(conversation_id: str):
"""删除对话处理器"""
if conversation_id in conversations_db:
del conversations_db[conversation_id]
return {"success": True, "message": "删除成功"}
else:
raise HTTPException(status_code=404, detail="对话不存在")
@app.post("/api/chat-ui/upload")
async def upload_file(file: UploadFile = File(...)):
"""文件上传接口"""
async def upload_file_handler(file: UploadFile = File(...)):
"""文件上传处理器"""
try:
# 检查文件类型
allowed_types = ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'text/plain', 'application/pdf']
@ -397,7 +335,7 @@ async def upload_file(file: UploadFile = File(...)):
# 生成唯一文件名
file_extension = Path(file.filename).suffix.lower()
unique_filename = f"{int(datetime.utcnow().timestamp())}_{uuid.uuid4()}{file_extension}"
unique_filename = f"{int(datetime.utcnow().timestamp())}_{generate_unique_id()}{file_extension}"
file_path = upload_dir / unique_filename
# 保存文件
@ -421,9 +359,9 @@ async def upload_file(file: UploadFile = File(...)):
print(f"[ERROR] Upload error: {str(e)}")
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
@app.get("/uploads/{filename}")
async def serve_upload(filename: str):
"""提供上传文件访问"""
def serve_upload_handler(filename: str):
"""提供上传文件访问处理器"""
file_path = upload_dir / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
@ -431,30 +369,8 @@ async def serve_upload(filename: str):
from fastapi.responses import FileResponse
return FileResponse(str(file_path))
@app.post("/api/chat-ui/stop")
async def stop_generation():
"""停止生成接口"""
# 在实际实现中这里可能需要维护正在运行的任务ID列表
# 目前只是返回成功消息
return {"success": True, "message": "已发出停止指令"}
@app.post("/api/chat-ui/stop/{message_id}")
async def stop_generation_by_id(message_id: str):
"""根据消息ID停止生成"""
return {"success": True, "message": "已发出停止指令消息ID: " + message_id}
if __name__ == "__main__":
port = int(os.getenv("PORT", 8000))
print("="*50)
print(f"Python AI Chat Server 启动中...")
print(f"监听端口: {port}")
print(f"API Key 状态: {'已配置' if api_key else '未配置'}")
print("="*50)
if not api_key:
print("警告: 未在环境变量中检测到 ALIYUN_API_KEY!")
print("请在 .env 文件中添加您的百炼 API Key。")
else:
print("API Key 已检测到。")
uvicorn.run(app, host="0.0.0.0", port=port)
async def stop_generation_handler(message_id: str = None):
"""停止生成处理器"""
message = f"已发出停止指令消息ID: {message_id}" if message_id else "已发出停止指令"
return {"success": True, "message": message}

View File

@ -1,343 +1,153 @@
"""
Python Flask/FastAPI 服务器实现用于替代 Node.js 服务器
使用 DashScope Python SDK 连接阿里云百炼平台 API
改进版Python FastAPI服务器实现使用DashScope Python SDK连接阿里云百炼平台API
拆分模块版本
"""
import os
import json
import uuid
import asyncio
from datetime import datetime
from typing import Dict, List, Optional
from pathlib import Path
import dashscope
from dashscope import Generation
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, File, UploadFile, Form
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
import uvicorn
from fastapi import FastAPI, HTTPException, File, UploadFile, Request
from fastapi.responses import JSONResponse
# 导入模块
import sys
sys.path.append('/home/mt/project/ai-chat-ui/server_python')
from api.chat_routes import (
chat_endpoint_handler,
get_models_handler,
get_conversations_handler,
get_conversation_handler,
save_conversation_handler,
delete_conversation_handler,
upload_file_handler,
serve_upload_handler,
stop_generation_handler
)
from models.chat_models import ChatRequest, ModelInfo
from utils.helpers import log_request, log_response
# 加载环境变量
load_dotenv()
# 设置 DashScope API 密钥
dashscope.api_key = os.getenv("ALIYUN_API_KEY")
api_key = os.getenv("ALIYUN_API_KEY")
if not api_key:
raise ValueError("请在环境变量中设置 ALIYUN_API_KEY")
dashscope.api_key = api_key
# 创建 FastAPI 应用
app = FastAPI(title="AI Chat API Server", version="1.0.0")
app = FastAPI(title="AI Chat API Server (Python)", version="2.0.0")
# 数据模型定义
class ChatMessage(BaseModel):
role: str
content: str
images: Optional[List[str]] = None
files: Optional[List[str]] = None
class ChatRequest(BaseModel):
conversationId: Optional[str] = None
message: str
images: Optional[List[str]] = None
files: Optional[List[str]] = None
model: Optional[str] = "qwen-plus"
temperature: Optional[float] = 0.7
maxTokens: Optional[int] = 2000
systemPrompt: Optional[str] = "你是一个支持视觉理解的助手。"
stream: Optional[bool] = True
# 扩展选项
deepSearch: Optional[bool] = False
webSearch: Optional[bool] = False
deepThinking: Optional[bool] = False
class ModelInfo(BaseModel):
id: str
name: str
description: str
maxTokens: int
provider: str
# 模拟数据库 - 实际应用中应使用持久化存储
conversations_db: Dict[str, dict] = {}
# 配置上传目录
upload_dir = Path("uploads")
upload_dir.mkdir(exist_ok=True)
@app.middleware("http")
async def add_process_time_header(request, call_next):
"""中间件:记录请求处理时间"""
async def logging_middleware(request: Request, call_next):
"""中间件:记录请求日志"""
start_time = datetime.utcnow()
# 记录请求信息
log_request(request.method, request.url.path, request.client.host if request.client else 'unknown')
response = await call_next(request)
# 计算处理时间
process_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# 记录响应信息
log_response(response.status_code, process_time)
# 在响应头中添加处理时间
response.headers["X-Process-Time"] = f"{process_time:.2f}ms"
# 记录请求信息
print(f"HTTP {request.method} {request.url.path} {response.status_code} {process_time:.2f}ms")
return response
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {"status": "healthy", "timestamp": datetime.utcnow().isoformat()}
@app.post("/api/chat-ui/chat")
async def chat_endpoint(request: ChatRequest):
"""聊天接口 - 处理普通请求"""
try:
# 构建消息数组,考虑是否包含图片
user_content = []
async def chat_endpoint(request: Request):
"""聊天接口 - 与阿里云百炼API兼容的接口"""
body = await request.json()
return await chat_endpoint_handler(body)
# 添加用户消息文本
user_content.append({"type": "text", "text": request.message})
# 如果有图片,则添加到内容中
if request.images and len(request.images) > 0:
for image_url in request.images:
user_content.append({
"type": "image_url",
"image_url": image_url
})
# 构建请求给百炼的消息列表
messages = [
{"role": "system", "content": request.systemPrompt},
{"role": "user", "content": user_content}
]
# 调用 DashScope API
response = Generation.call(
model=request.model,
messages=messages,
stream=False, # 非流式响应
max_tokens=request.maxTokens,
temperature=request.temperature
)
if response.status_code == 200:
content = response.output.choices[0]['message']['content']
# 构建响应
result = {
"id": str(uuid.uuid4()),
"conversationId": request.conversationId or str(uuid.uuid4()),
"content": content,
"model": request.model,
"createdAt": int(datetime.utcnow().timestamp())
}
if hasattr(response, 'usage'):
result["usage"] = {
"promptTokens": response.usage.input_tokens,
"completionTokens": response.usage.output_tokens,
"totalTokens": response.usage.total_tokens
}
return JSONResponse(content=result)
else:
raise HTTPException(status_code=500, detail=f"API Error: {response.code} - {response.message}")
except Exception as e:
print(f"Error in chat endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/chat-ui/chat/stream")
async def chat_stream_endpoint(request: ChatRequest):
"""流式聊天接口 - 处理流式请求"""
async def event_generator():
try:
# 构建消息数组,考虑是否包含图片
user_content = []
# 添加用户消息文本
user_content.append({"type": "text", "text": request.message})
# 如果有图片,则添加到内容中
if request.images and len(request.images) > 0:
for image_url in request.images:
user_content.append({
"type": "image_url",
"image_url": image_url
})
# 构建请求给百炼的消息列表
messages = [
{"role": "system", "content": request.systemPrompt},
{"role": "user", "content": user_content}
]
# 调用 DashScope API流式
responses = Generation.call(
model=request.model,
messages=messages,
stream=True, # 流式响应
max_tokens=request.maxTokens,
temperature=request.temperature
)
for response in responses:
if response.status_code == 200:
content = response.output.choices[0]['message']['content']
if content:
# 发送流式数据
data = {
"choices": [
{
"delta": {"content": content},
"index": 0,
"finish_reason": None
}
]
}
yield f"data: {json.dumps(data)}\n\n"
else:
error_data = {
"error": {
"message": f"API Error: {response.code} - {response.message}",
"type": "api_error",
"param": None,
"code": response.code
}
}
yield f"data: {json.dumps(error_data)}\n\n"
break
# 发送结束信号
yield "data: [DONE]\n\n"
except Exception as e:
error_data = {
"error": {
"message": str(e),
"type": "server_error"
}
}
yield f"data: {json.dumps(error_data)}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/chat-ui/models")
async def get_models():
"""获取模型列表"""
models = [
ModelInfo(
id="qwen-max",
name="通义千问 Max",
description="最强大的模型",
maxTokens=8192,
provider="Aliyun"
),
ModelInfo(
id="qwen-plus",
name="通义千问 Plus",
description="能力均衡",
maxTokens=8192,
provider="Aliyun"
)
]
return [model.dict() for model in models]
return await get_models_handler()
@app.get("/api/chat-ui/conversations")
async def get_conversations():
"""获取所有对话"""
return list(conversations_db.values())
return await get_conversations_handler()
@app.get("/api/chat-ui/conversations/{conversation_id}")
async def get_conversation(conversation_id: str):
"""获取特定对话"""
conversation = conversations_db.get(conversation_id)
if not conversation:
raise HTTPException(status_code=404, detail="对话不存在")
return conversation
return await get_conversation_handler(conversation_id)
@app.post("/api/chat-ui/conversations")
async def save_conversation(
id: str = Form(None),
title: str = Form(...),
messages: str = Form(...)
):
async def save_conversation(request: Request):
"""保存或更新对话"""
# 解析 messages JSON 字符串
try:
parsed_messages = json.loads(messages)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid messages JSON")
data = await request.json()
return await save_conversation_handler(data)
conversation_id = id or str(uuid.uuid4())
conversation = {
"id": conversation_id,
"title": title,
"messages": parsed_messages,
"updatedAt": datetime.utcnow().isoformat()
}
conversations_db[conversation_id] = conversation
return conversation
@app.delete("/api/chat-ui/conversations/{conversation_id}")
async def delete_conversation(conversation_id: str):
"""删除对话"""
if conversation_id in conversations_db:
del conversations_db[conversation_id]
return {"success": True, "message": "删除成功"}
else:
raise HTTPException(status_code=404, detail="对话不存在")
return await delete_conversation_handler(conversation_id)
@app.post("/api/chat-ui/upload")
async def upload_file(file: UploadFile = File(...)):
"""文件上传接口"""
try:
# 生成唯一文件名
file_extension = Path(file.filename).suffix
unique_filename = f"{int(datetime.utcnow().timestamp())}-{uuid.uuid4()}{file_extension}"
file_path = upload_dir / unique_filename
return await upload_file_handler(file=file)
# 保存文件
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 返回文件信息
file_url = f"http://localhost:8000/uploads/{unique_filename}"
return {
"url": file_url,
"name": file.filename,
"size": len(content),
"mimeType": file.content_type
}
except Exception as e:
print(f"Upload error: {str(e)}")
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
@app.get("/uploads/{filename}")
async def serve_upload(filename: str):
"""提供上传文件的访问"""
file_path = upload_dir / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
return serve_upload_handler(filename)
from fastapi.responses import FileResponse
return FileResponse(file_path)
@app.post("/api/chat-ui/stop")
async def stop_generation():
"""停止生成接口"""
# 在实际实现中这里可能需要维护正在运行的任务ID列表
# 目前只是返回成功消息
return {"success": True, "message": "已发出停止指令"}
return await stop_generation_handler()
@app.post("/api/chat-ui/stop/{message_id}")
async def stop_generation_by_id(message_id: str):
"""根据消息ID停止生成"""
return {"success": True, "message": "已发出停止指令"}
return await stop_generation_handler(message_id)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8000))
print("="*50)
print(f"Python AI Chat Server 启动中...")
print(f"监听端口: {port}")
print(f"API Key 状态: {'已配置' if api_key else '未配置'}")
print("="*50)
if not api_key:
print("警告: 未在环境变量中检测到 ALIYUN_API_KEY!")
print("请在 .env 文件中添加您的百炼 API Key。")
else:
print("API Key 已检测到。")
uvicorn.run(app, host="0.0.0.0", port=port)

View File

@ -0,0 +1 @@
# models/__init__.py

View File

@ -0,0 +1,28 @@
"""
数据模型定义
"""
from pydantic import BaseModel
from typing import Dict, List, Optional, Any
class ChatMessage(BaseModel):
role: str
content: str
images: Optional[List[str]] = None
files: Optional[List[str]] = None
class ChatRequest(BaseModel):
model: str = "qwen-plus"
messages: List[Dict[str, Any]]
stream: bool = True
temperature: Optional[float] = 0.7
max_tokens: Optional[int] = 2000
class ModelInfo(BaseModel):
id: str
name: str
description: str
maxTokens: int
provider: str

View File

@ -0,0 +1 @@
# utils/__init__.py

View File

@ -0,0 +1,48 @@
"""
通用工具函数
"""
import os
import json
import uuid
from datetime import datetime
from typing import Dict
def get_current_timestamp():
"""获取当前时间戳"""
return int(datetime.utcnow().timestamp())
def generate_unique_id():
"""生成唯一ID"""
return str(uuid.uuid4())
def format_api_response(content: str, conversation_id: str = None, model: str = "qwen-plus"):
"""格式化API响应"""
return {
"id": generate_unique_id(),
"conversationId": conversation_id or generate_unique_id(),
"content": content,
"model": model,
"createdAt": get_current_timestamp()
}
def log_request(method: str, path: str, client_ip: str = "unknown"):
"""记录请求日志"""
print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - "
f"HTTP {method} {path} - IP: {client_ip}")
def log_response(status_code: int, process_time: float):
"""记录响应日志"""
print(f"[INFO] {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - "
f"Response {status_code}, Process Time: {process_time:.2f}ms")
def extract_delta_content(full_content: str, previous_content: str) -> str:
"""提取增量内容"""
if len(full_content) > len(previous_content):
return full_content[len(previous_content):]
return ""

View File

@ -24,4 +24,4 @@ fi
echo "虚拟环境已找到,正在激活..."
# 激活虚拟环境并启动服务器
source .venv/bin/activate && python3 app.py
source .venv/bin/activate && python3 main.py