feat:RESTful API架构改WebSocket架构-执行结果可以分步显示版本

This commit is contained in:
liailing1026
2026-01-22 17:22:30 +08:00
parent 1c8036adf1
commit 786c674d21
15 changed files with 2591 additions and 308 deletions

View File

@@ -5,6 +5,7 @@ from typing import List, Dict, Set, Generator, Any
import AgentCoord.RehearsalEngine_V2.Action as Action
import AgentCoord.util as util
from termcolor import colored
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
# ==================== 配置参数 ====================
@@ -236,6 +237,14 @@ async def execute_step_async_streaming(
# 分批执行动作
for batch_index, batch_indices in enumerate(batches):
# 在每个批次执行前检查暂停状态
print(f"🔍 [DEBUG] 步骤 {StepName}: 批次 {batch_index+1}/{len(batches)} 执行前,检查暂停状态...")
should_continue = await execution_state_manager.async_check_pause()
if not should_continue:
# 用户请求停止,中断执行
util.print_colored("🛑 用户请求停止执行", "red")
return
batch_size = len(batch_indices)
if batch_size > 1:
@@ -316,13 +325,19 @@ def executePlan_streaming(
AgentProfile_Dict: Dict
) -> Generator[str, None, None]:
"""
执行计划(流式返回版本)
执行计划(流式返回版本,支持暂停/恢复
返回生成器,每次返回 SSE 格式的字符串
使用方式:
for event in executePlan_streaming(...):
yield event
"""
# 初始化执行状态
general_goal = plan.get("General Goal", "")
execution_state_manager.start_execution(general_goal)
print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green"))
# 准备执行
KeyObjects = {}
finishedStep_index = -1
@@ -349,6 +364,17 @@ def executePlan_streaming(
"""异步生产者:每收到一个事件就放入队列"""
try:
for step_index, stepDescrip in enumerate(steps_to_run):
# 在每个步骤执行前检查暂停状态
should_continue = await execution_state_manager.async_check_pause()
if not should_continue:
# 用户请求停止,中断执行
print(colored("🛑 用户请求停止执行", "red"))
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
return
# 执行单个步骤(流式)
async for event in execute_step_async_streaming(
stepDescrip,
@@ -358,7 +384,21 @@ def executePlan_streaming(
step_index,
total_steps
):
# 检查是否需要停止
if execution_state_manager.is_stopped():
await queue.put({
"type": "error",
"message": "执行已被用户停止"
})
return
await queue.put(event) # ← 立即放入队列
except Exception as e:
# 发送错误信息
await queue.put({
"type": "error",
"message": f"执行出错: {str(e)}"
})
finally:
await queue.put(None) # ← 发送完成信号
@@ -386,12 +426,13 @@ def executePlan_streaming(
# 等待生产者任务完成
loop.run_until_complete(producer_task)
# 发送完成信号
complete_event = json.dumps({
"type": "execution_complete",
"total_steps": total_steps
}, ensure_ascii=False)
yield f"data: {complete_event}\n\n"
# 如果不是被停止的,发送完成信号
if not execution_state_manager.is_stopped():
complete_event = json.dumps({
"type": "execution_complete",
"total_steps": total_steps
}, ensure_ascii=False)
yield f"data: {complete_event}\n\n"
finally:
# 清理任务
@@ -399,3 +440,5 @@ def executePlan_streaming(
if not producer_task.done():
producer_task.cancel()
loop.close()

View File

@@ -0,0 +1,190 @@
"""
全局执行状态管理器
用于支持任务的暂停、恢复和停止功能
使用轮询检查机制,确保线程安全
"""
import threading
import asyncio
import time
from typing import Optional
from enum import Enum
class ExecutionStatus(Enum):
"""执行状态枚举"""
RUNNING = "running" # 正在运行
PAUSED = "paused" # 已暂停
STOPPED = "stopped" # 已停止
IDLE = "idle" # 空闲
class ExecutionStateManager:
"""
全局执行状态管理器(单例模式)
功能:
- 管理任务执行状态(运行/暂停/停止)
- 使用轮询检查机制,避免异步事件的线程问题
- 提供线程安全的状态查询和修改接口
"""
_instance: Optional['ExecutionStateManager'] = None
_lock = threading.Lock()
def __new__(cls):
"""单例模式"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
"""初始化状态管理器"""
if self._initialized:
return
self._initialized = True
self._status = ExecutionStatus.IDLE
self._current_goal: Optional[str] = None # 当前执行的任务目标
# 使用简单的布尔标志,而不是 asyncio.Event
self._should_pause = False
self._should_stop = False
def get_status(self) -> ExecutionStatus:
"""获取当前执行状态"""
with self._lock:
return self._status
def set_goal(self, goal: str):
"""设置当前执行的任务目标"""
with self._lock:
self._current_goal = goal
def get_goal(self) -> Optional[str]:
"""获取当前执行的任务目标"""
with self._lock:
return self._current_goal
def start_execution(self, goal: str):
"""开始执行"""
with self._lock:
self._status = ExecutionStatus.RUNNING
self._current_goal = goal
self._should_pause = False
self._should_stop = False
print(f"🚀 [DEBUG] start_execution: 状态设置为 RUNNING, goal={goal}")
def pause_execution(self) -> bool:
"""
暂停执行
Returns:
bool: 是否成功暂停
"""
with self._lock:
if self._status != ExecutionStatus.RUNNING:
print(f"⚠️ [DEBUG] pause_execution: 当前状态不是RUNNING而是 {self._status}")
return False
self._status = ExecutionStatus.PAUSED
self._should_pause = True
print(f"⏸️ [DEBUG] pause_execution: 状态设置为PAUSED, should_pause=True")
return True
def resume_execution(self) -> bool:
"""
恢复执行
Returns:
bool: 是否成功恢复
"""
with self._lock:
if self._status != ExecutionStatus.PAUSED:
print(f"⚠️ [DEBUG] resume_execution: 当前状态不是PAUSED而是 {self._status}")
return False
self._status = ExecutionStatus.RUNNING
self._should_pause = False
print(f"▶️ [DEBUG] resume_execution: 状态设置为RUNNING, should_pause=False")
return True
def stop_execution(self) -> bool:
"""
停止执行
Returns:
bool: 是否成功停止
"""
with self._lock:
if self._status in [ExecutionStatus.IDLE, ExecutionStatus.STOPPED]:
return False
self._status = ExecutionStatus.STOPPED
self._should_stop = True
self._should_pause = False
print(f"🛑 [DEBUG] stop_execution: 状态设置为STOPPED")
return True
def reset(self):
"""重置状态为空闲"""
with self._lock:
self._status = ExecutionStatus.IDLE
self._current_goal = None
self._should_pause = False
self._should_stop = False
print(f"🔄 [DEBUG] reset: 状态重置为IDLE")
async def async_check_pause(self):
"""
异步检查是否需要暂停(轮询方式)
如果处于暂停状态,会阻塞当前协程直到恢复或停止
应该在执行循环的关键点调用此方法
Returns:
bool: 如果返回True表示应该继续执行False表示应该停止
"""
# 使用轮询检查,避免异步事件问题
while True:
# 检查停止标志
if self._should_stop:
print("🛑 [DEBUG] async_check_pause: 检测到停止信号")
return False
# 检查暂停状态
if self._should_pause:
# 处于暂停状态,等待恢复
print("⏸️ [DEBUG] async_check_pause: 检测到暂停,等待恢复...")
await asyncio.sleep(0.1) # 短暂睡眠避免占用CPU
# 如果恢复,继续执行
if not self._should_pause:
print("▶️ [DEBUG] async_check_pause: 从暂停中恢复!")
continue
# 如果停止了,返回
if self._should_stop:
return False
# 继续等待
continue
# 既没有停止也没有暂停,可以继续执行
return True
def is_paused(self) -> bool:
"""检查是否处于暂停状态"""
with self._lock:
return self._status == ExecutionStatus.PAUSED
def is_running(self) -> bool:
"""检查是否正在运行"""
with self._lock:
return self._status == ExecutionStatus.RUNNING
def is_stopped(self) -> bool:
"""检查是否已停止"""
with self._lock:
return self._status == ExecutionStatus.STOPPED
# 全局单例实例
execution_state_manager = ExecutionStateManager()

File diff suppressed because it is too large Load Diff