feat: ds搜索功能初步完成,但是时间不是实时的

This commit is contained in:
肖应宇 2026-03-12 13:49:36 +08:00
parent ecff6edd61
commit 7569e588b9
5 changed files with 359 additions and 81 deletions

View File

@ -10,6 +10,7 @@ from typing import Dict, List
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .plugins import get_web_search_mode
from core import get_logger
logger = get_logger()
@ -29,7 +30,7 @@ DASHSCOPE_MODELS = [
max_tokens=8192,
provider="Aliyun",
supports_thinking=True,
supports_web_search=False,
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
@ -40,7 +41,7 @@ DASHSCOPE_MODELS = [
max_tokens=8192,
provider="Aliyun",
supports_thinking=True,
supports_web_search=False,
supports_web_search=True,
supports_vision=True,
supports_files=False,
),
@ -51,7 +52,7 @@ DASHSCOPE_MODELS = [
max_tokens=8192,
provider="Aliyun",
supports_thinking=False,
supports_web_search=False,
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
@ -188,7 +189,7 @@ class DashScopeAdapter(BaseAdapter):
chunk_count = 0
error_occurred = False
# 构建 API 调用参数
# 打印 API 调用参数
api_params = {
"model": request.model,
"messages": messages,
@ -197,6 +198,13 @@ class DashScopeAdapter(BaseAdapter):
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
@ -330,6 +338,13 @@ class DashScopeAdapter(BaseAdapter):
"max_tokens": request.max_tokens,
"result_format": "message",
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
@ -531,6 +546,13 @@ class DashScopeAdapter(BaseAdapter):
"max_tokens": request.max_tokens,
"temperature": request.temperature,
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:
@ -679,6 +701,13 @@ class DashScopeAdapter(BaseAdapter):
"enable_thinking": False,
"temperature": request.temperature,
}
# 使用统一网络搜索配置
web_search_mode = get_web_search_mode(request)
if web_search_mode:
api_params["enable_search"] = True
if web_search_mode == "deep":
api_params["search_options"] = {"enable_search_extension": True}
# 添加深度思考参数
if thinking_enabled:

View File

@ -11,6 +11,7 @@ from typing import Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .plugins import get_web_search_mode, build_glm_search_tool
from core import get_logger
logger = get_logger()
@ -24,7 +25,7 @@ GLM_MODELS = [
max_tokens=128000,
provider="ZhipuAI",
supports_thinking=True,
supports_web_search=False,
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
@ -64,11 +65,11 @@ GLM_MODELS = [
ModelInfo(
id="glm-z1-flash",
name="GLM-Z1 Flash",
description="深度思考推理模型",
description="深度思考推理模型,默认开启深度思考",
max_tokens=128000,
provider="ZhipuAI",
supports_thinking=True,
supports_web_search=False,
supports_web_search=True,
supports_vision=False,
supports_files=False,
),
@ -129,10 +130,10 @@ class GLMAdapter(BaseAdapter):
# 构建额外参数
extra_kwargs = {}
web_search = self._get_web_search_mode(request)
web_search_mode = get_web_search_mode(request)
if web_search:
extra_kwargs["tools"] = [self._build_web_search_tool(web_search)]
if web_search_mode:
extra_kwargs["tools"] = [build_glm_search_tool(web_search_mode)]
extra_kwargs["tool_choice"] = "auto"
# 深度思考正向选择True 时启用False 时禁用)
@ -260,46 +261,6 @@ class GLMAdapter(BaseAdapter):
"""检查模型是否支持深度思考"""
return model.lower() in THINKING_MODELS
def _get_web_search_mode(self, request: ChatCompletionRequest) -> str:
"""获取联网搜索模式"""
if request.deep_search:
return "deep"
elif request.web_search:
return "simple"
return ""
def _build_web_search_tool(self, mode: str) -> Dict:
"""构建联网搜索工具"""
from datetime import datetime
today = datetime.now().strftime("%Y年%m月%d")
if mode == "deep":
# 深度搜索:返回搜索结果详情
return {
"type": "web_search",
"web_search": {
"enable": True,
"search_engine": "search_pro",
"search_result": True,
"search_prompt": f"你是一位智能助手。请用简洁的语言总结网络搜索{{search_result}}中的关键信息,按重要性排序并引用来源日期。今天的日期是{today}",
"count": 5,
"search_recency_filter": "noLimit",
"content_size": "high",
},
}
else:
# 简单搜索
return {
"type": "web_search",
"web_search": {
"enable": True,
"search_engine": "search_pro",
"search_result": True,
"count": 5,
},
}
def _stream_chat(
self, client, messages, model, request, extra_kwargs
) -> StreamingResponse:

View File

@ -10,6 +10,7 @@ from typing import Dict, List, Optional
from fastapi.responses import JSONResponse, StreamingResponse
from .base import BaseAdapter, ChatCompletionRequest, ModelInfo
from .plugins import get_web_search_mode, build_openai_search_tool, execute_tavily_search
from core import get_logger
logger = get_logger()
@ -167,6 +168,12 @@ class OpenAIAdapter(BaseAdapter):
"max_tokens": request.max_tokens,
"stream": request.stream,
}
# 统一添加联网搜索插件参数
web_search_mode = get_web_search_mode(request)
if web_search_mode:
search_tool = build_openai_search_tool(web_search_mode)
kwargs["tools"] = [search_tool]
# DeepSeek 深度思考支持
extra_body = None
@ -219,17 +226,27 @@ class OpenAIAdapter(BaseAdapter):
def generator():
from utils.helpers import generate_unique_id, get_current_timestamp
nonlocal kwargs
resp = client.chat.completions.create(**kwargs)
full_content = ""
full_reasoning = ""
chunk_count = 0
for chunk in resp:
if chunk.choices:
# 可能需要执行多轮对话(当发生工具调用时)
while True:
resp = client.chat.completions.create(**kwargs)
full_content = ""
full_reasoning = ""
chunk_count = 0
tool_calls = []
current_tool_call = None
for chunk in resp:
if not chunk.choices:
continue
chunk_count += 1
delta = chunk.choices[0].delta
# 1. 收集可能有内容/推理
delta_content = {}
if hasattr(delta, "content") and delta.content:
delta_content["content"] = delta.content
@ -238,7 +255,27 @@ class OpenAIAdapter(BaseAdapter):
delta_content["reasoning_content"] = delta.reasoning_content
full_reasoning += delta.reasoning_content
if delta_content:
# 2. 收集可能产生的 tool_calls (流式)
if hasattr(delta, "tool_calls") and delta.tool_calls:
for tool_call_chunk in delta.tool_calls:
idx = tool_call_chunk.index
# 确保 tool_calls 列表足够长
while len(tool_calls) <= idx:
tool_calls.append({"id": "", "type": "function", "function": {"name": "", "arguments": ""}})
if tool_call_chunk.id:
tool_calls[idx]["id"] += tool_call_chunk.id
if tool_call_chunk.type:
# 对于 type, 因为 OpenAI 可能会传 chunks, 但通常只在第一块或者每块传, 为了避免 functionfunction, 使用赋值而非累加
tool_calls[idx]["type"] = tool_call_chunk.type
if tool_call_chunk.function:
if tool_call_chunk.function.name:
tool_calls[idx]["function"]["name"] += tool_call_chunk.function.name
if tool_call_chunk.function.arguments:
tool_calls[idx]["function"]["arguments"] += tool_call_chunk.function.arguments
# 3. 输出给前端普通文本
if delta_content and not tool_calls:
data = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
@ -253,28 +290,72 @@ class OpenAIAdapter(BaseAdapter):
],
}
yield f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
# 检查此轮请求是否收到了完整工具调用,若是则执行搜索逻辑并追加继续请求,不再让外部函数退出
if tool_calls:
logger.info(f"[{provider_name}] 检测到流式中包含了工具调用进行拦截并处理: {json.dumps(tool_calls, ensure_ascii=False)}")
# 把大模型的工具调用请求也追加进去
assistant_msg = {
"role": "assistant",
"content": full_content or None, # 如果工具和普通内容同时存在也保留
"tool_calls": tool_calls
}
if full_reasoning:
assistant_msg["reasoning_content"] = full_reasoning
elif self._provider_type == "deepseek" and self._supports_thinking(kwargs["model"]):
# DeepSeek 推理模型在有工具调用时必须有 reasoning_content 字段
assistant_msg["reasoning_content"] = ""
kwargs["messages"].append(assistant_msg)
for tc in tool_calls:
if tc["function"]["name"] == "web_search":
try:
args = json.loads(tc["function"]["arguments"])
query = args.get("query", "")
mode = "deep" if "advanced" in str(kwargs.get("tools", [])) else "simple"
logger.info(f"[{provider_name}] 执行搜索插件: {query}")
search_result = execute_tavily_search(query, mode=mode)
except Exception as e:
search_result = f"获取搜索参数或执行搜索失败: {str(e)}"
logger.error(search_result)
# 把执行结果告诉大模型
kwargs["messages"].append({
"role": "tool",
"tool_call_id": tc["id"],
"name": "web_search",
"content": search_result
})
# 工具执行完毕,继续发起下一轮请求大模型归纳总结输出
continue
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 如果没有工具调用或者全部分发完毕,正常结束给前端
finish = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion.chunk",
"created": get_current_timestamp(),
"model": kwargs["model"],
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(finish, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
# 打印流式响应结果
logger.info(f"[{provider_name}] 流式响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
# 打印流式响应结果
logger.info(f"[{provider_name}] 流式响应完成:")
logger.info(f" - chunks: {chunk_count}")
logger.info(f" - content_length: {len(full_content)} 字符")
if full_reasoning:
logger.info(f" - reasoning_length: {len(full_reasoning)} 字符")
logger.info(
f" - content_preview: {full_content[:200]}..."
if len(full_content) > 200
else f" - content: {full_content}"
)
# 结束外层循环退出生成器
break
return StreamingResponse(generator(), media_type="text/event-stream")
@ -284,10 +365,58 @@ class OpenAIAdapter(BaseAdapter):
"""非流式聊天"""
from utils.helpers import generate_unique_id, get_current_timestamp
resp = client.chat.completions.create(**kwargs)
while True:
resp = client.chat.completions.create(**kwargs)
message = resp.choices[0].message
content = message.content or ""
message = resp.choices[0].message
# 判断是否涉及工具调用
if hasattr(message, "tool_calls") and message.tool_calls:
# 记录这轮的助手回复
assistant_msg = {"role": "assistant", "content": message.content or None}
# openai sdk 对象转 dict 存储 tool_calls
tool_calls_dict = []
for tc in message.tool_calls:
tc_dict = {
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
}
tool_calls_dict.append(tc_dict)
assistant_msg["tool_calls"] = tool_calls_dict
if hasattr(message, "reasoning_content") and message.reasoning_content:
assistant_msg["reasoning_content"] = message.reasoning_content
elif self._provider_type == "deepseek" and self._supports_thinking(kwargs["model"]):
# DeepSeek 推理模型在有工具调用时必须有 reasoning_content 字段
assistant_msg["reasoning_content"] = ""
kwargs["messages"].append(assistant_msg)
# 执行所有的工具调用
for tc in tool_calls_dict:
if tc["function"]["name"] == "web_search":
try:
args = json.loads(tc["function"]["arguments"])
query = args.get("query", "")
mode = "deep" if "advanced" in str(kwargs.get("tools", [])) else "simple"
search_result = execute_tavily_search(query, mode=mode)
except Exception as e:
search_result = f"执行搜索失败: {str(e)}"
# 把执行结果追加到消息中
kwargs["messages"].append({
"role": "tool",
"tool_call_id": tc["id"],
"name": "web_search",
"content": search_result
})
# 工具调用完成,发起下一轮请求获取归纳答案
continue
# 处理普通的文本回复
content = message.content or ""
response = {
"id": f"chatcmpl-{generate_unique_id()}",
"object": "chat.completion",

113
server/adapters/plugins.py Normal file
View File

@ -0,0 +1,113 @@
import os
import urllib.request
import json
from typing import Dict
from datetime import datetime
from .base import ChatCompletionRequest
def get_web_search_mode(request: ChatCompletionRequest) -> str:
"""获取联网搜索模式"""
if getattr(request, 'deep_search', False):
return "deep"
elif getattr(request, 'web_search', False):
return "simple"
return ""
def execute_tavily_search(query: str, mode: str = "simple") -> str:
"""真实调用 Tavily 搜索 API"""
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
return "本地环境变量 TAVILY_API_KEY 未配置,无法进行搜索。"
url = "https://api.tavily.com/search"
headers = {"Content-Type": "application/json"}
data = {
"api_key": api_key,
"query": query,
"search_depth": "advanced" if mode == "deep" else "basic",
"include_answer": False,
"max_results": 5 if mode == "deep" else 3
}
req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers=headers, method='POST')
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
results = result.get("results", [])
if not results:
return "搜索未返回结果。"
formatted_res = []
for i, res in enumerate(results):
formatted_res.append(f"[{i+1}] {res.get('title')}\n{res.get('content')}\n链接: {res.get('url')}")
return "\n\n".join(formatted_res)
except Exception as e:
return f"搜索请求失败,错误: {str(e)}"
def build_openai_search_tool(mode: str) -> Dict:
"""
构建兼容型联网搜索插件工具结构 ( DeepSeek / OpenAI SDK 使用)
注意此类提供标准的 Tool/Function Function calling 模板
深度思考通常结合内置联网或者其他外挂流程实现
"""
if mode == "deep":
return {
"type": "function",
"function": {
"name": "web_search",
"description": "深度互联网搜索插件(查找并阅读网页内容)",
"parameters": {"type": "object", "properties": {"query": {"type": "string"}}},
}
}
else:
return {
"type": "function",
"function": {
"name": "web_search",
"description": "进行互联网搜索并获取实时信息或资料以辅助回答。",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "要搜索的准确关键词或短语"
}
},
"required": ["query"]
},
}
}
def build_glm_search_tool(mode: str) -> Dict:
"""构建 GLM 联网搜索工具"""
today = datetime.now().strftime("%Y年%m月%d")
if mode == "deep":
# 深度搜索:返回搜索结果详情
return {
"type": "web_search",
"web_search": {
"enable": True,
"search_engine": "search_pro",
"search_result": True,
"search_prompt": f"你是一位智能助手。请用简洁的语言总结网络搜索{{search_result}}中的关键信息,按重要性排序并引用来源日期。今天的日期是{today}",
"count": 5,
"search_recency_filter": "noLimit",
"content_size": "high",
},
}
else:
# 简单搜索
return {
"type": "web_search",
"web_search": {
"enable": True,
"search_engine": "search_pro",
"search_result": True,
"count": 5,
},
}

