ai-chat-ui/server/utils/oss_uploader.py

321 lines
9.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
阿里云 OSS 简单上传工具
参考文档: https://help.aliyun.com/zh/oss/user-guide/simple-upload
支持:
- 上传本地文件
- 上传字节数据 / 字符串
- 上传文件流file-like object
- 自动根据文件后缀生成 OSS 对象路径
- 返回可公开访问的 URL需确保 Bucket 已开启公共读或已配置签名)
前置条件:
1. pip install alibabacloud-oss-v2
2. 在 .env 中配置以下变量:
OSS_ACCESS_KEY_ID=<你的 AccessKey ID>
OSS_ACCESS_KEY_SECRET=<你的 AccessKey Secret>
OSS_BUCKET_NAME=<存储空间名称>
OSS_ENDPOINT=<Endpoint如 https://oss-cn-hangzhou.aliyuncs.com>
OSS_REGION=<地域,如 cn-hangzhou>
OSS_URL_PREFIX=<可选,自定义域名前缀,如 https://cdn.example.com>
"""
import os
import uuid
import mimetypes
from datetime import datetime
from pathlib import Path
from typing import Optional, Union, BinaryIO
import alibabacloud_oss_v2 as oss
from dotenv import load_dotenv
# ── 加载环境变量 ──────────────────────────────────────────────
load_dotenv()
# 所有配置从 .env 文件读取
OSS_ACCESS_KEY_ID = os.getenv("OSS_ACCESS_KEY_ID", "")
OSS_ACCESS_KEY_SECRET = os.getenv("OSS_ACCESS_KEY_SECRET", "")
OSS_BUCKET_NAME = os.getenv("OSS_BUCKET_NAME", "")
OSS_ENDPOINT = os.getenv("OSS_ENDPOINT", "")
OSS_REGION = os.getenv("OSS_REGION", "")
# 可选:自定义域名前缀,用于拼接返回的公开 URL
OSS_URL_PREFIX = os.getenv("OSS_URL_PREFIX", "")
def _get_client() -> oss.Client:
"""创建并返回 OSS 客户端实例"""
credentials_provider = oss.credentials.StaticCredentialsProvider(
access_key_id=OSS_ACCESS_KEY_ID,
access_key_secret=OSS_ACCESS_KEY_SECRET,
)
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = OSS_REGION
cfg.endpoint = OSS_ENDPOINT
return oss.Client(cfg)
def _generate_object_key(filename: str, prefix: str = "chat-ui") -> str:
"""
根据文件名生成唯一的 OSS 对象 Key
格式: {prefix}/{日期}/{uuid}_{原始文件名}
"""
# TODO: 需要按用户ID分目录
date_str = datetime.now().strftime("%Y%m%d")
unique_id = uuid.uuid4().hex[:8]
safe_name = Path(filename).name # 只取文件名,去掉路径
return f"{prefix}/{date_str}/{unique_id}_{safe_name}"
def _build_url(object_key: str) -> str:
"""根据对象 Key 构建可访问的 URL"""
if OSS_URL_PREFIX:
return f"{OSS_URL_PREFIX.rstrip('/')}/{object_key}"
# 默认使用 Bucket 域名拼接
endpoint = OSS_ENDPOINT.replace("https://", "").replace("http://", "")
return f"https://{OSS_BUCKET_NAME}.{endpoint}/{object_key}"
def upload_file(
file_path: str,
object_key: Optional[str] = None,
prefix: str = "chat-ui",
) -> dict:
"""
上传本地文件到 OSS
参数:
file_path: 本地文件的绝对路径
object_key: 自定义 OSS 对象名称,为 None 则自动生成
prefix: 对象 Key 的前缀目录
返回:
包含上传结果的字典:
{
"url": "文件访问地址",
"object_key": "OSS 对象路径",
"etag": "ETag",
"status_code": 200,
}
"""
if not os.path.isfile(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
filename = os.path.basename(file_path)
if object_key is None:
object_key = _generate_object_key(filename, prefix)
# 检测文件类型,设置 Content-Type
content_type, _ = mimetypes.guess_type(file_path)
client = _get_client()
result = client.put_object_from_file(
oss.PutObjectRequest(
bucket=OSS_BUCKET_NAME,
key=object_key,
content_type=content_type,
),
file_path,
)
return {
"url": _build_url(object_key),
"object_key": object_key,
"etag": result.etag,
"status_code": result.status_code,
}
def upload_bytes(
data: Union[bytes, str],
filename: str,
object_key: Optional[str] = None,
prefix: str = "uploads",
content_type: Optional[str] = None,
) -> dict:
"""
上传字节数据或字符串到 OSS
参数:
data: 要上传的数据bytes 或 str
filename: 用于生成 Key 的文件名(如 "report.txt"
object_key: 自定义 OSS 对象名称,为 None 则自动生成
prefix: 对象 Key 的前缀目录
content_type: 自定义 Content-Type
返回:
包含上传结果的字典
"""
if isinstance(data, str):
data = data.encode("utf-8")
if object_key is None:
object_key = _generate_object_key(filename, prefix)
if content_type is None:
content_type, _ = mimetypes.guess_type(filename)
client = _get_client()
result = client.put_object(
oss.PutObjectRequest(
bucket=OSS_BUCKET_NAME,
key=object_key,
body=data,
content_type=content_type,
)
)
return {
"url": _build_url(object_key),
"object_key": object_key,
"etag": result.etag,
"status_code": result.status_code,
}
def upload_fileobj(
fileobj: BinaryIO,
filename: str,
object_key: Optional[str] = None,
prefix: str = "uploads",
content_type: Optional[str] = None,
) -> dict:
"""
上传文件流file-like object到 OSS
参数:
fileobj: 文件流对象(如 open(path, 'rb') 或 FastAPI 的 UploadFile.file
filename: 用于生成 Key 的文件名
object_key: 自定义 OSS 对象名称,为 None 则自动生成
prefix: 对象 Key 的前缀目录
content_type: 自定义 Content-Type
返回:
包含上传结果的字典
"""
data = fileobj.read()
return upload_bytes(
data=data,
filename=filename,
object_key=object_key,
prefix=prefix,
content_type=content_type,
)
def delete_file(object_key: str) -> bool:
"""
删除 OSS 上的单个文件
参数:
object_key: OSS 对象路径(如 "uploads/20240301/abc123_file.jpg"
返回:
True 表示删除成功False 表示失败
"""
try:
client = _get_client()
result = client.delete_object(
oss.DeleteObjectRequest(
bucket=OSS_BUCKET_NAME,
key=object_key,
)
)
return result.status_code == 204
except Exception as e:
print(f"[OSS] 删除文件失败: {object_key}, 错误: {e}")
return False
def delete_files(object_keys: list) -> dict:
"""
批量删除 OSS 上的文件
参数:
object_keys: OSS 对象路径列表
返回:
{
"deleted": ["成功删除的 object_key 列表"],
"failed": ["删除失败的 object_key 列表"],
}
"""
deleted = []
failed = []
for key in object_keys:
if delete_file(key):
deleted.append(key)
else:
failed.append(key)
return {"deleted": deleted, "failed": failed}
def extract_object_key_from_url(url: str) -> Optional[str]:
"""
从 OSS URL 中提取 object_key
参数:
url: OSS 文件的完整 URL
返回:
object_key 或 None如果不是有效的 OSS URL
"""
if not url:
return None
# 支持两种 URL 格式:
# 1. 自定义域名: OSS_URL_PREFIX/object_key
# 2. 默认域名: https://bucket.endpoint/object_key
try:
# 移除查询参数
url_path = url.split("?")[0]
if OSS_URL_PREFIX:
# 自定义域名格式
prefix = OSS_URL_PREFIX.rstrip("/")
if url_path.startswith(prefix):
return url_path[len(prefix) + 1:] # +1 去掉开头的 /
# 默认域名格式: https://bucket.endpoint/object_key
endpoint = OSS_ENDPOINT.replace("https://", "").replace("http://", "")
default_prefix = f"https://{OSS_BUCKET_NAME}.{endpoint}/"
if url_path.startswith(default_prefix):
return url_path[len(default_prefix):]
# 也尝试匹配 http 版本
http_prefix = f"http://{OSS_BUCKET_NAME}.{endpoint}/"
if url_path.startswith(http_prefix):
return url_path[len(http_prefix):]
return None
except Exception:
return None
# ────────────────────────────────────────────────────────────────
# 命令行入口python -m utils.oss_uploader --file <路径>
# ────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="阿里云 OSS 简单上传工具")
parser.add_argument("--file", required=True, help="要上传的本地文件路径")
parser.add_argument("--key", default=None, help="自定义 OSS 对象路径(可选)")
parser.add_argument(
"--prefix", default="uploads", help="对象 Key 前缀(默认: uploads"
)
args = parser.parse_args()
result = upload_file(args.file, object_key=args.key, prefix=args.prefix)
print("✅ 上传成功!")
print(f" 访问地址: {result['url']}")
print(f" 对象路径: {result['object_key']}")
print(f" ETag: {result['etag']}")
print(f" 状态码: {result['status_code']}")