diff --git a/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py new file mode 100644 index 0000000..ea102fa --- /dev/null +++ b/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py @@ -0,0 +1,401 @@ +import asyncio +import json +import time +from typing import List, Dict, Set, Generator, Any +import AgentCoord.RehearsalEngine_V2.Action as Action +import AgentCoord.util as util +from termcolor import colored + + +# ==================== 配置参数 ==================== +# 最大并发请求数(避免触发 OpenAI API 速率限制) +MAX_CONCURRENT_REQUESTS = 2 + +# 批次之间的延迟(秒) +BATCH_DELAY = 1.0 + +# 429 错误重试次数和延迟 +MAX_RETRIES = 3 +RETRY_DELAY = 5.0 # 秒 + +# ==================== 限流器 ==================== +class RateLimiter: + """异步限流器,控制并发请求数量""" + + def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS): + self.semaphore = asyncio.Semaphore(max_concurrent) + self.max_concurrent = max_concurrent + + async def __aenter__(self): + await self.semaphore.acquire() + return self + + async def __aexit__(self, *args): + self.semaphore.release() + + +# 全局限流器实例 +rate_limiter = RateLimiter() + + +def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]: + """ + 构建动作依赖图 + 返回: {action_index: [dependent_action_indices]} + """ + dependency_map = {i: [] for i in range(len(TaskProcess))} + + for i, action in enumerate(TaskProcess): + important_inputs = action.get('ImportantInput', []) + if not important_inputs: + continue + + # 检查是否依赖其他动作的 ActionResult + for j, prev_action in enumerate(TaskProcess): + if i == j: + continue + + # 判断是否依赖前一个动作的结果 + if any( + inp.startswith('ActionResult:') and + inp == f'ActionResult:{prev_action["ID"]}' + for inp in important_inputs + ): + dependency_map[i].append(j) + + return dependency_map + + +def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]: + """ + 将动作分为多个批次,每批内部可以并行执行 + 返回: [[batch1_indices], [batch2_indices], ...] + """ + batches = [] + completed: Set[int] = set() + + while len(completed) < len(TaskProcess): + # 找出所有依赖已满足的动作 + ready_to_run = [ + i for i in range(len(TaskProcess)) + if i not in completed and + all(dep in completed for dep in dependency_map[i]) + ] + + if not ready_to_run: + # 避免死循环(循环依赖情况) + remaining = [i for i in range(len(TaskProcess)) if i not in completed] + if remaining: + print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow")) + ready_to_run = remaining[:1] # 每次只执行一个 + else: + break + + batches.append(ready_to_run) + completed.update(ready_to_run) + + return batches + + +async def execute_single_action_async( + ActionInfo: Dict, + General_Goal: str, + TaskDescription: str, + OutputName: str, + KeyObjects: Dict, + ActionHistory: List, + agentName: str, + AgentProfile_Dict: Dict, + InputName_List: List[str] +) -> Dict: + """ + 异步执行单个动作 + """ + actionType = ActionInfo["ActionType"] + + # 创建动作实例 + if actionType in Action.customAction_Dict: + currentAction = Action.customAction_Dict[actionType]( + info=ActionInfo, + OutputName=OutputName, + KeyObjects=KeyObjects, + ) + else: + currentAction = Action.BaseAction( + info=ActionInfo, + OutputName=OutputName, + KeyObjects=KeyObjects, + ) + + # 执行动作(在线程池中运行,避免阻塞事件循环) + loop = asyncio.get_event_loop() + ActionInfo_with_Result = await loop.run_in_executor( + None, + lambda: currentAction.run( + General_Goal=General_Goal, + TaskDescription=TaskDescription, + agentName=agentName, + AgentProfile_Dict=AgentProfile_Dict, + InputName_List=InputName_List, + OutputName=OutputName, + KeyObjects=KeyObjects, + ActionHistory=ActionHistory, + ) + ) + + return ActionInfo_with_Result + + +async def execute_step_async_streaming( + stepDescrip: Dict, + General_Goal: str, + AgentProfile_Dict: Dict, + KeyObjects: Dict, + step_index: int, + total_steps: int +) -> Generator[Dict, None, None]: + """ + 异步执行单个步骤(支持流式返回) + 返回生成器,每完成一个动作就 yield 一次 + """ + # 准备步骤信息 + StepName = ( + util.camel_case_to_normal(stepDescrip["StepName"]) + if util.is_camel_case(stepDescrip["StepName"]) + else stepDescrip["StepName"] + ) + TaskContent = stepDescrip["TaskContent"] + InputName_List = ( + [ + ( + util.camel_case_to_normal(obj) + if util.is_camel_case(obj) + else obj + ) + for obj in stepDescrip["InputObject_List"] + ] + if stepDescrip["InputObject_List"] is not None + else None + ) + OutputName = ( + util.camel_case_to_normal(stepDescrip["OutputObject"]) + if util.is_camel_case(stepDescrip["OutputObject"]) + else stepDescrip["OutputObject"] + ) + Agent_List = stepDescrip["AgentSelection"] + TaskProcess = stepDescrip["TaskProcess"] + + TaskDescription = ( + util.converter.generate_template_sentence_for_CollaborationBrief( + input_object_list=InputName_List, + output_object=OutputName, + agent_list=Agent_List, + step_task=TaskContent, + ) + ) + + # 初始化日志节点 + inputObject_Record = [ + {InputName: KeyObjects[InputName]} for InputName in InputName_List + ] + stepLogNode = { + "LogNodeType": "step", + "NodeId": StepName, + "InputName_List": InputName_List, + "OutputName": OutputName, + "chatLog": [], + "inputObject_Record": inputObject_Record, + } + objectLogNode = { + "LogNodeType": "object", + "NodeId": OutputName, + "content": None, + } + + # 先返回步骤开始信息 + yield { + "type": "step_start", + "step_index": step_index, + "total_steps": total_steps, + "step_name": StepName, + "task_description": TaskDescription, + } + + # 构建动作依赖图 + dependency_map = build_action_dependency_graph(TaskProcess) + batches = get_parallel_batches(TaskProcess, dependency_map) + + ActionHistory = [] + total_actions = len(TaskProcess) + completed_actions = 0 + + util.print_colored( + f"📋 步骤 {step_index + 1}/{total_steps}: {StepName} ({total_actions} 个动作, 分 {len(batches)} 批并行执行)", + text_color="cyan" + ) + + # 分批执行动作 + for batch_index, batch_indices in enumerate(batches): + batch_size = len(batch_indices) + + if batch_size > 1: + util.print_colored( + f"🚦 批次 {batch_index + 1}/{len(batches)}: 并行执行 {batch_size} 个动作", + text_color="blue" + ) + else: + util.print_colored( + f"🔄 动作 {completed_actions + 1}/{total_actions}: 串行执行", + text_color="yellow" + ) + + # 并行执行当前批次的所有动作 + tasks = [ + execute_single_action_async( + TaskProcess[i], + General_Goal=General_Goal, + TaskDescription=TaskDescription, + OutputName=OutputName, + KeyObjects=KeyObjects, + ActionHistory=ActionHistory, + agentName=TaskProcess[i]["AgentName"], + AgentProfile_Dict=AgentProfile_Dict, + InputName_List=InputName_List + ) + for i in batch_indices + ] + + # 等待当前批次完成 + batch_results = await asyncio.gather(*tasks) + + # 逐个返回结果(流式) + for i, result in enumerate(batch_results): + action_index_in_batch = batch_indices[i] + completed_actions += 1 + + util.print_colored( + f"✅ 动作 {completed_actions}/{total_actions} 完成: {result['ActionType']} by {result['AgentName']}", + text_color="green" + ) + + ActionHistory.append(result) + + # 立即返回该动作结果(流式) + yield { + "type": "action_complete", + "step_index": step_index, + "step_name": StepName, + "action_index": action_index_in_batch, + "total_actions": total_actions, + "completed_actions": completed_actions, + "action_result": result, + "batch_info": { + "batch_index": batch_index, + "batch_size": batch_size, + "is_parallel": batch_size > 1 + } + } + + # 步骤完成 + objectLogNode["content"] = KeyObjects[OutputName] + stepLogNode["ActionHistory"] = ActionHistory + + yield { + "type": "step_complete", + "step_index": step_index, + "step_name": StepName, + "step_log_node": stepLogNode, + "object_log_node": objectLogNode, + } + + +def executePlan_streaming( + plan: Dict, + num_StepToRun: int, + RehearsalLog: List, + AgentProfile_Dict: Dict +) -> Generator[str, None, None]: + """ + 执行计划(流式返回版本) + 返回生成器,每次返回 SSE 格式的字符串 + + 使用方式: + for event in executePlan_streaming(...): + yield event + """ + # 准备执行 + KeyObjects = {} + finishedStep_index = -1 + + for logNode in RehearsalLog: + if logNode["LogNodeType"] == "step": + finishedStep_index += 1 + if logNode["LogNodeType"] == "object": + KeyObjects[logNode["NodeId"]] = logNode["content"] + + # 确定要运行的步骤范围 + if num_StepToRun is None: + run_to = len(plan["Collaboration Process"]) + else: + run_to = (finishedStep_index + 1) + num_StepToRun + + steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to] + total_steps = len(steps_to_run) + + print(colored(f"🚀 开始执行计划(流式推送),共 {total_steps} 个步骤", "cyan", attrs=["bold"])) + + # 使用队列实现流式推送 + async def produce_events(queue: asyncio.Queue): + """异步生产者:每收到一个事件就放入队列""" + try: + for step_index, stepDescrip in enumerate(steps_to_run): + # 执行单个步骤(流式) + async for event in execute_step_async_streaming( + stepDescrip, + plan["General Goal"], + AgentProfile_Dict, + KeyObjects, + step_index, + total_steps + ): + await queue.put(event) # ← 立即放入队列 + finally: + await queue.put(None) # ← 发送完成信号 + + # 运行异步任务并实时 yield + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + # 创建队列 + queue = asyncio.Queue(maxsize=10) + + # 启动异步生产者任务 + producer_task = loop.create_task(produce_events(queue)) + + # 同步消费者:从队列读取并 yield(使用 run_until_complete) + while True: + event = loop.run_until_complete(queue.get()) # ← 从队列获取事件 + if event is None: # ← 收到完成信号 + break + + # 立即转换为 SSE 格式并发送 + event_str = json.dumps(event, ensure_ascii=False) + yield f"data: {event_str}\n\n" # ← 立即发送给前端 + + # 等待生产者任务完成 + loop.run_until_complete(producer_task) + + # 发送完成信号 + complete_event = json.dumps({ + "type": "execution_complete", + "total_steps": total_steps + }, ensure_ascii=False) + yield f"data: {complete_event}\n\n" + + finally: + # 清理任务 + if 'producer_task' in locals(): + if not producer_task.done(): + producer_task.cancel() + loop.close()