diff --git a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py index ea102fa..02b763b 100644 --- a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py +++ b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py @@ -5,6 +5,7 @@ from typing import List, Dict, Set, Generator, Any import AgentCoord.RehearsalEngine_V2.Action as Action import AgentCoord.util as util from termcolor import colored +from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager # ==================== 配置参数 ==================== @@ -236,6 +237,14 @@ async def execute_step_async_streaming( # 分批执行动作 for batch_index, batch_indices in enumerate(batches): + # 在每个批次执行前检查暂停状态 + print(f"🔍 [DEBUG] 步骤 {StepName}: 批次 {batch_index+1}/{len(batches)} 执行前,检查暂停状态...") + should_continue = await execution_state_manager.async_check_pause() + if not should_continue: + # 用户请求停止,中断执行 + util.print_colored("🛑 用户请求停止执行", "red") + return + batch_size = len(batch_indices) if batch_size > 1: @@ -316,13 +325,19 @@ def executePlan_streaming( AgentProfile_Dict: Dict ) -> Generator[str, None, None]: """ - 执行计划(流式返回版本) + 执行计划(流式返回版本,支持暂停/恢复) 返回生成器,每次返回 SSE 格式的字符串 使用方式: for event in executePlan_streaming(...): yield event """ + # 初始化执行状态 + general_goal = plan.get("General Goal", "") + execution_state_manager.start_execution(general_goal) + + print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green")) + # 准备执行 KeyObjects = {} finishedStep_index = -1 @@ -349,6 +364,17 @@ def executePlan_streaming( """异步生产者:每收到一个事件就放入队列""" try: for step_index, stepDescrip in enumerate(steps_to_run): + # 在每个步骤执行前检查暂停状态 + should_continue = await execution_state_manager.async_check_pause() + if not should_continue: + # 用户请求停止,中断执行 + print(colored("🛑 用户请求停止执行", "red")) + await queue.put({ + "type": "error", + "message": "执行已被用户停止" + }) + return + # 执行单个步骤(流式) async for event in execute_step_async_streaming( stepDescrip, @@ -358,7 +384,21 @@ def executePlan_streaming( step_index, total_steps ): + # 检查是否需要停止 + if execution_state_manager.is_stopped(): + await queue.put({ + "type": "error", + "message": "执行已被用户停止" + }) + return + await queue.put(event) # ← 立即放入队列 + except Exception as e: + # 发送错误信息 + await queue.put({ + "type": "error", + "message": f"执行出错: {str(e)}" + }) finally: await queue.put(None) # ← 发送完成信号 @@ -386,12 +426,13 @@ def executePlan_streaming( # 等待生产者任务完成 loop.run_until_complete(producer_task) - # 发送完成信号 - complete_event = json.dumps({ - "type": "execution_complete", - "total_steps": total_steps - }, ensure_ascii=False) - yield f"data: {complete_event}\n\n" + # 如果不是被停止的,发送完成信号 + if not execution_state_manager.is_stopped(): + complete_event = json.dumps({ + "type": "execution_complete", + "total_steps": total_steps + }, ensure_ascii=False) + yield f"data: {complete_event}\n\n" finally: # 清理任务 @@ -399,3 +440,5 @@ def executePlan_streaming( if not producer_task.done(): producer_task.cancel() loop.close() + + \ No newline at end of file diff --git a/backend/AgentCoord/RehearsalEngine_V2/execution_state.py b/backend/AgentCoord/RehearsalEngine_V2/execution_state.py new file mode 100644 index 0000000..fa71914 --- /dev/null +++ b/backend/AgentCoord/RehearsalEngine_V2/execution_state.py @@ -0,0 +1,190 @@ +""" +全局执行状态管理器 +用于支持任务的暂停、恢复和停止功能 +使用轮询检查机制,确保线程安全 +""" + +import threading +import asyncio +import time +from typing import Optional +from enum import Enum + + +class ExecutionStatus(Enum): + """执行状态枚举""" + RUNNING = "running" # 正在运行 + PAUSED = "paused" # 已暂停 + STOPPED = "stopped" # 已停止 + IDLE = "idle" # 空闲 + + +class ExecutionStateManager: + """ + 全局执行状态管理器(单例模式) + + 功能: + - 管理任务执行状态(运行/暂停/停止) + - 使用轮询检查机制,避免异步事件的线程问题 + - 提供线程安全的状态查询和修改接口 + """ + + _instance: Optional['ExecutionStateManager'] = None + _lock = threading.Lock() + + def __new__(cls): + """单例模式""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + """初始化状态管理器""" + if self._initialized: + return + + self._initialized = True + self._status = ExecutionStatus.IDLE + self._current_goal: Optional[str] = None # 当前执行的任务目标 + # 使用简单的布尔标志,而不是 asyncio.Event + self._should_pause = False + self._should_stop = False + + def get_status(self) -> ExecutionStatus: + """获取当前执行状态""" + with self._lock: + return self._status + + def set_goal(self, goal: str): + """设置当前执行的任务目标""" + with self._lock: + self._current_goal = goal + + def get_goal(self) -> Optional[str]: + """获取当前执行的任务目标""" + with self._lock: + return self._current_goal + + def start_execution(self, goal: str): + """开始执行""" + with self._lock: + self._status = ExecutionStatus.RUNNING + self._current_goal = goal + self._should_pause = False + self._should_stop = False + print(f"🚀 [DEBUG] start_execution: 状态设置为 RUNNING, goal={goal}") + + def pause_execution(self) -> bool: + """ + 暂停执行 + + Returns: + bool: 是否成功暂停 + """ + with self._lock: + if self._status != ExecutionStatus.RUNNING: + print(f"⚠️ [DEBUG] pause_execution: 当前状态不是RUNNING,而是 {self._status}") + return False + self._status = ExecutionStatus.PAUSED + self._should_pause = True + print(f"⏸️ [DEBUG] pause_execution: 状态设置为PAUSED, should_pause=True") + return True + + def resume_execution(self) -> bool: + """ + 恢复执行 + + Returns: + bool: 是否成功恢复 + """ + with self._lock: + if self._status != ExecutionStatus.PAUSED: + print(f"⚠️ [DEBUG] resume_execution: 当前状态不是PAUSED,而是 {self._status}") + return False + self._status = ExecutionStatus.RUNNING + self._should_pause = False + print(f"▶️ [DEBUG] resume_execution: 状态设置为RUNNING, should_pause=False") + return True + + def stop_execution(self) -> bool: + """ + 停止执行 + + Returns: + bool: 是否成功停止 + """ + with self._lock: + if self._status in [ExecutionStatus.IDLE, ExecutionStatus.STOPPED]: + return False + self._status = ExecutionStatus.STOPPED + self._should_stop = True + self._should_pause = False + print(f"🛑 [DEBUG] stop_execution: 状态设置为STOPPED") + return True + + def reset(self): + """重置状态为空闲""" + with self._lock: + self._status = ExecutionStatus.IDLE + self._current_goal = None + self._should_pause = False + self._should_stop = False + print(f"🔄 [DEBUG] reset: 状态重置为IDLE") + + async def async_check_pause(self): + """ + 异步检查是否需要暂停(轮询方式) + + 如果处于暂停状态,会阻塞当前协程直到恢复或停止 + 应该在执行循环的关键点调用此方法 + + Returns: + bool: 如果返回True表示应该继续执行,False表示应该停止 + """ + # 使用轮询检查,避免异步事件问题 + while True: + # 检查停止标志 + if self._should_stop: + print("🛑 [DEBUG] async_check_pause: 检测到停止信号") + return False + + # 检查暂停状态 + if self._should_pause: + # 处于暂停状态,等待恢复 + print("⏸️ [DEBUG] async_check_pause: 检测到暂停,等待恢复...") + await asyncio.sleep(0.1) # 短暂睡眠,避免占用CPU + + # 如果恢复,继续执行 + if not self._should_pause: + print("▶️ [DEBUG] async_check_pause: 从暂停中恢复!") + continue + # 如果停止了,返回 + if self._should_stop: + return False + # 继续等待 + continue + + # 既没有停止也没有暂停,可以继续执行 + return True + + def is_paused(self) -> bool: + """检查是否处于暂停状态""" + with self._lock: + return self._status == ExecutionStatus.PAUSED + + def is_running(self) -> bool: + """检查是否正在运行""" + with self._lock: + return self._status == ExecutionStatus.RUNNING + + def is_stopped(self) -> bool: + """检查是否已停止""" + with self._lock: + return self._status == ExecutionStatus.STOPPED + + +# 全局单例实例 +execution_state_manager = ExecutionStateManager() diff --git a/backend/server.py b/backend/server.py index 8cd9417..3a4482f 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1,4 +1,5 @@ from flask import Flask, request, jsonify, Response, stream_with_context +from flask_socketio import SocketIO, emit, join_room, leave_room import json from DataProcess import Add_Collaboration_Brief_FrontEnd from AgentCoord.RehearsalEngine_V2.ExecutePlan import executePlan @@ -34,6 +35,8 @@ AgentBoard = None AgentProfile_Dict = {} Request_Cache: dict[str, str] = {} app = Flask(__name__) +app.config['SECRET_KEY'] = 'agentcoord-secret-key' +socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') @app.route("/fill_stepTask_TaskProcess", methods=["post"]) @@ -373,6 +376,1172 @@ def init(): AgentProfile_Dict = {} +# ==================== WebSocket 连接管理 ==================== +@socketio.on('connect') +def handle_connect(): + """客户端连接""" + print(f"✅ WebSocket client connected: {request.sid}") + emit('connected', {'sid': request.sid, 'message': 'WebSocket连接成功'}) + + +@socketio.on('disconnect') +def handle_disconnect(): + """客户端断开连接""" + print(f"❌ WebSocket client disconnected: {request.sid}") + + +@socketio.on('ping') +def handle_ping(): + """心跳检测""" + emit('pong') + + +# ==================== WebSocket 事件处理 ==================== +# 注:以下为WebSocket版本的接口,与REST API并存 +# 逐步迁移核心接口到WebSocket + + +@socketio.on('execute_plan_optimized') +def handle_execute_plan_optimized_ws(data): + """ + WebSocket版本:优化版流式执行计划 + 支持步骤级流式 + 动作级智能并行 + + 请求格式: + { + "id": "request-id", + "action": "execute_plan_optimized", + "data": { + "plan": {...}, + "num_StepToRun": null, + "RehearsalLog": [] + } + } + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + plan = incoming_data.get("plan") + num_StepToRun = incoming_data.get("num_StepToRun") + RehearsalLog = incoming_data.get("RehearsalLog", []) + + # 使用原有的流式执行函数 + for chunk in executePlan_streaming( + plan=plan, + num_StepToRun=num_StepToRun, + RehearsalLog=RehearsalLog, + AgentProfile_Dict=AgentProfile_Dict, + ): + # 通过WebSocket推送进度 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'data': chunk.replace('data: ', '').replace('\n\n', '') + }) + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'complete', + 'data': None + }) + + except Exception as e: + # 发送错误信息 + emit('progress', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('generate_base_plan') +def handle_generate_base_plan_ws(data): + """ + WebSocket版本:生成基础计划(支持流式/分步返回) + + 请求格式: + { + "id": "request-id", + "action": "generate_base_plan", + "data": { + "General Goal": "...", + "Initial Input Object": [...] + } + } + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "generating_outline", "message": "正在生成计划大纲..."} + - progress: {"id": request_id, "status": "streaming", "stage": "processing_steps", "step": 1, "total": 3, "message": "正在处理步骤 1/3..."} + - response: {"id": request_id, "status": "success", "data": basePlan} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + requestIdentifier = str(( + "/generate_basePlan", + incoming_data.get("General Goal"), + incoming_data.get("Initial Input Object"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 阶段1:生成计划大纲 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generating_outline', + 'message': '📋 正在生成计划大纲...' + }) + + from AgentCoord.PlanEngine.planOutline_Generator import generate_PlanOutline + PlanOutline = generate_PlanOutline( + InitialObject_List=incoming_data.get("Initial Input Object"), + General_Goal=incoming_data.get("General Goal") + ) + + # 阶段2:构建基础计划(逐步添加步骤) + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'building_plan', + 'total_steps': len(PlanOutline), + 'message': f'🔨 正在构建计划,共 {len(PlanOutline)} 个步骤...' + }) + + basePlan = { + "General Goal": incoming_data.get("General Goal"), + "Initial Input Object": incoming_data.get("Initial Input Object"), + "Collaboration Process": [] + } + + for idx, stepItem in enumerate(PlanOutline, 1): + # 添加智能体选择和任务流程字段 + stepItem["AgentSelection"] = [] + stepItem["TaskProcess"] = [] + stepItem["Collaboration_Brief_frontEnd"] = { + "template": "", + "data": {} + } + basePlan["Collaboration Process"].append(stepItem) + + # 发送进度更新 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'adding_step', + 'step': idx, + 'total': len(PlanOutline), + 'step_name': stepItem.get("StepName", ""), + 'message': f'✅ 已添加步骤 {idx}/{len(PlanOutline)}: {stepItem.get("StepName", "")}' + }) + + # 阶段3:处理渲染规范 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'rendering', + 'message': '🎨 正在处理渲染规范...' + }) + + basePlan_withRenderSpec = Add_Collaboration_Brief_FrontEnd(basePlan) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = basePlan_withRenderSpec + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': '✅ 计划生成完成' + }) + + # 返回最终结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': basePlan_withRenderSpec + }) + + except ValueError as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': f"An unexpected error occurred: {str(e)}" + }) + + +@socketio.on('fill_step_task') +def handle_fill_step_task_ws(data): + """ + WebSocket版本:填充步骤任务(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始填充步骤任务..."} + - progress: {"id": request_id, "status": "streaming", "stage": "agent_selection", "message": "正在生成智能体选择..."} + - progress: {"id": request_id, "status": "streaming", "stage": "task_process", "message": "正在生成任务流程..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "任务填充完成"} + - response: {"id": request_id, "status": "success", "data": filled_stepTask} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + print(f"📥 [WS] 收到 fill_step_task 请求: {request_id}") + + try: + # 检查缓存 + requestIdentifier = str(( + "/fill_stepTask", + incoming_data.get("General Goal"), + incoming_data.get("stepTask"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + print(f"✅ [WS] 使用缓存返回: {request_id}") + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'message': f'🚀 开始填充步骤任务: {incoming_data.get("stepTask", {}).get("StepName", "")}' + }) + + print(f"⏳ [WS] 开始处理 fill_step_task...") + + # 阶段1:生成智能体选择 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'agent_selection', + 'message': '👥 正在生成智能体选择...' + }) + + from AgentCoord.PlanEngine.AgentSelection_Generator import generate_AgentSelection + stepTask = incoming_data.get("stepTask") + Current_Task = { + "TaskName": stepTask.get("StepName"), + "InputObject_List": stepTask.get("InputObject_List"), + "OutputObject": stepTask.get("OutputObject"), + "TaskContent": stepTask.get("TaskContent"), + } + AgentSelection = generate_AgentSelection( + General_Goal=incoming_data.get("General Goal"), + Current_Task=Current_Task, + Agent_Board=AgentBoard, + ) + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'agent_selection_done', + 'message': f'✅ 智能体选择完成: {", ".join(AgentSelection)}' + }) + + # 阶段2:生成任务流程 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'task_process', + 'message': '📝 正在生成任务流程...' + }) + + import AgentCoord.util as util + from AgentCoord.PlanEngine.taskProcess_Generator import generate_TaskProcess + Current_Task_Description = { + "TaskName": stepTask.get("StepName"), + "AgentInvolved": [ + {"Name": name, "Profile": AgentProfile_Dict[name]} + for name in AgentSelection + ], + "InputObject_List": stepTask.get("InputObject_List"), + "OutputObject": stepTask.get("OutputObject"), + "CurrentTaskDescription": util.generate_template_sentence_for_CollaborationBrief( + stepTask.get("InputObject_List"), + stepTask.get("OutputObject"), + AgentSelection, + stepTask.get("TaskContent"), + ), + } + TaskProcess = generate_TaskProcess( + General_Goal=incoming_data.get("General Goal"), + Current_Task_Description=Current_Task_Description, + ) + + # 构建结果 + stepTask["AgentSelection"] = AgentSelection + stepTask["TaskProcess"] = TaskProcess + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'task_process_done', + 'message': f'✅ 任务流程生成完成,共 {len(TaskProcess)} 个动作' + }) + + # 阶段3:处理渲染规范 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'rendering', + 'message': '🎨 正在处理渲染规范...' + }) + + filled_stepTask = Add_Collaboration_Brief_FrontEnd(stepTask) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = filled_stepTask + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': '✅ 任务填充完成' + }) + + # 返回结果 + print(f"✅ [WS] fill_step_task 处理完成: {request_id}") + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': filled_stepTask + }) + + except Exception as e: + print(f"❌ [WS] fill_step_task 处理失败: {request_id}, 错误: {str(e)}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('fill_step_task_process') +def handle_fill_step_task_process_ws(data): + """ + WebSocket版本:填充步骤任务流程(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始生成任务流程..."} + - progress: {"id": request_id, "status": "streaming", "stage": "generating", "message": "正在生成任务流程..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "任务流程生成完成"} + - response: {"id": request_id, "status": "success", "data": filled_stepTask} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + requestIdentifier = str(( + "/fill_stepTask_TaskProcess", + incoming_data.get("General Goal"), + incoming_data.get("stepTask_lackTaskProcess"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + stepTask = incoming_data.get("stepTask_lackTaskProcess") + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'message': f'🚀 开始生成任务流程: {stepTask.get("StepName", "")}' + }) + + # 生成任务流程 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generating', + 'message': '📝 正在生成任务流程...' + }) + + filled_stepTask = fill_stepTask_TaskProcess( + General_Goal=incoming_data.get("General Goal"), + stepTask=stepTask, + AgentProfile_Dict=AgentProfile_Dict, + ) + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generated', + 'message': f'✅ 任务流程生成完成,共 {len(filled_stepTask.get("TaskProcess", []))} 个动作' + }) + + # 处理渲染规范 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'rendering', + 'message': '🎨 正在处理渲染规范...' + }) + + filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = filled_stepTask + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': '✅ 任务流程生成完成' + }) + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': filled_stepTask + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('branch_plan_outline') +def handle_branch_plan_outline_ws(data): + """ + WebSocket版本:分支任务大纲(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "branch": 1, "total": 3, "message": "正在生成分支 1/3..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "分支大纲生成完成"} + - response: {"id": request_id, "status": "success", "data": branchList} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + requestIdentifier = str(( + "/branch_PlanOutline", + incoming_data.get("branch_Number"), + incoming_data.get("Modification_Requirement"), + incoming_data.get("Existing_Steps"), + incoming_data.get("Baseline_Completion"), + incoming_data.get("Initial Input Object"), + incoming_data.get("General Goal"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + branch_Number = incoming_data.get("branch_Number") + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'total_branches': branch_Number, + 'message': f'🚀 开始生成分支大纲,共 {branch_Number} 个分支...' + }) + + # 生成大纲分支(逐步生成) + from AgentCoord.util.converter import read_LLM_Completion + from AgentCoord.PlanEngine.branch_PlanOutline import JSON_PLAN_OUTLINE_BRANCHING + import json + + prompt = f""" +## Instruction +Based on "Existing Steps", your task is to comeplete the "Remaining Steps" for the plan for "General Goal". +Note: "Modification Requirement" specifies how to modify the "Baseline Completion" for a better/alternative solution. + +**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all content, including StepName, TaskContent, and OutputObject fields.** + +## General Goal (Specify the general goal for the plan) +{incoming_data.get("General Goal")} + +## Initial Key Object List (Specify the list of initial key objects available for use as the input object of a Step) +{incoming_data.get("Initial Input Object")} + +## Existing Steps +{json.dumps(incoming_data.get("Existing_Steps"), indent=4)} + +## Baseline Completion +{json.dumps(incoming_data.get("Baseline_Completion"), indent=4)} + +## Modification Requirement +{incoming_data.get("Modification_Requirement")} +""" + + branch_List = [] + for i in range(branch_Number): + # 发送进度更新 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generating_branch', + 'branch': i + 1, + 'total': branch_Number, + 'message': f'🌿 正在生成分支大纲 {i+1}/{branch_Number}...' + }) + + messages = [ + { + "role": "system", + "content": f" The JSON object must use the schema: {json.dumps(JSON_PLAN_OUTLINE_BRANCHING.model_json_schema(), indent=2)}", + }, + {"role": "system", "content": prompt}, + ] + Remaining_Steps = read_LLM_Completion(messages, useGroq=False)["Remaining Steps"] + branch_List.append(Remaining_Steps) + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'branch_done', + 'branch': i + 1, + 'total': branch_Number, + 'steps_count': len(Remaining_Steps), + 'message': f'✅ 分支 {i+1}/{branch_Number} 生成完成,包含 {len(Remaining_Steps)} 个步骤' + }) + + # 处理渲染规范 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'rendering', + 'message': '🎨 正在处理渲染规范...' + }) + + branchList = Add_Collaboration_Brief_FrontEnd(branch_List) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = branchList + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': f'✅ 分支大纲生成完成,共 {branch_Number} 个分支' + }) + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': branchList + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('branch_task_process') +def handle_branch_task_process_ws(data): + """ + WebSocket版本:分支任务流程(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "branch": 1, "total": 3, "message": "正在生成分支任务流程 1/3..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "分支任务流程生成完成"} + - response: {"id": request_id, "status": "success", "data": branchList} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + requestIdentifier = str(( + "/branch_TaskProcess", + incoming_data.get("branch_Number"), + incoming_data.get("Modification_Requirement"), + incoming_data.get("Existing_Steps"), + incoming_data.get("Baseline_Completion"), + incoming_data.get("stepTaskExisting"), + incoming_data.get("General Goal"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + branch_Number = incoming_data.get("branch_Number") + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'total_branches': branch_Number, + 'message': f'🚀 开始生成分支任务流程,共 {branch_Number} 个分支...' + }) + + # 生成任务流程分支(逐步生成) + from AgentCoord.util.converter import read_LLM_Completion + from AgentCoord.PlanEngine.branch_TaskProcess import ( + JSON_TASK_PROCESS_BRANCHING, + ACT_SET, + PROMPT_TASK_PROCESS_BRANCHING + ) + import AgentCoord.util as util + import json + + stepTaskExisting = incoming_data.get("stepTaskExisting") + Current_Task_Description = { + "TaskName": stepTaskExisting.get("StepName"), + "AgentInvolved": [ + {"Name": name, "Profile": AgentProfile_Dict[name]} + for name in stepTaskExisting.get("AgentSelection", []) + ], + "InputObject_List": stepTaskExisting.get("InputObject_List"), + "OutputObject": stepTaskExisting.get("OutputObject"), + "CurrentTaskDescription": util.generate_template_sentence_for_CollaborationBrief( + stepTaskExisting.get("InputObject_List"), + stepTaskExisting.get("OutputObject"), + stepTaskExisting.get("AgentSelection"), + stepTaskExisting.get("TaskContent"), + ), + } + + prompt = PROMPT_TASK_PROCESS_BRANCHING.format( + Modification_Requirement=incoming_data.get("Modification_Requirement"), + Current_Task_Description=json.dumps(Current_Task_Description, indent=4), + Existing_Steps=json.dumps(incoming_data.get("Existing_Steps"), indent=4), + Baseline_Completion=json.dumps(incoming_data.get("Baseline_Completion"), indent=4), + General_Goal=incoming_data.get("General Goal"), + Act_Set=ACT_SET, + ) + + branch_List = [] + for i in range(branch_Number): + # 发送进度更新 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generating_branch', + 'branch': i + 1, + 'total': branch_Number, + 'message': f'🌿 正在生成分支任务流程 {i+1}/{branch_Number}...' + }) + + messages = [ + { + "role": "system", + "content": f" The JSON object must use the schema: {json.dumps(JSON_TASK_PROCESS_BRANCHING.model_json_schema(), indent=2)}", + }, + {"role": "system", "content": prompt}, + ] + Remaining_Steps = read_LLM_Completion(messages, useGroq=False)["Remaining Steps"] + branch_List.append(Remaining_Steps) + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'branch_done', + 'branch': i + 1, + 'total': branch_Number, + 'actions_count': len(Remaining_Steps), + 'message': f'✅ 分支 {i+1}/{branch_Number} 生成完成,包含 {len(Remaining_Steps)} 个动作' + }) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = branch_List + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': f'✅ 分支任务流程生成完成,共 {branch_Number} 个分支' + }) + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': branch_List + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('agent_select_modify_init') +def handle_agent_select_modify_init_ws(data): + """ + WebSocket版本:智能体选择评分初始化(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "message": "开始生成能力需求..."} + - progress: {"id": request_id, "status": "streaming", "stage": "requirements", "message": "能力需求: [xxx, yyy, zzz]"} + - progress: {"id": request_id, "status": "streaming", "stage": "scoring", "aspect": 1, "total": 3, "message": "正在评分能力 1/3..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "智能体评分完成"} + - response: {"id": request_id, "status": "success", "data": scoreTable} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + requestIdentifier = str(( + "/agentSelectModify_init", + incoming_data.get("General Goal"), + incoming_data.get("stepTask"), + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'message': '🚀 开始生成智能体能力需求...' + }) + + from AgentCoord.util.converter import read_LLM_Completion + from AgentCoord.PlanEngine.AgentSelectModify import ( + JSON_ABILITY_REQUIREMENT_GENERATION, + PROMPT_ABILITY_REQUIREMENT_GENERATION, + agentAbilityScoring + ) + import json + + # 阶段1:生成能力需求列表 + stepTask = incoming_data.get("stepTask") + Current_Task = { + "TaskName": stepTask.get("StepName"), + "InputObject_List": stepTask.get("InputObject_List"), + "OutputObject": stepTask.get("OutputObject"), + "TaskContent": stepTask.get("TaskContent"), + } + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'generating_requirements', + 'message': '📋 正在生成能力需求列表...' + }) + + messages = [ + { + "role": "system", + "content": f" The JSON object must use the schema: {json.dumps(JSON_ABILITY_REQUIREMENT_GENERATION.model_json_schema(), indent=2)}", + }, + { + "role": "system", + "content": PROMPT_ABILITY_REQUIREMENT_GENERATION.format( + General_Goal=incoming_data.get("General Goal"), + Current_Task=json.dumps(Current_Task, indent=4), + ), + }, + ] + Ability_Requirement_List = read_LLM_Completion(messages)["AbilityRequirement"] + + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'requirements_generated', + 'requirements': Ability_Requirement_List, + 'message': f'✅ 能力需求生成完成: {", ".join(Ability_Requirement_List)}' + }) + + # 阶段2:为每个能力需求进行智能体评分 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'scoring', + 'total_aspects': len(Ability_Requirement_List), + 'message': f'📊 开始为 {len(Ability_Requirement_List)} 个能力需求评分...' + }) + + scoreTable = agentAbilityScoring(AgentBoard, Ability_Requirement_List) + + # 逐步报告评分进度 + for idx, (ability, scores) in enumerate(scoreTable.items(), 1): + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'aspect_scored', + 'aspect': idx, + 'total': len(Ability_Requirement_List), + 'ability': ability, + 'message': f'✅ 能力 "{ability}" 评分完成 ({idx}/{len(Ability_Requirement_List)})' + }) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = scoreTable + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': f'✅ 智能体评分完成,共 {len(Ability_Requirement_List)} 个能力维度' + }) + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': scoreTable + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('agent_select_modify_add_aspect') +def handle_agent_select_modify_add_aspect_ws(data): + """ + WebSocket版本:添加新的评估维度(支持流式/分步返回) + + 流式事件: + - progress: {"id": request_id, "status": "streaming", "stage": "starting", "aspect": "新能力", "message": "开始为新能力评分..."} + - progress: {"id": request_id, "status": "streaming", "stage": "scoring", "message": "正在评分..."} + - progress: {"id": request_id, "status": "streaming", "stage": "complete", "message": "评分完成"} + - response: {"id": request_id, "status": "success", "data": scoreTable} + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + # 检查缓存 + aspectList = incoming_data.get("aspectList") + newAspect = aspectList[-1] if aspectList else None + requestIdentifier = str(( + "/agentSelectModify_addAspect", + aspectList, + )) + + if USE_CACHE and requestIdentifier in Request_Cache: + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': Request_Cache[requestIdentifier] + }) + return + + # 开始处理 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'starting', + 'message': f'🚀 开始为新能力维度评分: {newAspect or "Unknown"}' + }) + + # 添加新维度并评分 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'scoring', + 'aspect': newAspect, + 'message': f'📊 正在为能力 "{newAspect}" 评分...' + }) + + scoreTable = AgentSelectModify_addAspect( + aspectList=aspectList, + Agent_Board=AgentBoard + ) + + # 发送完成信号 + emit('progress', { + 'id': request_id, + 'status': 'streaming', + 'stage': 'complete', + 'message': f'✅ 能力 "{newAspect}" 评分完成' + }) + + # 缓存结果 + if USE_CACHE: + Request_Cache[requestIdentifier] = scoreTable + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': scoreTable + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('set_agents') +def handle_set_agents_ws(data): + """ + WebSocket版本:设置智能体 + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + global AgentBoard, AgentProfile_Dict, yaml_data + + try: + AgentBoard = incoming_data + AgentProfile_Dict = {} + + for item in AgentBoard: + name = item["Name"] + if all(item.get(field) for field in ["apiUrl", "apiKey", "apiModel"]): + agent_config = { + "profile": item["Profile"], + "apiUrl": item["apiUrl"], + "apiKey": item["apiKey"], + "apiModel": item["apiModel"], + "useCustomAPI": True + } + else: + agent_config = { + "profile": item["Profile"], + "apiUrl": yaml_data.get("OPENAI_API_BASE"), + "apiKey": yaml_data.get("OPENAI_API_KEY"), + "apiModel": yaml_data.get("OPENAI_API_MODEL"), + "useCustomAPI": False + } + AgentProfile_Dict[name] = agent_config + + # 返回结果 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': {"code": 200, "content": "set agentboard successfully"} + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('stop_generation') +def handle_stop_generation(data): + """ + WebSocket版本:停止生成任务 + + 请求格式: + { + "id": "request-id", + "action": "stop_generation", + "data": { + "goal": "任务描述" + } + } + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + goal = incoming_data.get('goal', '') + + # TODO: 这里可以添加实际的停止逻辑 + # 例如:设置全局停止标志,通知所有正在运行的生成任务停止 + print(f"🛑 收到停止生成请求: goal={goal}") + + # 返回成功响应 + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': {"message": "已发送停止信号"} + }) + + except Exception as e: + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('pause_execution') +def handle_pause_execution(data): + """ + WebSocket版本:暂停任务执行 + + 请求格式: + { + "id": "request-id", + "action": "pause_execution", + "data": { + "goal": "任务描述" + } + } + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager + + goal = incoming_data.get('goal', '') + + # 检查当前执行的任务是否匹配 + current_goal = execution_state_manager.get_goal() + if current_goal and current_goal != goal: + print(f"⚠️ 任务目标不匹配: 当前={current_goal}, 请求={goal}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': '任务目标不匹配' + }) + return + + # 调用执行状态管理器暂停 + success = execution_state_manager.pause_execution() + + if success: + print(f"⏸️ [DEBUG] 暂停成功! 当前状态: {execution_state_manager.get_status().value}") + print(f"⏸️ [DEBUG] should_pause: {execution_state_manager._should_pause}") + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': {"message": "已暂停执行,可随时继续"} + }) + else: + print(f"⚠️ [DEBUG] 暂停失败,当前状态: {execution_state_manager.get_status().value}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': f'无法暂停,当前状态: {execution_state_manager.get_status().value}' + }) + + except Exception as e: + print(f"❌ 暂停执行失败: {str(e)}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + +@socketio.on('resume_execution') +def handle_resume_execution(data): + """ + WebSocket版本:恢复任务执行 + + 请求格式: + { + "id": "request-id", + "action": "resume_execution", + "data": { + "goal": "任务描述" + } + } + """ + request_id = data.get('id') + incoming_data = data.get('data', {}) + + try: + from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager + + goal = incoming_data.get('goal', '') + + # 检查当前执行的任务是否匹配 + current_goal = execution_state_manager.get_goal() + if current_goal and current_goal != goal: + print(f"⚠️ 任务目标不匹配: 当前={current_goal}, 请求={goal}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': '任务目标不匹配' + }) + return + + # 调用执行状态管理器恢复 + success = execution_state_manager.resume_execution() + + if success: + print(f"▶️ 已恢复执行: goal={goal}") + emit('response', { + 'id': request_id, + 'status': 'success', + 'data': {"message": "已恢复执行"} + }) + else: + print(f"⚠️ 恢复失败,当前状态: {execution_state_manager.get_status()}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': f'无法恢复,当前状态: {execution_state_manager.get_status().value}' + }) + + except Exception as e: + print(f"❌ 恢复执行失败: {str(e)}") + emit('response', { + 'id': request_id, + 'status': 'error', + 'error': str(e) + }) + + + if __name__ == "__main__": parser = argparse.ArgumentParser( description="start the backend for AgentCoord" @@ -381,8 +1550,9 @@ if __name__ == "__main__": "--port", type=int, default=8000, - help="set the port number, 8000 by defaul.", + help="set the port number, 8000 by default.", ) args = parser.parse_args() init() - app.run(host="0.0.0.0", port=args.port, debug=True) + # 使用 socketio.run 替代 app.run,支持WebSocket + socketio.run(app, host="0.0.0.0", port=args.port, debug=True, allow_unsafe_werkzeug=True) diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index 65ca112..ce3b605 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -1,4 +1,5 @@ import request from '@/utils/request' +import websocket from '@/utils/websocket' import type { Agent, IApiStepTask, IRawPlanResponse, IRawStepTask } from '@/stores' import { mockBackendAgentSelectModifyInit, @@ -82,7 +83,16 @@ export interface IFillAgentSelectionRequest { } class Api { - setAgents = (data: Pick[]) => { + // 默认使用WebSocket + private useWebSocketDefault = true + + setAgents = (data: Pick[], useWebSocket: boolean = this.useWebSocketDefault) => { + // 如果启用WebSocket且已连接,使用WebSocket + if (useWebSocket && websocket.connected) { + return websocket.send('set_agents', data) + } + + // 否则使用REST API return request({ url: '/setAgents', data, @@ -96,7 +106,23 @@ class Api { apiUrl?: string apiKey?: string apiModel?: string + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }) => { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + return websocket.send('generate_base_plan', { + 'General Goal': data.goal, + 'Initial Input Object': data.inputs, + apiUrl: data.apiUrl, + apiKey: data.apiKey, + apiModel: data.apiModel, + }, undefined, data.onProgress) + } + + // 否则使用REST API return request({ url: '/generate_basePlan', method: 'POST', @@ -143,13 +169,18 @@ class Api { /** * 优化版流式执行计划(阶段1+2:步骤级流式 + 动作级智能并行) * 无依赖关系的动作并行执行,有依赖关系的动作串行执行 + * + * 默认使用WebSocket,如果连接失败则降级到SSE */ executePlanOptimized = ( plan: IRawPlanResponse, onMessage: (event: StreamingEvent) => void, onError?: (error: Error) => void, onComplete?: () => void, + useWebSocket?: boolean, ) => { + const useWs = useWebSocket !== undefined ? useWebSocket : this.useWebSocketDefault + const data = { RehearsalLog: [], num_StepToRun: null, @@ -174,6 +205,41 @@ class Api { }, } + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + websocket.subscribe( + 'execute_plan_optimized', + data, + // onProgress + (progressData) => { + try { + // progressData 应该已经是解析后的对象了 + // 如果是字符串,说明后端发送的是 JSON 字符串,需要解析 + let event: StreamingEvent + if (typeof progressData === 'string') { + event = JSON.parse(progressData) + } else { + event = progressData as StreamingEvent + } + onMessage(event) + } catch (e) { + // Failed to parse WebSocket data + } + }, + // onComplete + () => { + onComplete?.() + }, + // onError + (error) => { + onError?.(error) + } + ) + return + } + + // 否则使用原有的SSE方式 + fetch('/api/executePlanOptimized', { method: 'POST', headers: { @@ -215,7 +281,7 @@ class Api { const event = JSON.parse(data) onMessage(event) } catch (e) { - console.error('Failed to parse SSE data:', e) + // Failed to parse SSE data } } } @@ -236,7 +302,24 @@ class Api { Baseline_Completion: number initialInputs: string[] goal: string + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }) => { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + return websocket.send('branch_plan_outline', { + branch_Number: data.branch_Number, + Modification_Requirement: data.Modification_Requirement, + Existing_Steps: data.Existing_Steps, + Baseline_Completion: data.Baseline_Completion, + 'Initial Input Object': data.initialInputs, + 'General Goal': data.goal, + }, undefined, data.onProgress) + } + + // 否则使用REST API return request({ url: '/branch_PlanOutline', method: 'POST', @@ -261,7 +344,24 @@ class Api { Baseline_Completion: number stepTaskExisting: any goal: string + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }) => { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + return websocket.send('branch_task_process', { + branch_Number: data.branch_Number, + Modification_Requirement: data.Modification_Requirement, + Existing_Steps: data.Existing_Steps, + Baseline_Completion: data.Baseline_Completion, + stepTaskExisting: data.stepTaskExisting, + 'General Goal': data.goal, + }, undefined, data.onProgress) + } + + // 否则使用REST API return request({ url: '/branch_TaskProcess', method: 'POST', @@ -276,38 +376,55 @@ class Api { }) } - fillStepTask = async (data: { goal: string; stepTask: any }): Promise => { - const response = await request< - { - 'General Goal': string - stepTask: any - }, - { - AgentSelection?: string[] - Collaboration_Brief_FrontEnd?: { - template: string - data: Record - } - InputObject_List?: string[] - OutputObject?: string - StepName?: string - TaskContent?: string - TaskProcess?: Array<{ - ID: string - ActionType: string - AgentName: string - Description: string - ImportantInput: string[] - }> - } - >({ - url: '/fill_stepTask', - method: 'POST', - data: { + fillStepTask = async (data: { + goal: string + stepTask: any + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void + }): Promise => { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + let response: any + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + response = await websocket.send('fill_step_task', { 'General Goal': data.goal, stepTask: data.stepTask, - }, - }) + }, undefined, data.onProgress) + } else { + // 否则使用REST API + response = await request< + { + 'General Goal': string + stepTask: any + }, + { + AgentSelection?: string[] + Collaboration_Brief_FrontEnd?: { + template: string + data: Record + } + InputObject_List?: string[] + OutputObject?: string + StepName?: string + TaskContent?: string + TaskProcess?: Array<{ + ID: string + ActionType: string + AgentName: string + Description: string + ImportantInput: string[] + }> + } + >({ + url: '/fill_stepTask', + method: 'POST', + data: { + 'General Goal': data.goal, + stepTask: data.stepTask, + }, + }) + } const vec2Hsl = (color: number[]): string => { const [h, s, l] = color @@ -347,40 +464,15 @@ class Api { goal: string stepTask: IApiStepTask agents: string[] + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }): Promise => { - const response = await request< - { - 'General Goal': string - stepTask_lackTaskProcess: { - StepName: string - TaskContent: string - InputObject_List: string[] - OutputObject: string - AgentSelection: string[] - } - }, - { - StepName?: string - TaskContent?: string - InputObject_List?: string[] - OutputObject?: string - AgentSelection?: string[] - TaskProcess?: Array<{ - ID: string - ActionType: string - AgentName: string - Description: string - ImportantInput: string[] - }> - Collaboration_Brief_FrontEnd?: { - template: string - data: Record - } - } - >({ - url: '/fill_stepTask_TaskProcess', - method: 'POST', - data: { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + let response: any + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + response = await websocket.send('fill_step_task_process', { 'General Goal': data.goal, stepTask_lackTaskProcess: { StepName: data.stepTask.name, @@ -389,8 +481,53 @@ class Api { OutputObject: data.stepTask.output, AgentSelection: data.agents, }, - }, - }) + }, undefined, data.onProgress) + } else { + // 否则使用REST API + response = await request< + { + 'General Goal': string + stepTask_lackTaskProcess: { + StepName: string + TaskContent: string + InputObject_List: string[] + OutputObject: string + AgentSelection: string[] + } + }, + { + StepName?: string + TaskContent?: string + InputObject_List?: string[] + OutputObject?: string + AgentSelection?: string[] + TaskProcess?: Array<{ + ID: string + ActionType: string + AgentName: string + Description: string + ImportantInput: string[] + }> + Collaboration_Brief_FrontEnd?: { + template: string + data: Record + } + } + >({ + url: '/fill_stepTask_TaskProcess', + method: 'POST', + data: { + 'General Goal': data.goal, + stepTask_lackTaskProcess: { + StepName: data.stepTask.name, + TaskContent: data.stepTask.content, + InputObject_List: data.stepTask.inputs, + OutputObject: data.stepTask.output, + AgentSelection: data.agents, + }, + }, + }) + } const vec2Hsl = (color: number[]): string => { const [h, s, l] = color @@ -409,7 +546,7 @@ class Api { } } - const process = (response.TaskProcess || []).map((action) => ({ + const process = (response.TaskProcess || []).map((action: any) => ({ id: action.ID, type: action.ActionType, agent: action.AgentName, @@ -437,17 +574,15 @@ class Api { agentSelectModifyInit = async (data: { goal: string stepTask: any + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }): Promise>> => { - const response = await request< - { - 'General Goal': string - stepTask: any - }, - Record> - >({ - url: '/agentSelectModify_init', - method: 'POST', - data: { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + let response: Record> + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + response = await websocket.send('agent_select_modify_init', { 'General Goal': data.goal, stepTask: { StepName: data.stepTask.StepName || data.stepTask.name, @@ -455,8 +590,29 @@ class Api { InputObject_List: data.stepTask.InputObject_List || data.stepTask.inputs, OutputObject: data.stepTask.OutputObject || data.stepTask.output, }, - }, - }) + }, undefined, data.onProgress) + } else { + // 否则使用REST API + response = await request< + { + 'General Goal': string + stepTask: any + }, + Record> + >({ + url: '/agentSelectModify_init', + method: 'POST', + data: { + 'General Goal': data.goal, + stepTask: { + StepName: data.stepTask.StepName || data.stepTask.name, + TaskContent: data.stepTask.TaskContent || data.stepTask.content, + InputObject_List: data.stepTask.InputObject_List || data.stepTask.inputs, + OutputObject: data.stepTask.OutputObject || data.stepTask.output, + }, + }, + }) + } const transformedData: Record> = {} @@ -480,22 +636,35 @@ class Api { */ agentSelectModifyAddAspect = async (data: { aspectList: string[] + useWebSocket?: boolean + onProgress?: (progress: { status: string; stage?: string; message?: string; [key: string]: any }) => void }): Promise<{ aspectName: string agentScores: Record }> => { - const response = await request< - { - aspectList: string[] - }, - Record> - >({ - url: '/agentSelectModify_addAspect', - method: 'POST', - data: { + const useWs = data.useWebSocket !== undefined ? data.useWebSocket : this.useWebSocketDefault + let response: Record> + + // 如果启用WebSocket且已连接,使用WebSocket + if (useWs && websocket.connected) { + response = await websocket.send('agent_select_modify_add_aspect', { aspectList: data.aspectList, - }, - }) + }, undefined, data.onProgress) + } else { + // 否则使用REST API + response = await request< + { + aspectList: string[] + }, + Record> + >({ + url: '/agentSelectModify_addAspect', + method: 'POST', + data: { + aspectList: data.aspectList, + }, + }) + } /** * 获取新添加的维度 diff --git a/frontend/src/assets/icons/Pause.svg b/frontend/src/assets/icons/Pause.svg new file mode 100644 index 0000000..9a3afac --- /dev/null +++ b/frontend/src/assets/icons/Pause.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/frontend/src/assets/icons/action.svg b/frontend/src/assets/icons/action.svg index c094e4e..7e64f72 100644 --- a/frontend/src/assets/icons/action.svg +++ b/frontend/src/assets/icons/action.svg @@ -1 +1,5 @@ - + + + + diff --git a/frontend/src/assets/icons/paper-plane.svg b/frontend/src/assets/icons/paper-plane.svg index a4b91ad..874ad41 100644 --- a/frontend/src/assets/icons/paper-plane.svg +++ b/frontend/src/assets/icons/paper-plane.svg @@ -1 +1,6 @@ - + + + + diff --git a/frontend/src/assets/icons/stoprunning.svg b/frontend/src/assets/icons/stoprunning.svg new file mode 100644 index 0000000..6a1e5a3 --- /dev/null +++ b/frontend/src/assets/icons/stoprunning.svg @@ -0,0 +1,7 @@ + + + + \ No newline at end of file diff --git a/frontend/src/assets/icons/video-play.svg b/frontend/src/assets/icons/video-play.svg new file mode 100644 index 0000000..0fa966f --- /dev/null +++ b/frontend/src/assets/icons/video-play.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/frontend/src/layout/components/Main/Task.vue b/frontend/src/layout/components/Main/Task.vue index 8e85a7f..1a3d8f6 100644 --- a/frontend/src/layout/components/Main/Task.vue +++ b/frontend/src/layout/components/Main/Task.vue @@ -3,6 +3,7 @@ import { ref, onMounted, computed, reactive, nextTick } from 'vue' import SvgIcon from '@/components/SvgIcon/index.vue' import { useAgentsStore, useConfigStore } from '@/stores' import api from '@/api' +import websocket from '@/utils/websocket' import { changeBriefs } from '@/utils/collaboration_Brief_FrontEnd.ts' import { ElMessage } from 'element-plus' import AssignmentButton from './TaskTemplate/TaskSyllabus/components/AssignmentButton.vue' @@ -18,6 +19,10 @@ const triggerOnFocus = ref(true) const isFocus = ref(false) const hasAutoSearched = ref(false) const isExpanded = ref(false) +// 添加一个状态来跟踪是否正在填充步骤数据 +const isFillingSteps = ref(false) +// 存储当前填充任务的取消函数 +const currentStepAbortController = ref<{ cancel: () => void } | null>(null) // 解析URL参数 function getUrlParam(param: string): string | null { @@ -83,6 +88,39 @@ function resetTextareaHeight() { }) } +// 停止填充数据的处理函数 +async function handleStop() { + try { + // 通过 WebSocket 发送停止信号 + if (websocket.connected) { + await websocket.send('stop_generation', { + goal: searchValue.value + }) + ElMessage.success('已发送停止信号,正在停止生成...') + } else { + ElMessage.warning('WebSocket 未连接,无法停止') + } + } catch (error) { + console.error('停止生成失败:', error) + ElMessage.error('停止生成失败') + } finally { + // 无论后端是否成功停止,都重置状态 + isFillingSteps.value = false + currentStepAbortController.value = null + } +} + +// 处理按钮点击事件 +function handleButtonClick() { + if (isFillingSteps.value) { + // 如果正在填充数据,点击停止 + handleStop() + } else { + // 否则开始搜索 + handleSearch() + } +} + async function handleSearch() { // 用于标记大纲是否成功加载 let outlineLoaded = false @@ -103,6 +141,11 @@ async function handleSearch() { inputs: [] }) + // 检查是否已被停止 + if (!isFillingSteps.value && currentStepAbortController.value) { + return + } + // 处理简报数据格式 outlineData['Collaboration Process'] = changeBriefs(outlineData['Collaboration Process']) @@ -111,6 +154,9 @@ async function handleSearch() { outlineLoaded = true emit('search', searchValue.value) + // 开始填充步骤详情,设置状态 + isFillingSteps.value = true + // 并行填充所有步骤的详情 const steps = outlineData['Collaboration Process'] || [] @@ -118,6 +164,12 @@ async function handleSearch() { const fillStepWithRetry = async (step: any, retryCount = 0): Promise => { const maxRetries = 2 // 最多重试2次 + // 检查是否已停止 + if (!isFillingSteps.value) { + console.log('检测到停止信号,跳过步骤填充') + return + } + try { if (!step.StepName) { console.warn('步骤缺少 StepName,跳过填充详情') @@ -135,6 +187,12 @@ async function handleSearch() { } }) + // 再次检查是否已停止(在 API 调用后) + if (!isFillingSteps.value) { + console.log('检测到停止信号,跳过更新步骤详情') + return + } + // 更新该步骤的详情到 store updateStepDetail(step.StepName, detailedStep) } catch (error) { @@ -166,6 +224,9 @@ async function handleSearch() { } } finally { triggerOnFocus.value = true + // 完成填充,重置状态 + isFillingSteps.value = false + currentStepAbortController.value = null // 如果大纲加载失败,确保关闭loading if (!outlineLoaded) { agentsStore.setAgentRawPlan({ loading: false }) @@ -255,18 +316,24 @@ onMounted(() => { class="task-button" color="linear-gradient(to right, #00C7D2, #315AB4)" size="large" - title="点击搜索任务" + :title="isFillingSteps ? '点击停止生成' : '点击搜索任务'" circle :loading="agentsStore.agentRawPlan.loading" :disabled="!searchValue" - @click.stop="handleSearch" + @click.stop="handleButtonClick" > + diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/ExecutePlan.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/ExecutePlan.vue index 1f40c70..1c19c0a 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/ExecutePlan.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/ExecutePlan.vue @@ -38,11 +38,12 @@ const data = computed(() => { if (result.NodeId === props.nodeId) { // LogNodeType 为 object直接渲染Content if (result.LogNodeType === 'object') { - return { + const data = { Description: props.nodeId, Content: sanitize(result.content), LogNodeType: result.LogNodeType } + return data } if (!result.ActionHistory) { diff --git a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue index df0f751..2b8aabe 100644 --- a/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue +++ b/frontend/src/layout/components/Main/TaskTemplate/TaskResult/index.vue @@ -2,15 +2,17 @@ import { computed, onUnmounted, ref, reactive, nextTick, watch, onMounted } from 'vue' import { throttle } from 'lodash' import { AnchorLocations, BezierConnector } from '@jsplumb/browser-ui' +import { ElMessage, ElMessageBox } from 'element-plus' import AdditionalOutputCard from './AdditionalOutputCard.vue' import SvgIcon from '@/components/SvgIcon/index.vue' import { getActionTypeDisplay, getAgentMapIcon } from '@/layout/components/config.ts' import { type ConnectArg, Jsplumb } from '@/layout/components/Main/TaskTemplate/utils.ts' import variables from '@/styles/variables.module.scss' -import { type IRawStepTask, useAgentsStore } from '@/stores' +import { type IRawStepTask, useAgentsStore, type IRawPlanResponse } from '@/stores' import api, { type StreamingEvent } from '@/api' import ProcessCard from '../TaskProcess/ProcessCard.vue' import ExecutePlan from './ExecutePlan.vue' +import websocket from '@/utils/websocket' const emit = defineEmits<{ (e: 'refreshLine'): void @@ -24,7 +26,106 @@ const collaborationProcess = computed(() => { return agentsStore.agentRawPlan.data?.['Collaboration Process'] ?? [] }) -// 监听额外产物变化 +// Step execution status enum +enum StepExecutionStatus { + WAITING = 'waiting', // Waiting for data + READY = 'ready', // Ready to execute + RUNNING = 'running', // Currently running + COMPLETED = 'completed', // Execution completed + FAILED = 'failed' // Execution failed +} + +// Execution status for each step +const stepExecutionStatus = ref>({}) + +// Check if step is ready to execute (has TaskProcess data) +const isStepReady = (step: IRawStepTask) => { + return step.TaskProcess && step.TaskProcess.length > 0 +} + +// 判断动作是否有执行结果 +const hasActionResult = (step: IRawStepTask, actionId: string) => { + const stepResult = agentsStore.executePlan.find( + r => r.NodeId === step.StepName && r.LogNodeType === 'step' + ) + if (!stepResult || !stepResult.ActionHistory) { + return false + } + return stepResult.ActionHistory.some(action => action.ID === actionId) +} + +// 判断 OutputObject 是否有执行结果 +const hasObjectResult = (outputObject?: string) => { + if (!outputObject) return false + return agentsStore.executePlan.some(r => r.NodeId === outputObject && r.LogNodeType === 'object') +} + +// Get execution status of a step +const getStepStatus = (step: IRawStepTask): StepExecutionStatus => { + const stepName = step.StepName || step.Id || '' + + // If status is already recorded, return it + if (stepExecutionStatus.value[stepName]) { + return stepExecutionStatus.value[stepName] + } + + // Check if has TaskProcess data + if (isStepReady(step)) { + return StepExecutionStatus.READY + } else { + return StepExecutionStatus.WAITING + } +} + +// Calculate preparation status of all steps +const stepsReadyStatus = computed(() => { + const steps = collaborationProcess.value + const readySteps: string[] = [] + const waitingSteps: string[] = [] + + steps.forEach(step => { + if (isStepReady(step)) { + readySteps.push(step.StepName || 'Unknown step') + } else { + waitingSteps.push(step.StepName || 'Unknown step') + } + }) + + return { + ready: readySteps, + waiting: waitingSteps, + allReady: waitingSteps.length === 0, + totalCount: steps.length, + readyCount: readySteps.length + } +}) + +// Watch step data changes, update waiting step status +watch( + () => collaborationProcess.value, + newSteps => { + newSteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + const currentStatus = stepExecutionStatus.value[stepName] + + // If step was waiting and now has data, set to ready + if (currentStatus === StepExecutionStatus.WAITING && isStepReady(step)) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.READY + + // 如果正在执行中,自动执行下一批就绪的步骤 + if (autoExecuteEnabled.value && loading.value) { + executeNextReadyBatch() + } + } + }) + }, + { deep: true } +) + +// Enable auto-execution (auto-execute when new steps are ready) +const autoExecuteEnabled = ref(true) + +// Watch additional outputs changes watch( () => agentsStore.additionalOutputs, () => { @@ -37,7 +138,7 @@ watch( { deep: true } ) -// 编辑逻辑 +// Edit logic const editMode = ref(false) const editMap = reactive>({}) const editBuffer = reactive>({}) @@ -76,7 +177,7 @@ const jsplumb = new Jsplumb('task-results-main', { } }) -// 操作折叠面板时要实时的刷新连线 +// Refresh connections in real-time when collapsing panels let timer: ReturnType | null = null function handleCollapse() { if (timer) { @@ -87,7 +188,7 @@ function handleCollapse() { emit('refreshLine') }, 1) as ReturnType - // 默认三秒后已经完全打开 + // Default fully open after 3 seconds const timer1 = setTimeout(() => { if (timer) { clearInterval(timer) @@ -105,7 +206,7 @@ function handleCollapse() { }) } -// 创建内部连线 +// Create internal connections function createInternalLine(id?: string) { const arr: ConnectArg[] = [] jsplumb.reset() @@ -188,160 +289,340 @@ const executionProgress = ref({ currentAction: 0, totalActions: 0, currentStepName: '', - message: '正在执行...' + message: '准备执行任务...' }) -async function handleRun() { - // 清空之前的执行结果 - agentsStore.setExecutePlan([]) - const tempResults: any[] = [] +// Pause functionality state +const isPaused = ref(false) // Whether paused +const isStreaming = ref(false) // Whether streaming data (backend started returning) +const isButtonLoading = ref(false) // Button brief loading state (prevent double-click) - try { - loading.value = true +// Store current step execution index (for sequential execution) +const currentExecutionIndex = ref(0) - // 使用优化版流式API(阶段1+2:步骤级流式 + 动作级智能并行) - api.executePlanOptimized( - agentsStore.agentRawPlan.data!, - // onMessage: 处理每个事件 - (event: StreamingEvent) => { - switch (event.type) { - case 'step_start': - // 步骤开始 - executionProgress.value = { - currentStep: event.step_index + 1, - totalSteps: event.total_steps, - currentAction: 0, - totalActions: 0, - currentStepName: event.step_name, - message: `正在执行步骤 ${event.step_index + 1}/${event.total_steps}: ${ - event.step_name - }` - } - console.log( - `📋 步骤 ${event.step_index + 1}/${event.total_steps} 开始: ${event.step_name}` - ) - break +// Execute next batch of ready steps (batch execution to maintain dependencies) +async function executeNextReadyBatch() { + const steps = collaborationProcess.value - case 'action_complete': - // 动作完成 - const parallelInfo = event.batch_info?.is_parallel - ? ` [批次 ${event.batch_info!.batch_index + 1}, 并行 ${ - event.batch_info!.batch_size - } 个]` - : '' + // Collect all ready but unexecuted steps (in order, until hitting unready step) + const readySteps: IRawStepTask[] = [] - executionProgress.value = { - ...executionProgress.value, - currentAction: event.completed_actions, - totalActions: event.total_actions, - message: `步骤 ${event.step_index + 1}/${executionProgress.value.totalSteps}: ${ - event.step_name - } - 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}` - } + for (let i = 0; i < steps.length; i++) { + const step = steps[i] + if (!step) continue - console.log( - `✅ 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}: ${event.action_result.ActionType} by ${event.action_result.AgentName}` - ) + // 如果步骤已就绪,加入批量执行列表 + if (isStepReady(step)) { + const stepName = step.StepName || step.Id || '' + const status = stepExecutionStatus.value[stepName] - // 实时更新到 store(找到对应的步骤并添加 ActionHistory) - const step = collaborationProcess.value.find(s => s.StepName === event.step_name) - if (step) { - const stepLogNode = tempResults.find( - r => r.NodeId === event.step_name && r.LogNodeType === 'step' - ) - if (!stepLogNode) { - // 创建步骤日志节点 - const newStepLog = { - LogNodeType: 'step', - NodeId: event.step_name, - InputName_List: step.InputObject_List || [], - OutputName: step.OutputObject || '', - chatLog: [], - inputObject_Record: [], - ActionHistory: [event.action_result] - } - tempResults.push(newStepLog) - } else { - // 追加动作结果 - stepLogNode.ActionHistory.push(event.action_result) - } - - // 更新 store - agentsStore.setExecutePlan([...tempResults]) - } - break - - case 'step_complete': - // 步骤完成 - console.log(`🎯 步骤完成: ${event.step_name}`) - - // 更新步骤日志节点 - const existingStepLog = tempResults.find( - r => r.NodeId === event.step_name && r.LogNodeType === 'step' - ) - if (existingStepLog) { - existingStepLog.ActionHistory = event.step_log_node.ActionHistory - } else { - tempResults.push(event.step_log_node) - } - - // 添加对象日志节点 - tempResults.push(event.object_log_node) - - // 更新 store - agentsStore.setExecutePlan([...tempResults]) - break - - case 'execution_complete': - // 执行完成 - executionProgress.value.message = `执行完成!共 ${event.total_steps} 个步骤` - console.log(`🎉 执行完成,共 ${event.total_steps} 个步骤`) - - // 确保所有结果都保存到 store - agentsStore.setExecutePlan([...tempResults]) - break - - case 'error': - // 错误 - console.error('❌ 执行错误:', event.message) - executionProgress.value.message = `执行错误: ${event.message}` - break - } - }, - // onError: 处理错误 - (error: Error) => { - console.error('❌ 流式执行错误:', error) - executionProgress.value.message = `执行失败: ${error.message}` - }, - // onComplete: 执行完成 - () => { - console.log('✅ 流式执行完成') - loading.value = false + // Only collect unexecuted steps + if (!status || status === StepExecutionStatus.READY) { + readySteps.push(step) } - ) - } catch (error) { - console.error('执行失败:', error) - executionProgress.value.message = '执行失败,请重试' - } finally { - // loading 会在 onComplete 中设置为 false + } else { + // Stop at first unready step (maintain step order) + break + } + } + + if (readySteps.length > 0) { + try { + // Mark all steps to be executed as running + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.RUNNING + }) + + // 构建包含所有已就绪步骤的计划数据(批量发送,保持依赖关系) + const batchPlan: IRawPlanResponse = { + 'General Goal': agentsStore.agentRawPlan.data?.['General Goal'] || '', + 'Initial Input Object': agentsStore.agentRawPlan.data?.['Initial Input Object'] || [], + 'Collaboration Process': readySteps // Key: batch send steps + } + + const tempResults: any[] = [] + + // Execute these steps in batch + await new Promise((resolve, reject) => { + api.executePlanOptimized( + batchPlan, + // onMessage: handle each event + (event: StreamingEvent) => { + // When backend starts returning data, set isStreaming (only once) + if (!isStreaming.value) { + isStreaming.value = true + } + + // If paused, ignore events + if (isPaused.value) { + return + } + + switch (event.type) { + case 'step_start': + // 使用后端返回的 step_index 和 total_steps + executionProgress.value = { + currentStep: (event.step_index || 0) + 1, + totalSteps: event.total_steps || collaborationProcess.value.length, + currentAction: 0, + totalActions: 0, + currentStepName: event.step_name, + message: `正在执行步骤 ${event.step_index + 1}/${ + event.total_steps || collaborationProcess.value.length + }: ${event.step_name}` + } + break + + case 'action_complete': + const parallelInfo = event.batch_info?.is_parallel + ? ` [并行 ${event.batch_info!.batch_size} 个动作]` + : '' + + // 使用后端返回的 step_index,total_steps 使用当前进度中的值 + const stepIndexForAction = event.step_index || 0 + const totalStepsValue = + executionProgress.value.totalSteps || collaborationProcess.value.length + executionProgress.value = { + ...executionProgress.value, + currentAction: event.completed_actions, + totalActions: event.total_actions, + message: `步骤 ${stepIndexForAction + 1}/${totalStepsValue}: ${ + event.step_name + } - 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}` + } + + // Update store in real-time + const existingStep = collaborationProcess.value.find( + s => s.StepName === event.step_name + ) + if (existingStep) { + const currentResults = agentsStore.executePlan + const stepLogNode = currentResults.find( + r => r.NodeId === event.step_name && r.LogNodeType === 'step' + ) + if (!stepLogNode) { + const newStepLog = { + LogNodeType: 'step', + NodeId: event.step_name, + InputName_List: existingStep.InputObject_List || [], + OutputName: existingStep.OutputObject || '', + chatLog: [], + inputObject_Record: [], + ActionHistory: [event.action_result] + } + tempResults.push(newStepLog) + agentsStore.setExecutePlan([...currentResults, newStepLog]) + } else { + stepLogNode.ActionHistory.push(event.action_result) + agentsStore.setExecutePlan([...currentResults]) + } + } + break + + case 'step_complete': + stepExecutionStatus.value[event.step_name] = StepExecutionStatus.COMPLETED + + // Update complete step log + const currentResults = agentsStore.executePlan + const existingLog = currentResults.find( + r => r.NodeId === event.step_name && r.LogNodeType === 'step' + ) + if (existingLog) { + existingLog.ActionHistory = event.step_log_node.ActionHistory + // 触发响应式更新 + agentsStore.setExecutePlan([...currentResults]) + } else if (event.step_log_node) { + // 添加新的 step_log_node + agentsStore.setExecutePlan([...currentResults, event.step_log_node]) + } + // 添加 object_log_node + const updatedResults = agentsStore.executePlan + if (event.object_log_node) { + agentsStore.setExecutePlan([...updatedResults, event.object_log_node]) + } + break + + case 'execution_complete': + // 所有步骤都标记为完成 + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + if (stepExecutionStatus.value[stepName] !== StepExecutionStatus.COMPLETED) { + stepExecutionStatus.value[stepName] = StepExecutionStatus.COMPLETED + } + }) + + resolve() + break + + case 'error': + console.error(' 执行错误:', event.message) + executionProgress.value.message = `执行错误: ${event.message}` + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED + }) + reject(new Error(event.message)) + break + } + }, + // onError + (error: Error) => { + console.error(' 流式执行错误:', error) + executionProgress.value.message = `执行失败: ${error.message}` + readySteps.forEach(step => { + const stepName = step.StepName || step.Id || '' + stepExecutionStatus.value[stepName] = StepExecutionStatus.FAILED + }) + reject(error) + }, + // onComplete + () => { + resolve() + } + ) + }) + + // 批量执行成功后,递归执行下一批 + await executeNextReadyBatch() + } catch (error) { + ElMessage.error('批量执行失败') + // 重置所有执行状态 + loading.value = false + isPaused.value = false + isStreaming.value = false + } + } else { + // No more ready steps + loading.value = false + // 重置暂停和流式状态 + isPaused.value = false + isStreaming.value = false + + // Check if there are still waiting steps + const hasWaitingSteps = steps.some(step => step && !isStepReady(step)) + + if (hasWaitingSteps) { + const waitingStepNames = steps + .filter(step => step && !isStepReady(step)) + .map(step => step?.StepName || '未知') + executionProgress.value.message = `等待 ${waitingStepNames.length} 个步骤数据填充中...` + ElMessage.info(`等待 ${waitingStepNames.length} 个步骤数据填充中...`) + } else { + executionProgress.value.message = '所有步骤已完成' + ElMessage.success('所有步骤已完成') + } } } -// 查看任务过程 +// Pause/Resume handler +async function handlePauseResume() { + if (isPaused.value) { + // Resume execution + try { + if (websocket.connected) { + await websocket.send('resume_execution', { + goal: agentsStore.agentRawPlan.data?.['General Goal'] || '' + }) + // 只有在收到成功响应后才更新状态 + isPaused.value = false + ElMessage.success('已恢复执行') + } else { + ElMessage.warning('WebSocket未连接,无法恢复执行') + } + } catch (error) { + ElMessage.error('恢复执行失败') + // 恢复失败时,保持原状态不变(仍然是暂停状态) + } + } else { + // Pause execution + try { + if (websocket.connected) { + await websocket.send('pause_execution', { + goal: agentsStore.agentRawPlan.data?.['General Goal'] || '' + }) + // 只有在收到成功响应后才更新状态 + isPaused.value = true + ElMessage.success('已暂停执行,可稍后继续') + } else { + ElMessage.warning('WebSocket未连接,无法暂停') + } + } catch (error) { + ElMessage.error('暂停执行失败') + // 暂停失败时,保持原状态不变(仍然是非暂停状态) + } + } +} + +// Handle execute button click +async function handleExecuteButtonClick() { + // If streaming, show pause/resume functionality + if (isStreaming.value) { + await handlePauseResume() + return + } + + // Otherwise, execute normal task execution logic + await handleRun() +} + +async function handleRun() { + // Check if there are ready steps + const readySteps = stepsReadyStatus.value.ready + const waitingSteps = stepsReadyStatus.value.waiting + + if (readySteps.length === 0 && waitingSteps.length > 0) { + ElMessageBox.confirm( + `All ${waitingSteps.length} steps的数据还在填充中:\n\n${waitingSteps.join( + '、' + )}\n\n建议等待数据填充完成后再执行。`, + 'Step data not ready', + { + confirmButtonText: 'I Understand', + cancelButtonText: 'Close', + type: 'warning' + } + ) + return + } + + // Set button brief loading state (prevent double-click) + isButtonLoading.value = true + setTimeout(() => { + isButtonLoading.value = false + }, 1000) + + // Reset pause and streaming state + isPaused.value = false + isStreaming.value = false + + // Start execution + loading.value = true + currentExecutionIndex.value = 0 + + // Clear previous execution results and status + agentsStore.setExecutePlan([]) + stepExecutionStatus.value = {} + + // Start batch executing first batch of ready steps + await executeNextReadyBatch() +} + +// View task process async function handleTaskProcess() { drawerVisible.value = true } -// 重置执行结果 +// Reset execution results function handleRefresh() { agentsStore.setExecutePlan([]) } -// 添加滚动状态标识 +// Add scroll state indicator const isScrolling = ref(false) let scrollTimer: ReturnType | null = null -// 修改滚动处理函数 +// Modify scroll handler function handleScroll() { isScrolling.value = true emit('refreshLine') @@ -357,7 +638,7 @@ function handleScroll() { }, 300) as ReturnType } -// 修改鼠标事件处理函数 +// Modify mouse event handler const handleMouseEnter = throttle(id => { if (!isScrolling.value) { createInternalLine(id) @@ -374,21 +655,21 @@ function clear() { jsplumb.reset() } -//封装连线重绘方法 +// Encapsulate line redraw method const redrawInternalLines = (highlightId?: string) => { - // 等待 DOM 更新完成 + // Waiting DOM 更新完成 nextTick(() => { // 清除旧连线 jsplumb.reset() - // 等待 DOM 稳定后重新绘制 + // Waiting DOM 稳定后重新绘制 setTimeout(() => { createInternalLine(highlightId) }, 100) }) } -//监听 collaborationProcess 变化,自动重绘连线 +// Watch collaborationProcess changes, auto redraw connections watch( () => collaborationProcess, () => { @@ -397,7 +678,7 @@ watch( { deep: true } ) -// 组件挂载后初始化连线 +// Initialize connections after component mount onMounted(() => { // 初始化时绘制连线 nextTick(() => { @@ -407,7 +688,7 @@ onMounted(() => { }) }) -//按钮交互状态管理 +// Button interaction state management const buttonHoverState = ref<'process' | 'execute' | 'refresh' | null>(null) let buttonHoverTimer: ReturnType | null = null const handleProcessMouseEnter = () => { @@ -448,13 +729,13 @@ const handleButtonMouseLeave = () => { }, 50) // 适当减少延迟时间 } -// 添加离开组件时的清理 +// Cleanup when leaving component onUnmounted(() => { if (buttonHoverTimer) { clearTimeout(buttonHoverTimer) } }) -// 计算按钮类名 +// Calculate button class names const processBtnClass = computed(() => { if (buttonHoverState.value === 'refresh' || buttonHoverState.value === 'execute') { return 'circle' @@ -476,7 +757,7 @@ const refreshBtnClass = computed(() => { return agentsStore.executePlan.length > 0 ? 'ellipse' : 'circle' }) -// 计算按钮是否显示文字 +// Calculate whether to show button text const showProcessText = computed(() => { return buttonHoverState.value === 'process' }) @@ -490,17 +771,17 @@ const showRefreshText = computed(() => { return buttonHoverState.value === 'refresh' }) -// 计算按钮标题 +// Calculate button titles const processBtnTitle = computed(() => { - return buttonHoverState.value === 'process' ? '任务过程' : '点击查看任务过程' + return buttonHoverState.value === 'process' ? '查看任务流程' : '点击查看任务流程' }) const executeBtnTitle = computed(() => { - return showExecuteText.value ? '任务执行' : '点击运行' + return showExecuteText.value ? '任务执行' : '点击执行任务' }) const refreshBtnTitle = computed(() => { - return showRefreshText.value ? '重置执行结果' : '点击重置执行状态' + return showRefreshText.value ? '重置结果' : '点击重置执行状态' }) defineExpose({ @@ -544,7 +825,7 @@ defineExpose({ 重置 - + 任务过程 - + - + + + + + + + + + + + + + - 任务执行 + + 任务执行 + 继续执行 + 暂停执行 @@ -597,7 +899,7 @@ defineExpose({ - +
-