Files
AgentCoord/backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py
2026-01-26 15:06:17 +08:00

609 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
优化版执行计划 - 支持动态追加步骤
在执行过程中可以接收新的步骤并追加到执行队列
"""
import asyncio
import json
import time
from typing import List, Dict, Set, Generator, Any
import AgentCoord.RehearsalEngine_V2.Action as Action
import AgentCoord.util as util
from termcolor import colored
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager
# ==================== 配置参数 ====================
# 最大并发请求数
MAX_CONCURRENT_REQUESTS = 2
# 批次之间的延迟
BATCH_DELAY = 1.0
# 429错误重试次数和延迟
MAX_RETRIES = 3
RETRY_DELAY = 5.0
# ==================== 限流器 ====================
class RateLimiter:
"""
异步限流器,控制并发请求数量
"""
def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.max_concurrent = max_concurrent
async def __aenter__(self):
await self.semaphore.acquire()
return self
async def __aexit__(self, *args):
self.semaphore.release()
# 全局限流器实例
rate_limiter = RateLimiter()
def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]:
"""
构建动作依赖图
Args:
TaskProcess: 任务流程列表
Returns:
依赖映射字典 {action_index: [dependent_action_indices]}
"""
dependency_map = {i: [] for i in range(len(TaskProcess))}
for i, action in enumerate(TaskProcess):
important_inputs = action.get('ImportantInput', [])
if not important_inputs:
continue
# 检查是否依赖其他动作的ActionResult
for j, prev_action in enumerate(TaskProcess):
if i == j:
continue
# 判断是否依赖前一个动作的结果
if any(
inp.startswith('ActionResult:') and
inp == f'ActionResult:{prev_action["ID"]}'
for inp in important_inputs
):
dependency_map[i].append(j)
return dependency_map
def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]:
"""
将动作分为多个批次,每批内部可以并行执行
Args:
TaskProcess: 任务流程列表
dependency_map: 依赖图
Returns:
批次列表 [[batch1_indices], [batch2_indices], ...]
"""
batches = []
completed: Set[int] = set()
while len(completed) < len(TaskProcess):
# 找出所有依赖已满足的动作
ready_to_run = [
i for i in range(len(TaskProcess))
if i not in completed and
all(dep in completed for dep in dependency_map[i])
]
if not ready_to_run:
# 避免死循环
remaining = [i for i in range(len(TaskProcess)) if i not in completed]
if remaining:
print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow"))
ready_to_run = remaining[:1]
else:
break
batches.append(ready_to_run)
completed.update(ready_to_run)
return batches
async def execute_single_action_async(
ActionInfo: Dict,
General_Goal: str,
TaskDescription: str,
OutputName: str,
KeyObjects: Dict,
ActionHistory: List,
agentName: str,
AgentProfile_Dict: Dict,
InputName_List: List[str]
) -> Dict:
"""
异步执行单个动作
Args:
ActionInfo: 动作信息
General_Goal: 总体目标
TaskDescription: 任务描述
OutputName: 输出对象名称
KeyObjects: 关键对象字典
ActionHistory: 动作历史
agentName: 智能体名称
AgentProfile_Dict: 智能体配置字典
InputName_List: 输入名称列表
Returns:
动作执行结果
"""
actionType = ActionInfo["ActionType"]
# 创建动作实例
if actionType in Action.customAction_Dict:
currentAction = Action.customAction_Dict[actionType](
info=ActionInfo,
OutputName=OutputName,
KeyObjects=KeyObjects,
)
else:
currentAction = Action.BaseAction(
info=ActionInfo,
OutputName=OutputName,
KeyObjects=KeyObjects,
)
# 在线程池中运行,避免阻塞事件循环
loop = asyncio.get_event_loop()
ActionInfo_with_Result = await loop.run_in_executor(
None,
lambda: currentAction.run(
General_Goal=General_Goal,
TaskDescription=TaskDescription,
agentName=agentName,
AgentProfile_Dict=AgentProfile_Dict,
InputName_List=InputName_List,
OutputName=OutputName,
KeyObjects=KeyObjects,
ActionHistory=ActionHistory,
)
)
return ActionInfo_with_Result
async def execute_step_async_streaming(
stepDescrip: Dict,
General_Goal: str,
AgentProfile_Dict: Dict,
KeyObjects: Dict,
step_index: int,
total_steps: int
) -> Generator[Dict, None, None]:
"""
异步执行单个步骤,支持流式返回
Args:
stepDescrip: 步骤描述
General_Goal: 总体目标
AgentProfile_Dict: 智能体配置字典
KeyObjects: 关键对象字典
step_index: 步骤索引
total_steps: 总步骤数
Yields:
执行事件字典
"""
# 准备步骤信息
StepName = (
util.camel_case_to_normal(stepDescrip["StepName"])
if util.is_camel_case(stepDescrip["StepName"])
else stepDescrip["StepName"]
)
TaskContent = stepDescrip["TaskContent"]
InputName_List = (
[
(
util.camel_case_to_normal(obj)
if util.is_camel_case(obj)
else obj
)
for obj in stepDescrip["InputObject_List"]
]
if stepDescrip["InputObject_List"] is not None
else None
)
OutputName = (
util.camel_case_to_normal(stepDescrip["OutputObject"])
if util.is_camel_case(stepDescrip["OutputObject"])
else stepDescrip["OutputObject"]
)
Agent_List = stepDescrip["AgentSelection"]
TaskProcess = stepDescrip["TaskProcess"]
TaskDescription = (
util.converter.generate_template_sentence_for_CollaborationBrief(
input_object_list=InputName_List,
output_object=OutputName,
agent_list=Agent_List,
step_task=TaskContent,
)
)
# 初始化日志节点
inputObject_Record = [
{InputName: KeyObjects[InputName]} for InputName in InputName_List
]
stepLogNode = {
"LogNodeType": "step",
"NodeId": StepName,
"InputName_List": InputName_List,
"OutputName": OutputName,
"chatLog": [],
"inputObject_Record": inputObject_Record,
}
objectLogNode = {
"LogNodeType": "object",
"NodeId": OutputName,
"content": None,
}
# 返回步骤开始事件
yield {
"type": "step_start",
"step_index": step_index,
"total_steps": total_steps,
"step_name": StepName,
"task_description": TaskDescription,
}
# 构建动作依赖图
dependency_map = build_action_dependency_graph(TaskProcess)
batches = get_parallel_batches(TaskProcess, dependency_map)
ActionHistory = []
total_actions = len(TaskProcess)
completed_actions = 0
util.print_colored(
f"📋 步骤 {step_index + 1}/{total_steps}: {StepName} ({total_actions} 个动作, 分 {len(batches)} 批并行执行)",
text_color="cyan"
)
# 分批执行动作
for batch_index, batch_indices in enumerate(batches):
# 在每个批次执行前检查暂停状态
should_continue = await execution_state_manager.async_check_pause()
if not should_continue:
util.print_colored("🛑 用户请求停止执行", "red")
return
batch_size = len(batch_indices)
if batch_size > 1:
util.print_colored(
f"🚦 批次 {batch_index + 1}/{len(batches)}: 并行执行 {batch_size} 个动作",
text_color="blue"
)
else:
util.print_colored(
f"🔄 动作 {completed_actions + 1}/{total_actions}: 串行执行",
text_color="yellow"
)
# 并行执行当前批次的所有动作
tasks = [
execute_single_action_async(
TaskProcess[i],
General_Goal=General_Goal,
TaskDescription=TaskDescription,
OutputName=OutputName,
KeyObjects=KeyObjects,
ActionHistory=ActionHistory,
agentName=TaskProcess[i]["AgentName"],
AgentProfile_Dict=AgentProfile_Dict,
InputName_List=InputName_List
)
for i in batch_indices
]
# 等待当前批次完成
batch_results = await asyncio.gather(*tasks)
# 逐个返回结果
for i, result in enumerate(batch_results):
action_index_in_batch = batch_indices[i]
completed_actions += 1
util.print_colored(
f"✅ 动作 {completed_actions}/{total_actions} 完成: {result['ActionType']} by {result['AgentName']}",
text_color="green"
)
ActionHistory.append(result)
# 立即返回该动作结果
yield {
"type": "action_complete",
"step_index": step_index,
"step_name": StepName,
"action_index": action_index_in_batch,
"total_actions": total_actions,
"completed_actions": completed_actions,
"action_result": result,
"batch_info": {
"batch_index": batch_index,
"batch_size": batch_size,
"is_parallel": batch_size > 1
}
}
# 步骤完成
objectLogNode["content"] = KeyObjects[OutputName]
stepLogNode["ActionHistory"] = ActionHistory
yield {
"type": "step_complete",
"step_index": step_index,
"step_name": StepName,
"step_log_node": stepLogNode,
"object_log_node": objectLogNode,
}
def executePlan_streaming_dynamic(
plan: Dict,
num_StepToRun: int,
RehearsalLog: List,
AgentProfile_Dict: Dict,
existingKeyObjects: Dict = None,
execution_id: str = None
) -> Generator[str, None, None]:
"""
动态执行计划,支持在执行过程中追加新步骤
Args:
plan: 执行计划
num_StepToRun: 要运行的步骤数
RehearsalLog: 已执行的历史记录
AgentProfile_Dict: 智能体配置
existingKeyObjects: 已存在的KeyObjects
execution_id: 执行ID用于动态追加步骤
Yields:
SSE格式的事件字符串
"""
# 初始化执行状态
general_goal = plan.get("General Goal", "")
execution_state_manager.start_execution(general_goal)
print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green"))
# 准备执行
KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {}
finishedStep_index = -1
for logNode in RehearsalLog:
if logNode["LogNodeType"] == "step":
finishedStep_index += 1
if logNode["LogNodeType"] == "object":
KeyObjects[logNode["NodeId"]] = logNode["content"]
if existingKeyObjects:
print(colored(f"📦 使用已存在的 KeyObjects: {list(existingKeyObjects.keys())}", "cyan"))
# 确定要运行的步骤范围
if num_StepToRun is None:
run_to = len(plan["Collaboration Process"])
else:
run_to = (finishedStep_index + 1) + num_StepToRun
steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to]
# 使用动态执行管理器
if execution_id:
# 初始化执行管理器使用传入的execution_id
actual_execution_id = dynamic_execution_manager.start_execution(general_goal, steps_to_run, execution_id)
print(colored(f"🚀 开始执行计划(动态模式),共 {len(steps_to_run)} 个步骤执行ID: {actual_execution_id}", "cyan"))
else:
print(colored(f"🚀 开始执行计划(流式推送),共 {len(steps_to_run)} 个步骤", "cyan"))
total_steps = len(steps_to_run)
# 使用队列实现流式推送
async def produce_events(queue: asyncio.Queue):
"""异步生产者"""
try:
step_index = 0
if execution_id:
# 动态模式:循环获取下一个步骤
# 等待新步骤的最大次数(避免无限等待)
max_empty_wait_cycles = 5 # 最多等待60次每次等待1秒
empty_wait_count = 0
while True:
# 检查暂停状态
should_continue = await execution_state_manager.async_check_pause()
if not should_continue:
print(colored("🛑 用户请求停止执行", "red"))
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
break
# 获取下一个步骤
stepDescrip = dynamic_execution_manager.get_next_step(execution_id)
if stepDescrip is None:
# 没有更多步骤了,检查是否应该继续等待
empty_wait_count += 1
# 获取执行信息
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
if execution_info:
queue_total_steps = execution_info.get("total_steps", 0)
completed_steps = execution_info.get("completed_steps", 0)
# 如果没有步骤在队列中queue_total_steps为0立即退出
if queue_total_steps == 0:
print(colored(f"⚠️ 没有步骤在队列中,退出执行", "yellow"))
break
# 如果所有步骤都已完成,等待可能的新步骤
if completed_steps >= queue_total_steps:
if empty_wait_count >= max_empty_wait_cycles:
# 等待超时,退出执行
print(colored(f"✅ 所有步骤执行完成,等待超时", "green"))
break
else:
# 等待新步骤追加
print(colored(f"⏳ 等待新步骤追加... ({empty_wait_count}/{max_empty_wait_cycles})", "cyan"))
await asyncio.sleep(1)
continue
else:
# 还有步骤未完成,继续尝试获取
print(colored(f"⏳ 等待步骤就绪... ({completed_steps}/{queue_total_steps})", "cyan"))
await asyncio.sleep(0.5)
empty_wait_count = 0 # 重置等待计数
continue
else:
# 执行信息不存在,退出
print(colored(f"⚠️ 执行信息不存在,退出执行", "yellow"))
break
# 重置等待计数
empty_wait_count = 0
# 获取最新的总步骤数(用于显示)
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
current_total_steps = execution_info.get("total_steps", total_steps) if execution_info else total_steps
# 执行步骤
async for event in execute_step_async_streaming(
stepDescrip,
plan["General Goal"],
AgentProfile_Dict,
KeyObjects,
step_index,
current_total_steps # 使用动态更新的总步骤数
):
if execution_state_manager.is_stopped():
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
return
await queue.put(event)
# 标记步骤完成
dynamic_execution_manager.mark_step_completed(execution_id)
# 更新KeyObjects
OutputName = stepDescrip.get("OutputObject", "")
if OutputName and OutputName in KeyObjects:
# 对象日志节点会在step_complete中发送
pass
step_index += 1
else:
# 非动态模式:按顺序执行所有步骤
for step_index, stepDescrip in enumerate(steps_to_run):
should_continue = await execution_state_manager.async_check_pause()
if not should_continue:
print(colored("🛑 用户请求停止执行", "red"))
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
return
async for event in execute_step_async_streaming(
stepDescrip,
plan["General Goal"],
AgentProfile_Dict,
KeyObjects,
step_index,
total_steps
):
if execution_state_manager.is_stopped():
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
return
await queue.put(event)
except Exception as e:
await queue.put({
"type": "error",
"message": f"执行出错: {str(e)}"
})
finally:
await queue.put(None)
# 运行异步任务并实时yield
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
queue = asyncio.Queue(maxsize=10)
producer_task = loop.create_task(produce_events(queue))
while True:
event = loop.run_until_complete(queue.get())
if event is None:
break
# 立即转换为SSE格式并发送
event_str = json.dumps(event, ensure_ascii=False)
yield f"data: {event_str}\n\n"
loop.run_until_complete(producer_task)
if not execution_state_manager.is_stopped():
complete_event = json.dumps({
"type": "execution_complete",
"total_steps": total_steps
}, ensure_ascii=False)
yield f"data: {complete_event}\n\n"
finally:
# 在关闭事件循环之前先清理执行记录
if execution_id:
# 清理执行记录
dynamic_execution_manager.cleanup(execution_id)
if 'producer_task' in locals():
if not producer_task.done():
producer_task.cancel()
# 确保所有任务都完成后再关闭事件循环
try:
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
pass # 忽略清理过程中的错误
loop.close()
# 保留旧版本函数以保持兼容性
executePlan_streaming = executePlan_streaming_dynamic