View File

@ -0,0 +1,46 @@
import os
import sys
import urllib.request
import json
def test_tavily(api_key: str):
url = "https://api.tavily.com/search"
headers = {
"Content-Type": "application/json"
}
data = {
"api_key": api_key,
"query": "武汉明天的天气",
"search_depth": "basic",
"include_answer": False,
"max_results": 3
}
# 模拟请求
req = urllib.request.Request(url, data=json.dumps(data).encode('utf-8'), headers=headers, method='POST')
try:
with urllib.request.urlopen(req) as response:
result = json.loads(response.read().decode('utf-8'))
print("✅ Tavily API Key 测试成功!成功获取以下搜索结果:\n")
for i, res in enumerate(result.get("results", [])):
print(f"[{i+1}] 标题: {res.get('title')}")
print(f" 内容: {res.get('content')}")
print(f" 链接: {res.get('url')}\n")
except urllib.error.HTTPError as e:
print(f"❌ 请求失败HTTP 错误代码: {e.code}")
print("这通常意味着您的 API Key 错误或无效。详细信息:")
error_msg = e.read().decode('utf-8')
print(error_msg)
except Exception as e:
print(f"❌ 发生其他错误: {str(e)}")
if __name__ == "__main__":
key = input("请输入您的 Tavily API Key (以 tvly- 开头): ").strip()
if not key:
print("未输入 Key程序退出。")
sys.exit(1)
print("\n正在连接 Tavily 进行测试搜索...")
test_tavily(key)