feat: 新增 oss 与智普图片解析功能

This commit is contained in:
肖应宇 2026-03-04 18:00:14 +08:00
parent 89b02c4c93
commit cb80d5cee7
5 changed files with 415 additions and 7 deletions

View File

@ -35,8 +35,12 @@ from pathlib import Path
sys.path.append(str(Path(__file__).parent.parent))
from models.chat_models import ChatRequest, ModelInfo
from utils.helpers import (extract_delta_content, format_api_response,
generate_unique_id, get_current_timestamp)
from utils.helpers import (
extract_delta_content,
format_api_response,
generate_unique_id,
get_current_timestamp,
)
from utils.logger import log_error, log_exception, log_info
# 模拟数据库 - 实际应用中应使用持久化存储
@ -1030,13 +1034,18 @@ async def upload_file_handler(file: UploadFile = File(...)):
unique_filename = f"{int(datetime.utcnow().timestamp())}_{generate_unique_id()}{file_extension}"
file_path = upload_dir / unique_filename
# 保存文件
# 保存文件到本地(临时缓存)
content = await file.read()
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
# 文件关闭后再上传到 OSS
from utils.oss_uploader import upload_file as oss_upload
oss_result = oss_upload(str(file_path))
file_url = oss_result["url"]
# 返回文件信息
file_url = f"http://localhost:8000/uploads/{unique_filename}"
result = {
"url": file_url,
"name": file.filename,

View File

@ -9,3 +9,5 @@ typing-extensions==4.12.2
# 路线二阿里云文档智能解析doc/docx/pdf
llama-index-core>=0.10.0
llama-index-readers-dashscope>=0.1.0
# 阿里云 OSS 上传
alibabacloud-oss-v2

View File

@ -47,7 +47,7 @@ def get_client():
# ── 模型映射 ──────────────────────────────────────────────────────────
DEFAULT_TEXT_MODEL = "glm-4.5-Air" # glm-4.6 文本统一模型
DEFAULT_VISION_MODEL = "glm-4.5-Air"
DEFAULT_VISION_MODEL = "glm-4.6v"
MODEL_MAP = {
"qwen-max": "glm-4.5-Air",
@ -61,7 +61,12 @@ MODEL_MAP = {
def resolve_model(model: str, has_vision: bool = False) -> str:
if model.startswith("glm"):
return model
return MODEL_MAP.get(model, DEFAULT_TEXT_MODEL)
mapped = MODEL_MAP.get(model, DEFAULT_TEXT_MODEL)
# 当消息包含图片时,强制使用视觉模型
if has_vision and mapped != DEFAULT_VISION_MODEL:
print(f"[GLM] 检测到图片,模型从 {mapped} 切换为 {DEFAULT_VISION_MODEL}")
return DEFAULT_VISION_MODEL
return mapped
# ── 文件上传(含 file_id 缓存)───────────────────────────────────────
@ -249,6 +254,44 @@ async def glm_stream_generator(
f"[GLM] 流式请求model={actual_model} vision={has_vision} "
f"web_search={web_search} thinking={deep_thinking}"
)
# ── 调试:打印发送给 GLM 的完整消息结构 ──
for i, msg in enumerate(glm_msgs):
role = msg.get("role", "?")
content = msg.get("content", "")
if isinstance(content, list):
for j, part in enumerate(content):
if not isinstance(part, dict):
print(f"[GLM-DEBUG] msg[{i}].content[{j}]: {type(part).__name__}")
continue
part_type = part.get("type", "?")
if part_type == "image_url":
img_val = part.get("image_url", "")
img_url = (
img_val.get("url", "")
if isinstance(img_val, dict)
else str(img_val)
)
display = img_url[:120] + "..." if len(img_url) > 120 else img_url
print(
f"[GLM-DEBUG] msg[{i}].content[{j}]: type=image_url, url={display}"
)
elif part_type == "text":
preview = (part.get("text", "") or "")[:100]
print(
f"[GLM-DEBUG] msg[{i}].content[{j}]: type=text, text={preview}"
)
else:
print(f"[GLM-DEBUG] msg[{i}].content[{j}]: {part}")
else:
print(f"[GLM-DEBUG] msg[{i}]: role={role}, content={str(content)[:150]}")
if extra_kwargs:
print(f"[GLM-DEBUG] extra_kwargs={extra_kwargs}")
# 原始 JSON 转储(用于排查结构问题)
import json as _json
print(
f"[GLM-RAW] messages={_json.dumps(glm_msgs, ensure_ascii=False, default=str)[:2000]}"
)
chunk_queue: queue.Queue = queue.Queue(maxsize=128)
@ -367,6 +410,44 @@ def glm_chat_sync(
client = get_client()
print(f"[GLM] 非流式请求model={actual_model}")
# ── 调试:打印发送给 GLM 的完整消息结构 ──
for i, msg in enumerate(glm_msgs):
role = msg.get("role", "?")
content = msg.get("content", "")
if isinstance(content, list):
for j, part in enumerate(content):
if not isinstance(part, dict):
print(f"[GLM-DEBUG] msg[{i}].content[{j}]: {type(part).__name__}")
continue
part_type = part.get("type", "?")
if part_type == "image_url":
img_val = part.get("image_url", "")
img_url = (
img_val.get("url", "")
if isinstance(img_val, dict)
else str(img_val)
)
display = img_url[:120] + "..." if len(img_url) > 120 else img_url
print(
f"[GLM-DEBUG] msg[{i}].content[{j}]: type=image_url, url={display}"
)
elif part_type == "text":
preview = (part.get("text", "") or "")[:100]
print(
f"[GLM-DEBUG] msg[{i}].content[{j}]: type=text, text={preview}"
)
else:
print(f"[GLM-DEBUG] msg[{i}].content[{j}]: {part}")
else:
print(f"[GLM-DEBUG] msg[{i}]: role={role}, content={str(content)[:150]}")
if extra_kwargs:
print(f"[GLM-DEBUG] extra_kwargs={extra_kwargs}")
# 原始 JSON 转储(用于排查结构问题)
import json as _json
print(
f"[GLM-RAW] messages={_json.dumps(glm_msgs, ensure_ascii=False, default=str)[:2000]}"
)
resp = client.chat.completions.create(
model=actual_model,
messages=glm_msgs,

View File

@ -0,0 +1,227 @@
"""
阿里云 OSS 简单上传工具
参考文档: https://help.aliyun.com/zh/oss/user-guide/simple-upload
支持:
- 上传本地文件
- 上传字节数据 / 字符串
- 上传文件流file-like object
- 自动根据文件后缀生成 OSS 对象路径
- 返回可公开访问的 URL需确保 Bucket 已开启公共读或已配置签名
前置条件:
1. pip install alibabacloud-oss-v2
2. .env 中配置以下变量:
OSS_ACCESS_KEY_ID=<你的 AccessKey ID>
OSS_ACCESS_KEY_SECRET=<你的 AccessKey Secret>
OSS_BUCKET_NAME=<存储空间名称>
OSS_ENDPOINT=<Endpoint https://oss-cn-hangzhou.aliyuncs.com>
OSS_REGION=<地域 cn-hangzhou>
OSS_URL_PREFIX=<可选自定义域名前缀 https://cdn.example.com>
"""
import os
import uuid
import mimetypes
from datetime import datetime
from pathlib import Path
from typing import Optional, Union, BinaryIO
import alibabacloud_oss_v2 as oss
from dotenv import load_dotenv
# ── 加载环境变量 ──────────────────────────────────────────────
load_dotenv()
# AccessKey 从系统环境变量读取(~/.bashrc 中 export 设置)
OSS_ACCESS_KEY_ID = os.environ.get("OSS_ACCESS_KEY_ID", "")
OSS_ACCESS_KEY_SECRET = os.environ.get("OSS_ACCESS_KEY_SECRET", "")
# 以下配置从 .env 文件读取
OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "")
OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "")
OSS_REGION = os.getenv("OSS_REGION", "")
# 可选:自定义域名前缀,用于拼接返回的公开 URL
OSS_URL_PREFIX = os.getenv("OSS_URL_PREFIX", "")
def _get_client() -> oss.Client:
"""创建并返回 OSS 客户端实例"""
credentials_provider = oss.credentials.StaticCredentialsProvider(
access_key_id=OSS_ACCESS_KEY_ID,
access_key_secret=OSS_ACCESS_KEY_SECRET,
)
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = OSS_REGION
cfg.endpoint = OSS_ENDPOINT
return oss.Client(cfg)
def _generate_object_key(filename: str, prefix: str = "uploads") -> str:
"""
根据文件名生成唯一的 OSS 对象 Key
格式: {prefix}/{日期}/{uuid}_{原始文件名}
"""
date_str = datetime.now().strftime("%Y%m%d")
unique_id = uuid.uuid4().hex[:8]
safe_name = Path(filename).name # 只取文件名,去掉路径
return f"{prefix}/{date_str}/{unique_id}_{safe_name}"
def _build_url(object_key: str) -> str:
"""根据对象 Key 构建可访问的 URL"""
if OSS_URL_PREFIX:
return f"{OSS_URL_PREFIX.rstrip('/')}/{object_key}"
# 默认使用 Bucket 域名拼接
endpoint = OSS_ENDPOINT.replace("https://", "").replace("http://", "")
return f"https://{OSS_BUCKET_NAME}.{endpoint}/{object_key}"
def upload_file(
file_path: str,
object_key: Optional[str] = None,
prefix: str = "uploads",
) -> dict:
"""
上传本地文件到 OSS
参数:
file_path: 本地文件的绝对路径
object_key: 自定义 OSS 对象名称 None 则自动生成
prefix: 对象 Key 的前缀目录
返回:
包含上传结果的字典:
{
"url": "文件访问地址",
"object_key": "OSS 对象路径",
"etag": "ETag",
"status_code": 200,
}
"""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
filename = os.path.basename(file_path)
if object_key is None:
object_key = _generate_object_key(filename, prefix)
# 检测文件类型,设置 Content-Type
content_type, _ = mimetypes.guess_type(file_path)
client = _get_client()
result = client.put_object_from_file(
oss.PutObjectRequest(
bucket=OSS_BUCKET_NAME,
key=object_key,
content_type=content_type,
),
file_path,
)
return {
"url": _build_url(object_key),
"object_key": object_key,
"etag": result.etag,
"status_code": result.status_code,
}
def upload_bytes(
data: Union[bytes, str],
filename: str,
object_key: Optional[str] = None,
prefix: str = "uploads",
content_type: Optional[str] = None,
) -> dict:
"""
上传字节数据或字符串到 OSS
参数:
data: 要上传的数据bytes str
filename: 用于生成 Key 的文件名 "report.txt"
object_key: 自定义 OSS 对象名称 None 则自动生成
prefix: 对象 Key 的前缀目录
content_type: 自定义 Content-Type
返回:
包含上传结果的字典
"""
if isinstance(data, str):
data = data.encode("utf-8")
if object_key is None:
object_key = _generate_object_key(filename, prefix)
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
client = _get_client()
result = client.put_object(
oss.PutObjectRequest(
bucket=OSS_BUCKET_NAME,
key=object_key,
body=data,
content_type=content_type,
)
)
return {
"url": _build_url(object_key),
"object_key": object_key,
"etag": result.etag,
"status_code": result.status_code,
}
def upload_fileobj(
fileobj: BinaryIO,
filename: str,
object_key: Optional[str] = None,
prefix: str = "uploads",
content_type: Optional[str] = None,
) -> dict:
"""
上传文件流file-like object OSS
参数:
fileobj: 文件流对象 open(path, 'rb') FastAPI UploadFile.file
filename: 用于生成 Key 的文件名
object_key: 自定义 OSS 对象名称 None 则自动生成
prefix: 对象 Key 的前缀目录
content_type: 自定义 Content-Type
返回:
包含上传结果的字典
"""
data = fileobj.read()
return upload_bytes(
data=data,
filename=filename,
object_key=object_key,
prefix=prefix,
content_type=content_type,
)
# ────────────────────────────────────────────────────────────────
# 命令行入口python -m utils.oss_uploader --file <路径>
# ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="阿里云 OSS 简单上传工具")
parser.add_argument("--file", required=True, help="要上传的本地文件路径")
parser.add_argument("--key", default=None, help="自定义 OSS 对象路径(可选)")
parser.add_argument(
"--prefix", default="uploads", help="对象 Key 前缀(默认: uploads"
)
args = parser.parse_args()
result = upload_file(args.file, object_key=args.key, prefix=args.prefix)
print("✅ 上传成功!")
print(f" 访问地址: {result['url']}")
print(f" 对象路径: {result['object_key']}")
print(f" ETag: {result['etag']}")
print(f" 状态码: {result['status_code']}")

View File

@ -0,0 +1,89 @@
"""
测试脚本上传文件到 OSS 获取 URL 发送给 GLM 进行识别
用法:
cd server
source ~/.bashrc && source .venv/bin/activate
python -m utils.test_oss_glm --file <本地文件路径> [--prompt "描述一下这张图片"]
"""
import argparse
import sys
from pathlib import Path
# 确保 server 目录在 sys.path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from utils.oss_uploader import upload_file
from utils.glm_adapter import glm_chat_sync
def main():
parser = argparse.ArgumentParser(description="上传文件到 OSS 并让 GLM 读取")
parser.add_argument("--file", required=True, help="要上传的本地文件路径")
parser.add_argument(
"--prompt", default="请描述一下这张图片的内容", help="发给 GLM 的提示词"
)
parser.add_argument(
"--model", default="glm-4.6v", help="GLM 模型名称(默认: glm-4.6v"
)
args = parser.parse_args()
# ── 第一步:上传文件到 OSS ────────────────────────────────
file_path = args.file
if not Path(file_path).exists():
print(f"❌ 文件不存在: {file_path}")
sys.exit(1)
print(f"📤 正在上传文件: {file_path}")
oss_result = upload_file(file_path)
file_url = oss_result["url"]
print(f"✅ 上传成功!")
print(f" URL: {file_url}")
print()
# ── 第二步:构建消息,把 URL 发给 GLM ──────────────────────
messages = [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": file_url},
},
{
"type": "text",
"text": args.prompt,
},
],
}
]
print(f"🤖 正在请求 GLM ({args.model}) 识别图片...")
print(f" 提示词: {args.prompt}")
print()
result = glm_chat_sync(
messages=messages,
model=args.model,
temperature=0.7,
max_tokens=2048,
)
print("" * 60)
print("📝 GLM 回复:")
print("" * 60)
print(result["content"])
print("" * 60)
if result.get("usage"):
usage = result["usage"]
print(
f"\n📊 Token 用量: 输入 {usage['promptTokens']} | "
f"输出 {usage['completionTokens']} | "
f"总计 {usage['totalTokens']}"
)
if __name__ == "__main__":
main()