feat:任务执行结果性能优化
This commit is contained in:
401
backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py
Normal file
401
backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py
Normal file
@@ -0,0 +1,401 @@
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from typing import List, Dict, Set, Generator, Any
|
||||
import AgentCoord.RehearsalEngine_V2.Action as Action
|
||||
import AgentCoord.util as util
|
||||
from termcolor import colored
|
||||
|
||||
|
||||
# ==================== 配置参数 ====================
|
||||
# 最大并发请求数(避免触发 OpenAI API 速率限制)
|
||||
MAX_CONCURRENT_REQUESTS = 2
|
||||
|
||||
# 批次之间的延迟(秒)
|
||||
BATCH_DELAY = 1.0
|
||||
|
||||
# 429 错误重试次数和延迟
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 5.0 # 秒
|
||||
|
||||
# ==================== 限流器 ====================
|
||||
class RateLimiter:
|
||||
"""异步限流器,控制并发请求数量"""
|
||||
|
||||
def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS):
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
self.max_concurrent = max_concurrent
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.semaphore.acquire()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
self.semaphore.release()
|
||||
|
||||
|
||||
# 全局限流器实例
|
||||
rate_limiter = RateLimiter()
|
||||
|
||||
|
||||
def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]:
|
||||
"""
|
||||
构建动作依赖图
|
||||
返回: {action_index: [dependent_action_indices]}
|
||||
"""
|
||||
dependency_map = {i: [] for i in range(len(TaskProcess))}
|
||||
|
||||
for i, action in enumerate(TaskProcess):
|
||||
important_inputs = action.get('ImportantInput', [])
|
||||
if not important_inputs:
|
||||
continue
|
||||
|
||||
# 检查是否依赖其他动作的 ActionResult
|
||||
for j, prev_action in enumerate(TaskProcess):
|
||||
if i == j:
|
||||
continue
|
||||
|
||||
# 判断是否依赖前一个动作的结果
|
||||
if any(
|
||||
inp.startswith('ActionResult:') and
|
||||
inp == f'ActionResult:{prev_action["ID"]}'
|
||||
for inp in important_inputs
|
||||
):
|
||||
dependency_map[i].append(j)
|
||||
|
||||
return dependency_map
|
||||
|
||||
|
||||
def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]:
|
||||
"""
|
||||
将动作分为多个批次,每批内部可以并行执行
|
||||
返回: [[batch1_indices], [batch2_indices], ...]
|
||||
"""
|
||||
batches = []
|
||||
completed: Set[int] = set()
|
||||
|
||||
while len(completed) < len(TaskProcess):
|
||||
# 找出所有依赖已满足的动作
|
||||
ready_to_run = [
|
||||
i for i in range(len(TaskProcess))
|
||||
if i not in completed and
|
||||
all(dep in completed for dep in dependency_map[i])
|
||||
]
|
||||
|
||||
if not ready_to_run:
|
||||
# 避免死循环(循环依赖情况)
|
||||
remaining = [i for i in range(len(TaskProcess)) if i not in completed]
|
||||
if remaining:
|
||||
print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow"))
|
||||
ready_to_run = remaining[:1] # 每次只执行一个
|
||||
else:
|
||||
break
|
||||
|
||||
batches.append(ready_to_run)
|
||||
completed.update(ready_to_run)
|
||||
|
||||
return batches
|
||||
|
||||
|
||||
async def execute_single_action_async(
|
||||
ActionInfo: Dict,
|
||||
General_Goal: str,
|
||||
TaskDescription: str,
|
||||
OutputName: str,
|
||||
KeyObjects: Dict,
|
||||
ActionHistory: List,
|
||||
agentName: str,
|
||||
AgentProfile_Dict: Dict,
|
||||
InputName_List: List[str]
|
||||
) -> Dict:
|
||||
"""
|
||||
异步执行单个动作
|
||||
"""
|
||||
actionType = ActionInfo["ActionType"]
|
||||
|
||||
# 创建动作实例
|
||||
if actionType in Action.customAction_Dict:
|
||||
currentAction = Action.customAction_Dict[actionType](
|
||||
info=ActionInfo,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
)
|
||||
else:
|
||||
currentAction = Action.BaseAction(
|
||||
info=ActionInfo,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
)
|
||||
|
||||
# 执行动作(在线程池中运行,避免阻塞事件循环)
|
||||
loop = asyncio.get_event_loop()
|
||||
ActionInfo_with_Result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: currentAction.run(
|
||||
General_Goal=General_Goal,
|
||||
TaskDescription=TaskDescription,
|
||||
agentName=agentName,
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
InputName_List=InputName_List,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
ActionHistory=ActionHistory,
|
||||
)
|
||||
)
|
||||
|
||||
return ActionInfo_with_Result
|
||||
|
||||
|
||||
async def execute_step_async_streaming(
|
||||
stepDescrip: Dict,
|
||||
General_Goal: str,
|
||||
AgentProfile_Dict: Dict,
|
||||
KeyObjects: Dict,
|
||||
step_index: int,
|
||||
total_steps: int
|
||||
) -> Generator[Dict, None, None]:
|
||||
"""
|
||||
异步执行单个步骤(支持流式返回)
|
||||
返回生成器,每完成一个动作就 yield 一次
|
||||
"""
|
||||
# 准备步骤信息
|
||||
StepName = (
|
||||
util.camel_case_to_normal(stepDescrip["StepName"])
|
||||
if util.is_camel_case(stepDescrip["StepName"])
|
||||
else stepDescrip["StepName"]
|
||||
)
|
||||
TaskContent = stepDescrip["TaskContent"]
|
||||
InputName_List = (
|
||||
[
|
||||
(
|
||||
util.camel_case_to_normal(obj)
|
||||
if util.is_camel_case(obj)
|
||||
else obj
|
||||
)
|
||||
for obj in stepDescrip["InputObject_List"]
|
||||
]
|
||||
if stepDescrip["InputObject_List"] is not None
|
||||
else None
|
||||
)
|
||||
OutputName = (
|
||||
util.camel_case_to_normal(stepDescrip["OutputObject"])
|
||||
if util.is_camel_case(stepDescrip["OutputObject"])
|
||||
else stepDescrip["OutputObject"]
|
||||
)
|
||||
Agent_List = stepDescrip["AgentSelection"]
|
||||
TaskProcess = stepDescrip["TaskProcess"]
|
||||
|
||||
TaskDescription = (
|
||||
util.converter.generate_template_sentence_for_CollaborationBrief(
|
||||
input_object_list=InputName_List,
|
||||
output_object=OutputName,
|
||||
agent_list=Agent_List,
|
||||
step_task=TaskContent,
|
||||
)
|
||||
)
|
||||
|
||||
# 初始化日志节点
|
||||
inputObject_Record = [
|
||||
{InputName: KeyObjects[InputName]} for InputName in InputName_List
|
||||
]
|
||||
stepLogNode = {
|
||||
"LogNodeType": "step",
|
||||
"NodeId": StepName,
|
||||
"InputName_List": InputName_List,
|
||||
"OutputName": OutputName,
|
||||
"chatLog": [],
|
||||
"inputObject_Record": inputObject_Record,
|
||||
}
|
||||
objectLogNode = {
|
||||
"LogNodeType": "object",
|
||||
"NodeId": OutputName,
|
||||
"content": None,
|
||||
}
|
||||
|
||||
# 先返回步骤开始信息
|
||||
yield {
|
||||
"type": "step_start",
|
||||
"step_index": step_index,
|
||||
"total_steps": total_steps,
|
||||
"step_name": StepName,
|
||||
"task_description": TaskDescription,
|
||||
}
|
||||
|
||||
# 构建动作依赖图
|
||||
dependency_map = build_action_dependency_graph(TaskProcess)
|
||||
batches = get_parallel_batches(TaskProcess, dependency_map)
|
||||
|
||||
ActionHistory = []
|
||||
total_actions = len(TaskProcess)
|
||||
completed_actions = 0
|
||||
|
||||
util.print_colored(
|
||||
f"📋 步骤 {step_index + 1}/{total_steps}: {StepName} ({total_actions} 个动作, 分 {len(batches)} 批并行执行)",
|
||||
text_color="cyan"
|
||||
)
|
||||
|
||||
# 分批执行动作
|
||||
for batch_index, batch_indices in enumerate(batches):
|
||||
batch_size = len(batch_indices)
|
||||
|
||||
if batch_size > 1:
|
||||
util.print_colored(
|
||||
f"🚦 批次 {batch_index + 1}/{len(batches)}: 并行执行 {batch_size} 个动作",
|
||||
text_color="blue"
|
||||
)
|
||||
else:
|
||||
util.print_colored(
|
||||
f"🔄 动作 {completed_actions + 1}/{total_actions}: 串行执行",
|
||||
text_color="yellow"
|
||||
)
|
||||
|
||||
# 并行执行当前批次的所有动作
|
||||
tasks = [
|
||||
execute_single_action_async(
|
||||
TaskProcess[i],
|
||||
General_Goal=General_Goal,
|
||||
TaskDescription=TaskDescription,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
ActionHistory=ActionHistory,
|
||||
agentName=TaskProcess[i]["AgentName"],
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
InputName_List=InputName_List
|
||||
)
|
||||
for i in batch_indices
|
||||
]
|
||||
|
||||
# 等待当前批次完成
|
||||
batch_results = await asyncio.gather(*tasks)
|
||||
|
||||
# 逐个返回结果(流式)
|
||||
for i, result in enumerate(batch_results):
|
||||
action_index_in_batch = batch_indices[i]
|
||||
completed_actions += 1
|
||||
|
||||
util.print_colored(
|
||||
f"✅ 动作 {completed_actions}/{total_actions} 完成: {result['ActionType']} by {result['AgentName']}",
|
||||
text_color="green"
|
||||
)
|
||||
|
||||
ActionHistory.append(result)
|
||||
|
||||
# 立即返回该动作结果(流式)
|
||||
yield {
|
||||
"type": "action_complete",
|
||||
"step_index": step_index,
|
||||
"step_name": StepName,
|
||||
"action_index": action_index_in_batch,
|
||||
"total_actions": total_actions,
|
||||
"completed_actions": completed_actions,
|
||||
"action_result": result,
|
||||
"batch_info": {
|
||||
"batch_index": batch_index,
|
||||
"batch_size": batch_size,
|
||||
"is_parallel": batch_size > 1
|
||||
}
|
||||
}
|
||||
|
||||
# 步骤完成
|
||||
objectLogNode["content"] = KeyObjects[OutputName]
|
||||
stepLogNode["ActionHistory"] = ActionHistory
|
||||
|
||||
yield {
|
||||
"type": "step_complete",
|
||||
"step_index": step_index,
|
||||
"step_name": StepName,
|
||||
"step_log_node": stepLogNode,
|
||||
"object_log_node": objectLogNode,
|
||||
}
|
||||
|
||||
|
||||
def executePlan_streaming(
|
||||
plan: Dict,
|
||||
num_StepToRun: int,
|
||||
RehearsalLog: List,
|
||||
AgentProfile_Dict: Dict
|
||||
) -> Generator[str, None, None]:
|
||||
"""
|
||||
执行计划(流式返回版本)
|
||||
返回生成器,每次返回 SSE 格式的字符串
|
||||
|
||||
使用方式:
|
||||
for event in executePlan_streaming(...):
|
||||
yield event
|
||||
"""
|
||||
# 准备执行
|
||||
KeyObjects = {}
|
||||
finishedStep_index = -1
|
||||
|
||||
for logNode in RehearsalLog:
|
||||
if logNode["LogNodeType"] == "step":
|
||||
finishedStep_index += 1
|
||||
if logNode["LogNodeType"] == "object":
|
||||
KeyObjects[logNode["NodeId"]] = logNode["content"]
|
||||
|
||||
# 确定要运行的步骤范围
|
||||
if num_StepToRun is None:
|
||||
run_to = len(plan["Collaboration Process"])
|
||||
else:
|
||||
run_to = (finishedStep_index + 1) + num_StepToRun
|
||||
|
||||
steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to]
|
||||
total_steps = len(steps_to_run)
|
||||
|
||||
print(colored(f"🚀 开始执行计划(流式推送),共 {total_steps} 个步骤", "cyan", attrs=["bold"]))
|
||||
|
||||
# 使用队列实现流式推送
|
||||
async def produce_events(queue: asyncio.Queue):
|
||||
"""异步生产者:每收到一个事件就放入队列"""
|
||||
try:
|
||||
for step_index, stepDescrip in enumerate(steps_to_run):
|
||||
# 执行单个步骤(流式)
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
total_steps
|
||||
):
|
||||
await queue.put(event) # ← 立即放入队列
|
||||
finally:
|
||||
await queue.put(None) # ← 发送完成信号
|
||||
|
||||
# 运行异步任务并实时 yield
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
# 创建队列
|
||||
queue = asyncio.Queue(maxsize=10)
|
||||
|
||||
# 启动异步生产者任务
|
||||
producer_task = loop.create_task(produce_events(queue))
|
||||
|
||||
# 同步消费者:从队列读取并 yield(使用 run_until_complete)
|
||||
while True:
|
||||
event = loop.run_until_complete(queue.get()) # ← 从队列获取事件
|
||||
if event is None: # ← 收到完成信号
|
||||
break
|
||||
|
||||
# 立即转换为 SSE 格式并发送
|
||||
event_str = json.dumps(event, ensure_ascii=False)
|
||||
yield f"data: {event_str}\n\n" # ← 立即发送给前端
|
||||
|
||||
# 等待生产者任务完成
|
||||
loop.run_until_complete(producer_task)
|
||||
|
||||
# 发送完成信号
|
||||
complete_event = json.dumps({
|
||||
"type": "execution_complete",
|
||||
"total_steps": total_steps
|
||||
}, ensure_ascii=False)
|
||||
yield f"data: {complete_event}\n\n"
|
||||
|
||||
finally:
|
||||
# 清理任务
|
||||
if 'producer_task' in locals():
|
||||
if not producer_task.done():
|
||||
producer_task.cancel()
|
||||
loop.close()
|
||||
Reference in New Issue
Block a user