From ac035d1237856b103f94e5bf4cc652799a3fdc2f Mon Sep 17 00:00:00 2001 From: liailing1026 <1815388873@qq.com> Date: Fri, 23 Jan 2026 15:38:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E4=BB=BB=E5=8A=A1=E5=A4=A7=E7=BA=B2?= =?UTF-8?q?=E5=81=9C=E6=AD=A2=E4=BB=A5=E5=8F=8A=E6=89=A7=E8=A1=8C=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E6=9A=82=E5=81=9C=E7=BB=A7=E7=BB=AD=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/AgentCoord/LLMAPI/LLMAPI.py | 106 +- .../ExecutePlan_Optimized.py | 298 ++++-- .../dynamic_execution_manager.py | 241 +++++ backend/server.py | 145 ++- frontend/src/api/index.ts | 62 +- .../components/Notification/Notification.vue | 260 +++++ frontend/src/composables/useNotification.ts | 154 +++ frontend/src/layout/components/Main/Task.vue | 4 + .../Main/TaskTemplate/TaskResult/index.vue | 951 +++++++++++++----- .../Main/TaskTemplate/TaskSyllabus/index.vue | 100 +- frontend/src/stores/modules/agents.ts | 12 + 11 files changed, 1904 insertions(+), 429 deletions(-) create mode 100644 backend/AgentCoord/RehearsalEngine_V2/dynamic_execution_manager.py create mode 100644 frontend/src/components/Notification/Notification.vue create mode 100644 frontend/src/composables/useNotification.ts diff --git a/backend/AgentCoord/LLMAPI/LLMAPI.py b/backend/AgentCoord/LLMAPI/LLMAPI.py index 96ff3d9..1809749 100644 --- a/backend/AgentCoord/LLMAPI/LLMAPI.py +++ b/backend/AgentCoord/LLMAPI/LLMAPI.py @@ -112,7 +112,16 @@ def _call_with_custom_config(messages: list[dict], stream: bool, model_config: d timeout=180 ) + # 检查响应是否有效 + if not response.choices or len(response.choices) == 0: + raise Exception(f"API returned empty response for model {api_model}") + if not response.choices[0] or not response.choices[0].message: + raise Exception(f"API returned invalid response format for model {api_model}") + full_reply_content = response.choices[0].message.content + if full_reply_content is None: + raise Exception(f"API returned None content for model {api_model}") + print(colored(full_reply_content, "blue", "on_white"), end="") return full_reply_content except Exception as e: @@ -138,15 +147,21 @@ async def _achat_completion_stream_custom(messages:list[dict], temp_async_client async for chunk in response: collected_chunks.append(chunk) choices = chunk.choices - if len(choices) > 0: - chunk_message = chunk.choices[0].delta - collected_messages.append(chunk_message) - if chunk_message.content: - print(colored(chunk_message.content, "blue", "on_white"), end="") + if len(choices) > 0 and choices[0] is not None: + chunk_message = choices[0].delta + if chunk_message is not None: + collected_messages.append(chunk_message) + if chunk_message.content: + print(colored(chunk_message.content, "blue", "on_white"), end="") print() full_reply_content = "".join( [m.content or "" for m in collected_messages if m is not None] ) + + # 检查最终结果是否为空 + if not full_reply_content or full_reply_content.strip() == "": + raise Exception(f"Stream API returned empty content for model {api_model}") + return full_reply_content except httpx.RemoteProtocolError as e: if attempt < max_retries - 1: @@ -184,7 +199,16 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str: else: raise Exception("failed") + # 检查响应是否有效 + if not response.choices or len(response.choices) == 0: + raise Exception("Groq API returned empty response") + if not response.choices[0] or not response.choices[0].message: + raise Exception("Groq API returned invalid response format") + full_reply_content = response.choices[0].message.content + if full_reply_content is None: + raise Exception("Groq API returned None content") + print(colored(full_reply_content, "blue", "on_white"), end="") print() return full_reply_content @@ -217,7 +241,16 @@ async def _achat_completion_stream_mixtral(messages: list[dict]) -> str: else: raise Exception("failed") + # 检查响应是否有效 + if not stream.choices or len(stream.choices) == 0: + raise Exception("Mistral API returned empty response") + if not stream.choices[0] or not stream.choices[0].message: + raise Exception("Mistral API returned invalid response format") + full_reply_content = stream.choices[0].message.content + if full_reply_content is None: + raise Exception("Mistral API returned None content") + print(colored(full_reply_content, "blue", "on_white"), end="") print() return full_reply_content @@ -240,19 +273,25 @@ async def _achat_completion_stream_gpt35(messages: list[dict]) -> str: async for chunk in response: collected_chunks.append(chunk) # save the event response choices = chunk.choices - if len(choices) > 0: - chunk_message = chunk.choices[0].delta - collected_messages.append(chunk_message) # save the message - if chunk_message.content: - print( - colored(chunk_message.content, "blue", "on_white"), - end="", - ) + if len(choices) > 0 and choices[0] is not None: + chunk_message = choices[0].delta + if chunk_message is not None: + collected_messages.append(chunk_message) # save the message + if chunk_message.content: + print( + colored(chunk_message.content, "blue", "on_white"), + end="", + ) print() full_reply_content = "".join( [m.content or "" for m in collected_messages if m is not None] ) + + # 检查最终结果是否为空 + if not full_reply_content or full_reply_content.strip() == "": + raise Exception("Stream API (gpt-3.5) returned empty content") + return full_reply_content @@ -275,7 +314,16 @@ def _achat_completion_json(messages: list[dict] ) -> str: else: raise Exception("failed") + # 检查响应是否有效 + if not response.choices or len(response.choices) == 0: + raise Exception("OpenAI API returned empty response") + if not response.choices[0] or not response.choices[0].message: + raise Exception("OpenAI API returned invalid response format") + full_reply_content = response.choices[0].message.content + if full_reply_content is None: + raise Exception("OpenAI API returned None content") + print(colored(full_reply_content, "blue", "on_white"), end="") print() return full_reply_content @@ -294,19 +342,25 @@ async def _achat_completion_stream(messages: list[dict]) -> str: async for chunk in response: collected_chunks.append(chunk) # save the event response choices = chunk.choices - if len(choices) > 0: - chunk_message = chunk.choices[0].delta - collected_messages.append(chunk_message) # save the message - if chunk_message.content: - print( - colored(chunk_message.content, "blue", "on_white"), - end="", - ) + if len(choices) > 0 and choices[0] is not None: + chunk_message = choices[0].delta + if chunk_message is not None: + collected_messages.append(chunk_message) # save the message + if chunk_message.content: + print( + colored(chunk_message.content, "blue", "on_white"), + end="", + ) print() full_reply_content = "".join( [m.content or "" for m in collected_messages if m is not None] ) + + # 检查最终结果是否为空 + if not full_reply_content or full_reply_content.strip() == "": + raise Exception("Stream API returned empty content") + return full_reply_content except Exception as e: print_colored(f"OpenAI API error in _achat_completion_stream: {str(e)}", "red") @@ -316,7 +370,17 @@ async def _achat_completion_stream(messages: list[dict]) -> str: def _chat_completion(messages: list[dict]) -> str: try: rsp = client.chat.completions.create(**_cons_kwargs(messages)) + + # 检查响应是否有效 + if not rsp.choices or len(rsp.choices) == 0: + raise Exception("OpenAI API returned empty response") + if not rsp.choices[0] or not rsp.choices[0].message: + raise Exception("OpenAI API returned invalid response format") + content = rsp.choices[0].message.content + if content is None: + raise Exception("OpenAI API returned None content") + return content except Exception as e: print_colored(f"OpenAI API error in _chat_completion: {str(e)}", "red") diff --git a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py index 02b763b..44ae5af 100644 --- a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py +++ b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py @@ -1,3 +1,8 @@ +""" +优化版执行计划 - 支持动态追加步骤 +在执行过程中可以接收新的步骤并追加到执行队列 +""" + import asyncio import json import time @@ -6,22 +11,26 @@ import AgentCoord.RehearsalEngine_V2.Action as Action import AgentCoord.util as util from termcolor import colored from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager +from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager # ==================== 配置参数 ==================== -# 最大并发请求数(避免触发 OpenAI API 速率限制) +# 最大并发请求数 MAX_CONCURRENT_REQUESTS = 2 -# 批次之间的延迟(秒) +# 批次之间的延迟 BATCH_DELAY = 1.0 -# 429 错误重试次数和延迟 +# 429错误重试次数和延迟 MAX_RETRIES = 3 -RETRY_DELAY = 5.0 # 秒 +RETRY_DELAY = 5.0 + # ==================== 限流器 ==================== class RateLimiter: - """异步限流器,控制并发请求数量""" + """ + 异步限流器,控制并发请求数量 + """ def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS): self.semaphore = asyncio.Semaphore(max_concurrent) @@ -42,7 +51,12 @@ rate_limiter = RateLimiter() def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]: """ 构建动作依赖图 - 返回: {action_index: [dependent_action_indices]} + + Args: + TaskProcess: 任务流程列表 + + Returns: + 依赖映射字典 {action_index: [dependent_action_indices]} """ dependency_map = {i: [] for i in range(len(TaskProcess))} @@ -51,7 +65,7 @@ def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int if not important_inputs: continue - # 检查是否依赖其他动作的 ActionResult + # 检查是否依赖其他动作的ActionResult for j, prev_action in enumerate(TaskProcess): if i == j: continue @@ -70,7 +84,13 @@ def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]: """ 将动作分为多个批次,每批内部可以并行执行 - 返回: [[batch1_indices], [batch2_indices], ...] + + Args: + TaskProcess: 任务流程列表 + dependency_map: 依赖图 + + Returns: + 批次列表 [[batch1_indices], [batch2_indices], ...] """ batches = [] completed: Set[int] = set() @@ -84,11 +104,11 @@ def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List ] if not ready_to_run: - # 避免死循环(循环依赖情况) + # 避免死循环 remaining = [i for i in range(len(TaskProcess)) if i not in completed] if remaining: print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow")) - ready_to_run = remaining[:1] # 每次只执行一个 + ready_to_run = remaining[:1] else: break @@ -111,6 +131,20 @@ async def execute_single_action_async( ) -> Dict: """ 异步执行单个动作 + + Args: + ActionInfo: 动作信息 + General_Goal: 总体目标 + TaskDescription: 任务描述 + OutputName: 输出对象名称 + KeyObjects: 关键对象字典 + ActionHistory: 动作历史 + agentName: 智能体名称 + AgentProfile_Dict: 智能体配置字典 + InputName_List: 输入名称列表 + + Returns: + 动作执行结果 """ actionType = ActionInfo["ActionType"] @@ -128,7 +162,7 @@ async def execute_single_action_async( KeyObjects=KeyObjects, ) - # 执行动作(在线程池中运行,避免阻塞事件循环) + # 在线程池中运行,避免阻塞事件循环 loop = asyncio.get_event_loop() ActionInfo_with_Result = await loop.run_in_executor( None, @@ -156,8 +190,18 @@ async def execute_step_async_streaming( total_steps: int ) -> Generator[Dict, None, None]: """ - 异步执行单个步骤(支持流式返回) - 返回生成器,每完成一个动作就 yield 一次 + 异步执行单个步骤,支持流式返回 + + Args: + stepDescrip: 步骤描述 + General_Goal: 总体目标 + AgentProfile_Dict: 智能体配置字典 + KeyObjects: 关键对象字典 + step_index: 步骤索引 + total_steps: 总步骤数 + + Yields: + 执行事件字典 """ # 准备步骤信息 StepName = ( @@ -213,7 +257,7 @@ async def execute_step_async_streaming( "content": None, } - # 先返回步骤开始信息 + # 返回步骤开始事件 yield { "type": "step_start", "step_index": step_index, @@ -238,10 +282,8 @@ async def execute_step_async_streaming( # 分批执行动作 for batch_index, batch_indices in enumerate(batches): # 在每个批次执行前检查暂停状态 - print(f"🔍 [DEBUG] 步骤 {StepName}: 批次 {batch_index+1}/{len(batches)} 执行前,检查暂停状态...") should_continue = await execution_state_manager.async_check_pause() if not should_continue: - # 用户请求停止,中断执行 util.print_colored("🛑 用户请求停止执行", "red") return @@ -277,7 +319,7 @@ async def execute_step_async_streaming( # 等待当前批次完成 batch_results = await asyncio.gather(*tasks) - # 逐个返回结果(流式) + # 逐个返回结果 for i, result in enumerate(batch_results): action_index_in_batch = batch_indices[i] completed_actions += 1 @@ -289,7 +331,7 @@ async def execute_step_async_streaming( ActionHistory.append(result) - # 立即返回该动作结果(流式) + # 立即返回该动作结果 yield { "type": "action_complete", "step_index": step_index, @@ -318,19 +360,27 @@ async def execute_step_async_streaming( } -def executePlan_streaming( +def executePlan_streaming_dynamic( plan: Dict, num_StepToRun: int, RehearsalLog: List, - AgentProfile_Dict: Dict + AgentProfile_Dict: Dict, + existingKeyObjects: Dict = None, + execution_id: str = None ) -> Generator[str, None, None]: """ - 执行计划(流式返回版本,支持暂停/恢复) - 返回生成器,每次返回 SSE 格式的字符串 + 动态执行计划,支持在执行过程中追加新步骤 - 使用方式: - for event in executePlan_streaming(...): - yield event + Args: + plan: 执行计划 + num_StepToRun: 要运行的步骤数 + RehearsalLog: 已执行的历史记录 + AgentProfile_Dict: 智能体配置 + existingKeyObjects: 已存在的KeyObjects + execution_id: 执行ID(用于动态追加步骤) + + Yields: + SSE格式的事件字符串 """ # 初始化执行状态 general_goal = plan.get("General Goal", "") @@ -339,7 +389,7 @@ def executePlan_streaming( print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green")) # 准备执行 - KeyObjects = {} + KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {} finishedStep_index = -1 for logNode in RehearsalLog: @@ -348,6 +398,9 @@ def executePlan_streaming( if logNode["LogNodeType"] == "object": KeyObjects[logNode["NodeId"]] = logNode["content"] + if existingKeyObjects: + print(colored(f"📦 使用已存在的 KeyObjects: {list(existingKeyObjects.keys())}", "cyan")) + # 确定要运行的步骤范围 if num_StepToRun is None: run_to = len(plan["Collaboration Process"]) @@ -355,78 +408,173 @@ def executePlan_streaming( run_to = (finishedStep_index + 1) + num_StepToRun steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to] - total_steps = len(steps_to_run) - print(colored(f"🚀 开始执行计划(流式推送),共 {total_steps} 个步骤", "cyan", attrs=["bold"])) + # 使用动态执行管理器 + if execution_id: + # 初始化执行管理器,使用传入的execution_id + actual_execution_id = dynamic_execution_manager.start_execution(general_goal, steps_to_run, execution_id) + print(colored(f"🚀 开始执行计划(动态模式),共 {len(steps_to_run)} 个步骤,执行ID: {actual_execution_id}", "cyan")) + else: + print(colored(f"🚀 开始执行计划(流式推送),共 {len(steps_to_run)} 个步骤", "cyan")) + + total_steps = len(steps_to_run) # 使用队列实现流式推送 async def produce_events(queue: asyncio.Queue): - """异步生产者:每收到一个事件就放入队列""" + """异步生产者""" try: - for step_index, stepDescrip in enumerate(steps_to_run): - # 在每个步骤执行前检查暂停状态 - should_continue = await execution_state_manager.async_check_pause() - if not should_continue: - # 用户请求停止,中断执行 - print(colored("🛑 用户请求停止执行", "red")) - await queue.put({ - "type": "error", - "message": "执行已被用户停止" - }) - return + step_index = 0 - # 执行单个步骤(流式) - async for event in execute_step_async_streaming( - stepDescrip, - plan["General Goal"], - AgentProfile_Dict, - KeyObjects, - step_index, - total_steps - ): - # 检查是否需要停止 - if execution_state_manager.is_stopped(): + if execution_id: + # 动态模式:循环获取下一个步骤 + # 等待新步骤的最大次数(避免无限等待) + max_empty_wait_cycles = 60 # 最多等待60次,每次等待1秒 + empty_wait_count = 0 + + while True: + # 检查暂停状态 + should_continue = await execution_state_manager.async_check_pause() + if not should_continue: + print(colored("🛑 用户请求停止执行", "red")) + await queue.put({ + "type": "error", + "message": "执行已被用户停止" + }) + break + + # 获取下一个步骤 + stepDescrip = dynamic_execution_manager.get_next_step(execution_id) + + if stepDescrip is None: + # 没有更多步骤了,检查是否应该继续等待 + empty_wait_count += 1 + + # 获取执行信息 + execution_info = dynamic_execution_manager.get_execution_info(execution_id) + + if execution_info: + queue_total_steps = execution_info.get("total_steps", 0) + completed_steps = execution_info.get("completed_steps", 0) + + # 如果没有步骤在队列中(queue_total_steps为0),立即退出 + if queue_total_steps == 0: + print(colored(f"⚠️ 没有步骤在队列中,退出执行", "yellow")) + break + + # 如果所有步骤都已完成,等待可能的新步骤 + if completed_steps >= queue_total_steps: + if empty_wait_count >= max_empty_wait_cycles: + # 等待超时,退出执行 + print(colored(f"✅ 所有步骤执行完成,等待超时", "green")) + break + else: + # 等待新步骤追加 + print(colored(f"⏳ 等待新步骤追加... ({empty_wait_count}/{max_empty_wait_cycles})", "cyan")) + await asyncio.sleep(1) + continue + else: + # 还有步骤未完成,继续尝试获取 + print(colored(f"⏳ 等待步骤就绪... ({completed_steps}/{queue_total_steps})", "cyan")) + await asyncio.sleep(0.5) + empty_wait_count = 0 # 重置等待计数 + continue + else: + # 执行信息不存在,退出 + print(colored(f"⚠️ 执行信息不存在,退出执行", "yellow")) + break + + # 重置等待计数 + empty_wait_count = 0 + + # 获取最新的总步骤数(用于显示) + execution_info = dynamic_execution_manager.get_execution_info(execution_id) + current_total_steps = execution_info.get("total_steps", total_steps) if execution_info else total_steps + + # 执行步骤 + async for event in execute_step_async_streaming( + stepDescrip, + plan["General Goal"], + AgentProfile_Dict, + KeyObjects, + step_index, + current_total_steps # 使用动态更新的总步骤数 + ): + if execution_state_manager.is_stopped(): + await queue.put({ + "type": "error", + "message": "执行已被用户停止" + }) + return + + await queue.put(event) + + # 标记步骤完成 + dynamic_execution_manager.mark_step_completed(execution_id) + + # 更新KeyObjects + OutputName = stepDescrip.get("OutputObject", "") + if OutputName and OutputName in KeyObjects: + # 对象日志节点会在step_complete中发送 + pass + + step_index += 1 + + else: + # 非动态模式:按顺序执行所有步骤 + for step_index, stepDescrip in enumerate(steps_to_run): + should_continue = await execution_state_manager.async_check_pause() + if not should_continue: + print(colored("🛑 用户请求停止执行", "red")) await queue.put({ "type": "error", "message": "执行已被用户停止" }) return - await queue.put(event) # ← 立即放入队列 + async for event in execute_step_async_streaming( + stepDescrip, + plan["General Goal"], + AgentProfile_Dict, + KeyObjects, + step_index, + total_steps + ): + if execution_state_manager.is_stopped(): + await queue.put({ + "type": "error", + "message": "执行已被用户停止" + }) + return + + await queue.put(event) + except Exception as e: - # 发送错误信息 await queue.put({ "type": "error", "message": f"执行出错: {str(e)}" }) finally: - await queue.put(None) # ← 发送完成信号 + await queue.put(None) - # 运行异步任务并实时 yield + # 运行异步任务并实时yield loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: - # 创建队列 queue = asyncio.Queue(maxsize=10) - - # 启动异步生产者任务 producer_task = loop.create_task(produce_events(queue)) - # 同步消费者:从队列读取并 yield(使用 run_until_complete) while True: - event = loop.run_until_complete(queue.get()) # ← 从队列获取事件 - if event is None: # ← 收到完成信号 + event = loop.run_until_complete(queue.get()) + if event is None: break - # 立即转换为 SSE 格式并发送 + # 立即转换为SSE格式并发送 event_str = json.dumps(event, ensure_ascii=False) - yield f"data: {event_str}\n\n" # ← 立即发送给前端 + yield f"data: {event_str}\n\n" - # 等待生产者任务完成 loop.run_until_complete(producer_task) - # 如果不是被停止的,发送完成信号 if not execution_state_manager.is_stopped(): complete_event = json.dumps({ "type": "execution_complete", @@ -435,10 +583,26 @@ def executePlan_streaming( yield f"data: {complete_event}\n\n" finally: - # 清理任务 + # 在关闭事件循环之前先清理执行记录 + if execution_id: + # 清理执行记录 + dynamic_execution_manager.cleanup(execution_id) + if 'producer_task' in locals(): if not producer_task.done(): producer_task.cancel() + + # 确保所有任务都完成后再关闭事件循环 + try: + pending = asyncio.all_tasks(loop) + for task in pending: + task.cancel() + loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) + except Exception: + pass # 忽略清理过程中的错误 + loop.close() - \ No newline at end of file + +# 保留旧版本函数以保持兼容性 +executePlan_streaming = executePlan_streaming_dynamic diff --git a/backend/AgentCoord/RehearsalEngine_V2/dynamic_execution_manager.py b/backend/AgentCoord/RehearsalEngine_V2/dynamic_execution_manager.py new file mode 100644 index 0000000..4f7ac5e --- /dev/null +++ b/backend/AgentCoord/RehearsalEngine_V2/dynamic_execution_manager.py @@ -0,0 +1,241 @@ +""" +动态执行管理器 +用于在任务执行过程中动态追加新步骤 +""" + +import asyncio +from typing import Dict, List, Optional, Any +from threading import Lock + + +class DynamicExecutionManager: + """ + 动态执行管理器 + 管理正在执行的任务,支持动态追加新步骤 + """ + + def __init__(self): + # 执行状态: goal -> execution_info + self._executions: Dict[str, Dict] = {} + + # 线程锁 + self._lock = Lock() + + # 步骤队列: goal -> List[step] + self._step_queues: Dict[str, List] = {} + + # 已执行的步骤索引: goal -> Set[step_index] + self._executed_steps: Dict[str, set] = {} + + # 待执行的步骤索引: goal -> List[step_index] + self._pending_steps: Dict[str, List[int]] = {} + + def start_execution(self, goal: str, initial_steps: List[Dict], execution_id: str = None) -> str: + """ + 开始执行一个新的任务 + + Args: + goal: 任务目标 + initial_steps: 初始步骤列表 + execution_id: 执行ID,如果不提供则自动生成 + + Returns: + 执行ID + """ + with self._lock: + # 如果未提供execution_id,则生成一个 + if execution_id is None: + execution_id = f"{goal}_{asyncio.get_event_loop().time()}" + + self._executions[execution_id] = { + "goal": goal, + "status": "running", + "total_steps": len(initial_steps), + "completed_steps": 0 + } + + # 初始化步骤队列 + self._step_queues[execution_id] = initial_steps.copy() + + # 初始化已执行步骤集合 + self._executed_steps[execution_id] = set() + + # 初始化待执行步骤索引 + self._pending_steps[execution_id] = list(range(len(initial_steps))) + + print(f"🚀 启动执行: {execution_id}") + print(f"📊 初始步骤数: {len(initial_steps)}") + print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}") + + return execution_id + + def add_steps(self, execution_id: str, new_steps: List[Dict]) -> int: + """ + 向执行中追加新步骤 + + Args: + execution_id: 执行ID + new_steps: 新步骤列表 + + Returns: + 追加的步骤数量 + """ + with self._lock: + if execution_id not in self._step_queues: + print(f"⚠️ 警告: 执行ID {execution_id} 不存在,无法追加步骤") + return 0 + + current_count = len(self._step_queues[execution_id]) + + # 追加新步骤到队列 + self._step_queues[execution_id].extend(new_steps) + + # 添加新步骤的索引到待执行列表 + new_indices = list(range(current_count, current_count + len(new_steps))) + self._pending_steps[execution_id].extend(new_indices) + + # 更新总步骤数 + old_total = self._executions[execution_id]["total_steps"] + self._executions[execution_id]["total_steps"] = len(self._step_queues[execution_id]) + new_total = self._executions[execution_id]["total_steps"] + + print(f"➕ 追加了 {len(new_steps)} 个步骤到 {execution_id}") + print(f"📊 步骤总数: {old_total} -> {new_total}") + print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}") + + return len(new_steps) + + def get_next_step(self, execution_id: str) -> Optional[Dict]: + """ + 获取下一个待执行的步骤 + + Args: + execution_id: 执行ID + + Returns: + 下一个步骤,如果没有则返回None + """ + with self._lock: + if execution_id not in self._pending_steps: + print(f"⚠️ 警告: 执行ID {execution_id} 不存在") + return None + + # 获取第一个待执行步骤的索引 + if not self._pending_steps[execution_id]: + return None + + step_index = self._pending_steps[execution_id].pop(0) + + # 从队列中获取步骤 + if step_index >= len(self._step_queues[execution_id]): + print(f"⚠️ 警告: 步骤索引 {step_index} 超出范围") + return None + + step = self._step_queues[execution_id][step_index] + + # 标记为已执行 + self._executed_steps[execution_id].add(step_index) + + step_name = step.get("StepName", "未知") + print(f"🎯 获取下一个步骤: {step_name} (索引: {step_index})") + print(f"📋 剩余待执行步骤: {len(self._pending_steps[execution_id])}") + + return step + + def mark_step_completed(self, execution_id: str): + """ + 标记一个步骤完成 + + Args: + execution_id: 执行ID + """ + with self._lock: + if execution_id in self._executions: + self._executions[execution_id]["completed_steps"] += 1 + completed = self._executions[execution_id]["completed_steps"] + total = self._executions[execution_id]["total_steps"] + print(f"📊 步骤完成进度: {completed}/{total}") + else: + print(f"⚠️ 警告: 执行ID {execution_id} 不存在") + + def get_execution_info(self, execution_id: str) -> Optional[Dict]: + """ + 获取执行信息 + + Args: + execution_id: 执行ID + + Returns: + 执行信息字典 + """ + with self._lock: + return self._executions.get(execution_id) + + def get_pending_count(self, execution_id: str) -> int: + """ + 获取待执行步骤数量 + + Args: + execution_id: 执行ID + + Returns: + 待执行步骤数量 + """ + with self._lock: + if execution_id not in self._pending_steps: + return 0 + return len(self._pending_steps[execution_id]) + + def has_more_steps(self, execution_id: str) -> bool: + """ + 检查是否还有更多步骤待执行 + + Args: + execution_id: 执行ID + + Returns: + 是否还有待执行步骤 + """ + with self._lock: + if execution_id not in self._pending_steps: + return False + return len(self._pending_steps[execution_id]) > 0 + + def finish_execution(self, execution_id: str): + """ + 完成执行 + + Args: + execution_id: 执行ID + """ + with self._lock: + if execution_id in self._executions: + self._executions[execution_id]["status"] = "completed" + + def cancel_execution(self, execution_id: str): + """ + 取消执行 + + Args: + execution_id: 执行ID + """ + with self._lock: + if execution_id in self._executions: + self._executions[execution_id]["status"] = "cancelled" + + def cleanup(self, execution_id: str): + """ + 清理执行记录 + + Args: + execution_id: 执行ID + """ + with self._lock: + self._executions.pop(execution_id, None) + self._step_queues.pop(execution_id, None) + self._executed_steps.pop(execution_id, None) + self._pending_steps.pop(execution_id, None) + + +# 全局单例 +dynamic_execution_manager = DynamicExecutionManager() diff --git a/backend/server.py b/backend/server.py index 3a4482f..0859717 100644 --- a/backend/server.py +++ b/backend/server.py @@ -270,6 +270,12 @@ def Handle_executePlanOptimized(): - 无依赖关系的动作并行执行 - 有依赖关系的动作串行执行 + 支持参数: + plan: 执行计划 + num_StepToRun: 要运行的步骤数 + RehearsalLog: 已执行的历史记录 + existingKeyObjects: 已存在的KeyObjects(用于重新执行时传递中间结果) + 前端使用 EventSource 接收 """ incoming_data = request.get_json() @@ -281,6 +287,7 @@ def Handle_executePlanOptimized(): num_StepToRun=incoming_data.get("num_StepToRun"), RehearsalLog=incoming_data.get("RehearsalLog", []), AgentProfile_Dict=AgentProfile_Dict, + existingKeyObjects=incoming_data.get("existingKeyObjects"), ): yield chunk except Exception as e: @@ -405,7 +412,7 @@ def handle_ping(): def handle_execute_plan_optimized_ws(data): """ WebSocket版本:优化版流式执行计划 - 支持步骤级流式 + 动作级智能并行 + 支持步骤级流式 + 动作级智能并行 + 动态追加步骤 请求格式: { @@ -414,7 +421,8 @@ def handle_execute_plan_optimized_ws(data): "data": { "plan": {...}, "num_StepToRun": null, - "RehearsalLog": [] + "RehearsalLog": [], + "enable_dynamic": true # 是否启用动态追加步骤 } } """ @@ -425,27 +433,66 @@ def handle_execute_plan_optimized_ws(data): plan = incoming_data.get("plan") num_StepToRun = incoming_data.get("num_StepToRun") RehearsalLog = incoming_data.get("RehearsalLog", []) + enable_dynamic = incoming_data.get("enable_dynamic", False) - # 使用原有的流式执行函数 - for chunk in executePlan_streaming( - plan=plan, - num_StepToRun=num_StepToRun, - RehearsalLog=RehearsalLog, - AgentProfile_Dict=AgentProfile_Dict, - ): - # 通过WebSocket推送进度 + # 如果前端传入了execution_id,使用前端的;否则生成新的 + execution_id = incoming_data.get("execution_id") + if not execution_id: + import time + execution_id = f"{plan.get('General Goal', '').replace(' ', '_')}_{int(time.time() * 1000)}" + + if enable_dynamic: + # 动态模式:使用executePlan_streaming_dynamic + from AgentCoord.RehearsalEngine_V2.ExecutePlan_Optimized import executePlan_streaming_dynamic + + # 发送执行ID(确认使用的ID) emit('progress', { 'id': request_id, - 'status': 'streaming', - 'data': chunk.replace('data: ', '').replace('\n\n', '') + 'status': 'execution_started', + 'execution_id': execution_id, + 'message': '执行已启动,支持动态追加步骤' }) - # 发送完成信号 - emit('progress', { - 'id': request_id, - 'status': 'complete', - 'data': None - }) + for chunk in executePlan_streaming_dynamic( + plan=plan, + num_StepToRun=num_StepToRun, + RehearsalLog=RehearsalLog, + AgentProfile_Dict=AgentProfile_Dict, + execution_id=execution_id + ): + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'data': chunk.replace('data: ', '').replace('\n\n', '') + }) + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'complete', + 'data': None + }) + + else: + # 非动态模式:使用原有方式 + for chunk in executePlan_streaming( + plan=plan, + num_StepToRun=num_StepToRun, + RehearsalLog=RehearsalLog, + AgentProfile_Dict=AgentProfile_Dict, + ): + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'data': chunk.replace('data: ', '').replace('\n\n', '') + }) + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'complete', + 'data': None + }) except Exception as e: # 发送错误信息 @@ -456,6 +503,68 @@ def handle_execute_plan_optimized_ws(data): }) +@socketio.on('add_steps_to_execution') +def handle_add_steps_to_execution(data): + """ + WebSocket版本:向正在执行的任务追加新步骤 + + 请求格式: + { + "id": "request-id", + "action": "add_steps_to_execution", + "data": { + "execution_id": "execution_id", + "new_steps": [...] + } + } + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager + + execution_id = incoming_data.get('execution_id') + new_steps = incoming_data.get('new_steps', []) + + if not execution_id: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': '缺少execution_id参数' + }) + return + + # 追加新步骤到执行队列 + added_count = dynamic_execution_manager.add_steps(execution_id, new_steps) + + if added_count > 0: + print(f"✅ 成功追加 {added_count} 个步骤到执行队列: {execution_id}") + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': { + 'message': f'成功追加 {added_count} 个步骤', + 'added_count': added_count + } + }) + else: + print(f"⚠️ 无法追加步骤,执行ID不存在或已结束: {execution_id}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': '执行ID不存在或已结束' + }) + + except Exception as e: + print(f"❌ 追加步骤失败: {str(e)}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + @socketio.on('generate_base_plan') def handle_generate_base_plan_ws(data): """ diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index ce3b605..75099c9 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -167,10 +167,8 @@ class Api { } /** - * 优化版流式执行计划(阶段1+2:步骤级流式 + 动作级智能并行) - * 无依赖关系的动作并行执行,有依赖关系的动作串行执行 - * - * 默认使用WebSocket,如果连接失败则降级到SSE + * 优化版流式执行计划(支持动态追加步骤) + * 步骤级流式 + 动作级智能并行 + 动态追加步骤 */ executePlanOptimized = ( plan: IRawPlanResponse, @@ -178,12 +176,19 @@ class Api { onError?: (error: Error) => void, onComplete?: () => void, useWebSocket?: boolean, + existingKeyObjects?: Record, + enableDynamic?: boolean, + onExecutionStarted?: (executionId: string) => void, + executionId?: string, ) => { const useWs = useWebSocket !== undefined ? useWebSocket : this.useWebSocketDefault const data = { RehearsalLog: [], num_StepToRun: null, + existingKeyObjects: existingKeyObjects || {}, + enable_dynamic: enableDynamic || false, + execution_id: executionId || null, plan: { 'Initial Input Object': plan['Initial Input Object'], 'General Goal': plan['General Goal'], @@ -213,14 +218,26 @@ class Api { // onProgress (progressData) => { try { - // progressData 应该已经是解析后的对象了 - // 如果是字符串,说明后端发送的是 JSON 字符串,需要解析 let event: StreamingEvent + + // 处理不同类型的progress数据 if (typeof progressData === 'string') { event = JSON.parse(progressData) } else { event = progressData as StreamingEvent } + + // 处理特殊事件类型 + if (event && typeof event === 'object') { + // 检查是否是execution_started事件 + if ('status' in event && event.status === 'execution_started') { + if ('execution_id' in event && onExecutionStarted) { + onExecutionStarted(event.execution_id as string) + } + return + } + } + onMessage(event) } catch (e) { // Failed to parse WebSocket data @@ -848,6 +865,39 @@ class Api { return response } + + /** + * 向正在执行的任务追加新步骤 + * @param executionId 执行ID + * @param newSteps 新步骤列表 + * @returns 追加的步骤数量 + */ + addStepsToExecution = async (executionId: string, newSteps: IRawStepTask[]): Promise => { + if (!websocket.connected) { + throw new Error('WebSocket未连接') + } + + const response = await websocket.send('add_steps_to_execution', { + execution_id: executionId, + new_steps: newSteps.map(step => ({ + StepName: step.StepName, + TaskContent: step.TaskContent, + InputObject_List: step.InputObject_List, + OutputObject: step.OutputObject, + AgentSelection: step.AgentSelection, + Collaboration_Brief_frontEnd: step.Collaboration_Brief_frontEnd, + TaskProcess: step.TaskProcess.map(action => ({ + ActionType: action.ActionType, + AgentName: action.AgentName, + Description: action.Description, + ID: action.ID, + ImportantInput: action.ImportantInput, + })), + })), + }) as { added_count: number } + + return response?.added_count || 0 + } } export default new Api() diff --git a/frontend/src/components/Notification/Notification.vue b/frontend/src/components/Notification/Notification.vue new file mode 100644 index 0000000..5531e20 --- /dev/null +++ b/frontend/src/components/Notification/Notification.vue @@ -0,0 +1,260 @@ + + + + + diff --git a/frontend/src/composables/useNotification.ts b/frontend/src/composables/useNotification.ts new file mode 100644 index 0000000..b71694c --- /dev/null +++ b/frontend/src/composables/useNotification.ts @@ -0,0 +1,154 @@ +import { ref } from 'vue' + +export interface NotificationItem { + id: string + title: string + message?: string + type?: 'success' | 'warning' | 'info' | 'error' + duration?: number + showProgress?: boolean + progress?: number + zIndex?: number + onClose?: () => void + // 详细进度信息 + detailTitle?: string + detailMessage?: string +} + +const notifications = ref([]) +let notificationIdCounter = 0 +let zIndexCounter = 1000 + +export function useNotification() { + const addNotification = (notification: Omit) => { + const id = `notification-${notificationIdCounter++}` + const newNotification: NotificationItem = { + ...notification, + id, + zIndex: ++zIndexCounter, + } + + notifications.value.push(newNotification) + + // 自动关闭 + if (notification.duration && notification.duration > 0) { + setTimeout(() => { + removeNotification(id) + }, notification.duration) + } + + return id + } + + const removeNotification = (id: string) => { + const index = notifications.value.findIndex((n) => n.id === id) + if (index !== -1) { + const notification = notifications.value[index] + notifications.value.splice(index, 1) + notification.onClose?.() + } + } + + const success = (title: string, message?: string, options?: Partial) => { + return addNotification({ + title, + message, + type: 'success', + duration: 3000, + ...options, + }) + } + + const warning = (title: string, message?: string, options?: Partial) => { + return addNotification({ + title, + message, + type: 'warning', + duration: 3000, + ...options, + }) + } + + const info = (title: string, message?: string, options?: Partial) => { + return addNotification({ + title, + message, + type: 'info', + duration: 3000, + ...options, + }) + } + + const error = (title: string, message?: string, options?: Partial) => { + return addNotification({ + title, + message, + type: 'error', + duration: 5000, + ...options, + }) + } + + const progress = ( + title: string, + current: number, + total: number, + options?: Partial, + ) => { + const progressPercent = Math.round((current / total) * 100) + return addNotification({ + title, + message: `${current}/${total}`, + type: 'info', + showProgress: true, + progress: progressPercent, + duration: 0, // 不自动关闭 + ...options, + }) + } + + const updateProgress = (id: string, current: number, total: number) => { + const notification = notifications.value.find((n) => n.id === id) + if (notification) { + notification.progress = Math.round((current / total) * 100) + notification.message = `${current}/${total}` + } + } + + const updateProgressDetail = ( + id: string, + detailTitle: string, + detailMessage: string, + current?: number, + total?: number + ) => { + const notification = notifications.value.find((n) => n.id === id) + if (notification) { + notification.detailTitle = detailTitle + notification.detailMessage = detailMessage + if (current !== undefined && total !== undefined) { + notification.progress = Math.round((current / total) * 100) + notification.message = `${current}/${total}` + } + } + } + + const clear = () => { + notifications.value.forEach((n) => n.onClose?.()) + notifications.value = [] + } + + return { + notifications, + addNotification, + removeNotification, + success, + warning, + info, + error, + progress, + updateProgress, + updateProgressDetail, + clear, + } +} diff --git a/frontend/src/layout/components/Main/Task.vue b/frontend/src/layout/components/Main/Task.vue index 1a3d8f6..eff3ba1 100644 --- a/frontend/src/layout/components/Main/Task.vue +++ b/frontend/src/layout/components/Main/Task.vue @@ -107,6 +107,8 @@ async function handleStop() { // 无论后端是否成功停止,都重置状态 isFillingSteps.value = false currentStepAbortController.value = null + // 标记用户已停止填充 + agentsStore.setHasStoppedFilling(true) } } @@ -134,6 +136,8 @@ async function handleSearch() { emit('search-start') agentsStore.resetAgent() agentsStore.setAgentRawPlan({ loading: true }) + // 重置停止状态 + agentsStore.setHasStoppedFilling(false) // 获取大纲 const outlineData = await api.generateBasePlan({ diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue index 2b8aabe..ca4fc6d 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue @@ -2,7 +2,7 @@ import { computed, onUnmounted, ref, reactive, nextTick, watch, onMounted } from 'vue' import { throttle } from 'lodash' import { AnchorLocations, BezierConnector } from '@jsplumb/browser-ui' -import { ElMessage, ElMessageBox } from 'element-plus' +import { ElMessageBox } from 'element-plus' import AdditionalOutputCard from './AdditionalOutputCard.vue' import SvgIcon from '@/components/SvgIcon/index.vue' import { getActionTypeDisplay, getAgentMapIcon } from '@/layout/components/config.ts' @@ -13,6 +13,8 @@ import api, { type StreamingEvent } from '@/api' import ProcessCard from '../TaskProcess/ProcessCard.vue' import ExecutePlan from './ExecutePlan.vue' import websocket from '@/utils/websocket' +import Notification from '@/components/Notification/Notification.vue' +import { useNotification } from '@/composables/useNotification' const emit = defineEmits<{ (e: 'refreshLine'): void @@ -38,6 +40,34 @@ enum StepExecutionStatus { // Execution status for each step const stepExecutionStatus = ref>({}) +// 用于标记暂停时的"最后动作完成"状态 +const isPausing = ref(false) // 正在请求暂停(等待当前动作完成) + +// ==================== 步骤版本追踪 ==================== +// 步骤版本信息接口 +interface StepVersionInfo { + stepId: string // 步骤ID + stepIndex: number // 步骤索引(0-based) + originalHash: string // 原始配置hash(初始化时生成) + currentHash: string // 当前配置hash(编辑后更新) + isModified: boolean // 是否已修改 +} + +// 重执行配置接口 +interface ReExecuteConfig { + shouldReExecute: boolean // 是否需要重新执行 + startFromStepIndex: number // 从哪个步骤开始(-1表示从头开始) + modifiedSteps: string[] // 被修改的步骤ID列表 +} + +// 步骤版本追踪 +const stepVersions = ref>({}) +const reExecuteConfig = ref({ + shouldReExecute: false, + startFromStepIndex: -1, + modifiedSteps: [] +}) + // Check if step is ready to execute (has TaskProcess data) const isStepReady = (step: IRawStepTask) => { return step.TaskProcess && step.TaskProcess.length > 0 @@ -100,31 +130,231 @@ const stepsReadyStatus = computed(() => { } }) -// Watch step data changes, update waiting step status +// ==================== 步骤版本追踪函数 ==================== +/** + * 生成步骤配置的hash(用于检测修改) + * @param step 步骤对象 + * @returns hash字符串 + */ +function generateStepHash(step: IRawStepTask): string { + // 只考虑TaskProcess中的Description字段 + // 因为这是用户可以编辑的部分 + const processDescriptions = step.TaskProcess.map(p => `${p.ID}:${p.Description}`).join('|') + + // 简单hash算法 + let hash = 0 + for (let i = 0; i < processDescriptions.length; i++) { + const char = processDescriptions.charCodeAt(i) + hash = (hash << 5) - hash + char + hash = hash & hash // Convert to 32bit integer + } + return Math.abs(hash).toString(36) +} + +/** + * 初始化步骤版本(在任务加载完成后调用) + */ +function initializeStepVersions() { + const steps = collaborationProcess.value + stepVersions.value = {} + + steps.forEach((step, index) => { + const stepId = step.Id || step.StepName || `step-${index}` + const hash = generateStepHash(step) + + stepVersions.value[stepId] = { + stepId, + stepIndex: index, + originalHash: hash, + currentHash: hash, + isModified: false + } + }) + + // 重置重执行配置 + reExecuteConfig.value = { + shouldReExecute: false, + startFromStepIndex: -1, + modifiedSteps: [] + } +} + +/** + * 更新步骤版本(编辑保存后调用) + */ +function updateStepVersion(stepId: string) { + const step = collaborationProcess.value.find(s => (s.Id || s.StepName) === stepId) + + if (step && stepVersions.value[stepId]) { + const newHash = generateStepHash(step) + const versionInfo = stepVersions.value[stepId] + + versionInfo.currentHash = newHash + versionInfo.isModified = newHash !== versionInfo.originalHash + + // 重新计算重执行配置 + calculateReExecuteConfig() + } +} + +/** + * 计算重执行配置 + */ +function calculateReExecuteConfig() { + const modifiedSteps: string[] = [] + let minModifiedIndex = Infinity + + // 找出所有被修改的步骤 + Object.values(stepVersions.value).forEach(versionInfo => { + if (versionInfo.isModified) { + modifiedSteps.push(versionInfo.stepId) + minModifiedIndex = Math.min(minModifiedIndex, versionInfo.stepIndex) + } + }) + + // 设置重执行配置 + if (modifiedSteps.length > 0) { + reExecuteConfig.value = { + shouldReExecute: true, + startFromStepIndex: minModifiedIndex, + modifiedSteps + } + } else { + reExecuteConfig.value = { + shouldReExecute: false, + startFromStepIndex: -1, + modifiedSteps: [] + } + } +} + +/** + * 重置步骤版本标记 + */ +function resetStepVersions() { + Object.values(stepVersions.value).forEach(versionInfo => { + versionInfo.originalHash = versionInfo.currentHash + versionInfo.isModified = false + }) + reExecuteConfig.value = { + shouldReExecute: false, + startFromStepIndex: -1, + modifiedSteps: [] + } +} + +/** + * 清理指定步骤索引之后的所有执行结果 + * @param fromStepIndex 起始步骤索引(该索引之后的结果将被清理) + */ +function clearExecutionResultsAfter(fromStepIndex: number) { + const steps = collaborationProcess.value + const currentResults = agentsStore.executePlan + + // 找出需要清理的步骤名称 + const stepsToClear = new Set() + for (let i = fromStepIndex; i < steps.length; i++) { + const step = steps[i] + if (!step) continue + if (step.StepName) { + stepsToClear.add(step.StepName) + } + if (step.OutputObject) { + stepsToClear.add(step.OutputObject) + } + } + + // 过滤掉需要清理的结果 + const filteredResults = currentResults.filter(result => !stepsToClear.has(result.NodeId)) + + // 更新store + agentsStore.setExecutePlan(filteredResults) + + // 重置步骤执行状态 + Object.keys(stepExecutionStatus.value).forEach(stepName => { + const stepIndex = steps.findIndex(s => s.StepName === stepName) + if (stepIndex >= fromStepIndex) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.READY + } + }) +} + +/** + * 监听步骤数据变化,更新步骤状态并动态追加新步骤 + */ watch( () => collaborationProcess.value, newSteps => { newSteps.forEach(step => { + const stepId = step.Id || step.StepName || '' const stepName = step.StepName || step.Id || '' const currentStatus = stepExecutionStatus.value[stepName] - // If step was waiting and now has data, set to ready - if (currentStatus === StepExecutionStatus.WAITING && isStepReady(step)) { - stepExecutionStatus.value[stepName] = StepExecutionStatus.READY + if (isStepReady(step)) { + // 步骤数据已就绪,更新状态 + if (!currentStatus || currentStatus === StepExecutionStatus.WAITING) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.READY + } - // 如果正在执行中,自动执行下一批就绪的步骤 - if (autoExecuteEnabled.value && loading.value) { - executeNextReadyBatch() + // 动态追加新步骤到执行队列 + if (loading.value && isStreaming.value && currentExecutionId.value) { + if (!sentStepIds.value.has(stepId)) { + console.log(`🔄 Watch监听到新就绪步骤: ${stepName}, 准备追加到执行队列`) + console.log(` - loading: ${loading.value}`) + console.log(` - isStreaming: ${isStreaming.value}`) + console.log(` - currentExecutionId: ${currentExecutionId.value}`) + + sentStepIds.value.add(stepId) + + // 异步追加步骤到后端执行队列 + api + .addStepsToExecution(currentExecutionId.value, [step]) + .then(addedCount => { + if (addedCount > 0) { + console.log(`✅ 成功追加步骤: ${stepName}`) + + // 更新总步骤数显示 + const totalStepsCount = collaborationProcess.value.length + const currentStep = executionProgress.value.currentStep || 1 + executionProgress.value.totalSteps = totalStepsCount + + // 使用 Notification 显示追加成功通知 + // info('步骤追加成功', `${stepName} (${currentStep}/${totalStepsCount})`, { + // duration: 2000 + // }) + } else { + console.warn(`⚠️ 追加步骤失败: ${stepName} - 执行ID不存在或已结束`) + // 追加失败,移除标记以便重试 + sentStepIds.value.delete(stepId) + } + }) + .catch(error => { + console.error(`❌ 追加步骤失败: ${stepName}`, error) + // 追加失败,移除标记以便重试 + sentStepIds.value.delete(stepId) + }) + } + } else if (loading.value && !isStreaming.value) { + console.log(`⚠️ 步骤 ${stepName} 已就绪,但尚未开始流式传输`) + } else if (loading.value && isStreaming.value && !currentExecutionId.value) { + console.log(`⚠️ 步骤 ${stepName} 已就绪,但currentExecutionId为空`) + } + } else { + // 步骤未就绪,设置为WAITING + if (!currentStatus) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.WAITING } } }) + + // 初始化步骤版本(首次加载时) + if (newSteps.length > 0 && Object.keys(stepVersions.value).length === 0) { + initializeStepVersions() + } }, { deep: true } ) -// Enable auto-execution (auto-execute when new steps are ready) -const autoExecuteEnabled = ref(true) - // Watch additional outputs changes watch( () => agentsStore.additionalOutputs, @@ -166,6 +396,15 @@ function handleSaveEdit(stepId: string, processId: string, value: string) { const process = step.TaskProcess.find(p => p.ID === processId) if (process) { process.Description = value + + // 更新步骤版本 + updateStepVersion(stepId) + + // 显示修改提示 + const versionInfo = stepVersions.value[stepId] + if (versionInfo?.isModified) { + warning('步骤已修改', `步骤 "${step.StepName}" 已修改,继续执行时将从该步骤重新开始`) + } } } editMap[key] = false @@ -288,230 +527,353 @@ const executionProgress = ref({ totalSteps: 0, currentAction: 0, totalActions: 0, - currentStepName: '', - message: '准备执行任务...' + currentStepName: '' }) +// Notification system +const { + notifications, + progress: showProgress, + updateProgress, + updateProgressDetail, + success, + info, + warning, + error +} = useNotification() +const currentProgressNotificationId = ref(null) + // Pause functionality state -const isPaused = ref(false) // Whether paused -const isStreaming = ref(false) // Whether streaming data (backend started returning) -const isButtonLoading = ref(false) // Button brief loading state (prevent double-click) +const isPaused = ref(false) +const isStreaming = ref(false) +const isButtonLoading = ref(false) -// Store current step execution index (for sequential execution) -const currentExecutionIndex = ref(0) +// Dynamic execution state +const currentExecutionId = ref(null) +const sentStepIds = ref>(new Set()) -// Execute next batch of ready steps (batch execution to maintain dependencies) +// Flag to prevent duplicate execution calls +const isExecutingNextBatch = ref(false) + +/** + * 执行下一批已就绪的步骤(使用动态追加模式) + * 支持在执行过程中动态追加新步骤 + */ async function executeNextReadyBatch() { - const steps = collaborationProcess.value - - // Collect all ready but unexecuted steps (in order, until hitting unready step) - const readySteps: IRawStepTask[] = [] - - for (let i = 0; i < steps.length; i++) { - const step = steps[i] - if (!step) continue - - // 如果步骤已就绪,加入批量执行列表 - if (isStepReady(step)) { - const stepName = step.StepName || step.Id || '' - const status = stepExecutionStatus.value[stepName] - - // Only collect unexecuted steps - if (!status || status === StepExecutionStatus.READY) { - readySteps.push(step) - } - } else { - // Stop at first unready step (maintain step order) - break - } + if (isExecutingNextBatch.value) { + console.log('executeNextReadyBatch already running, skipping duplicate call') + return } - if (readySteps.length > 0) { - try { - // Mark all steps to be executed as running - readySteps.forEach(step => { - const stepName = step.StepName || step.Id || '' - stepExecutionStatus.value[stepName] = StepExecutionStatus.RUNNING - }) + isExecutingNextBatch.value = true - // 构建包含所有已就绪步骤的计划数据(批量发送,保持依赖关系) - const batchPlan: IRawPlanResponse = { - 'General Goal': agentsStore.agentRawPlan.data?.['General Goal'] || '', - 'Initial Input Object': agentsStore.agentRawPlan.data?.['Initial Input Object'] || [], - 'Collaboration Process': readySteps // Key: batch send steps + try { + const steps = collaborationProcess.value + + // 收集所有已就绪但未执行的步骤 + const readySteps: IRawStepTask[] = [] + + for (let i = 0; i < steps.length; i++) { + const step = steps[i] + if (!step) continue + + const stepId = step.Id || step.StepName || '' + const stepName = step.StepName || step.Id || '' + + // 调试日志 + const isReady = isStepReady(step) + const wasSent = sentStepIds.value.has(stepId) + const status = stepExecutionStatus.value[stepId] + + console.log( + `[步骤检查] ${stepName}: isReady=${isReady}, wasSent=${wasSent}, status=${status}` + ) + + // 只收集未发送的已就绪步骤 + if (isStepReady(step) && !sentStepIds.value.has(stepId)) { + if (!status || status === StepExecutionStatus.READY) { + readySteps.push(step) + sentStepIds.value.add(stepId) + console.log(`✅ 收集步骤: ${stepName}, 当前总数: ${readySteps.length}`) + } + } + } + + console.log(`📊 总共收集到 ${readySteps.length} 个已就绪步骤`) + + if (readySteps.length > 0) { + // 如果还没有executionId,生成一个 + if (!currentExecutionId.value) { + const generalGoal = agentsStore.agentRawPlan.data?.['General Goal'] || '' + const timestamp = Date.now() + currentExecutionId.value = `${generalGoal.replace(/\s+/g, '_')}_${timestamp}` + console.log('🆔 生成执行ID:', currentExecutionId.value) } - const tempResults: any[] = [] + try { + // 标记所有要执行的步骤为运行中 + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.RUNNING + }) - // Execute these steps in batch - await new Promise((resolve, reject) => { - api.executePlanOptimized( - batchPlan, - // onMessage: handle each event - (event: StreamingEvent) => { - // When backend starts returning data, set isStreaming (only once) - if (!isStreaming.value) { - isStreaming.value = true - } + // 构建批量执行计划 + const batchPlan: IRawPlanResponse = { + 'General Goal': agentsStore.agentRawPlan.data?.['General Goal'] || '', + 'Initial Input Object': agentsStore.agentRawPlan.data?.['Initial Input Object'] || [], + 'Collaboration Process': readySteps + } - // If paused, ignore events - if (isPaused.value) { - return - } + // 执行批量步骤(启用动态追加模式) + await new Promise((resolve, reject) => { + api.executePlanOptimized( + batchPlan, + // onMessage: 处理每个事件 + (event: StreamingEvent) => { + // 当后端开始返回数据时,设置isStreaming + if (!isStreaming.value) { + isStreaming.value = true + } - switch (event.type) { - case 'step_start': - // 使用后端返回的 step_index 和 total_steps - executionProgress.value = { - currentStep: (event.step_index || 0) + 1, - totalSteps: event.total_steps || collaborationProcess.value.length, - currentAction: 0, - totalActions: 0, - currentStepName: event.step_name, - message: `正在执行步骤 ${event.step_index + 1}/${ - event.total_steps || collaborationProcess.value.length - }: ${event.step_name}` + // 如果正在暂停(isPausing)或已暂停(isPaused),只允许特定事件 + // 这样可以确保当前正在完成的动作的结果能够被正确保存 + if (isPausing.value || isPaused.value) { + if ( + event.type !== 'action_complete' && + event.type !== 'step_complete' && + event.type !== 'error' + ) { + return } - break + } - case 'action_complete': - const parallelInfo = event.batch_info?.is_parallel - ? ` [并行 ${event.batch_info!.batch_size} 个动作]` - : '' + switch (event.type) { + case 'step_start': + executionProgress.value = { + currentStep: (event.step_index || 0) + 1, + totalSteps: event.total_steps || collaborationProcess.value.length, + currentAction: 0, + totalActions: 0, + currentStepName: event.step_name + } - // 使用后端返回的 step_index,total_steps 使用当前进度中的值 - const stepIndexForAction = event.step_index || 0 - const totalStepsValue = - executionProgress.value.totalSteps || collaborationProcess.value.length - executionProgress.value = { - ...executionProgress.value, - currentAction: event.completed_actions, - totalActions: event.total_actions, - message: `步骤 ${stepIndexForAction + 1}/${totalStepsValue}: ${ - event.step_name - } - 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}` - } + // 创建或更新进度通知,显示详细步骤信息 + if (!currentProgressNotificationId.value) { + currentProgressNotificationId.value = showProgress( + '任务执行中', + executionProgress.value.currentStep, + executionProgress.value.totalSteps || 1 + ) + updateProgressDetail( + currentProgressNotificationId.value, + `步骤 ${executionProgress.value.currentStep}/${ + executionProgress.value.totalSteps || 1 + }`, + `正在执行: ${event.step_name}` + ) + } else { + updateProgressDetail( + currentProgressNotificationId.value, + `步骤 ${executionProgress.value.currentStep}/${ + executionProgress.value.totalSteps || 1 + }`, + `正在执行: ${event.step_name}`, + executionProgress.value.currentStep, + executionProgress.value.totalSteps || 1 + ) + } + break - // Update store in real-time - const existingStep = collaborationProcess.value.find( - s => s.StepName === event.step_name - ) - if (existingStep) { + case 'action_complete': + const parallelInfo = event.batch_info?.is_parallel + ? ` [并行 ${event.batch_info!.batch_size} 个动作]` + : '' + + const stepIndexForAction = event.step_index || 0 + const totalStepsValue = + executionProgress.value.totalSteps || collaborationProcess.value.length + executionProgress.value = { + ...executionProgress.value, + currentAction: event.completed_actions, + totalActions: event.total_actions + } + + // 更新详细进度信息,显示动作级别进度 + if (currentProgressNotificationId.value) { + updateProgressDetail( + currentProgressNotificationId.value, + `步骤 ${stepIndexForAction + 1}/${totalStepsValue}`, + `${event.step_name} - 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}`, + stepIndexForAction + 1, + totalStepsValue + ) + } + + // 实时更新store + const existingStep = collaborationProcess.value.find( + s => s.StepName === event.step_name + ) + if (existingStep) { + const currentResults = agentsStore.executePlan + const stepLogNode = currentResults.find( + r => r.NodeId === event.step_name && r.LogNodeType === 'step' + ) + if (!stepLogNode) { + const newStepLog = { + LogNodeType: 'step', + NodeId: event.step_name, + InputName_List: existingStep.InputObject_List || [], + OutputName: existingStep.OutputObject || '', + chatLog: [], + inputObject_Record: [], + ActionHistory: [event.action_result] + } + agentsStore.setExecutePlan([...currentResults, newStepLog]) + } else { + stepLogNode.ActionHistory.push(event.action_result) + agentsStore.setExecutePlan([...currentResults]) + } + } + break + + case 'step_complete': + stepExecutionStatus.value[event.step_name] = StepExecutionStatus.COMPLETED + + // 更新完整步骤日志 const currentResults = agentsStore.executePlan - const stepLogNode = currentResults.find( + const existingLog = currentResults.find( r => r.NodeId === event.step_name && r.LogNodeType === 'step' ) - if (!stepLogNode) { - const newStepLog = { - LogNodeType: 'step', - NodeId: event.step_name, - InputName_List: existingStep.InputObject_List || [], - OutputName: existingStep.OutputObject || '', - chatLog: [], - inputObject_Record: [], - ActionHistory: [event.action_result] - } - tempResults.push(newStepLog) - agentsStore.setExecutePlan([...currentResults, newStepLog]) - } else { - stepLogNode.ActionHistory.push(event.action_result) + if (existingLog) { + existingLog.ActionHistory = event.step_log_node.ActionHistory agentsStore.setExecutePlan([...currentResults]) + } else if (event.step_log_node) { + agentsStore.setExecutePlan([...currentResults, event.step_log_node]) } - } - break - - case 'step_complete': - stepExecutionStatus.value[event.step_name] = StepExecutionStatus.COMPLETED - - // Update complete step log - const currentResults = agentsStore.executePlan - const existingLog = currentResults.find( - r => r.NodeId === event.step_name && r.LogNodeType === 'step' - ) - if (existingLog) { - existingLog.ActionHistory = event.step_log_node.ActionHistory - // 触发响应式更新 - agentsStore.setExecutePlan([...currentResults]) - } else if (event.step_log_node) { - // 添加新的 step_log_node - agentsStore.setExecutePlan([...currentResults, event.step_log_node]) - } - // 添加 object_log_node - const updatedResults = agentsStore.executePlan - if (event.object_log_node) { - agentsStore.setExecutePlan([...updatedResults, event.object_log_node]) - } - break - - case 'execution_complete': - // 所有步骤都标记为完成 - readySteps.forEach(step => { - const stepName = step.StepName || step.Id || '' - if (stepExecutionStatus.value[stepName] !== StepExecutionStatus.COMPLETED) { - stepExecutionStatus.value[stepName] = StepExecutionStatus.COMPLETED + // 添加object_log_node + const updatedResults = agentsStore.executePlan + if (event.object_log_node) { + agentsStore.setExecutePlan([...updatedResults, event.object_log_node]) } - }) + break - resolve() - break + case 'execution_complete': + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + if (stepExecutionStatus.value[stepName] !== StepExecutionStatus.COMPLETED) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.COMPLETED + } + }) - case 'error': - console.error(' 执行错误:', event.message) - executionProgress.value.message = `执行错误: ${event.message}` - readySteps.forEach(step => { - const stepName = step.StepName || step.Id || '' - stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED - }) - reject(new Error(event.message)) - break - } - }, - // onError - (error: Error) => { - console.error(' 流式执行错误:', error) - executionProgress.value.message = `执行失败: ${error.message}` - readySteps.forEach(step => { - const stepName = step.StepName || step.Id || '' - stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED - }) - reject(error) - }, - // onComplete - () => { - resolve() - } - ) - }) + // 关闭进度通知并显示完成通知 + if (currentProgressNotificationId.value) { + removeNotification(currentProgressNotificationId.value) + currentProgressNotificationId.value = null + } - // 批量执行成功后,递归执行下一批 - await executeNextReadyBatch() - } catch (error) { - ElMessage.error('批量执行失败') - // 重置所有执行状态 + success('任务执行完成', `所有步骤已执行完成`, { duration: 3000 }) + resolve() + break + + case 'error': + const errorMessage = event.message || event.error || '未知错误' + console.error('执行错误:', errorMessage) + + // 关闭进度通知并显示错误通知 + if (currentProgressNotificationId.value) { + removeNotification(currentProgressNotificationId.value) + currentProgressNotificationId.value = null + } + + error('执行错误', errorMessage, { duration: 5000 }) + + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED + }) + reject(new Error(errorMessage)) + break + } + }, + // onError: 处理错误 + (err: Error) => { + console.error('流式执行错误:', err) + + // 关闭进度通知并显示错误通知 + if (currentProgressNotificationId.value) { + removeNotification(currentProgressNotificationId.value) + currentProgressNotificationId.value = null + } + + error('执行失败', err.message, { duration: 5000 }) + + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED + }) + reject(err) + }, + // onComplete: 完成回调 + () => { + // 关闭进度通知 + if (currentProgressNotificationId.value) { + removeNotification(currentProgressNotificationId.value) + currentProgressNotificationId.value = null + } + resolve() + }, + // useWebSocket: 使用WebSocket + true, + // existingKeyObjects: 已存在的KeyObjects + {}, + // enableDynamic: 启用动态追加模式 + true, + // onExecutionStarted: 接收执行ID + (executionId: string) => { + console.log('动态执行已启动,执行ID:', executionId) + }, + // executionId: 前端生成的执行ID + currentExecutionId.value || undefined + ) + }) + + // 不再递归调用,而是通过watch监听追加新步骤 + } catch (err) { + error('执行失败', '批量执行失败') + loading.value = false + isPaused.value = false + isStreaming.value = false + } + } else { + // 没有更多已就绪的步骤 loading.value = false isPaused.value = false isStreaming.value = false - } - } else { - // No more ready steps - loading.value = false - // 重置暂停和流式状态 - isPaused.value = false - isStreaming.value = false - // Check if there are still waiting steps - const hasWaitingSteps = steps.some(step => step && !isStepReady(step)) + // 检查是否还有等待填充的步骤 + const hasWaitingSteps = steps.some(step => step && !isStepReady(step)) - if (hasWaitingSteps) { - const waitingStepNames = steps - .filter(step => step && !isStepReady(step)) - .map(step => step?.StepName || '未知') - executionProgress.value.message = `等待 ${waitingStepNames.length} 个步骤数据填充中...` - ElMessage.info(`等待 ${waitingStepNames.length} 个步骤数据填充中...`) - } else { - executionProgress.value.message = '所有步骤已完成' - ElMessage.success('所有步骤已完成') + if (hasWaitingSteps) { + const waitingStepNames = steps + .filter(step => step && !isStepReady(step)) + .map(step => step?.StepName || '未知') + info('等待数据填充', `等待 ${waitingStepNames.length} 个步骤数据填充中...`) + } else { + success('执行完成', '所有步骤已完成') + } } + } finally { + isExecutingNextBatch.value = false + } +} + +/** + * 移除通知 + */ +function removeNotification(id: string) { + const index = notifications.value.findIndex(n => n.id === id) + if (index !== -1) { + notifications.value.splice(index, 1) } } @@ -519,37 +881,86 @@ async function executeNextReadyBatch() { async function handlePauseResume() { if (isPaused.value) { // Resume execution - try { - if (websocket.connected) { - await websocket.send('resume_execution', { - goal: agentsStore.agentRawPlan.data?.['General Goal'] || '' - }) - // 只有在收到成功响应后才更新状态 + + // 检查是否需要重新执行(有步骤被修改) + if (reExecuteConfig.value.shouldReExecute) { + const startStepIndex = reExecuteConfig.value.startFromStepIndex + const startStep = collaborationProcess.value[startStepIndex] + + try { + // 确认对话框 + await ElMessageBox.confirm( + `检测到 ${reExecuteConfig.value.modifiedSteps.length} 个步骤已被修改\n` + + `将从步骤 "${startStep?.StepName}" 重新开始执行\n\n` + + `是否继续?`, + '检测到步骤修改', + { + confirmButtonText: '从修改步骤重新执行', + cancelButtonText: '取消', + type: 'warning' + } + ) + + // 清理执行结果 + clearExecutionResultsAfter(startStepIndex) + + // 重置暂停状态 isPaused.value = false - ElMessage.success('已恢复执行') - } else { - ElMessage.warning('WebSocket未连接,无法恢复执行') + isPausing.value = false + isStreaming.value = false + + // 从指定步骤重新执行 + await reExecuteFromStep(startStepIndex) + + // 重置修改标记 + resetStepVersions() + } catch { + // 用户取消 + info('已取消', '已取消重新执行') + } + } else { + // 没有修改,正常恢复执行 + try { + if (websocket.connected) { + await websocket.send('resume_execution', { + goal: agentsStore.agentRawPlan.data?.['General Goal'] || '' + }) + // 只有在收到成功响应后才更新状态 + isPaused.value = false + isPausing.value = false + success('已恢复', '已恢复执行') + } else { + warning('无法恢复', 'WebSocket未连接,无法恢复执行') + } + } catch (error) { + error('恢复失败', '恢复执行失败') + // 恢复失败时,保持原状态不变(仍然是暂停状态) } - } catch (error) { - ElMessage.error('恢复执行失败') - // 恢复失败时,保持原状态不变(仍然是暂停状态) } } else { // Pause execution try { if (websocket.connected) { + // 先设置 isPausing,允许接收当前正在执行的动作的结果 + isPausing.value = true + info('暂停中', '正在等待当前动作完成...') + await websocket.send('pause_execution', { goal: agentsStore.agentRawPlan.data?.['General Goal'] || '' }) - // 只有在收到成功响应后才更新状态 + + // 收到成功响应后,设置完全暂停状态 isPaused.value = true - ElMessage.success('已暂停执行,可稍后继续') + isPausing.value = false + success('已暂停', '已暂停执行,可稍后继续') } else { - ElMessage.warning('WebSocket未连接,无法暂停') + warning('无法暂停', 'WebSocket未连接,无法暂停') + isPausing.value = false } } catch (error) { - ElMessage.error('暂停执行失败') - // 暂停失败时,保持原状态不变(仍然是非暂停状态) + error('暂停失败', '暂停执行失败') + // 暂停失败时,重置状态 + isPausing.value = false } } } @@ -566,6 +977,35 @@ async function handleExecuteButtonClick() { await handleRun() } +/** + * 从指定步骤重新执行(支持边填充边执行) + * @param fromStepIndex 起始步骤索引 + */ +async function reExecuteFromStep(fromStepIndex: number) { + const steps = collaborationProcess.value + + // 设置执行状态 + loading.value = true + isStreaming.value = true + + // 重置将要执行的步骤的状态 + for (let i = fromStepIndex; i < steps.length; i++) { + const step = steps[i] + if (step) { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.READY + } + } + + // 使用批量执行模式,会自动处理依赖和边填充边执行 + await executeNextReadyBatch() + + success('重新执行完成', '所有步骤已重新执行完成') + loading.value = false + isPaused.value = false + isStreaming.value = false +} + async function handleRun() { // Check if there are ready steps const readySteps = stepsReadyStatus.value.ready @@ -598,11 +1038,12 @@ async function handleRun() { // Start execution loading.value = true - currentExecutionIndex.value = 0 // Clear previous execution results and status agentsStore.setExecutePlan([]) stepExecutionStatus.value = {} + sentStepIds.value.clear() + currentExecutionId.value = null // Start batch executing first batch of ready steps await executeNextReadyBatch() @@ -796,14 +1237,8 @@ defineExpose({ id="task-results" :class="{ 'is-running': agentsStore.executePlan.length > 0 }" > - -
- - {{ executionProgress.message }} - - {{ executionProgress.currentStep }}/{{ executionProgress.totalSteps }} - -
+ +
@@ -899,7 +1334,7 @@ defineExpose({
- +
- -