diff --git a/config.yaml b/config.yaml index 52369c0b00058cd925c4f64d3824e38e9c67cb63..6cb9a52c4f1a6867560c58e51c74123b5acb2dff 100644 --- a/config.yaml +++ b/config.yaml @@ -106,9 +106,7 @@ scenarios: clarify_with_user: true write_research_brief: true compress_research: true - generate_report_outline: true tool_call: clarify_with_user: true write_research_brief: true compress_research: true - generate_report_outline: true diff --git a/deepinsight/core/agent/conference_qa/supervisor.py b/deepinsight/core/agent/conference_qa/supervisor.py index 4ea24bb8c0bd7ddee3ea1341032731bcbbdf5f4d..32a497607f5382fdb9a98c6e7f6ede1558a12381 100644 --- a/deepinsight/core/agent/conference_qa/supervisor.py +++ b/deepinsight/core/agent/conference_qa/supervisor.py @@ -372,7 +372,9 @@ async def deep_research_team_node(state: SupervisorState, config: RunnableConfig for key, usage in all_keys_usage.items(): logging.error(f"API Key: {key} - Plan Limit: {usage['plan_limit']}, Plan Usage: {usage['plan_usage']}") writer = get_stream_writer() - writer({"result": "no tavily key can be used, please set first."}) + writer(FinalResult( + final_report="no tavily key can be used, please set first." + )) return Command(goto=END) parent_configurable = config.get("configurable", {}) diff --git a/deepinsight/core/agent/conference_research/conf_stat_value_mining.py b/deepinsight/core/agent/conference_research/conf_stat_value_mining.py index cf24b00310cd5386d80ac083c0db1bee35660a06..7d5b36ddb12d538aea3b05ea3137596e8481ce01 100644 --- a/deepinsight/core/agent/conference_research/conf_stat_value_mining.py +++ b/deepinsight/core/agent/conference_research/conf_stat_value_mining.py @@ -20,7 +20,6 @@ from deepinsight.core.tools.wordcloud_tool import generate_wordcloud from deepinsight.core.utils.progress_utils import progress_stage from deepinsight.core.utils.research_utils import parse_research_config from deepinsight.core.tools.tavily_search import tavily_search -from deepinsight.core.utils.context_utils import DefaultSummarizationMiddleware from deepinsight.utils.db_schema_utils import get_db_models_source_markdown from integrations.mcps.generate_chart import generate_column_chart, generate_bar_chart, generate_pie_chart @@ -61,7 +60,7 @@ async def get_deep_agents(config: RunnableConfig, prompt_template_name, extent_t tools=tools, system_prompt=system_prompt, backend=mem_file_system_instance, - middleware=[ModelFallbackMiddleware(llm_model, llm_model), DefaultSummarizationMiddleware(model=llm_model)] + middleware=[ModelFallbackMiddleware(llm_model, llm_model)] ) return agent diff --git a/deepinsight/core/agent/conference_research/supervisor.py b/deepinsight/core/agent/conference_research/supervisor.py index 2961d29b6cab55f93580997ef8b5d2398687bf9c..67fddfc515ce69bf9802bedc0e03552e0f1326ca 100644 --- a/deepinsight/core/agent/conference_research/supervisor.py +++ b/deepinsight/core/agent/conference_research/supervisor.py @@ -15,6 +15,7 @@ from langgraph.config import get_stream_writer from langgraph.constants import END from langgraph.graph import StateGraph, add_messages from langgraph.types import Command, interrupt +from deepinsight.core.utils.progress_utils import progress_stage from deepinsight.core.tools.best_paper_analysis import batch_analyze_papers from deepinsight.core.tools.paper_statistic import ( @@ -27,7 +28,7 @@ from deepinsight.core.tools.paper_statistic import ( ) from deepinsight.core.utils.mcp_utils import MCPClientUtils from deepinsight.core.utils.research_utils import parse_research_config -from deepinsight.core.types.graph_config import ResearchConfig +from deepinsight.core.types.graph_config import ResearchConfig, SearchAPI from deepinsight.core.types.research import FinalResult from deepinsight.core.agent.deep_research.supervisor import graph as deep_research_graph @@ -152,10 +153,7 @@ async def wait_user_clarify_node(state: ConferenceState): async def construct_sub_config(config, prompt_group: ConferenceGraphNodeType): - parent_configurable = config.get("configurable", {}) tools = [] - if parent_configurable.get("tools"): - tools.extend(parent_configurable["tools"]) if prompt_group == ConferenceGraphNodeType.CONFERENCE_BEST_PAPER: tools.append(batch_analyze_papers) elif prompt_group == ConferenceGraphNodeType.CONFERENCE_SUBMISSION: @@ -182,9 +180,10 @@ async def construct_sub_config(config, prompt_group: ConferenceGraphNodeType): "allow_edit_report_outline": False, "allow_publish_result": False, "tools": tools, + "search_api": [SearchAPI.TAVILY], } - +@progress_stage("会议概览信息收集") async def conference_overview_node(state: ConferenceState, config: RunnableConfig): result = await deep_research_graph.with_config( configurable=await construct_sub_config(config, ConferenceGraphNodeType.CONFERENCE_OVERVIEW) @@ -195,7 +194,7 @@ async def conference_overview_node(state: ConferenceState, config: RunnableConfi "conference_overview": result["final_report"] } - +@progress_stage("会议统计分析") async def conference_submission_node(state: ConferenceState, config: RunnableConfig): result = await conf_stat_graph.with_config( configurable=await construct_sub_config(config, ConferenceGraphNodeType.CONFERENCE_SUBMISSION) @@ -206,7 +205,7 @@ async def conference_submission_node(state: ConferenceState, config: RunnableCon "conference_submission": result["static_summary"] } - +@progress_stage("会议keynotes分析") async def conference_keynotes_node(state: ConferenceState, config: RunnableConfig): result = await deep_research_graph.with_config( configurable=await construct_sub_config(config, ConferenceGraphNodeType.CONFERENCE_KEYNOTE) @@ -217,7 +216,7 @@ async def conference_keynotes_node(state: ConferenceState, config: RunnableConfi "conference_keynotes": result["final_report"] } - +@progress_stage("会议主题分析") async def conference_topic_node(state: ConferenceState, config: RunnableConfig): result = await deep_research_graph.with_config( configurable=await construct_sub_config(config, ConferenceGraphNodeType.CONFERENCE_TOPIC) @@ -228,7 +227,7 @@ async def conference_topic_node(state: ConferenceState, config: RunnableConfig): "conference_topic": result["final_report"] } - +@progress_stage("会议最佳论文分析") async def conference_best_paper_node(state: ConferenceState, config: RunnableConfig): result = await deep_research_graph.with_config( configurable=await construct_sub_config(config, ConferenceGraphNodeType.CONFERENCE_BEST_PAPER) @@ -244,7 +243,7 @@ async def conference_best_paper_node(state: ConferenceState, config: RunnableCon "conference_best_papers": paper_file_content, } - +@progress_stage("洞察总结") async def insight_summary_node(state: ConferenceState, config: RunnableConfig): rc = parse_research_config(config) model = rc.get_model() diff --git a/deepinsight/core/agent/deep_research/researcher.py b/deepinsight/core/agent/deep_research/researcher.py index f743e4d995505309b335c284ddcdb57964f6c373..392617394f024be67cff5ec9d17f73b6870d6aff 100644 --- a/deepinsight/core/agent/deep_research/researcher.py +++ b/deepinsight/core/agent/deep_research/researcher.py @@ -11,6 +11,7 @@ from langgraph.constants import START, END from langgraph.config import get_stream_writer from langgraph.graph import StateGraph from langgraph.types import Command +from deepinsight.core.utils.progress_utils import progress_stage from deepinsight.core.types.research import ( ErrorResult, @@ -105,7 +106,7 @@ async def execute_tool_safely(tool, args, config, name): writer(error_response) return f"Error executing tool: {str(e)}" - +@progress_stage("规划主题研究") async def topic_researcher(state: ResearcherState, config: RunnableConfig) -> Command[Literal["researcher_tools"]]: """Individual researcher that conducts focused research on specific topics. @@ -173,7 +174,7 @@ async def topic_researcher(state: ResearcherState, config: RunnableConfig) -> Co } ) - +@progress_stage("执行主题研究") async def topic_tools_exec(state: ResearcherState, config: RunnableConfig) -> Command[ Literal["researcher", "compress_research"]]: """Execute tools called by the researcher, including search tools and strategic thinking. @@ -304,7 +305,7 @@ async def topic_tools_exec(state: ResearcherState, config: RunnableConfig) -> Co logging.error(f"Exception traceback: {traceback.format_exc()}") raise - +@progress_stage("搜索结果压缩") async def topic_results_compress(state: ResearcherState, config: RunnableConfig): """Compress and synthesize research findings into a concise, structured summary. diff --git a/deepinsight/core/agent/deep_research/supervisor.py b/deepinsight/core/agent/deep_research/supervisor.py index 1359df8a885f92e23b90055ce2a5158482322039..6e79e23ac9af9b74d83f628ebc8dd43ffecf134f 100644 --- a/deepinsight/core/agent/deep_research/supervisor.py +++ b/deepinsight/core/agent/deep_research/supervisor.py @@ -39,6 +39,7 @@ from deepinsight.core.utils.llm_token_utils import ( is_token_limit_exceeded, get_model_token_limit, ) +from deepinsight.core.utils.progress_utils import progress_stage class ConductResearch(BaseModel): @@ -150,7 +151,7 @@ async def wait_user_clarification(state: AgentState): "messages": [HumanMessage(content=user_reply)] } - +@progress_stage("制定研究概要") async def write_research_brief(state: AgentState, config: RunnableConfig): """Transform user messages into a structured research brief and initialize supervisor.""" # Step 1: Set up the research model (without structured output) @@ -230,7 +231,7 @@ async def wait_user_confirm_research_brief(state: AgentState, config: RunnableCo } } - +@progress_stage("生成研究大纲") async def generate_report_outline(state: AgentState, config: RunnableConfig): # Step 1: Extract research findings and prepare state cleanup notes = state.get("notes", []) @@ -512,7 +513,7 @@ async def publish_result(state: AgentState, config: RunnableConfig): )) return state - +@progress_stage("生成研究主题指令") async def supervisor(state: SupervisorState, config: RunnableConfig) -> Command[Literal["supervisor_tools"]]: """Lead research supervisor that plans research strategy and delegates to researchers. @@ -555,7 +556,7 @@ async def supervisor(state: SupervisorState, config: RunnableConfig) -> Command[ } ) - +@progress_stage("执行研究主题") async def supervisor_tools(state: SupervisorState, config: RunnableConfig) -> Command[Literal["supervisor", "__end__"]]: """Execute tools called by the supervisor, including research delegation and strategic thinking. diff --git a/deepinsight/core/tools/best_paper_analysis.py b/deepinsight/core/tools/best_paper_analysis.py index 92329a5ed627f0b8d72ef6879380d6d6ad87c839..bab0b0f77a0c91e05dd7a06a96a0b583de6fab2d 100644 --- a/deepinsight/core/tools/best_paper_analysis.py +++ b/deepinsight/core/tools/best_paper_analysis.py @@ -11,11 +11,9 @@ from langchain_tavily import TavilySearch from deepinsight.core.tools.file_system import register_fs_tools, MemoryMCPFilesystem from deepinsight.core.utils.tool_utils import create_retrieval_tool, CoerceToolOutput -from deepinsight.core.utils.context_utils import SummarizationMiddleware from deepinsight.core.types.graph_config import RetrievalType from deepinsight.core.utils.research_utils import parse_research_config from deepinsight.utils.db_schema_utils import get_db_models_source_markdown -from deepinsight.core.utils.context_utils import DefaultSummarizationMiddleware # ----------------- 单篇论文解析函数 ----------------- @@ -83,7 +81,6 @@ async def analyze_single_paper(paper_info: str, output_dir: str, config: Runnabl } middleware = [ CoerceToolOutput(), - SummarizationMiddleware(model=rc.default_model), ModelFallbackMiddleware( rc.default_model, # Try first on error rc.default_model, # Then this diff --git a/deepinsight/core/tools/keynote_analysis.py b/deepinsight/core/tools/keynote_analysis.py index 484439403dba5d4e11fb95d7d10c193be6ffc297..24df1847fefbd88dd3b2bd204b2f377733dd98d8 100644 --- a/deepinsight/core/tools/keynote_analysis.py +++ b/deepinsight/core/tools/keynote_analysis.py @@ -13,7 +13,6 @@ from langchain_tavily import TavilySearch from deepinsight.core.utils.research_utils import parse_research_config from deepinsight.core.utils.tool_utils import CoerceToolOutput -from deepinsight.core.utils.context_utils import SummarizationMiddleware from deepinsight.core.tools.file_system import register_fs_tools, fs_instance @@ -100,7 +99,6 @@ async def analyze_single_keynote(keynote_info: str, output_dir: str, config: Run from deepagents import create_deep_agent middleware = [ CoerceToolOutput(), - SummarizationMiddleware(model=rc.default_model), ModelFallbackMiddleware( rc.default_model, rc.default_model, diff --git a/deepinsight/core/tools/tavily_search.py b/deepinsight/core/tools/tavily_search.py index 6af315af250033c8df4e22564205f5d08a2fc4aa..5576427b3d6a5ee94ca25669ece2e4e73cd23fa2 100644 --- a/deepinsight/core/tools/tavily_search.py +++ b/deepinsight/core/tools/tavily_search.py @@ -15,6 +15,7 @@ from langchain_core.messages import HumanMessage from deepinsight.core.types.graph_config import ResearchConfig from deepinsight.core.utils.research_utils import parse_research_config from deepinsight.core.types.research import ( + ErrorResult, WebSearchResult, ToolType, ToolUnifiedResponse, @@ -160,6 +161,16 @@ async def tavily_search( except Exception as e: error_message = f"Tavily search failed with error: {type(e).__name__}: {e}" logging.error(error_message) + writer = get_stream_writer() + writer(ToolUnifiedResponse( + parent_message_id=config.get("metadata", {}).get("parent_message_id", None), + type=ToolType.web_search, + name="tavily_search", + args={"queries": queries}, + result=ErrorResult( + error=error_message + ) + )) return error_message # Step 2: Deduplicate results by URL to avoid processing the same content multiple times diff --git a/deepinsight/core/utils/tool_utils.py b/deepinsight/core/utils/tool_utils.py index 4c49ed101f4098f2987bfcde2b8b4901ad1994cd..7a4f19839da237bf34bd4d25b860f518014e8c9d 100644 --- a/deepinsight/core/utils/tool_utils.py +++ b/deepinsight/core/utils/tool_utils.py @@ -1,6 +1,6 @@ import logging import json -from typing import Callable +from typing import Callable, Awaitable from google.ai.generativelanguage_v1beta.types import Tool as GenAITool @@ -249,4 +249,50 @@ class CoerceToolOutput(AgentMiddleware): # 如果无法序列化,则转为通用的字符串表示 result.content = str(result.content) - return result \ No newline at end of file + return result + + async def awrap_tool_call( + self, + request: ToolCallRequest, + handler: Callable[[ToolCallRequest], Awaitable[ToolMessage]], + ) -> ToolMessage: + # 1. 先执行实际的工具调用,获取结果 + result = await handler(request) + + # 2. 确保 tool_call['args']['messages'] 每项的 content 都是字符串 + if isinstance(request.tool_call.get("args", {}).get("messages"), list): + for message in request.tool_call["args"]["messages"]: + if not isinstance(message.get("content"), str): + # 添加更细致的类型转换,确保每个 message 的内容都被处理为字符串 + if isinstance(message["content"], (dict, list)): + message["content"] = json.dumps(message["content"], ensure_ascii=False) + else: + message["content"] = str(message["content"]) + + # 3. 处理 ToolMessage 中的消息 + if isinstance(result, ToolMessage): + if isinstance(result.content, dict): + # 提取消息列表,如果 content 是字典 + messages = result.content.get("messages", []) + else: + messages = [] + + # 遍历 messages,确保每个 message 的 content 都是字符串 + for message in messages: + if not isinstance(message.content, str): + if isinstance(message.content, (dict, list)): + message.content = json.dumps(message.content, ensure_ascii=False) + else: + message.content = str(message.content) + + # 4. 最终处理 ToolMessage 的顶层 content + if not isinstance(result.content, str): + try: + # 尝试将整个内容对象序列化为 JSON 字符串 + result.content = json.dumps(result.content, ensure_ascii=False) + except TypeError: + # 如果无法序列化,则转为通用的字符串表示 + result.content = str(result.content) + + return result + diff --git a/deepinsight/service/research/research.py b/deepinsight/service/research/research.py index f3986428df0fed10b3ab776bba24c0d126ef1ba1..1843809f2330244572ed97a5da927973ee145ab8 100644 --- a/deepinsight/service/research/research.py +++ b/deepinsight/service/research/research.py @@ -99,9 +99,11 @@ class ResearchService: # Additional filters for conference scenes (only new ones not in config.yaml) conference_additional_filters = { "researcher_tools": True, + "researcher": True, "publish_result": True, + "generate_report": True, + "generate_report_outline": True, "tools": True, - "researcher": True, "model": True, "agent": True, } diff --git a/deepinsight/service/schemas/streaming.py b/deepinsight/service/schemas/streaming.py index 310b85594c6dfd820be0e5d9c91c2f147ca536e9..ef69b649ac25df2b851a051087e3f8d1aa70649b 100644 --- a/deepinsight/service/schemas/streaming.py +++ b/deepinsight/service/schemas/streaming.py @@ -34,6 +34,9 @@ class EventType(str, Enum): thinking_step_topic = "thinking_step_topic" thinking_step_report_generating = "thinking_step_report_generating" + # progress + progress = "progress" + class MessageToolCallContent(BaseModel): index: Optional[int] = Field(None, description="Tool call index") diff --git a/deepinsight/service/streaming/stream_adapter.py b/deepinsight/service/streaming/stream_adapter.py index 93a4acca1322bae1e5e591ec89fc1ff70b7ddefd..636cf954449b31f6df1f849d53cdfd659f5fe702 100644 --- a/deepinsight/service/streaming/stream_adapter.py +++ b/deepinsight/service/streaming/stream_adapter.py @@ -11,6 +11,7 @@ from langchain_core.runnables import RunnableConfig from langchain_core.messages import HumanMessage, AIMessage, AIMessageChunk, ToolMessage, ToolMessageChunk from langgraph.types import StateSnapshot, Interrupt, Command from langgraph.graph.state import CompiledStateGraph +from deepinsight.core.utils.progress_utils import ProgressEvent from deepinsight.service.schemas.streaming import ( EventType, @@ -260,6 +261,18 @@ class StreamEventAdapter: ) ], ) + elif isinstance(message_chunk, ProgressEvent): + yield StreamEvent( + event=EventType.progress, + run_id=run_id, + conversation_id=conversation_id, + messages=[ + ResponseMessage( + content=ResponseMessageContent(text=message_chunk.description), + content_type=ResponseMessageContentType.plain_text, + ) + ] + ) elif mode == "updates": if isinstance(data, dict) and "__interrupt__" in data: