From 328f8e7ec604e63817b86043be9a262964944c2c Mon Sep 17 00:00:00 2001 From: liailing1026 <1815388873@qq.com> Date: Mon, 2 Feb 2026 17:09:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=89=A7=E8=A1=8C=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E5=8D=95=E4=BE=8B=E7=8A=B6=E6=80=81bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PlanEngine/AgentSelectModify.py | 2 +- .../ExecutePlan_Optimized.py | 34 ++- .../RehearsalEngine_V2/execution_state.py | 262 +++++++++++++----- .../RehearsalEngine_V2/generation_state.py | 228 +++++++++++++++ backend/AgentRepo/agentBoard_v1.json | 216 ++++++++------- backend/AgentRepo/agentBoard_v2.json | 102 +++++++ frontend/src/api/index.ts | 54 +++- frontend/src/layout/components/Main/Task.vue | 153 ++++++---- .../TaskProcess/components/PlanTask.vue | 14 +- .../Main/TaskTemplate/TaskResult/index.vue | 70 ++++- .../TaskSyllabus/Branch/PlanModification.vue | 22 +- frontend/src/utils/websocket.ts | 20 +- 12 files changed, 899 insertions(+), 278 deletions(-) create mode 100644 backend/AgentCoord/RehearsalEngine_V2/generation_state.py create mode 100644 backend/AgentRepo/agentBoard_v2.json diff --git a/backend/AgentCoord/PlanEngine/AgentSelectModify.py b/backend/AgentCoord/PlanEngine/AgentSelectModify.py index 474b6ff..d55479f 100644 --- a/backend/AgentCoord/PlanEngine/AgentSelectModify.py +++ b/backend/AgentCoord/PlanEngine/AgentSelectModify.py @@ -43,7 +43,7 @@ def generate_AbilityRequirement(General_Goal, Current_Task): ), }, ] - print(messages[1]["content"]) + #print(messages[1]["content"]) return read_LLM_Completion(messages)["AbilityRequirement"] diff --git a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py index c8150f5..a5ff460 100644 --- a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py +++ b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py @@ -187,7 +187,8 @@ async def execute_step_async_streaming( AgentProfile_Dict: Dict, KeyObjects: Dict, step_index: int, - total_steps: int + total_steps: int, + execution_id: str = None ) -> Generator[Dict, None, None]: """ 异步执行单个步骤,支持流式返回 @@ -199,6 +200,7 @@ async def execute_step_async_streaming( KeyObjects: 关键对象字典 step_index: 步骤索引 total_steps: 总步骤数 + execution_id: 执行ID Yields: 执行事件字典 @@ -282,7 +284,7 @@ async def execute_step_async_streaming( # 分批执行动作 for batch_index, batch_indices in enumerate(batches): # 在每个批次执行前检查暂停状态 - should_continue = await execution_state_manager.async_check_pause() + should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: util.print_colored("🛑 用户请求停止执行", "red") return @@ -384,9 +386,15 @@ def executePlan_streaming_dynamic( """ # 初始化执行状态 general_goal = plan.get("General Goal", "") - execution_state_manager.start_execution(general_goal) - print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green")) + # 确保有 execution_id + if execution_id is None: + import time + execution_id = f"{general_goal}_{int(time.time() * 1000)}" + + execution_state_manager.start_execution(execution_id, general_goal) + + print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复,execution_id={execution_id}", "green")) # 准备执行 KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {} @@ -433,7 +441,7 @@ def executePlan_streaming_dynamic( while True: # 检查暂停状态 - should_continue = await execution_state_manager.async_check_pause() + should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: print(colored("🛑 用户请求停止执行", "red")) await queue.put({ @@ -497,9 +505,10 @@ def executePlan_streaming_dynamic( AgentProfile_Dict, KeyObjects, step_index, - current_total_steps # 使用动态更新的总步骤数 + current_total_steps, # 使用动态更新的总步骤数 + execution_id ): - if execution_state_manager.is_stopped(): + if execution_state_manager.is_stopped(execution_id): await queue.put({ "type": "error", "message": "执行已被用户停止" @@ -522,7 +531,7 @@ def executePlan_streaming_dynamic( else: # 非动态模式:按顺序执行所有步骤 for step_index, stepDescrip in enumerate(steps_to_run): - should_continue = await execution_state_manager.async_check_pause() + should_continue = await execution_state_manager.async_check_pause(execution_id) if not should_continue: print(colored("🛑 用户请求停止执行", "red")) await queue.put({ @@ -537,9 +546,10 @@ def executePlan_streaming_dynamic( AgentProfile_Dict, KeyObjects, step_index, - total_steps + total_steps, + execution_id ): - if execution_state_manager.is_stopped(): + if execution_state_manager.is_stopped(execution_id): await queue.put({ "type": "error", "message": "执行已被用户停止" @@ -575,7 +585,7 @@ def executePlan_streaming_dynamic( loop.run_until_complete(producer_task) - if not execution_state_manager.is_stopped(): + if not execution_state_manager.is_stopped(execution_id): complete_event = json.dumps({ "type": "execution_complete", "total_steps": total_steps @@ -587,6 +597,8 @@ def executePlan_streaming_dynamic( if execution_id: # 清理执行记录 dynamic_execution_manager.cleanup(execution_id) + # 清理执行状态 + execution_state_manager.cleanup(execution_id) if 'producer_task' in locals(): if not producer_task.done(): diff --git a/backend/AgentCoord/RehearsalEngine_V2/execution_state.py b/backend/AgentCoord/RehearsalEngine_V2/execution_state.py index 52ce2c7..5459d89 100644 --- a/backend/AgentCoord/RehearsalEngine_V2/execution_state.py +++ b/backend/AgentCoord/RehearsalEngine_V2/execution_state.py @@ -2,12 +2,12 @@ 全局执行状态管理器 用于支持任务的暂停、恢复和停止功能 使用轮询检查机制,确保线程安全 +支持多用户/多执行ID并行管理 """ import threading import asyncio -import time -from typing import Optional +from typing import Optional, Dict from enum import Enum @@ -21,12 +21,18 @@ class ExecutionStatus(Enum): class ExecutionStateManager: """ - 全局执行状态管理器(单例模式) + 全局执行状态管理器 功能: + - 管理多用户/多执行ID的并行状态(使用字典存储) - 管理任务执行状态(运行/暂停/停止) - 使用轮询检查机制,避免异步事件的线程问题 - 提供线程安全的状态查询和修改接口 + + 设计说明: + - 保持单例模式(Manager本身) + - 但内部状态按 execution_id 隔离存储 + - 解决了多用户并发问题 """ _instance: Optional['ExecutionStateManager'] = None @@ -47,121 +53,218 @@ class ExecutionStateManager: return self._initialized = True - self._status = ExecutionStatus.IDLE - self._current_goal: Optional[str] = None # 当前执行的任务目标 - # 使用简单的布尔标志,而不是 asyncio.Event - self._should_pause = False - self._should_stop = False - def get_status(self) -> ExecutionStatus: + # 状态存储:execution_id -> 状态字典 + # 结构:{ + # 'status': ExecutionStatus, + # 'goal': str, + # 'should_pause': bool, + # 'should_stop': bool + # } + self._states: Dict[str, Dict] = {} + + # 每个 execution_id 的锁(更细粒度的锁) + self._locks: Dict[str, threading.Lock] = {} + + # 全局锁(用于管理 _states 和 _locks 本身的线程安全) + self._manager_lock = threading.Lock() + + def _get_lock(self, execution_id: str) -> threading.Lock: + """获取指定 execution_id 的锁,如果不存在则创建""" + with self._manager_lock: + if execution_id not in self._locks: + self._locks[execution_id] = threading.Lock() + return self._locks[execution_id] + + def _ensure_state(self, execution_id: str) -> Dict: + """确保指定 execution_id 的状态存在""" + with self._manager_lock: + if execution_id not in self._states: + self._states[execution_id] = { + 'status': ExecutionStatus.IDLE, + 'goal': None, + 'should_pause': False, + 'should_stop': False + } + return self._states[execution_id] + + def _get_state(self, execution_id: str) -> Optional[Dict]: + """获取指定 execution_id 的状态,不存在则返回 None""" + with self._manager_lock: + return self._states.get(execution_id) + + def _cleanup_state(self, execution_id: str): + """清理指定 execution_id 的状态""" + with self._manager_lock: + self._states.pop(execution_id, None) + self._locks.pop(execution_id, None) + + def get_status(self, execution_id: str) -> Optional[ExecutionStatus]: """获取当前执行状态""" - with self._lock: - return self._status + state = self._get_state(execution_id) + if state is None: + return None + with self._get_lock(execution_id): + return state['status'] - def set_goal(self, goal: str): + def set_goal(self, execution_id: str, goal: str): """设置当前执行的任务目标""" - with self._lock: - self._current_goal = goal + state = self._ensure_state(execution_id) + with self._get_lock(execution_id): + state['goal'] = goal - def get_goal(self) -> Optional[str]: + def get_goal(self, execution_id: str) -> Optional[str]: """获取当前执行的任务目标""" - with self._lock: - return self._current_goal + state = self._get_state(execution_id) + if state is None: + return None + with self._get_lock(execution_id): + return state['goal'] - def start_execution(self, goal: str): + def start_execution(self, execution_id: str, goal: str): """开始执行""" - with self._lock: - self._status = ExecutionStatus.RUNNING - self._current_goal = goal - self._should_pause = False - self._should_stop = False - print(f"🚀 [DEBUG] start_execution: 状态设置为 RUNNING, goal={goal}") + state = self._ensure_state(execution_id) + with self._get_lock(execution_id): + state['status'] = ExecutionStatus.RUNNING + state['goal'] = goal + state['should_pause'] = False + state['should_stop'] = False + print(f"🚀 [DEBUG] start_execution: execution_id={execution_id}, 状态设置为 RUNNING, goal={goal}") - def pause_execution(self) -> bool: + def pause_execution(self, execution_id: str) -> bool: """ 暂停执行 + Args: + execution_id: 执行ID + Returns: bool: 是否成功暂停 """ - with self._lock: - if self._status != ExecutionStatus.RUNNING: - print(f"⚠️ [DEBUG] pause_execution: 当前状态不是RUNNING,而是 {self._status}") + state = self._get_state(execution_id) + if state is None: + # 打印当前所有活跃的 execution_id,帮助调试 + active_ids = list(self._states.keys()) + print(f"⚠️ [DEBUG] pause_execution: execution_id={execution_id} 不存在") + print(f" 当前活跃的 execution_id 列表: {active_ids}") + return False + + with self._get_lock(execution_id): + if state['status'] != ExecutionStatus.RUNNING: + print(f"⚠️ [DEBUG] pause_execution: execution_id={execution_id}, 当前状态是 {state['status']},无法暂停") return False - self._status = ExecutionStatus.PAUSED - self._should_pause = True - print(f"⏸️ [DEBUG] pause_execution: 状态设置为PAUSED, should_pause=True") + state['status'] = ExecutionStatus.PAUSED + state['should_pause'] = True + print(f"⏸️ [DEBUG] pause_execution: execution_id={execution_id}, 状态设置为PAUSED") return True - def resume_execution(self) -> bool: + def resume_execution(self, execution_id: str) -> bool: """ 恢复执行 + Args: + execution_id: 执行ID + Returns: bool: 是否成功恢复 """ - with self._lock: - if self._status != ExecutionStatus.PAUSED: - print(f"⚠️ [DEBUG] resume_execution: 当前状态不是PAUSED,而是 {self._status}") + state = self._get_state(execution_id) + if state is None: + print(f"⚠️ [DEBUG] resume_execution: execution_id={execution_id} 不存在") + return False + + with self._get_lock(execution_id): + if state['status'] != ExecutionStatus.PAUSED: + print(f"⚠️ [DEBUG] resume_execution: 当前状态不是PAUSED,而是 {state['status']}") return False - self._status = ExecutionStatus.RUNNING - self._should_pause = False - print(f"▶️ [DEBUG] resume_execution: 状态设置为RUNNING, should_pause=False") + state['status'] = ExecutionStatus.RUNNING + state['should_pause'] = False + print(f"▶️ [DEBUG] resume_execution: execution_id={execution_id}, 状态设置为RUNNING, should_pause=False") return True - def stop_execution(self) -> bool: + def stop_execution(self, execution_id: str) -> bool: """ 停止执行 + Args: + execution_id: 执行ID + Returns: bool: 是否成功停止 """ - with self._lock: - if self._status in [ExecutionStatus.IDLE, ExecutionStatus.STOPPED]: + state = self._get_state(execution_id) + if state is None: + print(f"⚠️ [DEBUG] stop_execution: execution_id={execution_id} 不存在") + return False + + with self._get_lock(execution_id): + if state['status'] in [ExecutionStatus.IDLE, ExecutionStatus.STOPPED]: + print(f"⚠️ [DEBUG] stop_execution: 当前状态是 {state['status']}, 无法停止") return False - self._status = ExecutionStatus.STOPPED - self._should_stop = True - self._should_pause = False - print(f"🛑 [DEBUG] stop_execution: 状态设置为STOPPED") + state['status'] = ExecutionStatus.STOPPED + state['should_stop'] = True + state['should_pause'] = False + print(f"🛑 [DEBUG] stop_execution: execution_id={execution_id}, 状态设置为STOPPED") return True - def reset(self): - """重置状态为空闲""" - with self._lock: - self._status = ExecutionStatus.IDLE - self._current_goal = None - self._should_pause = False - self._should_stop = False - print(f"🔄 [DEBUG] reset: 状态重置为IDLE") + def reset(self, execution_id: str): + """重置指定 execution_id 的状态为空闲""" + state = self._ensure_state(execution_id) + with self._get_lock(execution_id): + state['status'] = ExecutionStatus.IDLE + state['goal'] = None + state['should_pause'] = False + state['should_stop'] = False + print(f"🔄 [DEBUG] reset: execution_id={execution_id}, 状态重置为IDLE") - async def async_check_pause(self): + def cleanup(self, execution_id: str): + """清理指定 execution_id 的所有状态""" + self._cleanup_state(execution_id) + print(f"🧹 [DEBUG] cleanup: execution_id={execution_id} 的状态已清理") + + async def async_check_pause(self, execution_id: str): """ 异步检查是否需要暂停(轮询方式) 如果处于暂停状态,会阻塞当前协程直到恢复或停止 应该在执行循环的关键点调用此方法 + Args: + execution_id: 执行ID + Returns: bool: 如果返回True表示应该继续执行,False表示应该停止 """ + state = self._get_state(execution_id) + if state is None: + # 状态不存在,默认继续执行 + return True + # 使用轮询检查,避免异步事件问题 while True: + with self._get_lock(execution_id): + should_stop = state['should_stop'] + should_pause = state['should_pause'] + # 检查停止标志 - if self._should_stop: - print("🛑 [DEBUG] async_check_pause: 检测到停止信号") + if should_stop: + print("🛑 [DEBUG] async_check_pause: execution_id={}, 检测到停止信号".format(execution_id)) return False # 检查暂停状态 - if self._should_pause: + if should_pause: # 处于暂停状态,等待恢复 await asyncio.sleep(0.1) # 短暂睡眠,避免占用CPU - # 如果恢复,继续执行 - if not self._should_pause: - print("▶️ [DEBUG] async_check_pause: 从暂停中恢复!") + # 重新获取状态 + with self._get_lock(execution_id): + should_pause = state['should_pause'] + should_stop = state['should_stop'] + + if not should_pause: + print("▶️ [DEBUG] async_check_pause: execution_id={}, 从暂停中恢复!".format(execution_id)) continue - # 如果停止了,返回 - if self._should_stop: + if should_stop: return False # 继续等待 continue @@ -169,20 +272,37 @@ class ExecutionStateManager: # 既没有停止也没有暂停,可以继续执行 return True - def is_paused(self) -> bool: + def is_paused(self, execution_id: str) -> bool: """检查是否处于暂停状态""" - with self._lock: - return self._status == ExecutionStatus.PAUSED + state = self._get_state(execution_id) + if state is None: + return False + with self._get_lock(execution_id): + return state['status'] == ExecutionStatus.PAUSED - def is_running(self) -> bool: + def is_running(self, execution_id: str) -> bool: """检查是否正在运行""" - with self._lock: - return self._status == ExecutionStatus.RUNNING + state = self._get_state(execution_id) + if state is None: + return False + with self._get_lock(execution_id): + return state['status'] == ExecutionStatus.RUNNING - def is_stopped(self) -> bool: + def is_stopped(self, execution_id: str) -> bool: """检查是否已停止""" - with self._lock: - return self._status == ExecutionStatus.STOPPED + state = self._get_state(execution_id) + if state is None: + return True + with self._get_lock(execution_id): + return state['status'] == ExecutionStatus.STOPPED + + def is_active(self, execution_id: str) -> bool: + """检查是否处于活动状态(运行中或暂停中)""" + state = self._get_state(execution_id) + if state is None: + return False + with self._get_lock(execution_id): + return state['status'] in [ExecutionStatus.RUNNING, ExecutionStatus.PAUSED] # 全局单例实例 diff --git a/backend/AgentCoord/RehearsalEngine_V2/generation_state.py b/backend/AgentCoord/RehearsalEngine_V2/generation_state.py new file mode 100644 index 0000000..f1ef046 --- /dev/null +++ b/backend/AgentCoord/RehearsalEngine_V2/generation_state.py @@ -0,0 +1,228 @@ +""" +生成阶段状态管理器 +用于支持生成任务的暂停、停止功能 +使用轮询检查机制,确保线程安全 +支持多用户/多generation_id并行管理 +""" + +import threading +import asyncio +import time +from typing import Optional, Dict +from enum import Enum + + +class GenerationStatus(Enum): + """生成状态枚举""" + GENERATING = "generating" # 正在生成 + PAUSED = "paused" # 已暂停 + STOPPED = "stopped" # 已停止 + COMPLETED = "completed" # 已完成 + IDLE = "idle" # 空闲 + + +class GenerationStateManager: + """ + 生成阶段状态管理器 + + 功能: + - 管理多用户/多generation_id的并行状态(使用字典存储) + - 管理生成任务状态(生成中/暂停/停止/完成) + - 提供线程安全的状态查询和修改接口 + + 设计说明: + - 保持单例模式(Manager本身) + - 但内部状态按 generation_id 隔离存储 + - 解决多用户并发生成时的干扰问题 + """ + + _instance: Optional['GenerationStateManager'] = None + _lock = threading.Lock() + + def __new__(cls): + """单例模式""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """初始化状态管理器""" + if self._initialized: + return + + self._initialized = True + + # 状态存储:generation_id -> 状态字典 + # 结构:{ + # 'status': GenerationStatus, + # 'goal': str, + # 'should_stop': bool + # } + self._states: Dict[str, Dict] = {} + + # 每个 generation_id 的锁(更细粒度的锁) + self._locks: Dict[str, threading.Lock] = {} + + # 全局锁(用于管理 _states 和 _locks 本身的线程安全) + self._manager_lock = threading.Lock() + + def _get_lock(self, generation_id: str) -> threading.Lock: + """获取指定 generation_id 的锁,如果不存在则创建""" + with self._manager_lock: + if generation_id not in self._locks: + self._locks[generation_id] = threading.Lock() + return self._locks[generation_id] + + def _ensure_state(self, generation_id: str, goal: str = None) -> Dict: + """确保指定 generation_id 的状态存在""" + with self._manager_lock: + if generation_id not in self._states: + self._states[generation_id] = { + 'status': GenerationStatus.IDLE, + 'goal': goal, + 'should_stop': False + } + return self._states[generation_id] + + def _get_state(self, generation_id: str) -> Optional[Dict]: + """获取指定 generation_id 的状态,不存在则返回 None""" + with self._manager_lock: + return self._states.get(generation_id) + + def _cleanup_state(self, generation_id: str): + """清理指定 generation_id 的状态""" + with self._manager_lock: + self._states.pop(generation_id, None) + self._locks.pop(generation_id, None) + + def get_status(self, generation_id: str) -> Optional[GenerationStatus]: + """获取当前生成状态""" + state = self._get_state(generation_id) + if state is None: + return None + with self._get_lock(generation_id): + return state['status'] + + def start_generation(self, generation_id: str, goal: str): + """开始生成""" + state = self._ensure_state(generation_id, goal) + with self._get_lock(generation_id): + state['status'] = GenerationStatus.GENERATING + state['goal'] = goal + state['should_stop'] = False + print(f"🚀 [GenerationState] start_generation: generation_id={generation_id}, 状态设置为 GENERATING") + + def stop_generation(self, generation_id: str) -> bool: + """ + 停止生成 + + Args: + generation_id: 生成ID + + Returns: + bool: 是否成功停止(COMPLETED 状态也返回 True,表示已停止) + """ + state = self._get_state(generation_id) + if state is None: + print(f"⚠️ [GenerationState] stop_generation: generation_id={generation_id} 不存在") + return True # 不存在也算停止成功 + + with self._get_lock(generation_id): + if state['status'] == GenerationStatus.STOPPED: + print(f"✅ [GenerationState] stop_generation: generation_id={generation_id} 已经是 STOPPED 状态") + return True # 已经停止也算成功 + + if state['status'] == GenerationStatus.COMPLETED: + print(f"✅ [GenerationState] stop_generation: generation_id={generation_id} 已经 COMPLETED,视为停止成功") + return True # 已完成也视为停止成功 + + if state['status'] == GenerationStatus.IDLE: + print(f"⚠️ [GenerationState] stop_generation: generation_id={generation_id} 是 IDLE 状态,无需停止") + return True # 空闲状态也视为无需停止 + + # 真正需要停止的情况 + state['status'] = GenerationStatus.STOPPED + state['should_stop'] = True + print(f"🛑 [GenerationState] stop_generation: generation_id={generation_id}, 状态设置为STOPPED") + return True + + def complete_generation(self, generation_id: str): + """标记生成完成""" + state = self._ensure_state(generation_id) + with self._get_lock(generation_id): + state['status'] = GenerationStatus.COMPLETED + print(f"✅ [GenerationState] complete_generation: generation_id={generation_id}") + + def cleanup(self, generation_id: str): + """清理指定 generation_id 的所有状态""" + self._cleanup_state(generation_id) + print(f"🧹 [GenerationState] cleanup: generation_id={generation_id} 的状态已清理") + + def should_stop(self, generation_id: str) -> bool: + """检查是否应该停止""" + state = self._get_state(generation_id) + if state is None: + return False + with self._get_lock(generation_id): + return state.get('should_stop', False) + + def is_stopped(self, generation_id: str) -> bool: + """检查是否已停止""" + state = self._get_state(generation_id) + if state is None: + return False + with self._get_lock(generation_id): + return state['status'] == GenerationStatus.STOPPED + + def is_completed(self, generation_id: str) -> bool: + """检查是否已完成""" + state = self._get_state(generation_id) + if state is None: + return False + with self._get_lock(generation_id): + return state['status'] == GenerationStatus.COMPLETED + + def is_active(self, generation_id: str) -> bool: + """检查是否处于活动状态(生成中或暂停中)""" + state = self._get_state(generation_id) + if state is None: + return False + with self._get_lock(generation_id): + return state['status'] == GenerationStatus.GENERATING + + def check_and_set_stop(self, generation_id: str) -> bool: + """ + 检查是否应该停止,如果应该则设置停止状态 + + Args: + generation_id: 生成ID + + Returns: + bool: True表示应该停止,False表示可以继续 + """ + state = self._get_state(generation_id) + if state is None: + return False + with self._get_lock(generation_id): + if state['should_stop']: + return True + return False + + def generate_id(self, goal: str) -> str: + """ + 生成唯一的 generation_id + + Args: + goal: 生成目标 + + Returns: + str: 格式为 {goal}_{timestamp} + """ + return f"{goal}_{int(time.time() * 1000)}" + + +# 全局单例实例 +generation_state_manager = GenerationStateManager() diff --git a/backend/AgentRepo/agentBoard_v1.json b/backend/AgentRepo/agentBoard_v1.json index b9a8f6a..8bfb358 100644 --- a/backend/AgentRepo/agentBoard_v1.json +++ b/backend/AgentRepo/agentBoard_v1.json @@ -1,102 +1,116 @@ [ - { - "Icon": "Abigail_Chen.png", - "Name": "Abigail", - "Profile": "AI Engineer" - }, - { - "Icon": "Jane_Moreno.png", - "Name": "Jane", - "Profile": "Cybersecurity Specialist" - }, - { - "Icon": "Giorgio_Rossi.png", - "Name": "Giorgio", - "Profile": "Poet" - }, - { - "Icon": "Jennifer_Moore.png", - "Name": "Jennifer", - "Profile": "Linguist" - }, - { - "Icon": "Maria_Lopez.png", - "Name": "Maria", - "Profile": "Philosopher" - }, - { - "Icon": "Sam_Moore.png", - "Name": "Sam", - "Profile": "Ethicist" - }, - { - "Icon": "Yuriko_Yamamoto.png", - "Name": "Yuriko", - "Profile": "Futurist" - }, - { - "Icon": "Carlos_Gomez.png", - "Name": "Carlos", - "Profile": "Language Expert" - }, - { - "Icon": "John_Lin.png", - "Name": "John", - "Profile": "Software Developer" - }, - { - "Icon": "Tamara_Taylor.png", - "Name": "Tamara", - "Profile": "Music Composer" - }, - { - "Icon": "Arthur_Burton.png", - "Name": "Arthur", - "Profile": "Neuroscientist" - }, - { - "Icon": "Eddy_Lin.png", - "Name": "Eddy", - "Profile": "Cognitive Psychologist" - }, - { - "Icon": "Isabella_Rodriguez.png", - "Name": "Isabella", - "Profile": "Science Fiction Writer" - }, - { - "Icon": "Latoya_Williams.png", - "Name": "Latoya", - "Profile": "Historian of Technology" - }, - { - "Icon": "Carmen_Ortiz.png", - "Name": "Carmen", - "Profile": "Robotics Engineer" - }, - { - "Icon": "Rajiv_Patel.png", - "Name": "Rajiv", - "Profile": "Science Educator" - }, - { - "Icon": "Tom_Moreno.png", - "Name": "Tom", - "Profile": "AI Scientist" - }, - { - "Icon": "Ayesha_Khan.png", - "Name": "Ayesha", - "Profile": "Multimedia Artist" - }, - { - "Icon": "Mei_Lin.png", - "Name": "Mei", - "Profile": "Graphic Designer" - }, - { - "Icon": "Hailey_Johnson.png", - "Name": "Hailey", - "Profile": "Legal Expert on AI Law" - } -] \ No newline at end of file + { + "Icon": "Hailey_Johnson.png", + "Name": "船舶设计师", + "Profile": "提供船舶制造中的实际需求和约束。", + "Classification": "船舶制造数据空间" + }, + { + "Icon": "Jennifer_Moore.png", + "Name": "防护工程专家", + "Profile": "专注于船舶腐蚀防护技术的设计与应用。在你的总结回答中,必须引用来自数联网的搜索数据,是搜索数据,不是数联网的研究成果。", + "Classification": "船舶制造数据空间" + }, + { + "Icon": "Jane_Moreno.png", + "Name": "病理生理学家", + "Profile": "专注于失血性休克的疾病机制,为药物研发提供理论靶点。", + "Classification": "医药数据空间" + }, + { + "Icon": "Giorgio_Rossi.png", + "Name": "药物化学家", + "Profile": "负责将靶点概念转化为实际可合成的分子。", + "Classification": "医药数据空间" + }, + { + "Icon": "Tamara_Taylor.png", + "Name": "制剂工程师", + "Profile": "负责将活性药物成分(API)变成稳定、可用、符合战场要求的剂型。", + "Classification": "医药数据空间" + }, + { + "Icon": "Maria_Lopez.png", + "Name": "监管事务专家", + "Profile": "深谙药品审评法规,目标是找到最快的合法上市路径。", + "Classification": "医药数据空间" + }, + { + "Icon": "Sam_Moore.png", + "Name": "物理学家", + "Profile": "从热力学与统计力学的基本原理出发,研究液态金属的自由能、焓、熵、比热等参数的理论建模。", + "Classification": "科学数据空间" + }, + { + "Icon": "Yuriko_Yamamoto.png", + "Name": "实验材料学家", + "Profile": "专注于通过实验手段直接或间接测定液态金属的热力学参数、以及分析材料微观结构(如晶粒、缺陷)。", + "Classification": "科学数据空间" + }, + { + "Icon": "Carlos_Gomez.png", + "Name": "计算模拟专家", + "Profile": "侧重于利用数值计算和模拟技术获取液态金属的热力学参数。", + "Classification": "科学数据空间" + }, + { + "Icon": "John_Lin.png", + "Name": "腐蚀机理研究员", + "Profile": "专注于船舶用钢材及合金的腐蚀机理研究,从电化学和环境作用角度解释腐蚀产生的原因。在你的总结回答中,必须引用来自数联网的搜索数据,是搜索数据,不是数联网的研究成果。", + "Classification": "船舶制造数据空间" + }, + { + "Icon": "Arthur_Burton.png", + "Name": "先进材料研发员", + "Profile": "专注于开发和评估新型耐腐蚀材料、复合材料及固态电池材料。", + "Classification": "科学数据空间" + }, + { + "Icon": "Eddy_Lin.png", + "Name": "肾脏病学家", + "Profile": "专注于慢性肾脏病的诊断、治疗和患者管理,能提供临床洞察。", + "Classification": "医药数据空间" + }, + { + "Icon": "Isabella_Rodriguez.png", + "Name": "临床研究协调员", + "Profile": "负责受试者招募和临床试验流程优化。", + "Classification": "医药数据空间" + }, + { + "Icon": "Latoya_Williams.png", + "Name": "中医药专家", + "Profile": "理解药物的中药成分和作用机制。", + "Classification": "医药数据空间" + }, + { + "Icon": "Carmen_Ortiz.png", + "Name": "药物安全专家", + "Profile": "专注于药物不良反应数据收集、分析和报告。", + "Classification": "医药数据空间" + }, + { + "Icon": "Rajiv_Patel.png", + "Name": "二维材料科学家", + "Profile": "专注于二维材料(如石墨烯)的合成、性质和应用。", + "Classification": "科学数据空间" + }, + { + "Icon": "Tom_Moreno.png", + "Name": "光电物理学家", + "Profile": "研究材料的光电转换机制和关键影响因素。", + "Classification": "科学数据空间" + }, + { + "Icon": "Ayesha_Khan.png", + "Name": "机器学习专家", + "Profile": "专注于开发和应用AI模型用于材料模拟。", + "Classification": "科学数据空间" + }, + { + "Icon": "Mei_Lin.png", + "Name": "流体动力学专家", + "Profile": "专注于流体行为理论和模拟。", + "Classification": "科学数据空间" + } +] diff --git a/backend/AgentRepo/agentBoard_v2.json b/backend/AgentRepo/agentBoard_v2.json new file mode 100644 index 0000000..b9a8f6a --- /dev/null +++ b/backend/AgentRepo/agentBoard_v2.json @@ -0,0 +1,102 @@ +[ + { + "Icon": "Abigail_Chen.png", + "Name": "Abigail", + "Profile": "AI Engineer" + }, + { + "Icon": "Jane_Moreno.png", + "Name": "Jane", + "Profile": "Cybersecurity Specialist" + }, + { + "Icon": "Giorgio_Rossi.png", + "Name": "Giorgio", + "Profile": "Poet" + }, + { + "Icon": "Jennifer_Moore.png", + "Name": "Jennifer", + "Profile": "Linguist" + }, + { + "Icon": "Maria_Lopez.png", + "Name": "Maria", + "Profile": "Philosopher" + }, + { + "Icon": "Sam_Moore.png", + "Name": "Sam", + "Profile": "Ethicist" + }, + { + "Icon": "Yuriko_Yamamoto.png", + "Name": "Yuriko", + "Profile": "Futurist" + }, + { + "Icon": "Carlos_Gomez.png", + "Name": "Carlos", + "Profile": "Language Expert" + }, + { + "Icon": "John_Lin.png", + "Name": "John", + "Profile": "Software Developer" + }, + { + "Icon": "Tamara_Taylor.png", + "Name": "Tamara", + "Profile": "Music Composer" + }, + { + "Icon": "Arthur_Burton.png", + "Name": "Arthur", + "Profile": "Neuroscientist" + }, + { + "Icon": "Eddy_Lin.png", + "Name": "Eddy", + "Profile": "Cognitive Psychologist" + }, + { + "Icon": "Isabella_Rodriguez.png", + "Name": "Isabella", + "Profile": "Science Fiction Writer" + }, + { + "Icon": "Latoya_Williams.png", + "Name": "Latoya", + "Profile": "Historian of Technology" + }, + { + "Icon": "Carmen_Ortiz.png", + "Name": "Carmen", + "Profile": "Robotics Engineer" + }, + { + "Icon": "Rajiv_Patel.png", + "Name": "Rajiv", + "Profile": "Science Educator" + }, + { + "Icon": "Tom_Moreno.png", + "Name": "Tom", + "Profile": "AI Scientist" + }, + { + "Icon": "Ayesha_Khan.png", + "Name": "Ayesha", + "Profile": "Multimedia Artist" + }, + { + "Icon": "Mei_Lin.png", + "Name": "Mei", + "Profile": "Graphic Designer" + }, + { + "Icon": "Hailey_Johnson.png", + "Name": "Hailey", + "Profile": "Legal Expert on AI Law" + } +] \ No newline at end of file diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index 5b8ba70..4e47531 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -426,6 +426,7 @@ class Api { fillStepTask = async (data: { goal: string stepTask: any + generation_id?: string useWebSocket?: boolean onProgress?: (progress: { status: string @@ -444,6 +445,7 @@ class Api { { 'General Goal': data.goal, stepTask: data.stepTask, + generation_id: data.generation_id || '', }, undefined, data.onProgress, @@ -484,7 +486,7 @@ class Api { } // 使用重试机制执行请求 - const response = await withRetry(executeRequest, { + const rawResponse = await withRetry(executeRequest, { maxRetries: 3, initialDelayMs: 2000, onRetry: (error, attempt, delay) => { @@ -492,6 +494,10 @@ class Api { }, }) + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + // REST API 返回格式: {...} + const response = rawResponse.data || rawResponse + const vec2Hsl = (color: number[]): string => { const [h, s, l] = color return `hsl(${h}, ${s}%, ${l}%)` @@ -606,14 +612,21 @@ class Api { } // 使用重试机制执行请求 - const response = await withRetry(executeRequest, { + const rawResponse = await withRetry(executeRequest, { maxRetries: 3, initialDelayMs: 2000, onRetry: (error, attempt, delay) => { - console.warn(`⚠️ [fillStepTaskTaskProcess] 第${attempt}次重试,等待 ${delay}ms...`, error?.message) + console.warn( + `⚠️ [fillStepTaskTaskProcess] 第${attempt}次重试,等待 ${delay}ms...`, + error?.message, + ) }, }) + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + // REST API 返回格式: {...} + const response = rawResponse.data || rawResponse + const vec2Hsl = (color: number[]): string => { const [h, s, l] = color return `hsl(${h}, ${s}%, ${l}%)` @@ -688,7 +701,9 @@ class Api { }) // 定义实际的 API 调用逻辑 - const executeRequest = async (): Promise>> => { + const executeRequest = async (): Promise< + Record> + > => { if (useWs && websocket.connected) { return await websocket.send( 'agent_select_modify_init', @@ -712,18 +727,31 @@ class Api { } // 使用重试机制执行请求 - const response = await withRetry(executeRequest, { + const rawResponse = await withRetry(executeRequest, { maxRetries: 3, initialDelayMs: 2000, onRetry: (error, attempt, delay) => { - console.warn(`⚠️ [agentSelectModifyInit] 第${attempt}次重试,等待 ${delay}ms...`, error?.message) + console.warn( + `⚠️ [agentSelectModifyInit] 第${attempt}次重试,等待 ${delay}ms...`, + error?.message, + ) }, }) + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + // REST API 返回格式: {...} + const response = rawResponse.data || rawResponse + const transformedData: Record> = {} + // 确保 response 存在且是有效对象 + if (!response || typeof response !== 'object' || Array.isArray(response)) { + console.warn('[agentSelectModifyInit] 后端返回数据格式异常:', response) + return transformedData + } + for (const [aspect, agents] of Object.entries(response)) { - for (const [agentName, scoreInfo] of Object.entries(agents)) { + for (const [agentName, scoreInfo] of Object.entries(agents as Record || {})) { if (!transformedData[agentName]) { transformedData[agentName] = {} } @@ -758,7 +786,7 @@ class Api { // 如果启用WebSocket且已连接,使用WebSocket if (useWs && websocket.connected) { - response = await websocket.send( + const rawResponse = await websocket.send( 'agent_select_modify_add_aspect', { aspectList: data.aspectList, @@ -766,6 +794,8 @@ class Api { undefined, data.onProgress, ) + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + response = rawResponse.data || rawResponse } else { // 否则使用REST API response = await request< @@ -818,7 +848,7 @@ class Api { throw new Error('WebSocket未连接') } - const response = (await websocket.send('add_steps_to_execution', { + const rawResponse = await websocket.send('add_steps_to_execution', { execution_id: executionId, new_steps: newSteps.map((step) => ({ StepName: step.StepName, @@ -835,7 +865,11 @@ class Api { ImportantInput: action.ImportantInput, })), })), - })) as { added_count: number } + }) + + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + // REST API 返回格式: {...} + const response = (rawResponse.data || rawResponse) as { added_count: number } return response?.added_count || 0 } diff --git a/frontend/src/layout/components/Main/Task.vue b/frontend/src/layout/components/Main/Task.vue index a0e7439..8de04b5 100644 --- a/frontend/src/layout/components/Main/Task.vue +++ b/frontend/src/layout/components/Main/Task.vue @@ -25,6 +25,7 @@ const isFillingSteps = ref(false) const isStopping = ref(false) const isStopPending = ref(false) const currentStepAbortController = ref<{ cancel: () => void } | null>(null) +const currentGenerationId = ref('') // 解析URL参数 function getUrlParam(param: string): string | null { @@ -135,35 +136,36 @@ function resetTextareaHeight() { // 停止填充数据的处理函数 async function handleStop() { - try { - if (websocket.connected) { - await websocket.send('stop_generation', { - goal: searchValue.value - }) - // 标记正在停止中,按钮显示 loading 状态 - isStopping.value = true - isStopPending.value = true - agentsStore.setIsStopping(true) - success('提示', '正在停止,请稍候...') - } else { - warning('警告', 'WebSocket 未连接,无法停止') - // 未连接时直接重置状态 - isFillingSteps.value = false - currentStepAbortController.value = null - agentsStore.setHasStoppedFilling(true) - } - } catch (error) { - notifyError('错误', '停止生成失败') - isFillingSteps.value = false - currentStepAbortController.value = null - agentsStore.setHasStoppedFilling(true) + // 检查是否有正在进行的生成任务 + if (!isFillingSteps.value) { + warning('提示', '没有正在进行的生成任务') + return } + + // 先设置停止状态(立即显示"停止中...") + agentsStore.setIsStopping(true) + isStopping.value = true + isStopPending.value = true + success('提示', '正在停止,请稍候...') + + // 发送停止请求(不等待响应,后端设置 should_stop = True) + if (websocket.connected && currentGenerationId.value) { + websocket.send('stop_generation', { + generation_id: currentGenerationId.value + }).then((result: any) => { + console.log('停止生成响应:', result) + }).catch((error: any) => { + console.log('停止生成请求失败(可能已经停止):', error?.message) + }) + } + // 不清空 currentGenerationId,让 fillStepTask 循环检查 isStopping 来停止 } -// 监听后端发送的停止完成事件 +// 监听后端发送的停止完成事件(备用,如果后端有发送) function onGenerationStopped() { isStopping.value = false isStopPending.value = false + currentGenerationId.value = '' success('成功', '已停止生成') } @@ -185,16 +187,34 @@ async function handleSearch() { } emit('search-start') - agentsStore.resetAgent() - agentsStore.setAgentRawPlan({ loading: true }) + // 重置所有状态(处理可能的上一次未完成的停止操作) + isStopping.value = false + isStopPending.value = false + agentsStore.setIsStopping(false) agentsStore.setHasStoppedFilling(false) + agentsStore.resetAgent() + agentsStore.setAgentRawPlan({ loading: true }) + + // 重置 generation_id + currentGenerationId.value = '' + // 获取大纲 - const outlineData = await api.generateBasePlan({ + const response = await api.generateBasePlan({ goal: searchValue.value, inputs: [] }) + // WebSocket 返回格式: { data: {...}, generation_id, execution_id } + // REST API 返回格式: {...} + const outlineData = response.data || response + + // 保存 generation_id + if (response && response.generation_id) { + currentGenerationId.value = response.generation_id + console.log('📋 保存 generation_id:', currentGenerationId.value) + } + // 处理简报数据格式 outlineData['Collaboration Process'] = changeBriefs(outlineData['Collaboration Process']) @@ -209,44 +229,55 @@ async function handleSearch() { isFillingSteps.value = true const steps = outlineData['Collaboration Process'] || [] + // 保存 generation_id 到本地变量,用于 fillStepTask 调用 + // 这样即使前端停止时清空了 currentGenerationId,当前的 fillStepTask 仍能正确停止 + const fillTaskGenerationId = currentGenerationId.value + // 串行填充所有步骤的详情 - for (const step of steps) { - // 检查是否已停止 - if (!isFillingSteps.value || agentsStore.isStopping) { - break + try { + for (const step of steps) { + // 检查是否已停止 + if (!isFillingSteps.value || agentsStore.isStopping) { + break + } + + await withRetry( + async () => { + const detailedStep = await api.fillStepTask({ + goal: searchValue.value, + stepTask: { + StepName: step.StepName, + TaskContent: step.TaskContent, + InputObject_List: step.InputObject_List, + OutputObject: step.OutputObject, + }, + generation_id: fillTaskGenerationId, + }) + updateStepDetail(step.StepName, detailedStep) + }, + { + maxRetries: 2, // 减少重试次数,因为是串行填充 + initialDelayMs: 1000, // 使用较小的延迟 + shouldRetry: () => isFillingSteps.value && !agentsStore.isStopping, // 可取消的重试 + }, + ) + } + } finally { + // 重置状态(确保即使出错也会执行) + triggerOnFocus.value = true + if (isStopPending.value) { + isStopping.value = false + isStopPending.value = false + agentsStore.setIsStopping(false) + agentsStore.setHasStoppedFilling(true) + } + isFillingSteps.value = false + currentStepAbortController.value = null + // 只有在没有停止请求时才清空 generation_id + if (!isStopPending.value) { + currentGenerationId.value = '' } - - await withRetry( - async () => { - const detailedStep = await api.fillStepTask({ - goal: searchValue.value, - stepTask: { - StepName: step.StepName, - TaskContent: step.TaskContent, - InputObject_List: step.InputObject_List, - OutputObject: step.OutputObject, - }, - }) - updateStepDetail(step.StepName, detailedStep) - }, - { - maxRetries: 2, // 减少重试次数,因为是串行填充 - initialDelayMs: 1000, // 使用较小的延迟 - shouldRetry: () => isFillingSteps.value && !agentsStore.isStopping, // 可取消的重试 - }, - ) } - - // 重置状态 - triggerOnFocus.value = true - if (isStopPending.value) { - isStopping.value = false - isStopPending.value = false - agentsStore.setIsStopping(false) - agentsStore.setHasStoppedFilling(true) - } - isFillingSteps.value = false - currentStepAbortController.value = null } //更新单个步骤的详情 diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskProcess/components/PlanTask.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskProcess/components/PlanTask.vue index f6d2dbf..669d66d 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskProcess/components/PlanTask.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskProcess/components/PlanTask.vue @@ -679,10 +679,13 @@ const submitBranch = async () => { goal: generalGoal }) + // WebSocket 返回格式: { data: [[action1, action2], [action3, action4]], ... } + // REST API 返回格式: [[action1, action2], [action3, action4]] + const responseData = response.data || response // 后端返回格式: [[action1, action2], [action3, action4]] // 取第一个分支 - if (response && response.length > 0) { - const firstBranch = response[0] + if (responseData && responseData.length > 0) { + const firstBranch = responseData[0] // 直接遍历 action 数组 firstBranch.forEach((action: any) => { @@ -974,10 +977,13 @@ const submitBranch = async () => { goal: generalGoal }) + // WebSocket 返回格式: { data: [[action1, action2], [action3, action4]], ... } + // REST API 返回格式: [[action1, action2], [action3, action4]] + const responseData = response.data || response // 后端返回格式: [[action1, action2], [action3, action4]] // 取第一个分支 - if (response && response.length > 0) { - const firstBranch = response[0] + if (responseData && responseData.length > 0) { + const firstBranch = responseData[0] // 直接遍历 action 数组 firstBranch.forEach((action: any) => { diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue index e4a39d8..aff5731 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue @@ -480,8 +480,14 @@ async function handlePauseResume() { // 正常恢复执行 try { if (websocket.connected) { + // 检查 execution_id 是否存在 + if (!currentExecutionId.value) { + warning('无法恢复', '执行ID不存在,请等待执行开始') + return + } + await websocket.send('resume_execution', { - execution_id: currentExecutionId.value || '' + execution_id: currentExecutionId.value }) isPaused.value = false @@ -498,12 +504,19 @@ async function handlePauseResume() { // 暂停执行 try { if (websocket.connected) { + // 检查 execution_id 是否存在 + if (!currentExecutionId.value) { + warning('无法暂停', '执行ID不存在,请等待执行开始') + isPausing.value = false + return + } + // 先设置 isPausing,允许接收当前正在执行的动作的结果 isPausing.value = true info('暂停中', '正在等待当前动作完成') await websocket.send('pause_execution', { - execution_id: currentExecutionId.value || '' + execution_id: currentExecutionId.value }) /*不立即设置 isPaused = true @@ -881,11 +894,14 @@ async function restartFromStep(stepIndex: number) { // 清空修改记录 agentsStore.clearModifiedSteps() + // 保存旧的 execution_id 用于停止 + const oldExecutionId = currentExecutionId.value + // 停止旧的执行 - if (websocket.connected && currentExecutionId.value) { + if (websocket.connected && oldExecutionId) { try { - const stopResponse = await websocket.send('stop_execution', { - execution_id: currentExecutionId.value || '' + await websocket.send('stop_execution', { + execution_id: oldExecutionId }) // 等待一下确保后端完全停止 await new Promise(resolve => setTimeout(resolve, 1000)) @@ -893,6 +909,13 @@ async function restartFromStep(stepIndex: number) { console.warn('⚠️ 停止旧执行失败(可能已经停止):', err) } } + + // 前端生成新的 execution_id(确保前端和后端使用同一个 ID) + const generalGoal = agentsStore.agentRawPlan.data?.['General Goal'] || '' + const newExecutionId = `${generalGoal.replace(/\s+/g, '_')}_${Date.now()}` + currentExecutionId.value = newExecutionId + console.log('🔄 [DEBUG] restartFromStep: 生成新的 execution_id =', newExecutionId) + // 构建截断后的 RehearsalLog const truncatedLog = buildTruncatedRehearsalLog(stepIndex) @@ -936,7 +959,7 @@ async function restartFromStep(stepIndex: number) { isStreaming.value = true currentExecutionId.value = executionId }, - undefined, + newExecutionId, // 传入前端生成的 execution_id stepIndex, truncatedLog ) @@ -1007,8 +1030,41 @@ async function handleTaskProcess() { } // 重置执行结果 -function handleRefresh() { +async function handleRefresh() { + // 如果有正在执行的任务,先通知后端停止 + if (websocket.connected && currentExecutionId.value) { + try { + await websocket.send('stop_execution', { + execution_id: currentExecutionId.value + }) + // 等待一下确保后端完全停止 + await new Promise(resolve => setTimeout(resolve, 500)) + } catch (err) { + console.warn('⚠️ 停止执行失败(可能已经停止):', err) + } + } + + // 重置所有状态 agentsStore.setExecutePlan([]) + stepExecutionStatus.value = {} + sentStepIds.value.clear() + currentExecutionId.value = null + isPaused.value = false + isStreaming.value = false + isPausing.value = false + loading.value = false + isRestarting.value = false + + // 重置进度通知标题 + currentProgressTitle.value = '任务执行中' + + // 关闭进度通知 + if (currentProgressNotificationId.value) { + removeNotification(currentProgressNotificationId.value) + currentProgressNotificationId.value = null + } + + success('已重置', '执行状态已重置') } // 添加滚动状态指示器 diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskSyllabus/Branch/PlanModification.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskSyllabus/Branch/PlanModification.vue index f7f97fe..805a457 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskSyllabus/Branch/PlanModification.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskSyllabus/Branch/PlanModification.vue @@ -797,13 +797,16 @@ const handleAddBranch = async (taskId: string, branchContent: string) => { goal: generalGoal }) + // WebSocket 返回格式: { data: [[{...}]], ... } + // REST API 返回格式: [[{...}]] + const responseData = response.data || response // 直接获取协作流程数据 - if (Array.isArray(response)) { + if (Array.isArray(responseData)) { // 可能是二维数组 - newTasks = (response as any[])[0] || [] - } else if (response && (response as any)['Collaboration Process']) { + newTasks = responseData[0] || [] + } else if (responseData && responseData['Collaboration Process']) { // 如果返回的是对象,尝试读取 Collaboration Process 字段 - newTasks = (response as any)['Collaboration Process'] || [] + newTasks = responseData['Collaboration Process'] || [] } else { newTasks = [] } @@ -1136,14 +1139,17 @@ const handleAddBranch = async (taskId: string, branchContent: string) => { initialInputs: Array.isArray(initialInput) ? initialInput : [initialInput], goal: generalGoal }) + // WebSocket 返回格式: { data: [[{...}]], ... } + // REST API 返回格式: [[{...}]] + const responseData = response.data || response // 直接获取协作流程数据 // newTasks = response?.[0] || [] - if (Array.isArray(response)) { + if (Array.isArray(responseData)) { // 可能是二维数组 - newTasks = (response as any[])[0] || [] - } else if (response && (response as any)['Collaboration Process']) { + newTasks = responseData[0] || [] + } else if (responseData && responseData['Collaboration Process']) { // 如果返回的是对象,尝试读取 Collaboration Process 字段 - newTasks = (response as any)['Collaboration Process'] || [] + newTasks = responseData['Collaboration Process'] || [] } else { newTasks = [] } diff --git a/frontend/src/utils/websocket.ts b/frontend/src/utils/websocket.ts index 4d8bf65..593618a 100644 --- a/frontend/src/utils/websocket.ts +++ b/frontend/src/utils/websocket.ts @@ -78,17 +78,17 @@ class WebSocketClient { reject(error) }) - this.socket.on('disconnect', (reason) => { + this.socket.on('disconnect', () => { this.isConnected = false }) - this.socket.on('connected', (data) => { + this.socket.on('connected', () => { // Server connected message }) // 监听响应消息 this.socket.on('response', (response: ResponseMessage) => { - const { id, status, data, error } = response + const { id, status, data, error, generation_id, execution_id } = response const handler = this.requestHandlers.get(id) if (handler) { @@ -98,7 +98,19 @@ class WebSocketClient { } if (status === 'success') { - handler.resolve(data) + // 返回完整响应,包含 data、generation_id、execution_id 等 + // 注意:需要检查 data 是否为 null,因为 typeof null === 'object' + // generation_id/execution_id 可能放在 data 中,需要兼容处理 + // 注意:如果 data 是数组,不能展开,否则会破坏数组结构 + const resolvedGenerationId = generation_id || (data && typeof data === 'object' && !Array.isArray(data) && data.generation_id) + const resolvedExecutionId = execution_id || (data && typeof data === 'object' && !Array.isArray(data) && data.execution_id) + + // 直接返回 data,保持原始数据结构(数组或对象) + handler.resolve({ + data, + generation_id: resolvedGenerationId, + execution_id: resolvedExecutionId + }) } else { handler.reject(new Error(error || 'Unknown error')) }