feat:任务大纲停止以及执行结果暂停继续逻辑完善
This commit is contained in:
@@ -112,7 +112,16 @@ def _call_with_custom_config(messages: list[dict], stream: bool, model_config: d
|
||||
timeout=180
|
||||
|
||||
)
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception(f"API returned empty response for model {api_model}")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception(f"API returned invalid response format for model {api_model}")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception(f"API returned None content for model {api_model}")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
return full_reply_content
|
||||
except Exception as e:
|
||||
@@ -138,15 +147,21 @@ async def _achat_completion_stream_custom(messages:list[dict], temp_async_client
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk)
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0:
|
||||
chunk_message = chunk.choices[0].delta
|
||||
collected_messages.append(chunk_message)
|
||||
if chunk_message.content:
|
||||
print(colored(chunk_message.content, "blue", "on_white"), end="")
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message)
|
||||
if chunk_message.content:
|
||||
print(colored(chunk_message.content, "blue", "on_white"), end="")
|
||||
print()
|
||||
full_reply_content = "".join(
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception(f"Stream API returned empty content for model {api_model}")
|
||||
|
||||
return full_reply_content
|
||||
except httpx.RemoteProtocolError as e:
|
||||
if attempt < max_retries - 1:
|
||||
@@ -184,7 +199,16 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str:
|
||||
else:
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception("Groq API returned empty response")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception("Groq API returned invalid response format")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("Groq API returned None content")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
@@ -217,7 +241,16 @@ async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
|
||||
else:
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not stream.choices or len(stream.choices) == 0:
|
||||
raise Exception("Mistral API returned empty response")
|
||||
if not stream.choices[0] or not stream.choices[0].message:
|
||||
raise Exception("Mistral API returned invalid response format")
|
||||
|
||||
full_reply_content = stream.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("Mistral API returned None content")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
@@ -240,19 +273,25 @@ async def _achat_completion_stream_gpt35(messages: list[dict]) -> str:
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk) # save the event response
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0:
|
||||
chunk_message = chunk.choices[0].delta
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
print()
|
||||
|
||||
full_reply_content = "".join(
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception("Stream API (gpt-3.5) returned empty content")
|
||||
|
||||
return full_reply_content
|
||||
|
||||
|
||||
@@ -275,7 +314,16 @@ def _achat_completion_json(messages: list[dict] ) -> str:
|
||||
else:
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception("OpenAI API returned empty response")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception("OpenAI API returned invalid response format")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("OpenAI API returned None content")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
@@ -294,19 +342,25 @@ async def _achat_completion_stream(messages: list[dict]) -> str:
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk) # save the event response
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0:
|
||||
chunk_message = chunk.choices[0].delta
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
print()
|
||||
|
||||
full_reply_content = "".join(
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception("Stream API returned empty content")
|
||||
|
||||
return full_reply_content
|
||||
except Exception as e:
|
||||
print_colored(f"OpenAI API error in _achat_completion_stream: {str(e)}", "red")
|
||||
@@ -316,7 +370,17 @@ async def _achat_completion_stream(messages: list[dict]) -> str:
|
||||
def _chat_completion(messages: list[dict]) -> str:
|
||||
try:
|
||||
rsp = client.chat.completions.create(**_cons_kwargs(messages))
|
||||
|
||||
# 检查响应是否有效
|
||||
if not rsp.choices or len(rsp.choices) == 0:
|
||||
raise Exception("OpenAI API returned empty response")
|
||||
if not rsp.choices[0] or not rsp.choices[0].message:
|
||||
raise Exception("OpenAI API returned invalid response format")
|
||||
|
||||
content = rsp.choices[0].message.content
|
||||
if content is None:
|
||||
raise Exception("OpenAI API returned None content")
|
||||
|
||||
return content
|
||||
except Exception as e:
|
||||
print_colored(f"OpenAI API error in _chat_completion: {str(e)}", "red")
|
||||
|
||||
@@ -1,3 +1,8 @@
|
||||
"""
|
||||
优化版执行计划 - 支持动态追加步骤
|
||||
在执行过程中可以接收新的步骤并追加到执行队列
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
@@ -6,22 +11,26 @@ import AgentCoord.RehearsalEngine_V2.Action as Action
|
||||
import AgentCoord.util as util
|
||||
from termcolor import colored
|
||||
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
|
||||
from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager
|
||||
|
||||
|
||||
# ==================== 配置参数 ====================
|
||||
# 最大并发请求数(避免触发 OpenAI API 速率限制)
|
||||
# 最大并发请求数
|
||||
MAX_CONCURRENT_REQUESTS = 2
|
||||
|
||||
# 批次之间的延迟(秒)
|
||||
# 批次之间的延迟
|
||||
BATCH_DELAY = 1.0
|
||||
|
||||
# 429 错误重试次数和延迟
|
||||
# 429错误重试次数和延迟
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 5.0 # 秒
|
||||
RETRY_DELAY = 5.0
|
||||
|
||||
|
||||
# ==================== 限流器 ====================
|
||||
class RateLimiter:
|
||||
"""异步限流器,控制并发请求数量"""
|
||||
"""
|
||||
异步限流器,控制并发请求数量
|
||||
"""
|
||||
|
||||
def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS):
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
@@ -42,7 +51,12 @@ rate_limiter = RateLimiter()
|
||||
def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]:
|
||||
"""
|
||||
构建动作依赖图
|
||||
返回: {action_index: [dependent_action_indices]}
|
||||
|
||||
Args:
|
||||
TaskProcess: 任务流程列表
|
||||
|
||||
Returns:
|
||||
依赖映射字典 {action_index: [dependent_action_indices]}
|
||||
"""
|
||||
dependency_map = {i: [] for i in range(len(TaskProcess))}
|
||||
|
||||
@@ -51,7 +65,7 @@ def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int
|
||||
if not important_inputs:
|
||||
continue
|
||||
|
||||
# 检查是否依赖其他动作的 ActionResult
|
||||
# 检查是否依赖其他动作的ActionResult
|
||||
for j, prev_action in enumerate(TaskProcess):
|
||||
if i == j:
|
||||
continue
|
||||
@@ -70,7 +84,13 @@ def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int
|
||||
def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]:
|
||||
"""
|
||||
将动作分为多个批次,每批内部可以并行执行
|
||||
返回: [[batch1_indices], [batch2_indices], ...]
|
||||
|
||||
Args:
|
||||
TaskProcess: 任务流程列表
|
||||
dependency_map: 依赖图
|
||||
|
||||
Returns:
|
||||
批次列表 [[batch1_indices], [batch2_indices], ...]
|
||||
"""
|
||||
batches = []
|
||||
completed: Set[int] = set()
|
||||
@@ -84,11 +104,11 @@ def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List
|
||||
]
|
||||
|
||||
if not ready_to_run:
|
||||
# 避免死循环(循环依赖情况)
|
||||
# 避免死循环
|
||||
remaining = [i for i in range(len(TaskProcess)) if i not in completed]
|
||||
if remaining:
|
||||
print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow"))
|
||||
ready_to_run = remaining[:1] # 每次只执行一个
|
||||
ready_to_run = remaining[:1]
|
||||
else:
|
||||
break
|
||||
|
||||
@@ -111,6 +131,20 @@ async def execute_single_action_async(
|
||||
) -> Dict:
|
||||
"""
|
||||
异步执行单个动作
|
||||
|
||||
Args:
|
||||
ActionInfo: 动作信息
|
||||
General_Goal: 总体目标
|
||||
TaskDescription: 任务描述
|
||||
OutputName: 输出对象名称
|
||||
KeyObjects: 关键对象字典
|
||||
ActionHistory: 动作历史
|
||||
agentName: 智能体名称
|
||||
AgentProfile_Dict: 智能体配置字典
|
||||
InputName_List: 输入名称列表
|
||||
|
||||
Returns:
|
||||
动作执行结果
|
||||
"""
|
||||
actionType = ActionInfo["ActionType"]
|
||||
|
||||
@@ -128,7 +162,7 @@ async def execute_single_action_async(
|
||||
KeyObjects=KeyObjects,
|
||||
)
|
||||
|
||||
# 执行动作(在线程池中运行,避免阻塞事件循环)
|
||||
# 在线程池中运行,避免阻塞事件循环
|
||||
loop = asyncio.get_event_loop()
|
||||
ActionInfo_with_Result = await loop.run_in_executor(
|
||||
None,
|
||||
@@ -156,8 +190,18 @@ async def execute_step_async_streaming(
|
||||
total_steps: int
|
||||
) -> Generator[Dict, None, None]:
|
||||
"""
|
||||
异步执行单个步骤(支持流式返回)
|
||||
返回生成器,每完成一个动作就 yield 一次
|
||||
异步执行单个步骤,支持流式返回
|
||||
|
||||
Args:
|
||||
stepDescrip: 步骤描述
|
||||
General_Goal: 总体目标
|
||||
AgentProfile_Dict: 智能体配置字典
|
||||
KeyObjects: 关键对象字典
|
||||
step_index: 步骤索引
|
||||
total_steps: 总步骤数
|
||||
|
||||
Yields:
|
||||
执行事件字典
|
||||
"""
|
||||
# 准备步骤信息
|
||||
StepName = (
|
||||
@@ -213,7 +257,7 @@ async def execute_step_async_streaming(
|
||||
"content": None,
|
||||
}
|
||||
|
||||
# 先返回步骤开始信息
|
||||
# 返回步骤开始事件
|
||||
yield {
|
||||
"type": "step_start",
|
||||
"step_index": step_index,
|
||||
@@ -238,10 +282,8 @@ async def execute_step_async_streaming(
|
||||
# 分批执行动作
|
||||
for batch_index, batch_indices in enumerate(batches):
|
||||
# 在每个批次执行前检查暂停状态
|
||||
print(f"🔍 [DEBUG] 步骤 {StepName}: 批次 {batch_index+1}/{len(batches)} 执行前,检查暂停状态...")
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
# 用户请求停止,中断执行
|
||||
util.print_colored("🛑 用户请求停止执行", "red")
|
||||
return
|
||||
|
||||
@@ -277,7 +319,7 @@ async def execute_step_async_streaming(
|
||||
# 等待当前批次完成
|
||||
batch_results = await asyncio.gather(*tasks)
|
||||
|
||||
# 逐个返回结果(流式)
|
||||
# 逐个返回结果
|
||||
for i, result in enumerate(batch_results):
|
||||
action_index_in_batch = batch_indices[i]
|
||||
completed_actions += 1
|
||||
@@ -289,7 +331,7 @@ async def execute_step_async_streaming(
|
||||
|
||||
ActionHistory.append(result)
|
||||
|
||||
# 立即返回该动作结果(流式)
|
||||
# 立即返回该动作结果
|
||||
yield {
|
||||
"type": "action_complete",
|
||||
"step_index": step_index,
|
||||
@@ -318,19 +360,27 @@ async def execute_step_async_streaming(
|
||||
}
|
||||
|
||||
|
||||
def executePlan_streaming(
|
||||
def executePlan_streaming_dynamic(
|
||||
plan: Dict,
|
||||
num_StepToRun: int,
|
||||
RehearsalLog: List,
|
||||
AgentProfile_Dict: Dict
|
||||
AgentProfile_Dict: Dict,
|
||||
existingKeyObjects: Dict = None,
|
||||
execution_id: str = None
|
||||
) -> Generator[str, None, None]:
|
||||
"""
|
||||
执行计划(流式返回版本,支持暂停/恢复)
|
||||
返回生成器,每次返回 SSE 格式的字符串
|
||||
动态执行计划,支持在执行过程中追加新步骤
|
||||
|
||||
使用方式:
|
||||
for event in executePlan_streaming(...):
|
||||
yield event
|
||||
Args:
|
||||
plan: 执行计划
|
||||
num_StepToRun: 要运行的步骤数
|
||||
RehearsalLog: 已执行的历史记录
|
||||
AgentProfile_Dict: 智能体配置
|
||||
existingKeyObjects: 已存在的KeyObjects
|
||||
execution_id: 执行ID(用于动态追加步骤)
|
||||
|
||||
Yields:
|
||||
SSE格式的事件字符串
|
||||
"""
|
||||
# 初始化执行状态
|
||||
general_goal = plan.get("General Goal", "")
|
||||
@@ -339,7 +389,7 @@ def executePlan_streaming(
|
||||
print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green"))
|
||||
|
||||
# 准备执行
|
||||
KeyObjects = {}
|
||||
KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {}
|
||||
finishedStep_index = -1
|
||||
|
||||
for logNode in RehearsalLog:
|
||||
@@ -348,6 +398,9 @@ def executePlan_streaming(
|
||||
if logNode["LogNodeType"] == "object":
|
||||
KeyObjects[logNode["NodeId"]] = logNode["content"]
|
||||
|
||||
if existingKeyObjects:
|
||||
print(colored(f"📦 使用已存在的 KeyObjects: {list(existingKeyObjects.keys())}", "cyan"))
|
||||
|
||||
# 确定要运行的步骤范围
|
||||
if num_StepToRun is None:
|
||||
run_to = len(plan["Collaboration Process"])
|
||||
@@ -355,78 +408,173 @@ def executePlan_streaming(
|
||||
run_to = (finishedStep_index + 1) + num_StepToRun
|
||||
|
||||
steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to]
|
||||
total_steps = len(steps_to_run)
|
||||
|
||||
print(colored(f"🚀 开始执行计划(流式推送),共 {total_steps} 个步骤", "cyan", attrs=["bold"]))
|
||||
# 使用动态执行管理器
|
||||
if execution_id:
|
||||
# 初始化执行管理器,使用传入的execution_id
|
||||
actual_execution_id = dynamic_execution_manager.start_execution(general_goal, steps_to_run, execution_id)
|
||||
print(colored(f"🚀 开始执行计划(动态模式),共 {len(steps_to_run)} 个步骤,执行ID: {actual_execution_id}", "cyan"))
|
||||
else:
|
||||
print(colored(f"🚀 开始执行计划(流式推送),共 {len(steps_to_run)} 个步骤", "cyan"))
|
||||
|
||||
total_steps = len(steps_to_run)
|
||||
|
||||
# 使用队列实现流式推送
|
||||
async def produce_events(queue: asyncio.Queue):
|
||||
"""异步生产者:每收到一个事件就放入队列"""
|
||||
"""异步生产者"""
|
||||
try:
|
||||
for step_index, stepDescrip in enumerate(steps_to_run):
|
||||
# 在每个步骤执行前检查暂停状态
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
# 用户请求停止,中断执行
|
||||
print(colored("🛑 用户请求停止执行", "red"))
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
step_index = 0
|
||||
|
||||
# 执行单个步骤(流式)
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
total_steps
|
||||
):
|
||||
# 检查是否需要停止
|
||||
if execution_state_manager.is_stopped():
|
||||
if execution_id:
|
||||
# 动态模式:循环获取下一个步骤
|
||||
# 等待新步骤的最大次数(避免无限等待)
|
||||
max_empty_wait_cycles = 60 # 最多等待60次,每次等待1秒
|
||||
empty_wait_count = 0
|
||||
|
||||
while True:
|
||||
# 检查暂停状态
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
print(colored("🛑 用户请求停止执行", "red"))
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
break
|
||||
|
||||
# 获取下一个步骤
|
||||
stepDescrip = dynamic_execution_manager.get_next_step(execution_id)
|
||||
|
||||
if stepDescrip is None:
|
||||
# 没有更多步骤了,检查是否应该继续等待
|
||||
empty_wait_count += 1
|
||||
|
||||
# 获取执行信息
|
||||
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
|
||||
|
||||
if execution_info:
|
||||
queue_total_steps = execution_info.get("total_steps", 0)
|
||||
completed_steps = execution_info.get("completed_steps", 0)
|
||||
|
||||
# 如果没有步骤在队列中(queue_total_steps为0),立即退出
|
||||
if queue_total_steps == 0:
|
||||
print(colored(f"⚠️ 没有步骤在队列中,退出执行", "yellow"))
|
||||
break
|
||||
|
||||
# 如果所有步骤都已完成,等待可能的新步骤
|
||||
if completed_steps >= queue_total_steps:
|
||||
if empty_wait_count >= max_empty_wait_cycles:
|
||||
# 等待超时,退出执行
|
||||
print(colored(f"✅ 所有步骤执行完成,等待超时", "green"))
|
||||
break
|
||||
else:
|
||||
# 等待新步骤追加
|
||||
print(colored(f"⏳ 等待新步骤追加... ({empty_wait_count}/{max_empty_wait_cycles})", "cyan"))
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
else:
|
||||
# 还有步骤未完成,继续尝试获取
|
||||
print(colored(f"⏳ 等待步骤就绪... ({completed_steps}/{queue_total_steps})", "cyan"))
|
||||
await asyncio.sleep(0.5)
|
||||
empty_wait_count = 0 # 重置等待计数
|
||||
continue
|
||||
else:
|
||||
# 执行信息不存在,退出
|
||||
print(colored(f"⚠️ 执行信息不存在,退出执行", "yellow"))
|
||||
break
|
||||
|
||||
# 重置等待计数
|
||||
empty_wait_count = 0
|
||||
|
||||
# 获取最新的总步骤数(用于显示)
|
||||
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
|
||||
current_total_steps = execution_info.get("total_steps", total_steps) if execution_info else total_steps
|
||||
|
||||
# 执行步骤
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
current_total_steps # 使用动态更新的总步骤数
|
||||
):
|
||||
if execution_state_manager.is_stopped():
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
await queue.put(event)
|
||||
|
||||
# 标记步骤完成
|
||||
dynamic_execution_manager.mark_step_completed(execution_id)
|
||||
|
||||
# 更新KeyObjects
|
||||
OutputName = stepDescrip.get("OutputObject", "")
|
||||
if OutputName and OutputName in KeyObjects:
|
||||
# 对象日志节点会在step_complete中发送
|
||||
pass
|
||||
|
||||
step_index += 1
|
||||
|
||||
else:
|
||||
# 非动态模式:按顺序执行所有步骤
|
||||
for step_index, stepDescrip in enumerate(steps_to_run):
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
print(colored("🛑 用户请求停止执行", "red"))
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
await queue.put(event) # ← 立即放入队列
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
total_steps
|
||||
):
|
||||
if execution_state_manager.is_stopped():
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
await queue.put(event)
|
||||
|
||||
except Exception as e:
|
||||
# 发送错误信息
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": f"执行出错: {str(e)}"
|
||||
})
|
||||
finally:
|
||||
await queue.put(None) # ← 发送完成信号
|
||||
await queue.put(None)
|
||||
|
||||
# 运行异步任务并实时 yield
|
||||
# 运行异步任务并实时yield
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
# 创建队列
|
||||
queue = asyncio.Queue(maxsize=10)
|
||||
|
||||
# 启动异步生产者任务
|
||||
producer_task = loop.create_task(produce_events(queue))
|
||||
|
||||
# 同步消费者:从队列读取并 yield(使用 run_until_complete)
|
||||
while True:
|
||||
event = loop.run_until_complete(queue.get()) # ← 从队列获取事件
|
||||
if event is None: # ← 收到完成信号
|
||||
event = loop.run_until_complete(queue.get())
|
||||
if event is None:
|
||||
break
|
||||
|
||||
# 立即转换为 SSE 格式并发送
|
||||
# 立即转换为SSE格式并发送
|
||||
event_str = json.dumps(event, ensure_ascii=False)
|
||||
yield f"data: {event_str}\n\n" # ← 立即发送给前端
|
||||
yield f"data: {event_str}\n\n"
|
||||
|
||||
# 等待生产者任务完成
|
||||
loop.run_until_complete(producer_task)
|
||||
|
||||
# 如果不是被停止的,发送完成信号
|
||||
if not execution_state_manager.is_stopped():
|
||||
complete_event = json.dumps({
|
||||
"type": "execution_complete",
|
||||
@@ -435,10 +583,26 @@ def executePlan_streaming(
|
||||
yield f"data: {complete_event}\n\n"
|
||||
|
||||
finally:
|
||||
# 清理任务
|
||||
# 在关闭事件循环之前先清理执行记录
|
||||
if execution_id:
|
||||
# 清理执行记录
|
||||
dynamic_execution_manager.cleanup(execution_id)
|
||||
|
||||
if 'producer_task' in locals():
|
||||
if not producer_task.done():
|
||||
producer_task.cancel()
|
||||
|
||||
# 确保所有任务都完成后再关闭事件循环
|
||||
try:
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception:
|
||||
pass # 忽略清理过程中的错误
|
||||
|
||||
loop.close()
|
||||
|
||||
|
||||
|
||||
# 保留旧版本函数以保持兼容性
|
||||
executePlan_streaming = executePlan_streaming_dynamic
|
||||
|
||||
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
动态执行管理器
|
||||
用于在任务执行过程中动态追加新步骤
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Dict, List, Optional, Any
|
||||
from threading import Lock
|
||||
|
||||
|
||||
class DynamicExecutionManager:
|
||||
"""
|
||||
动态执行管理器
|
||||
管理正在执行的任务,支持动态追加新步骤
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 执行状态: goal -> execution_info
|
||||
self._executions: Dict[str, Dict] = {}
|
||||
|
||||
# 线程锁
|
||||
self._lock = Lock()
|
||||
|
||||
# 步骤队列: goal -> List[step]
|
||||
self._step_queues: Dict[str, List] = {}
|
||||
|
||||
# 已执行的步骤索引: goal -> Set[step_index]
|
||||
self._executed_steps: Dict[str, set] = {}
|
||||
|
||||
# 待执行的步骤索引: goal -> List[step_index]
|
||||
self._pending_steps: Dict[str, List[int]] = {}
|
||||
|
||||
def start_execution(self, goal: str, initial_steps: List[Dict], execution_id: str = None) -> str:
|
||||
"""
|
||||
开始执行一个新的任务
|
||||
|
||||
Args:
|
||||
goal: 任务目标
|
||||
initial_steps: 初始步骤列表
|
||||
execution_id: 执行ID,如果不提供则自动生成
|
||||
|
||||
Returns:
|
||||
执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
# 如果未提供execution_id,则生成一个
|
||||
if execution_id is None:
|
||||
execution_id = f"{goal}_{asyncio.get_event_loop().time()}"
|
||||
|
||||
self._executions[execution_id] = {
|
||||
"goal": goal,
|
||||
"status": "running",
|
||||
"total_steps": len(initial_steps),
|
||||
"completed_steps": 0
|
||||
}
|
||||
|
||||
# 初始化步骤队列
|
||||
self._step_queues[execution_id] = initial_steps.copy()
|
||||
|
||||
# 初始化已执行步骤集合
|
||||
self._executed_steps[execution_id] = set()
|
||||
|
||||
# 初始化待执行步骤索引
|
||||
self._pending_steps[execution_id] = list(range(len(initial_steps)))
|
||||
|
||||
print(f"🚀 启动执行: {execution_id}")
|
||||
print(f"📊 初始步骤数: {len(initial_steps)}")
|
||||
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
|
||||
|
||||
return execution_id
|
||||
|
||||
def add_steps(self, execution_id: str, new_steps: List[Dict]) -> int:
|
||||
"""
|
||||
向执行中追加新步骤
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
new_steps: 新步骤列表
|
||||
|
||||
Returns:
|
||||
追加的步骤数量
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._step_queues:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在,无法追加步骤")
|
||||
return 0
|
||||
|
||||
current_count = len(self._step_queues[execution_id])
|
||||
|
||||
# 追加新步骤到队列
|
||||
self._step_queues[execution_id].extend(new_steps)
|
||||
|
||||
# 添加新步骤的索引到待执行列表
|
||||
new_indices = list(range(current_count, current_count + len(new_steps)))
|
||||
self._pending_steps[execution_id].extend(new_indices)
|
||||
|
||||
# 更新总步骤数
|
||||
old_total = self._executions[execution_id]["total_steps"]
|
||||
self._executions[execution_id]["total_steps"] = len(self._step_queues[execution_id])
|
||||
new_total = self._executions[execution_id]["total_steps"]
|
||||
|
||||
print(f"➕ 追加了 {len(new_steps)} 个步骤到 {execution_id}")
|
||||
print(f"📊 步骤总数: {old_total} -> {new_total}")
|
||||
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
|
||||
|
||||
return len(new_steps)
|
||||
|
||||
def get_next_step(self, execution_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
获取下一个待执行的步骤
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
下一个步骤,如果没有则返回None
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
|
||||
return None
|
||||
|
||||
# 获取第一个待执行步骤的索引
|
||||
if not self._pending_steps[execution_id]:
|
||||
return None
|
||||
|
||||
step_index = self._pending_steps[execution_id].pop(0)
|
||||
|
||||
# 从队列中获取步骤
|
||||
if step_index >= len(self._step_queues[execution_id]):
|
||||
print(f"⚠️ 警告: 步骤索引 {step_index} 超出范围")
|
||||
return None
|
||||
|
||||
step = self._step_queues[execution_id][step_index]
|
||||
|
||||
# 标记为已执行
|
||||
self._executed_steps[execution_id].add(step_index)
|
||||
|
||||
step_name = step.get("StepName", "未知")
|
||||
print(f"🎯 获取下一个步骤: {step_name} (索引: {step_index})")
|
||||
print(f"📋 剩余待执行步骤: {len(self._pending_steps[execution_id])}")
|
||||
|
||||
return step
|
||||
|
||||
def mark_step_completed(self, execution_id: str):
|
||||
"""
|
||||
标记一个步骤完成
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["completed_steps"] += 1
|
||||
completed = self._executions[execution_id]["completed_steps"]
|
||||
total = self._executions[execution_id]["total_steps"]
|
||||
print(f"📊 步骤完成进度: {completed}/{total}")
|
||||
else:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
|
||||
|
||||
def get_execution_info(self, execution_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
获取执行信息
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
执行信息字典
|
||||
"""
|
||||
with self._lock:
|
||||
return self._executions.get(execution_id)
|
||||
|
||||
def get_pending_count(self, execution_id: str) -> int:
|
||||
"""
|
||||
获取待执行步骤数量
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
待执行步骤数量
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
return 0
|
||||
return len(self._pending_steps[execution_id])
|
||||
|
||||
def has_more_steps(self, execution_id: str) -> bool:
|
||||
"""
|
||||
检查是否还有更多步骤待执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
是否还有待执行步骤
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
return False
|
||||
return len(self._pending_steps[execution_id]) > 0
|
||||
|
||||
def finish_execution(self, execution_id: str):
|
||||
"""
|
||||
完成执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["status"] = "completed"
|
||||
|
||||
def cancel_execution(self, execution_id: str):
|
||||
"""
|
||||
取消执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["status"] = "cancelled"
|
||||
|
||||
def cleanup(self, execution_id: str):
|
||||
"""
|
||||
清理执行记录
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
self._executions.pop(execution_id, None)
|
||||
self._step_queues.pop(execution_id, None)
|
||||
self._executed_steps.pop(execution_id, None)
|
||||
self._pending_steps.pop(execution_id, None)
|
||||
|
||||
|
||||
# 全局单例
|
||||
dynamic_execution_manager = DynamicExecutionManager()
|
||||
@@ -270,6 +270,12 @@ def Handle_executePlanOptimized():
|
||||
- 无依赖关系的动作并行执行
|
||||
- 有依赖关系的动作串行执行
|
||||
|
||||
支持参数:
|
||||
plan: 执行计划
|
||||
num_StepToRun: 要运行的步骤数
|
||||
RehearsalLog: 已执行的历史记录
|
||||
existingKeyObjects: 已存在的KeyObjects(用于重新执行时传递中间结果)
|
||||
|
||||
前端使用 EventSource 接收
|
||||
"""
|
||||
incoming_data = request.get_json()
|
||||
@@ -281,6 +287,7 @@ def Handle_executePlanOptimized():
|
||||
num_StepToRun=incoming_data.get("num_StepToRun"),
|
||||
RehearsalLog=incoming_data.get("RehearsalLog", []),
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
existingKeyObjects=incoming_data.get("existingKeyObjects"),
|
||||
):
|
||||
yield chunk
|
||||
except Exception as e:
|
||||
@@ -405,7 +412,7 @@ def handle_ping():
|
||||
def handle_execute_plan_optimized_ws(data):
|
||||
"""
|
||||
WebSocket版本:优化版流式执行计划
|
||||
支持步骤级流式 + 动作级智能并行
|
||||
支持步骤级流式 + 动作级智能并行 + 动态追加步骤
|
||||
|
||||
请求格式:
|
||||
{
|
||||
@@ -414,7 +421,8 @@ def handle_execute_plan_optimized_ws(data):
|
||||
"data": {
|
||||
"plan": {...},
|
||||
"num_StepToRun": null,
|
||||
"RehearsalLog": []
|
||||
"RehearsalLog": [],
|
||||
"enable_dynamic": true # 是否启用动态追加步骤
|
||||
}
|
||||
}
|
||||
"""
|
||||
@@ -425,27 +433,66 @@ def handle_execute_plan_optimized_ws(data):
|
||||
plan = incoming_data.get("plan")
|
||||
num_StepToRun = incoming_data.get("num_StepToRun")
|
||||
RehearsalLog = incoming_data.get("RehearsalLog", [])
|
||||
enable_dynamic = incoming_data.get("enable_dynamic", False)
|
||||
|
||||
# 使用原有的流式执行函数
|
||||
for chunk in executePlan_streaming(
|
||||
plan=plan,
|
||||
num_StepToRun=num_StepToRun,
|
||||
RehearsalLog=RehearsalLog,
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
):
|
||||
# 通过WebSocket推送进度
|
||||
# 如果前端传入了execution_id,使用前端的;否则生成新的
|
||||
execution_id = incoming_data.get("execution_id")
|
||||
if not execution_id:
|
||||
import time
|
||||
execution_id = f"{plan.get('General Goal', '').replace(' ', '_')}_{int(time.time() * 1000)}"
|
||||
|
||||
if enable_dynamic:
|
||||
# 动态模式:使用executePlan_streaming_dynamic
|
||||
from AgentCoord.RehearsalEngine_V2.ExecutePlan_Optimized import executePlan_streaming_dynamic
|
||||
|
||||
# 发送执行ID(确认使用的ID)
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'streaming',
|
||||
'data': chunk.replace('data: ', '').replace('\n\n', '')
|
||||
'status': 'execution_started',
|
||||
'execution_id': execution_id,
|
||||
'message': '执行已启动,支持动态追加步骤'
|
||||
})
|
||||
|
||||
# 发送完成信号
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'complete',
|
||||
'data': None
|
||||
})
|
||||
for chunk in executePlan_streaming_dynamic(
|
||||
plan=plan,
|
||||
num_StepToRun=num_StepToRun,
|
||||
RehearsalLog=RehearsalLog,
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
execution_id=execution_id
|
||||
):
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'streaming',
|
||||
'data': chunk.replace('data: ', '').replace('\n\n', '')
|
||||
})
|
||||
|
||||
# 发送完成信号
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'complete',
|
||||
'data': None
|
||||
})
|
||||
|
||||
else:
|
||||
# 非动态模式:使用原有方式
|
||||
for chunk in executePlan_streaming(
|
||||
plan=plan,
|
||||
num_StepToRun=num_StepToRun,
|
||||
RehearsalLog=RehearsalLog,
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
):
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'streaming',
|
||||
'data': chunk.replace('data: ', '').replace('\n\n', '')
|
||||
})
|
||||
|
||||
# 发送完成信号
|
||||
emit('progress', {
|
||||
'id': request_id,
|
||||
'status': 'complete',
|
||||
'data': None
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
# 发送错误信息
|
||||
@@ -456,6 +503,68 @@ def handle_execute_plan_optimized_ws(data):
|
||||
})
|
||||
|
||||
|
||||
@socketio.on('add_steps_to_execution')
|
||||
def handle_add_steps_to_execution(data):
|
||||
"""
|
||||
WebSocket版本:向正在执行的任务追加新步骤
|
||||
|
||||
请求格式:
|
||||
{
|
||||
"id": "request-id",
|
||||
"action": "add_steps_to_execution",
|
||||
"data": {
|
||||
"execution_id": "execution_id",
|
||||
"new_steps": [...]
|
||||
}
|
||||
}
|
||||
"""
|
||||
request_id = data.get('id')
|
||||
incoming_data = data.get('data', {})
|
||||
|
||||
try:
|
||||
from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager
|
||||
|
||||
execution_id = incoming_data.get('execution_id')
|
||||
new_steps = incoming_data.get('new_steps', [])
|
||||
|
||||
if not execution_id:
|
||||
emit('response', {
|
||||
'id': request_id,
|
||||
'status': 'error',
|
||||
'error': '缺少execution_id参数'
|
||||
})
|
||||
return
|
||||
|
||||
# 追加新步骤到执行队列
|
||||
added_count = dynamic_execution_manager.add_steps(execution_id, new_steps)
|
||||
|
||||
if added_count > 0:
|
||||
print(f"✅ 成功追加 {added_count} 个步骤到执行队列: {execution_id}")
|
||||
emit('response', {
|
||||
'id': request_id,
|
||||
'status': 'success',
|
||||
'data': {
|
||||
'message': f'成功追加 {added_count} 个步骤',
|
||||
'added_count': added_count
|
||||
}
|
||||
})
|
||||
else:
|
||||
print(f"⚠️ 无法追加步骤,执行ID不存在或已结束: {execution_id}")
|
||||
emit('response', {
|
||||
'id': request_id,
|
||||
'status': 'error',
|
||||
'error': '执行ID不存在或已结束'
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 追加步骤失败: {str(e)}")
|
||||
emit('response', {
|
||||
'id': request_id,
|
||||
'status': 'error',
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
|
||||
@socketio.on('generate_base_plan')
|
||||
def handle_generate_base_plan_ws(data):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user