Files
AgentCoord/backend/AgentCoord/RehearsalEngine_V2/dynamic_execution_manager.py

242 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
动态执行管理器
用于在任务执行过程中动态追加新步骤
"""
import asyncio
from typing import Dict, List, Optional, Any
from threading import Lock
class DynamicExecutionManager:
"""
动态执行管理器
管理正在执行的任务,支持动态追加新步骤
"""
def __init__(self):
# 执行状态: goal -> execution_info
self._executions: Dict[str, Dict] = {}
# 线程锁
self._lock = Lock()
# 步骤队列: goal -> List[step]
self._step_queues: Dict[str, List] = {}
# 已执行的步骤索引: goal -> Set[step_index]
self._executed_steps: Dict[str, set] = {}
# 待执行的步骤索引: goal -> List[step_index]
self._pending_steps: Dict[str, List[int]] = {}
def start_execution(self, goal: str, initial_steps: List[Dict], execution_id: str = None) -> str:
"""
开始执行一个新的任务
Args:
goal: 任务目标
initial_steps: 初始步骤列表
execution_id: 执行ID如果不提供则自动生成
Returns:
执行ID
"""
with self._lock:
# 如果未提供execution_id则生成一个
if execution_id is None:
execution_id = f"{goal}_{asyncio.get_event_loop().time()}"
self._executions[execution_id] = {
"goal": goal,
"status": "running",
"total_steps": len(initial_steps),
"completed_steps": 0
}
# 初始化步骤队列
self._step_queues[execution_id] = initial_steps.copy()
# 初始化已执行步骤集合
self._executed_steps[execution_id] = set()
# 初始化待执行步骤索引
self._pending_steps[execution_id] = list(range(len(initial_steps)))
print(f"🚀 启动执行: {execution_id}")
print(f"📊 初始步骤数: {len(initial_steps)}")
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
return execution_id
def add_steps(self, execution_id: str, new_steps: List[Dict]) -> int:
"""
向执行中追加新步骤
Args:
execution_id: 执行ID
new_steps: 新步骤列表
Returns:
追加的步骤数量
"""
with self._lock:
if execution_id not in self._step_queues:
print(f"⚠️ 警告: 执行ID {execution_id} 不存在,无法追加步骤")
return 0
current_count = len(self._step_queues[execution_id])
# 追加新步骤到队列
self._step_queues[execution_id].extend(new_steps)
# 添加新步骤的索引到待执行列表
new_indices = list(range(current_count, current_count + len(new_steps)))
self._pending_steps[execution_id].extend(new_indices)
# 更新总步骤数
old_total = self._executions[execution_id]["total_steps"]
self._executions[execution_id]["total_steps"] = len(self._step_queues[execution_id])
new_total = self._executions[execution_id]["total_steps"]
print(f" 追加了 {len(new_steps)} 个步骤到 {execution_id}")
print(f"📊 步骤总数: {old_total} -> {new_total}")
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
return len(new_steps)
def get_next_step(self, execution_id: str) -> Optional[Dict]:
"""
获取下一个待执行的步骤
Args:
execution_id: 执行ID
Returns:
下一个步骤如果没有则返回None
"""
with self._lock:
if execution_id not in self._pending_steps:
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
return None
# 获取第一个待执行步骤的索引
if not self._pending_steps[execution_id]:
return None
step_index = self._pending_steps[execution_id].pop(0)
# 从队列中获取步骤
if step_index >= len(self._step_queues[execution_id]):
print(f"⚠️ 警告: 步骤索引 {step_index} 超出范围")
return None
step = self._step_queues[execution_id][step_index]
# 标记为已执行
self._executed_steps[execution_id].add(step_index)
step_name = step.get("StepName", "未知")
print(f"🎯 获取下一个步骤: {step_name} (索引: {step_index})")
print(f"📋 剩余待执行步骤: {len(self._pending_steps[execution_id])}")
return step
def mark_step_completed(self, execution_id: str):
"""
标记一个步骤完成
Args:
execution_id: 执行ID
"""
with self._lock:
if execution_id in self._executions:
self._executions[execution_id]["completed_steps"] += 1
completed = self._executions[execution_id]["completed_steps"]
total = self._executions[execution_id]["total_steps"]
print(f"📊 步骤完成进度: {completed}/{total}")
else:
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
def get_execution_info(self, execution_id: str) -> Optional[Dict]:
"""
获取执行信息
Args:
execution_id: 执行ID
Returns:
执行信息字典
"""
with self._lock:
return self._executions.get(execution_id)
def get_pending_count(self, execution_id: str) -> int:
"""
获取待执行步骤数量
Args:
execution_id: 执行ID
Returns:
待执行步骤数量
"""
with self._lock:
if execution_id not in self._pending_steps:
return 0
return len(self._pending_steps[execution_id])
def has_more_steps(self, execution_id: str) -> bool:
"""
检查是否还有更多步骤待执行
Args:
execution_id: 执行ID
Returns:
是否还有待执行步骤
"""
with self._lock:
if execution_id not in self._pending_steps:
return False
return len(self._pending_steps[execution_id]) > 0
def finish_execution(self, execution_id: str):
"""
完成执行
Args:
execution_id: 执行ID
"""
with self._lock:
if execution_id in self._executions:
self._executions[execution_id]["status"] = "completed"
def cancel_execution(self, execution_id: str):
"""
取消执行
Args:
execution_id: 执行ID
"""
with self._lock:
if execution_id in self._executions:
self._executions[execution_id]["status"] = "cancelled"
def cleanup(self, execution_id: str):
"""
清理执行记录
Args:
execution_id: 执行ID
"""
with self._lock:
self._executions.pop(execution_id, None)
self._step_queues.pop(execution_id, None)
self._executed_steps.pop(execution_id, None)
self._pending_steps.pop(execution_id, None)
# 全局单例
dynamic_execution_manager = DynamicExecutionManager()