Compare commits
48 Commits
631aa6dcdc
...
web
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1749ae4f1e | ||
|
|
418b2e5f8f | ||
|
|
641d70033d | ||
|
|
b287867069 | ||
|
|
5699635d1a | ||
|
|
ac035d1237 | ||
|
|
53add0431e | ||
|
|
786c674d21 | ||
|
|
1c8036adf1 | ||
|
|
45314b7be6 | ||
|
|
c5848410c1 | ||
|
|
571b5101ff | ||
|
|
029df6b5a5 | ||
|
|
edb39d4c1f | ||
|
|
0e87777ae8 | ||
|
|
244deceb91 | ||
|
|
69587c0481 | ||
|
|
e0cc11647f | ||
|
|
59fd94e783 | ||
|
|
3ff70463ca | ||
|
|
82e92f12aa | ||
|
|
920588b063 | ||
|
|
5847365eee | ||
|
|
d42554ce03 | ||
|
|
bcc0c53ba1 | ||
|
|
7da5e82d40 | ||
|
|
cc22655a1e | ||
|
|
f0db3c88e4 | ||
|
|
b987fe70ad | ||
|
|
b42ab5aedd | ||
|
|
5ef86c6fa9 | ||
|
|
907310365a | ||
|
|
5dace5f788 | ||
|
|
77530c49f8 | ||
|
|
6392301833 | ||
|
|
ab8c9e294d | ||
|
|
1aa9e280b0 | ||
|
|
4fa5504697 | ||
|
|
041986f5cd | ||
|
|
00ef22505e | ||
|
|
b73419b7a0 | ||
|
|
974af053ca | ||
|
|
0c571dec21 | ||
|
|
e40cdd1dee | ||
|
|
4604d70848 | ||
|
|
408cd2a384 | ||
|
|
a465fc8fbe | ||
|
|
c779d7756b |
@@ -2,7 +2,8 @@
|
||||
<p align="center">
|
||||
<a ><img src="static/icon.png" alt=" AgentCoord: Visually Exploring Coordination Strategy for LLM-based Multi-Agent Collaboration" width="200px"></a>
|
||||
</p>
|
||||
AgentCoord is an experimental open-source system to help general users design coordination strategies for multiple LLM-based agents (Research paper forthcoming).
|
||||
AgentCoord is an experimental open-source system to help general users design coordination strategies for multiple LLM-based agents.
|
||||
[Research paper](https://arxiv.org/abs/2404.11943)
|
||||
|
||||
## System Usage
|
||||
<a href="https://youtu.be/s56rHJx-eqY" target="_blank"><img src="static/videoPreview.png"
|
||||
|
||||
1
backend/.gitignore
vendored
1
backend/.gitignore
vendored
@@ -124,6 +124,7 @@ venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
.idea/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
|
||||
@@ -1,24 +1,35 @@
|
||||
import asyncio
|
||||
import openai
|
||||
|
||||
import httpx
|
||||
from openai import OpenAI, AsyncOpenAI, max_retries
|
||||
import yaml
|
||||
from termcolor import colored
|
||||
import os
|
||||
|
||||
# Helper function to avoid circular import
|
||||
def print_colored(text, text_color="green", background="on_white"):
|
||||
print(colored(text, text_color, background))
|
||||
|
||||
# load config (apikey, apibase, model)
|
||||
yaml_file = os.path.join(os.getcwd(), "config", "config.yaml")
|
||||
try:
|
||||
with open(yaml_file, "r", encoding="utf-8") as file:
|
||||
yaml_data = yaml.safe_load(file)
|
||||
except Exception:
|
||||
yaml_file = {}
|
||||
yaml_data = {}
|
||||
OPENAI_API_BASE = os.getenv("OPENAI_API_BASE") or yaml_data.get(
|
||||
"OPENAI_API_BASE", "https://api.openai.com"
|
||||
)
|
||||
openai.api_base = OPENAI_API_BASE
|
||||
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or yaml_data.get(
|
||||
"OPENAI_API_KEY", ""
|
||||
)
|
||||
openai.api_key = OPENAI_API_KEY
|
||||
OPENAI_API_MODEL = os.getenv("OPENAI_API_MODEL") or yaml_data.get(
|
||||
"OPENAI_API_MODEL", ""
|
||||
)
|
||||
|
||||
# Initialize OpenAI clients
|
||||
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE)
|
||||
async_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE)
|
||||
MODEL: str = os.getenv("OPENAI_API_MODEL") or yaml_data.get(
|
||||
"OPENAI_API_MODEL", "gpt-4-turbo-preview"
|
||||
)
|
||||
@@ -35,8 +46,11 @@ MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") or yaml_data.get(
|
||||
|
||||
# for LLM completion
|
||||
def LLM_Completion(
|
||||
messages: list[dict], stream: bool = True, useGroq: bool = True
|
||||
messages: list[dict], stream: bool = True, useGroq: bool = True,model_config: dict = None
|
||||
) -> str:
|
||||
if model_config:
|
||||
print_colored(f"Using model config: {model_config}", "blue")
|
||||
return _call_with_custom_config(messages,stream,model_config)
|
||||
if not useGroq or not FAST_DESIGN_MODE:
|
||||
force_gpt4 = True
|
||||
useGroq = False
|
||||
@@ -69,16 +83,107 @@ def LLM_Completion(
|
||||
return _chat_completion(messages=messages)
|
||||
|
||||
|
||||
def _call_with_custom_config(messages: list[dict], stream: bool, model_config: dict) ->str:
|
||||
"使用自定义配置调用API"
|
||||
api_url = model_config.get("apiUrl", OPENAI_API_BASE)
|
||||
api_key = model_config.get("apiKey", OPENAI_API_KEY)
|
||||
api_model = model_config.get("apiModel", OPENAI_API_MODEL)
|
||||
|
||||
temp_client = OpenAI(api_key=api_key, base_url=api_url)
|
||||
temp_async_client = AsyncOpenAI(api_key=api_key, base_url=api_url)
|
||||
|
||||
try:
|
||||
if stream:
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
except RuntimeError as ex:
|
||||
if "There is no current event loop in thread" in str(ex):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
return loop.run_until_complete(
|
||||
_achat_completion_stream_custom(messages=messages, temp_async_client=temp_async_client, api_model=api_model)
|
||||
)
|
||||
else:
|
||||
response = temp_client.chat.completions.create(
|
||||
messages=messages,
|
||||
model=api_model,
|
||||
temperature=0.3,
|
||||
max_tokens=4096,
|
||||
timeout=180
|
||||
|
||||
)
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception(f"API returned empty response for model {api_model}")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception(f"API returned invalid response format for model {api_model}")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception(f"API returned None content for model {api_model}")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
return full_reply_content
|
||||
except Exception as e:
|
||||
print_colored(f"Custom API error for model {api_model} :{str(e)}","red")
|
||||
raise
|
||||
|
||||
|
||||
async def _achat_completion_stream_custom(messages:list[dict], temp_async_client, api_model: str ) -> str:
|
||||
max_retries=3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = await temp_async_client.chat.completions.create(
|
||||
messages=messages,
|
||||
model=api_model,
|
||||
temperature=0.3,
|
||||
max_tokens=4096,
|
||||
stream=True,
|
||||
timeout=180
|
||||
)
|
||||
|
||||
collected_chunks = []
|
||||
collected_messages = []
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk)
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message)
|
||||
if chunk_message.content:
|
||||
print(colored(chunk_message.content, "blue", "on_white"), end="")
|
||||
print()
|
||||
full_reply_content = "".join(
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception(f"Stream API returned empty content for model {api_model}")
|
||||
|
||||
return full_reply_content
|
||||
except httpx.RemoteProtocolError as e:
|
||||
if attempt < max_retries - 1:
|
||||
wait_time = (attempt + 1) *2
|
||||
print_colored(f"⚠️ Stream connection interrupted (attempt {attempt+1}/{max_retries}). Retrying in {wait_time}s...", text_color="yellow")
|
||||
await asyncio.sleep(wait_time)
|
||||
continue
|
||||
except Exception as e:
|
||||
print_colored(f"Custom API stream error for model {api_model} :{str(e)}","red")
|
||||
raise
|
||||
|
||||
|
||||
async def _achat_completion_stream_groq(messages: list[dict]) -> str:
|
||||
from groq import AsyncGroq
|
||||
client = AsyncGroq(api_key=GROQ_API_KEY)
|
||||
groq_client = AsyncGroq(api_key=GROQ_API_KEY)
|
||||
|
||||
max_attempts = 5
|
||||
|
||||
for attempt in range(max_attempts):
|
||||
print("Attempt to use Groq (Fase Design Mode):")
|
||||
try:
|
||||
stream = await client.chat.completions.create(
|
||||
response = await groq_client.chat.completions.create(
|
||||
messages=messages,
|
||||
# model='gemma-7b-it',
|
||||
model="mixtral-8x7b-32768",
|
||||
@@ -92,9 +197,18 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str:
|
||||
if attempt < max_attempts - 1: # i is zero indexed
|
||||
continue
|
||||
else:
|
||||
raise "failed"
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception("Groq API returned empty response")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception("Groq API returned invalid response format")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("Groq API returned None content")
|
||||
|
||||
full_reply_content = stream.choices[0].message.content
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
@@ -103,14 +217,14 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str:
|
||||
async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
|
||||
from mistralai.client import MistralClient
|
||||
from mistralai.models.chat_completion import ChatMessage
|
||||
client = MistralClient(api_key=MISTRAL_API_KEY)
|
||||
mistral_client = MistralClient(api_key=MISTRAL_API_KEY)
|
||||
# client=AsyncGroq(api_key=GROQ_API_KEY)
|
||||
max_attempts = 5
|
||||
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
messages[len(messages) - 1]["role"] = "user"
|
||||
stream = client.chat(
|
||||
stream = mistral_client.chat(
|
||||
messages=[
|
||||
ChatMessage(
|
||||
role=message["role"], content=message["content"]
|
||||
@@ -119,31 +233,35 @@ async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
|
||||
],
|
||||
# model = "mistral-small-latest",
|
||||
model="open-mixtral-8x7b",
|
||||
# response_format={"type": "json_object"},
|
||||
)
|
||||
break # If the operation is successful, break the loop
|
||||
except Exception:
|
||||
if attempt < max_attempts - 1: # i is zero indexed
|
||||
continue
|
||||
else:
|
||||
raise "failed"
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not stream.choices or len(stream.choices) == 0:
|
||||
raise Exception("Mistral API returned empty response")
|
||||
if not stream.choices[0] or not stream.choices[0].message:
|
||||
raise Exception("Mistral API returned invalid response format")
|
||||
|
||||
full_reply_content = stream.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("Mistral API returned None content")
|
||||
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
|
||||
|
||||
async def _achat_completion_stream_gpt35(messages: list[dict]) -> str:
|
||||
openai.api_key = OPENAI_API_KEY
|
||||
openai.api_base = OPENAI_API_BASE
|
||||
response = await openai.ChatCompletion.acreate(
|
||||
response = await async_client.chat.completions.create(
|
||||
messages=messages,
|
||||
max_tokens=4096,
|
||||
n=1,
|
||||
stop=None,
|
||||
temperature=0.3,
|
||||
timeout=3,
|
||||
timeout=600,
|
||||
model="gpt-3.5-turbo-16k",
|
||||
stream=True,
|
||||
)
|
||||
@@ -154,40 +272,38 @@ async def _achat_completion_stream_gpt35(messages: list[dict]) -> str:
|
||||
# iterate through the stream of events
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk) # save the event response
|
||||
choices = chunk["choices"]
|
||||
if len(choices) > 0:
|
||||
chunk_message = chunk["choices"][0].get(
|
||||
"delta", {}
|
||||
) # extract the message
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if "content" in chunk_message:
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message["content"], "blue", "on_white"),
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
print()
|
||||
|
||||
full_reply_content = "".join(
|
||||
[m.get("content", "") for m in collected_messages]
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception("Stream API (gpt-3.5) returned empty content")
|
||||
|
||||
return full_reply_content
|
||||
|
||||
|
||||
async def _achat_completion_json(messages: list[dict]) -> str:
|
||||
openai.api_key = OPENAI_API_KEY
|
||||
openai.api_base = OPENAI_API_BASE
|
||||
|
||||
def _achat_completion_json(messages: list[dict] ) -> str:
|
||||
max_attempts = 5
|
||||
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
stream = await openai.ChatCompletion.acreate(
|
||||
response = async_client.chat.completions.create(
|
||||
messages=messages,
|
||||
max_tokens=4096,
|
||||
n=1,
|
||||
stop=None,
|
||||
temperature=0.3,
|
||||
timeout=3,
|
||||
timeout=600,
|
||||
model=MODEL,
|
||||
response_format={"type": "json_object"},
|
||||
)
|
||||
@@ -196,18 +312,26 @@ async def _achat_completion_json(messages: list[dict]) -> str:
|
||||
if attempt < max_attempts - 1: # i is zero indexed
|
||||
continue
|
||||
else:
|
||||
raise "failed"
|
||||
raise Exception("failed")
|
||||
|
||||
# 检查响应是否有效
|
||||
if not response.choices or len(response.choices) == 0:
|
||||
raise Exception("OpenAI API returned empty response")
|
||||
if not response.choices[0] or not response.choices[0].message:
|
||||
raise Exception("OpenAI API returned invalid response format")
|
||||
|
||||
full_reply_content = response.choices[0].message.content
|
||||
if full_reply_content is None:
|
||||
raise Exception("OpenAI API returned None content")
|
||||
|
||||
full_reply_content = stream.choices[0].message.content
|
||||
print(colored(full_reply_content, "blue", "on_white"), end="")
|
||||
print()
|
||||
return full_reply_content
|
||||
|
||||
|
||||
async def _achat_completion_stream(messages: list[dict]) -> str:
|
||||
openai.api_key = OPENAI_API_KEY
|
||||
openai.api_base = OPENAI_API_BASE
|
||||
response = await openai.ChatCompletion.acreate(
|
||||
try:
|
||||
response = await async_client.chat.completions.create(
|
||||
**_cons_kwargs(messages), stream=True
|
||||
)
|
||||
|
||||
@@ -217,39 +341,58 @@ async def _achat_completion_stream(messages: list[dict]) -> str:
|
||||
# iterate through the stream of events
|
||||
async for chunk in response:
|
||||
collected_chunks.append(chunk) # save the event response
|
||||
choices = chunk["choices"]
|
||||
if len(choices) > 0:
|
||||
chunk_message = chunk["choices"][0].get(
|
||||
"delta", {}
|
||||
) # extract the message
|
||||
choices = chunk.choices
|
||||
if len(choices) > 0 and choices[0] is not None:
|
||||
chunk_message = choices[0].delta
|
||||
if chunk_message is not None:
|
||||
collected_messages.append(chunk_message) # save the message
|
||||
if "content" in chunk_message:
|
||||
if chunk_message.content:
|
||||
print(
|
||||
colored(chunk_message["content"], "blue", "on_white"),
|
||||
colored(chunk_message.content, "blue", "on_white"),
|
||||
end="",
|
||||
)
|
||||
print()
|
||||
|
||||
full_reply_content = "".join(
|
||||
[m.get("content", "") for m in collected_messages]
|
||||
[m.content or "" for m in collected_messages if m is not None]
|
||||
)
|
||||
|
||||
# 检查最终结果是否为空
|
||||
if not full_reply_content or full_reply_content.strip() == "":
|
||||
raise Exception("Stream API returned empty content")
|
||||
|
||||
return full_reply_content
|
||||
except Exception as e:
|
||||
print_colored(f"OpenAI API error in _achat_completion_stream: {str(e)}", "red")
|
||||
raise
|
||||
|
||||
|
||||
def _chat_completion(messages: list[dict]) -> str:
|
||||
rsp = openai.ChatCompletion.create(**_cons_kwargs(messages))
|
||||
content = rsp["choices"][0]["message"]["content"]
|
||||
try:
|
||||
rsp = client.chat.completions.create(**_cons_kwargs(messages))
|
||||
|
||||
# 检查响应是否有效
|
||||
if not rsp.choices or len(rsp.choices) == 0:
|
||||
raise Exception("OpenAI API returned empty response")
|
||||
if not rsp.choices[0] or not rsp.choices[0].message:
|
||||
raise Exception("OpenAI API returned invalid response format")
|
||||
|
||||
content = rsp.choices[0].message.content
|
||||
if content is None:
|
||||
raise Exception("OpenAI API returned None content")
|
||||
|
||||
return content
|
||||
except Exception as e:
|
||||
print_colored(f"OpenAI API error in _chat_completion: {str(e)}", "red")
|
||||
raise
|
||||
|
||||
|
||||
def _cons_kwargs(messages: list[dict]) -> dict:
|
||||
kwargs = {
|
||||
"messages": messages,
|
||||
"max_tokens": 4096,
|
||||
"n": 1,
|
||||
"stop": None,
|
||||
"temperature": 0.5,
|
||||
"timeout": 3,
|
||||
"max_tokens": 2000,
|
||||
"temperature": 0.3,
|
||||
"timeout": 600,
|
||||
}
|
||||
kwargs_mode = {"model": MODEL}
|
||||
kwargs.update(kwargs_mode)
|
||||
|
||||
@@ -7,6 +7,8 @@ PROMPT_ABILITY_REQUIREMENT_GENERATION = """
|
||||
## Instruction
|
||||
Based on "General Goal" and "Current Task", output a formatted "Ability Requirement" which lists at least 3 different ability requirement that is required by the "Current Task". The ability should be summarized concisely within a few words.
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all ability requirements.**
|
||||
|
||||
## General Goal (The general goal for the collaboration plan, "Current Task" is just one of its substep)
|
||||
{General_Goal}
|
||||
|
||||
@@ -49,6 +51,8 @@ PROMPT_AGENT_ABILITY_SCORING = """
|
||||
## Instruction
|
||||
Based on "Agent Board" and "Ability Requirement", output a score for each agent to estimate the possibility that the agent can fulfil the "Ability Requirement". The score should be 1-5. Provide a concise reason before you assign the score.
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all reasons and explanations.**
|
||||
|
||||
## AgentBoard
|
||||
{Agent_Board}
|
||||
|
||||
@@ -133,5 +137,6 @@ def AgentSelectModify_init(stepTask, General_Goal, Agent_Board):
|
||||
|
||||
|
||||
def AgentSelectModify_addAspect(aspectList, Agent_Board):
|
||||
scoreTable = agentAbilityScoring(Agent_Board, aspectList)
|
||||
newAspect = aspectList[-1]
|
||||
scoreTable = agentAbilityScoring(Agent_Board, [newAspect])
|
||||
return scoreTable
|
||||
|
||||
@@ -35,6 +35,8 @@ PROMPT_AGENT_SELECTION_GENERATION = """
|
||||
## Instruction
|
||||
Based on "General Goal", "Current Task" and "Agent Board", output a formatted "Agent Selection Plan". Your selection should consider the ability needed for "Current Task" and the profile of each agent in "Agent Board".
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all explanations and reasoning, though agent names should remain in their original form.**
|
||||
|
||||
## General Goal (Specify the general goal for the collaboration plan)
|
||||
{General_Goal}
|
||||
|
||||
@@ -80,6 +82,10 @@ def generate_AbilityRequirement(General_Goal, Current_Task):
|
||||
|
||||
|
||||
def generate_AgentSelection(General_Goal, Current_Task, Agent_Board):
|
||||
# Check if Agent_Board is None or empty
|
||||
if Agent_Board is None or len(Agent_Board) == 0:
|
||||
raise ValueError("Agent_Board cannot be None or empty. Please ensure agents are set via /setAgents endpoint before generating a plan.")
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
|
||||
@@ -1,55 +1,53 @@
|
||||
from AgentCoord.PlanEngine.planOutline_Generator import generate_PlanOutline
|
||||
from AgentCoord.PlanEngine.AgentSelection_Generator import (
|
||||
generate_AgentSelection,
|
||||
)
|
||||
from AgentCoord.PlanEngine.taskProcess_Generator import generate_TaskProcess
|
||||
import AgentCoord.util as util
|
||||
# from AgentCoord.PlanEngine.AgentSelection_Generator import (
|
||||
# generate_AgentSelection,
|
||||
# )
|
||||
|
||||
|
||||
def generate_basePlan(
|
||||
General_Goal, Agent_Board, AgentProfile_Dict, InitialObject_List
|
||||
):
|
||||
basePlan = {
|
||||
"Initial Input Object": InitialObject_List,
|
||||
"Collaboration Process": [],
|
||||
}
|
||||
"""
|
||||
优化模式:生成大纲 + 智能体选择,但不生成任务流程
|
||||
优化用户体验:
|
||||
1. 快速生成大纲和分配智能体
|
||||
2. 用户可以看到完整的大纲和智能体图标
|
||||
3. TaskProcess由前端通过 fillStepTask API 异步填充
|
||||
|
||||
"""
|
||||
# 参数保留以保持接口兼容性
|
||||
_ = AgentProfile_Dict
|
||||
PlanOutline = generate_PlanOutline(
|
||||
InitialObject_List=[], General_Goal=General_Goal
|
||||
InitialObject_List=InitialObject_List, General_Goal=General_Goal
|
||||
)
|
||||
|
||||
basePlan = {
|
||||
"General Goal": General_Goal,
|
||||
"Initial Input Object": InitialObject_List,
|
||||
"Collaboration Process": []
|
||||
}
|
||||
|
||||
for stepItem in PlanOutline:
|
||||
Current_Task = {
|
||||
"TaskName": stepItem["StepName"],
|
||||
"InputObject_List": stepItem["InputObject_List"],
|
||||
"OutputObject": stepItem["OutputObject"],
|
||||
"TaskContent": stepItem["TaskContent"],
|
||||
# # 为每个步骤分配智能体
|
||||
# Current_Task = {
|
||||
# "TaskName": stepItem["StepName"],
|
||||
# "InputObject_List": stepItem["InputObject_List"],
|
||||
# "OutputObject": stepItem["OutputObject"],
|
||||
# "TaskContent": stepItem["TaskContent"],
|
||||
# }
|
||||
# AgentSelection = generate_AgentSelection(
|
||||
# General_Goal=General_Goal,
|
||||
# Current_Task=Current_Task,
|
||||
# Agent_Board=Agent_Board,
|
||||
# )
|
||||
|
||||
# 添加智能体选择,但不添加任务流程
|
||||
stepItem["AgentSelection"] = []
|
||||
stepItem["TaskProcess"] = [] # 空数组,由前端异步填充
|
||||
stepItem["Collaboration_Brief_frontEnd"] = {
|
||||
"template": "",
|
||||
"data": {}
|
||||
}
|
||||
AgentSelection = generate_AgentSelection(
|
||||
General_Goal=General_Goal,
|
||||
Current_Task=Current_Task,
|
||||
Agent_Board=Agent_Board,
|
||||
)
|
||||
Current_Task_Description = {
|
||||
"TaskName": stepItem["StepName"],
|
||||
"AgentInvolved": [
|
||||
{"Name": name, "Profile": AgentProfile_Dict[name]}
|
||||
for name in AgentSelection
|
||||
],
|
||||
"InputObject_List": stepItem["InputObject_List"],
|
||||
"OutputObject": stepItem["OutputObject"],
|
||||
"CurrentTaskDescription": util.generate_template_sentence_for_CollaborationBrief(
|
||||
stepItem["InputObject_List"],
|
||||
stepItem["OutputObject"],
|
||||
AgentSelection,
|
||||
stepItem["TaskContent"],
|
||||
),
|
||||
}
|
||||
TaskProcess = generate_TaskProcess(
|
||||
General_Goal=General_Goal,
|
||||
Current_Task_Description=Current_Task_Description,
|
||||
)
|
||||
# add the generated AgentSelection and TaskProcess to the stepItem
|
||||
stepItem["AgentSelection"] = AgentSelection
|
||||
stepItem["TaskProcess"] = TaskProcess
|
||||
basePlan["Collaboration Process"].append(stepItem)
|
||||
basePlan["General Goal"] = General_Goal
|
||||
|
||||
return basePlan
|
||||
@@ -9,6 +9,8 @@ PROMPT_PLAN_OUTLINE_BRANCHING = """
|
||||
Based on "Existing Steps", your task is to comeplete the "Remaining Steps" for the plan for "General Goal".
|
||||
Note: "Modification Requirement" specifies how to modify the "Baseline Completion" for a better/alternative solution.
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all content, including StepName, TaskContent, and OutputObject fields.**
|
||||
|
||||
## General Goal (Specify the general goal for the plan)
|
||||
{General_Goal}
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ PROMPT_TASK_PROCESS_BRANCHING = """
|
||||
Based on "Existing Steps", your task is to comeplete the "Remaining Steps" for the "Task for Current Step".
|
||||
Note: "Modification Requirement" specifies how to modify the "Baseline Completion" for a better/alternative solution.
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for the Description field and all explanations, while keeping ID, ActionType, and AgentName in their original format.**
|
||||
|
||||
## General Goal (The general goal for the collaboration plan, you just design the plan for one of its step (i.e. "Task for Current Step"))
|
||||
{General_Goal}
|
||||
|
||||
@@ -55,27 +57,40 @@ Note: "Modification Requirement" specifies how to modify the "Baseline Completio
|
||||
"ID": "Action4",
|
||||
"ActionType": "Propose",
|
||||
"AgentName": "Mia",
|
||||
"Description": "Propose psychological theories on love and attachment that could be applied to AI's emotional development.",
|
||||
"Description": "提议关于人工智能情感发展的心理学理论,重点关注爱与依恋的概念。",
|
||||
"ImportantInput": [
|
||||
"InputObject:Story Outline"
|
||||
]
|
||||
}},
|
||||
{{
|
||||
"ID": "Action5",
|
||||
"ActionType": "Propose",
|
||||
"ActionType": "Critique",
|
||||
"AgentName": "Noah",
|
||||
"Description": "Propose ethical considerations and philosophical questions regarding AI's capacity for love.",
|
||||
"ImportantInput": []
|
||||
"Description": "对Mia提出的心理学理论进行批判性评估,分析其在AI情感发展场景中的适用性和局限性。",
|
||||
"ImportantInput": [
|
||||
"ActionResult:Action4"
|
||||
]
|
||||
}},
|
||||
{{
|
||||
"ID": "Action6",
|
||||
"ActionType": "Finalize",
|
||||
"ActionType": "Improve",
|
||||
"AgentName": "Liam",
|
||||
"Description": "Combine the poetic elements and ethical considerations into a cohesive set of core love elements for the story.",
|
||||
"Description": "基于Noah的批判性反馈,改进和完善心理学理论框架,使其更贴合AI情感发展的实际需求。",
|
||||
"ImportantInput": [
|
||||
"ActionResult:Action1",
|
||||
"ActionResult:Action4",
|
||||
"ActionResult:Action5"
|
||||
]
|
||||
}},
|
||||
{{
|
||||
"ID": "Action7",
|
||||
"ActionType": "Finalize",
|
||||
"AgentName": "Mia",
|
||||
"Description": "综合所有提议、批判和改进意见,整合并提交最终的AI情感发展心理学理论框架。",
|
||||
"ImportantInput": [
|
||||
"ActionResult:Action4",
|
||||
"ActionResult:Action5",
|
||||
"ActionResult:Action6"
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}
|
||||
@@ -84,7 +99,12 @@ Note: "Modification Requirement" specifies how to modify the "Baseline Completio
|
||||
ImportantInput: Specify if there is any previous result that should be taken special consideration during the execution the action. Should be of format "InputObject:xx" or "ActionResult:xx".
|
||||
InputObject_List: List existing objects that should be utilized in current step.
|
||||
AgentName: Specify the agent who will perform the action, You CAN ONLY USE THE NAME APPEARS IN "AgentInvolved".
|
||||
ActionType: Specify the type of action, note that only the last action can be of type "Finalize", and the last action must be "Finalize".
|
||||
ActionType: Specify the type of action. **CRITICAL REQUIREMENTS:**
|
||||
1. The "Remaining Steps" MUST include ALL FOUR action types in the following order: Propose -> Critique -> Improve -> Finalize
|
||||
2. Each action type (Propose, Critique, Improve, Finalize) MUST appear at least once
|
||||
3. The actions must follow the sequence: Propose actions first, then Critique actions, then Improve actions, and Finalize must be the last action
|
||||
4. Even if only one agent is involved in a phase, that phase must still have its corresponding action type
|
||||
5. The last action must ALWAYS be of type "Finalize"
|
||||
|
||||
"""
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ PROMPT_PLAN_OUTLINE_GENERATION = """
|
||||
## Instruction
|
||||
Based on "Output Format Example", "General Goal", and "Initial Key Object List", output a formatted "Plan_Outline".
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all content, including StepName, TaskContent, and OutputObject fields.**
|
||||
|
||||
## Initial Key Object List (Specify the list of initial key objects available, each initial key object should be the input object of at least one Step)
|
||||
{InitialObject_List}
|
||||
|
||||
@@ -83,4 +85,16 @@ def generate_PlanOutline(InitialObject_List, General_Goal):
|
||||
),
|
||||
},
|
||||
]
|
||||
return read_LLM_Completion(messages)["Plan_Outline"]
|
||||
result = read_LLM_Completion(messages)
|
||||
if isinstance(result, dict) and "Plan_Outline" in result:
|
||||
return result["Plan_Outline"]
|
||||
else:
|
||||
# 如果格式不正确,返回默认的计划大纲
|
||||
return [
|
||||
{
|
||||
"StepName": "Default Step",
|
||||
"TaskContent": "Generated default plan step due to format error",
|
||||
"InputObject_List": [],
|
||||
"OutputObject": "Default Output"
|
||||
}
|
||||
]
|
||||
|
||||
@@ -25,6 +25,8 @@ PROMPT_TASK_PROCESS_GENERATION = """
|
||||
## Instruction
|
||||
Based on "General Goal", "Task for Current Step", "Action Set" and "Output Format Example", design a plan for "Task for Current Step", output a formatted "Task_Process_Plan".
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for the Description field and all explanations, while keeping ID, ActionType, and AgentName in their original format.**
|
||||
|
||||
## General Goal (The general goal for the collaboration plan, you just design the plan for one of its step (i.e. "Task for Current Step"))
|
||||
{General_Goal}
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ You are within a multi-agent collaboration for the "Current Task".
|
||||
Now it's your turn to take action. Read the "Context Information" and take your action following "Instruction for Your Current Action".
|
||||
Note: Important Input for your action are marked with *Important Input*
|
||||
|
||||
**IMPORTANT LANGUAGE REQUIREMENT: You must respond in Chinese (中文) for all your answers and outputs.**
|
||||
|
||||
## Context Information
|
||||
|
||||
### General Goal (The "Current Task" is indeed a substep of the general goal)
|
||||
@@ -80,10 +82,34 @@ class BaseAction():
|
||||
Important_Mark = ""
|
||||
action_Record += PROMPT_TEMPLATE_ACTION_RECORD.format(AgentName = actionInfo["AgentName"], Action_Description = actionInfo["AgentName"], Action_Result = actionInfo["Action_Result"], Important_Mark = Important_Mark)
|
||||
|
||||
prompt = PROMPT_TEMPLATE_TAKE_ACTION_BASE.format(agentName = agentName, agentProfile = AgentProfile_Dict[agentName], General_Goal = General_Goal, Current_Task_Description = TaskDescription, Input_Objects = inputObject_Record, History_Action = action_Record, Action_Description = self.info["Description"], Action_Custom_Note = self.Action_Custom_Note)
|
||||
# Handle missing agent profiles gracefully
|
||||
model_config = None
|
||||
if agentName not in AgentProfile_Dict:
|
||||
print_colored(text=f"Warning: Agent '{agentName}' not found in AgentProfile_Dict. Using default profile.", text_color="yellow")
|
||||
agentProfile = f"AI Agent named {agentName}"
|
||||
else:
|
||||
# agentProfile = AgentProfile_Dict[agentName]
|
||||
agent_config = AgentProfile_Dict[agentName]
|
||||
agentProfile = agent_config.get("profile",f"AI Agent named {agentName}")
|
||||
if agent_config.get("useCustomAPI",False):
|
||||
model_config = {
|
||||
"apiModel":agent_config.get("apiModel"),
|
||||
"apiUrl":agent_config.get("apiUrl"),
|
||||
"apiKey":agent_config.get("apiKey"),
|
||||
}
|
||||
prompt = PROMPT_TEMPLATE_TAKE_ACTION_BASE.format(
|
||||
agentName = agentName,
|
||||
agentProfile = agentProfile,
|
||||
General_Goal = General_Goal,
|
||||
Current_Task_Description = TaskDescription,
|
||||
Input_Objects = inputObject_Record,
|
||||
History_Action = action_Record,
|
||||
Action_Description = self.info["Description"],
|
||||
Action_Custom_Note = self.Action_Custom_Note
|
||||
)
|
||||
print_colored(text = prompt, text_color="red")
|
||||
messages = [{"role":"system", "content": prompt}]
|
||||
ActionResult = LLM_Completion(messages,True,False)
|
||||
ActionResult = LLM_Completion(messages,True,False,model_config=model_config)
|
||||
ActionInfo_with_Result = copy.deepcopy(self.info)
|
||||
ActionInfo_with_Result["Action_Result"] = ActionResult
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from AgentCoord.RehearsalEngine_V2.Action import BaseAction
|
||||
|
||||
ACTION_CUSTOM_NOTE = '''
|
||||
Note: Since you are in a conversation, your critique must be concise, clear and easy to read, don't overwhelm others. If you want to list some points, list at most 2 points.
|
||||
注意:由于你在对话中,你的批评必须简洁、清晰且易于阅读,不要让人感到压力过大。如果你要列出一些观点,最多列出2点。
|
||||
|
||||
'''
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@ from AgentCoord.util.converter import read_outputObject_content
|
||||
from AgentCoord.RehearsalEngine_V2.Action import BaseAction
|
||||
|
||||
ACTION_CUSTOM_NOTE = '''
|
||||
Note: You can say something before you give the final content of {OutputName}. When you decide to give the final content of {OutputName}, it should be enclosed like this:
|
||||
注意:你可以在给出{OutputName}的最终内容之前先说一些话。当你决定给出{OutputName}的最终内容时,应该这样包含:
|
||||
```{OutputName}
|
||||
(the content of {OutputName})
|
||||
({OutputName}的内容)
|
||||
```
|
||||
'''
|
||||
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
from AgentCoord.RehearsalEngine_V2.Action import BaseAction
|
||||
|
||||
ACTION_CUSTOM_NOTE = '''
|
||||
Note: You can say something before you provide the improved version of the content.
|
||||
The improved version you provide must be a completed version (e.g. if you provide a improved story, you should give completed story content, rather than just reporting where you have improved).
|
||||
When you decide to give the improved version of the content, it should be start like this:
|
||||
注意:你可以在提供改进版本的内容之前先说一些话。
|
||||
你提供的改进版本必须是完整的版本(例如,如果你提供改进的故事,你应该给出完整的故事内容,而不仅仅是报告你在哪里改进了)。
|
||||
当你决定提供内容的改进版本时,应该这样开始:
|
||||
|
||||
## Improved version of xxx
|
||||
(the improved version of the content)
|
||||
## xxx的改进版本
|
||||
(改进版本的内容)
|
||||
```
|
||||
|
||||
'''
|
||||
|
||||
@@ -85,9 +85,17 @@ def executePlan(plan, num_StepToRun, RehearsalLog, AgentProfile_Dict):
|
||||
# start the group chat
|
||||
util.print_colored(TaskDescription, text_color="green")
|
||||
ActionHistory = []
|
||||
action_count = 0
|
||||
total_actions = len(TaskProcess)
|
||||
|
||||
for ActionInfo in TaskProcess:
|
||||
action_count += 1
|
||||
actionType = ActionInfo["ActionType"]
|
||||
agentName = ActionInfo["AgentName"]
|
||||
|
||||
# 添加进度日志
|
||||
util.print_colored(f"🔄 Executing action {action_count}/{total_actions}: {actionType} by {agentName}", text_color="yellow")
|
||||
|
||||
if actionType in Action.customAction_Dict:
|
||||
currentAction = Action.customAction_Dict[actionType](
|
||||
info=ActionInfo,
|
||||
|
||||
608
backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py
Normal file
608
backend/AgentCoord/RehearsalEngine_V2/ExecutePlan_Optimized.py
Normal file
@@ -0,0 +1,608 @@
|
||||
"""
|
||||
优化版执行计划 - 支持动态追加步骤
|
||||
在执行过程中可以接收新的步骤并追加到执行队列
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import time
|
||||
from typing import List, Dict, Set, Generator, Any
|
||||
import AgentCoord.RehearsalEngine_V2.Action as Action
|
||||
import AgentCoord.util as util
|
||||
from termcolor import colored
|
||||
from AgentCoord.RehearsalEngine_V2.execution_state import execution_state_manager
|
||||
from AgentCoord.RehearsalEngine_V2.dynamic_execution_manager import dynamic_execution_manager
|
||||
|
||||
|
||||
# ==================== 配置参数 ====================
|
||||
# 最大并发请求数
|
||||
MAX_CONCURRENT_REQUESTS = 2
|
||||
|
||||
# 批次之间的延迟
|
||||
BATCH_DELAY = 1.0
|
||||
|
||||
# 429错误重试次数和延迟
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 5.0
|
||||
|
||||
|
||||
# ==================== 限流器 ====================
|
||||
class RateLimiter:
|
||||
"""
|
||||
异步限流器,控制并发请求数量
|
||||
"""
|
||||
|
||||
def __init__(self, max_concurrent: int = MAX_CONCURRENT_REQUESTS):
|
||||
self.semaphore = asyncio.Semaphore(max_concurrent)
|
||||
self.max_concurrent = max_concurrent
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.semaphore.acquire()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *args):
|
||||
self.semaphore.release()
|
||||
|
||||
|
||||
# 全局限流器实例
|
||||
rate_limiter = RateLimiter()
|
||||
|
||||
|
||||
def build_action_dependency_graph(TaskProcess: List[Dict]) -> Dict[int, List[int]]:
|
||||
"""
|
||||
构建动作依赖图
|
||||
|
||||
Args:
|
||||
TaskProcess: 任务流程列表
|
||||
|
||||
Returns:
|
||||
依赖映射字典 {action_index: [dependent_action_indices]}
|
||||
"""
|
||||
dependency_map = {i: [] for i in range(len(TaskProcess))}
|
||||
|
||||
for i, action in enumerate(TaskProcess):
|
||||
important_inputs = action.get('ImportantInput', [])
|
||||
if not important_inputs:
|
||||
continue
|
||||
|
||||
# 检查是否依赖其他动作的ActionResult
|
||||
for j, prev_action in enumerate(TaskProcess):
|
||||
if i == j:
|
||||
continue
|
||||
|
||||
# 判断是否依赖前一个动作的结果
|
||||
if any(
|
||||
inp.startswith('ActionResult:') and
|
||||
inp == f'ActionResult:{prev_action["ID"]}'
|
||||
for inp in important_inputs
|
||||
):
|
||||
dependency_map[i].append(j)
|
||||
|
||||
return dependency_map
|
||||
|
||||
|
||||
def get_parallel_batches(TaskProcess: List[Dict], dependency_map: Dict[int, List[int]]) -> List[List[int]]:
|
||||
"""
|
||||
将动作分为多个批次,每批内部可以并行执行
|
||||
|
||||
Args:
|
||||
TaskProcess: 任务流程列表
|
||||
dependency_map: 依赖图
|
||||
|
||||
Returns:
|
||||
批次列表 [[batch1_indices], [batch2_indices], ...]
|
||||
"""
|
||||
batches = []
|
||||
completed: Set[int] = set()
|
||||
|
||||
while len(completed) < len(TaskProcess):
|
||||
# 找出所有依赖已满足的动作
|
||||
ready_to_run = [
|
||||
i for i in range(len(TaskProcess))
|
||||
if i not in completed and
|
||||
all(dep in completed for dep in dependency_map[i])
|
||||
]
|
||||
|
||||
if not ready_to_run:
|
||||
# 避免死循环
|
||||
remaining = [i for i in range(len(TaskProcess)) if i not in completed]
|
||||
if remaining:
|
||||
print(colored(f"警告: 检测到循环依赖,强制串行执行: {remaining}", "yellow"))
|
||||
ready_to_run = remaining[:1]
|
||||
else:
|
||||
break
|
||||
|
||||
batches.append(ready_to_run)
|
||||
completed.update(ready_to_run)
|
||||
|
||||
return batches
|
||||
|
||||
|
||||
async def execute_single_action_async(
|
||||
ActionInfo: Dict,
|
||||
General_Goal: str,
|
||||
TaskDescription: str,
|
||||
OutputName: str,
|
||||
KeyObjects: Dict,
|
||||
ActionHistory: List,
|
||||
agentName: str,
|
||||
AgentProfile_Dict: Dict,
|
||||
InputName_List: List[str]
|
||||
) -> Dict:
|
||||
"""
|
||||
异步执行单个动作
|
||||
|
||||
Args:
|
||||
ActionInfo: 动作信息
|
||||
General_Goal: 总体目标
|
||||
TaskDescription: 任务描述
|
||||
OutputName: 输出对象名称
|
||||
KeyObjects: 关键对象字典
|
||||
ActionHistory: 动作历史
|
||||
agentName: 智能体名称
|
||||
AgentProfile_Dict: 智能体配置字典
|
||||
InputName_List: 输入名称列表
|
||||
|
||||
Returns:
|
||||
动作执行结果
|
||||
"""
|
||||
actionType = ActionInfo["ActionType"]
|
||||
|
||||
# 创建动作实例
|
||||
if actionType in Action.customAction_Dict:
|
||||
currentAction = Action.customAction_Dict[actionType](
|
||||
info=ActionInfo,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
)
|
||||
else:
|
||||
currentAction = Action.BaseAction(
|
||||
info=ActionInfo,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
)
|
||||
|
||||
# 在线程池中运行,避免阻塞事件循环
|
||||
loop = asyncio.get_event_loop()
|
||||
ActionInfo_with_Result = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: currentAction.run(
|
||||
General_Goal=General_Goal,
|
||||
TaskDescription=TaskDescription,
|
||||
agentName=agentName,
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
InputName_List=InputName_List,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
ActionHistory=ActionHistory,
|
||||
)
|
||||
)
|
||||
|
||||
return ActionInfo_with_Result
|
||||
|
||||
|
||||
async def execute_step_async_streaming(
|
||||
stepDescrip: Dict,
|
||||
General_Goal: str,
|
||||
AgentProfile_Dict: Dict,
|
||||
KeyObjects: Dict,
|
||||
step_index: int,
|
||||
total_steps: int
|
||||
) -> Generator[Dict, None, None]:
|
||||
"""
|
||||
异步执行单个步骤,支持流式返回
|
||||
|
||||
Args:
|
||||
stepDescrip: 步骤描述
|
||||
General_Goal: 总体目标
|
||||
AgentProfile_Dict: 智能体配置字典
|
||||
KeyObjects: 关键对象字典
|
||||
step_index: 步骤索引
|
||||
total_steps: 总步骤数
|
||||
|
||||
Yields:
|
||||
执行事件字典
|
||||
"""
|
||||
# 准备步骤信息
|
||||
StepName = (
|
||||
util.camel_case_to_normal(stepDescrip["StepName"])
|
||||
if util.is_camel_case(stepDescrip["StepName"])
|
||||
else stepDescrip["StepName"]
|
||||
)
|
||||
TaskContent = stepDescrip["TaskContent"]
|
||||
InputName_List = (
|
||||
[
|
||||
(
|
||||
util.camel_case_to_normal(obj)
|
||||
if util.is_camel_case(obj)
|
||||
else obj
|
||||
)
|
||||
for obj in stepDescrip["InputObject_List"]
|
||||
]
|
||||
if stepDescrip["InputObject_List"] is not None
|
||||
else None
|
||||
)
|
||||
OutputName = (
|
||||
util.camel_case_to_normal(stepDescrip["OutputObject"])
|
||||
if util.is_camel_case(stepDescrip["OutputObject"])
|
||||
else stepDescrip["OutputObject"]
|
||||
)
|
||||
Agent_List = stepDescrip["AgentSelection"]
|
||||
TaskProcess = stepDescrip["TaskProcess"]
|
||||
|
||||
TaskDescription = (
|
||||
util.converter.generate_template_sentence_for_CollaborationBrief(
|
||||
input_object_list=InputName_List,
|
||||
output_object=OutputName,
|
||||
agent_list=Agent_List,
|
||||
step_task=TaskContent,
|
||||
)
|
||||
)
|
||||
|
||||
# 初始化日志节点
|
||||
inputObject_Record = [
|
||||
{InputName: KeyObjects[InputName]} for InputName in InputName_List
|
||||
]
|
||||
stepLogNode = {
|
||||
"LogNodeType": "step",
|
||||
"NodeId": StepName,
|
||||
"InputName_List": InputName_List,
|
||||
"OutputName": OutputName,
|
||||
"chatLog": [],
|
||||
"inputObject_Record": inputObject_Record,
|
||||
}
|
||||
objectLogNode = {
|
||||
"LogNodeType": "object",
|
||||
"NodeId": OutputName,
|
||||
"content": None,
|
||||
}
|
||||
|
||||
# 返回步骤开始事件
|
||||
yield {
|
||||
"type": "step_start",
|
||||
"step_index": step_index,
|
||||
"total_steps": total_steps,
|
||||
"step_name": StepName,
|
||||
"task_description": TaskDescription,
|
||||
}
|
||||
|
||||
# 构建动作依赖图
|
||||
dependency_map = build_action_dependency_graph(TaskProcess)
|
||||
batches = get_parallel_batches(TaskProcess, dependency_map)
|
||||
|
||||
ActionHistory = []
|
||||
total_actions = len(TaskProcess)
|
||||
completed_actions = 0
|
||||
|
||||
util.print_colored(
|
||||
f"📋 步骤 {step_index + 1}/{total_steps}: {StepName} ({total_actions} 个动作, 分 {len(batches)} 批并行执行)",
|
||||
text_color="cyan"
|
||||
)
|
||||
|
||||
# 分批执行动作
|
||||
for batch_index, batch_indices in enumerate(batches):
|
||||
# 在每个批次执行前检查暂停状态
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
util.print_colored("🛑 用户请求停止执行", "red")
|
||||
return
|
||||
|
||||
batch_size = len(batch_indices)
|
||||
|
||||
if batch_size > 1:
|
||||
util.print_colored(
|
||||
f"🚦 批次 {batch_index + 1}/{len(batches)}: 并行执行 {batch_size} 个动作",
|
||||
text_color="blue"
|
||||
)
|
||||
else:
|
||||
util.print_colored(
|
||||
f"🔄 动作 {completed_actions + 1}/{total_actions}: 串行执行",
|
||||
text_color="yellow"
|
||||
)
|
||||
|
||||
# 并行执行当前批次的所有动作
|
||||
tasks = [
|
||||
execute_single_action_async(
|
||||
TaskProcess[i],
|
||||
General_Goal=General_Goal,
|
||||
TaskDescription=TaskDescription,
|
||||
OutputName=OutputName,
|
||||
KeyObjects=KeyObjects,
|
||||
ActionHistory=ActionHistory,
|
||||
agentName=TaskProcess[i]["AgentName"],
|
||||
AgentProfile_Dict=AgentProfile_Dict,
|
||||
InputName_List=InputName_List
|
||||
)
|
||||
for i in batch_indices
|
||||
]
|
||||
|
||||
# 等待当前批次完成
|
||||
batch_results = await asyncio.gather(*tasks)
|
||||
|
||||
# 逐个返回结果
|
||||
for i, result in enumerate(batch_results):
|
||||
action_index_in_batch = batch_indices[i]
|
||||
completed_actions += 1
|
||||
|
||||
util.print_colored(
|
||||
f"✅ 动作 {completed_actions}/{total_actions} 完成: {result['ActionType']} by {result['AgentName']}",
|
||||
text_color="green"
|
||||
)
|
||||
|
||||
ActionHistory.append(result)
|
||||
|
||||
# 立即返回该动作结果
|
||||
yield {
|
||||
"type": "action_complete",
|
||||
"step_index": step_index,
|
||||
"step_name": StepName,
|
||||
"action_index": action_index_in_batch,
|
||||
"total_actions": total_actions,
|
||||
"completed_actions": completed_actions,
|
||||
"action_result": result,
|
||||
"batch_info": {
|
||||
"batch_index": batch_index,
|
||||
"batch_size": batch_size,
|
||||
"is_parallel": batch_size > 1
|
||||
}
|
||||
}
|
||||
|
||||
# 步骤完成
|
||||
objectLogNode["content"] = KeyObjects[OutputName]
|
||||
stepLogNode["ActionHistory"] = ActionHistory
|
||||
|
||||
yield {
|
||||
"type": "step_complete",
|
||||
"step_index": step_index,
|
||||
"step_name": StepName,
|
||||
"step_log_node": stepLogNode,
|
||||
"object_log_node": objectLogNode,
|
||||
}
|
||||
|
||||
|
||||
def executePlan_streaming_dynamic(
|
||||
plan: Dict,
|
||||
num_StepToRun: int,
|
||||
RehearsalLog: List,
|
||||
AgentProfile_Dict: Dict,
|
||||
existingKeyObjects: Dict = None,
|
||||
execution_id: str = None
|
||||
) -> Generator[str, None, None]:
|
||||
"""
|
||||
动态执行计划,支持在执行过程中追加新步骤
|
||||
|
||||
Args:
|
||||
plan: 执行计划
|
||||
num_StepToRun: 要运行的步骤数
|
||||
RehearsalLog: 已执行的历史记录
|
||||
AgentProfile_Dict: 智能体配置
|
||||
existingKeyObjects: 已存在的KeyObjects
|
||||
execution_id: 执行ID(用于动态追加步骤)
|
||||
|
||||
Yields:
|
||||
SSE格式的事件字符串
|
||||
"""
|
||||
# 初始化执行状态
|
||||
general_goal = plan.get("General Goal", "")
|
||||
execution_state_manager.start_execution(general_goal)
|
||||
|
||||
print(colored(f"⏸️ 执行状态管理器已启动,支持暂停/恢复", "green"))
|
||||
|
||||
# 准备执行
|
||||
KeyObjects = existingKeyObjects.copy() if existingKeyObjects else {}
|
||||
finishedStep_index = -1
|
||||
|
||||
for logNode in RehearsalLog:
|
||||
if logNode["LogNodeType"] == "step":
|
||||
finishedStep_index += 1
|
||||
if logNode["LogNodeType"] == "object":
|
||||
KeyObjects[logNode["NodeId"]] = logNode["content"]
|
||||
|
||||
if existingKeyObjects:
|
||||
print(colored(f"📦 使用已存在的 KeyObjects: {list(existingKeyObjects.keys())}", "cyan"))
|
||||
|
||||
# 确定要运行的步骤范围
|
||||
if num_StepToRun is None:
|
||||
run_to = len(plan["Collaboration Process"])
|
||||
else:
|
||||
run_to = (finishedStep_index + 1) + num_StepToRun
|
||||
|
||||
steps_to_run = plan["Collaboration Process"][(finishedStep_index + 1): run_to]
|
||||
|
||||
# 使用动态执行管理器
|
||||
if execution_id:
|
||||
# 初始化执行管理器,使用传入的execution_id
|
||||
actual_execution_id = dynamic_execution_manager.start_execution(general_goal, steps_to_run, execution_id)
|
||||
print(colored(f"🚀 开始执行计划(动态模式),共 {len(steps_to_run)} 个步骤,执行ID: {actual_execution_id}", "cyan"))
|
||||
else:
|
||||
print(colored(f"🚀 开始执行计划(流式推送),共 {len(steps_to_run)} 个步骤", "cyan"))
|
||||
|
||||
total_steps = len(steps_to_run)
|
||||
|
||||
# 使用队列实现流式推送
|
||||
async def produce_events(queue: asyncio.Queue):
|
||||
"""异步生产者"""
|
||||
try:
|
||||
step_index = 0
|
||||
|
||||
if execution_id:
|
||||
# 动态模式:循环获取下一个步骤
|
||||
# 等待新步骤的最大次数(避免无限等待)
|
||||
max_empty_wait_cycles = 5 # 最多等待60次,每次等待1秒
|
||||
empty_wait_count = 0
|
||||
|
||||
while True:
|
||||
# 检查暂停状态
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
print(colored("🛑 用户请求停止执行", "red"))
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
break
|
||||
|
||||
# 获取下一个步骤
|
||||
stepDescrip = dynamic_execution_manager.get_next_step(execution_id)
|
||||
|
||||
if stepDescrip is None:
|
||||
# 没有更多步骤了,检查是否应该继续等待
|
||||
empty_wait_count += 1
|
||||
|
||||
# 获取执行信息
|
||||
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
|
||||
|
||||
if execution_info:
|
||||
queue_total_steps = execution_info.get("total_steps", 0)
|
||||
completed_steps = execution_info.get("completed_steps", 0)
|
||||
|
||||
# 如果没有步骤在队列中(queue_total_steps为0),立即退出
|
||||
if queue_total_steps == 0:
|
||||
print(colored(f"⚠️ 没有步骤在队列中,退出执行", "yellow"))
|
||||
break
|
||||
|
||||
# 如果所有步骤都已完成,等待可能的新步骤
|
||||
if completed_steps >= queue_total_steps:
|
||||
if empty_wait_count >= max_empty_wait_cycles:
|
||||
# 等待超时,退出执行
|
||||
print(colored(f"✅ 所有步骤执行完成,等待超时", "green"))
|
||||
break
|
||||
else:
|
||||
# 等待新步骤追加
|
||||
print(colored(f"⏳ 等待新步骤追加... ({empty_wait_count}/{max_empty_wait_cycles})", "cyan"))
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
else:
|
||||
# 还有步骤未完成,继续尝试获取
|
||||
print(colored(f"⏳ 等待步骤就绪... ({completed_steps}/{queue_total_steps})", "cyan"))
|
||||
await asyncio.sleep(0.5)
|
||||
empty_wait_count = 0 # 重置等待计数
|
||||
continue
|
||||
else:
|
||||
# 执行信息不存在,退出
|
||||
print(colored(f"⚠️ 执行信息不存在,退出执行", "yellow"))
|
||||
break
|
||||
|
||||
# 重置等待计数
|
||||
empty_wait_count = 0
|
||||
|
||||
# 获取最新的总步骤数(用于显示)
|
||||
execution_info = dynamic_execution_manager.get_execution_info(execution_id)
|
||||
current_total_steps = execution_info.get("total_steps", total_steps) if execution_info else total_steps
|
||||
|
||||
# 执行步骤
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
current_total_steps # 使用动态更新的总步骤数
|
||||
):
|
||||
if execution_state_manager.is_stopped():
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
await queue.put(event)
|
||||
|
||||
# 标记步骤完成
|
||||
dynamic_execution_manager.mark_step_completed(execution_id)
|
||||
|
||||
# 更新KeyObjects
|
||||
OutputName = stepDescrip.get("OutputObject", "")
|
||||
if OutputName and OutputName in KeyObjects:
|
||||
# 对象日志节点会在step_complete中发送
|
||||
pass
|
||||
|
||||
step_index += 1
|
||||
|
||||
else:
|
||||
# 非动态模式:按顺序执行所有步骤
|
||||
for step_index, stepDescrip in enumerate(steps_to_run):
|
||||
should_continue = await execution_state_manager.async_check_pause()
|
||||
if not should_continue:
|
||||
print(colored("🛑 用户请求停止执行", "red"))
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
async for event in execute_step_async_streaming(
|
||||
stepDescrip,
|
||||
plan["General Goal"],
|
||||
AgentProfile_Dict,
|
||||
KeyObjects,
|
||||
step_index,
|
||||
total_steps
|
||||
):
|
||||
if execution_state_manager.is_stopped():
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": "执行已被用户停止"
|
||||
})
|
||||
return
|
||||
|
||||
await queue.put(event)
|
||||
|
||||
except Exception as e:
|
||||
await queue.put({
|
||||
"type": "error",
|
||||
"message": f"执行出错: {str(e)}"
|
||||
})
|
||||
finally:
|
||||
await queue.put(None)
|
||||
|
||||
# 运行异步任务并实时yield
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
queue = asyncio.Queue(maxsize=10)
|
||||
producer_task = loop.create_task(produce_events(queue))
|
||||
|
||||
while True:
|
||||
event = loop.run_until_complete(queue.get())
|
||||
if event is None:
|
||||
break
|
||||
|
||||
# 立即转换为SSE格式并发送
|
||||
event_str = json.dumps(event, ensure_ascii=False)
|
||||
yield f"data: {event_str}\n\n"
|
||||
|
||||
loop.run_until_complete(producer_task)
|
||||
|
||||
if not execution_state_manager.is_stopped():
|
||||
complete_event = json.dumps({
|
||||
"type": "execution_complete",
|
||||
"total_steps": total_steps
|
||||
}, ensure_ascii=False)
|
||||
yield f"data: {complete_event}\n\n"
|
||||
|
||||
finally:
|
||||
# 在关闭事件循环之前先清理执行记录
|
||||
if execution_id:
|
||||
# 清理执行记录
|
||||
dynamic_execution_manager.cleanup(execution_id)
|
||||
|
||||
if 'producer_task' in locals():
|
||||
if not producer_task.done():
|
||||
producer_task.cancel()
|
||||
|
||||
# 确保所有任务都完成后再关闭事件循环
|
||||
try:
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception:
|
||||
pass # 忽略清理过程中的错误
|
||||
|
||||
loop.close()
|
||||
|
||||
|
||||
# 保留旧版本函数以保持兼容性
|
||||
executePlan_streaming = executePlan_streaming_dynamic
|
||||
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
动态执行管理器
|
||||
用于在任务执行过程中动态追加新步骤
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import Dict, List, Optional, Any
|
||||
from threading import Lock
|
||||
|
||||
|
||||
class DynamicExecutionManager:
|
||||
"""
|
||||
动态执行管理器
|
||||
管理正在执行的任务,支持动态追加新步骤
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
# 执行状态: goal -> execution_info
|
||||
self._executions: Dict[str, Dict] = {}
|
||||
|
||||
# 线程锁
|
||||
self._lock = Lock()
|
||||
|
||||
# 步骤队列: goal -> List[step]
|
||||
self._step_queues: Dict[str, List] = {}
|
||||
|
||||
# 已执行的步骤索引: goal -> Set[step_index]
|
||||
self._executed_steps: Dict[str, set] = {}
|
||||
|
||||
# 待执行的步骤索引: goal -> List[step_index]
|
||||
self._pending_steps: Dict[str, List[int]] = {}
|
||||
|
||||
def start_execution(self, goal: str, initial_steps: List[Dict], execution_id: str = None) -> str:
|
||||
"""
|
||||
开始执行一个新的任务
|
||||
|
||||
Args:
|
||||
goal: 任务目标
|
||||
initial_steps: 初始步骤列表
|
||||
execution_id: 执行ID,如果不提供则自动生成
|
||||
|
||||
Returns:
|
||||
执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
# 如果未提供execution_id,则生成一个
|
||||
if execution_id is None:
|
||||
execution_id = f"{goal}_{asyncio.get_event_loop().time()}"
|
||||
|
||||
self._executions[execution_id] = {
|
||||
"goal": goal,
|
||||
"status": "running",
|
||||
"total_steps": len(initial_steps),
|
||||
"completed_steps": 0
|
||||
}
|
||||
|
||||
# 初始化步骤队列
|
||||
self._step_queues[execution_id] = initial_steps.copy()
|
||||
|
||||
# 初始化已执行步骤集合
|
||||
self._executed_steps[execution_id] = set()
|
||||
|
||||
# 初始化待执行步骤索引
|
||||
self._pending_steps[execution_id] = list(range(len(initial_steps)))
|
||||
|
||||
print(f"🚀 启动执行: {execution_id}")
|
||||
print(f"📊 初始步骤数: {len(initial_steps)}")
|
||||
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
|
||||
|
||||
return execution_id
|
||||
|
||||
def add_steps(self, execution_id: str, new_steps: List[Dict]) -> int:
|
||||
"""
|
||||
向执行中追加新步骤
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
new_steps: 新步骤列表
|
||||
|
||||
Returns:
|
||||
追加的步骤数量
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._step_queues:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在,无法追加步骤")
|
||||
return 0
|
||||
|
||||
current_count = len(self._step_queues[execution_id])
|
||||
|
||||
# 追加新步骤到队列
|
||||
self._step_queues[execution_id].extend(new_steps)
|
||||
|
||||
# 添加新步骤的索引到待执行列表
|
||||
new_indices = list(range(current_count, current_count + len(new_steps)))
|
||||
self._pending_steps[execution_id].extend(new_indices)
|
||||
|
||||
# 更新总步骤数
|
||||
old_total = self._executions[execution_id]["total_steps"]
|
||||
self._executions[execution_id]["total_steps"] = len(self._step_queues[execution_id])
|
||||
new_total = self._executions[execution_id]["total_steps"]
|
||||
|
||||
print(f"➕ 追加了 {len(new_steps)} 个步骤到 {execution_id}")
|
||||
print(f"📊 步骤总数: {old_total} -> {new_total}")
|
||||
print(f"📋 待执行步骤索引: {self._pending_steps[execution_id]}")
|
||||
|
||||
return len(new_steps)
|
||||
|
||||
def get_next_step(self, execution_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
获取下一个待执行的步骤
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
下一个步骤,如果没有则返回None
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
|
||||
return None
|
||||
|
||||
# 获取第一个待执行步骤的索引
|
||||
if not self._pending_steps[execution_id]:
|
||||
return None
|
||||
|
||||
step_index = self._pending_steps[execution_id].pop(0)
|
||||
|
||||
# 从队列中获取步骤
|
||||
if step_index >= len(self._step_queues[execution_id]):
|
||||
print(f"⚠️ 警告: 步骤索引 {step_index} 超出范围")
|
||||
return None
|
||||
|
||||
step = self._step_queues[execution_id][step_index]
|
||||
|
||||
# 标记为已执行
|
||||
self._executed_steps[execution_id].add(step_index)
|
||||
|
||||
step_name = step.get("StepName", "未知")
|
||||
print(f"🎯 获取下一个步骤: {step_name} (索引: {step_index})")
|
||||
print(f"📋 剩余待执行步骤: {len(self._pending_steps[execution_id])}")
|
||||
|
||||
return step
|
||||
|
||||
def mark_step_completed(self, execution_id: str):
|
||||
"""
|
||||
标记一个步骤完成
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["completed_steps"] += 1
|
||||
completed = self._executions[execution_id]["completed_steps"]
|
||||
total = self._executions[execution_id]["total_steps"]
|
||||
print(f"📊 步骤完成进度: {completed}/{total}")
|
||||
else:
|
||||
print(f"⚠️ 警告: 执行ID {execution_id} 不存在")
|
||||
|
||||
def get_execution_info(self, execution_id: str) -> Optional[Dict]:
|
||||
"""
|
||||
获取执行信息
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
执行信息字典
|
||||
"""
|
||||
with self._lock:
|
||||
return self._executions.get(execution_id)
|
||||
|
||||
def get_pending_count(self, execution_id: str) -> int:
|
||||
"""
|
||||
获取待执行步骤数量
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
待执行步骤数量
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
return 0
|
||||
return len(self._pending_steps[execution_id])
|
||||
|
||||
def has_more_steps(self, execution_id: str) -> bool:
|
||||
"""
|
||||
检查是否还有更多步骤待执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
|
||||
Returns:
|
||||
是否还有待执行步骤
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id not in self._pending_steps:
|
||||
return False
|
||||
return len(self._pending_steps[execution_id]) > 0
|
||||
|
||||
def finish_execution(self, execution_id: str):
|
||||
"""
|
||||
完成执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["status"] = "completed"
|
||||
|
||||
def cancel_execution(self, execution_id: str):
|
||||
"""
|
||||
取消执行
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
if execution_id in self._executions:
|
||||
self._executions[execution_id]["status"] = "cancelled"
|
||||
|
||||
def cleanup(self, execution_id: str):
|
||||
"""
|
||||
清理执行记录
|
||||
|
||||
Args:
|
||||
execution_id: 执行ID
|
||||
"""
|
||||
with self._lock:
|
||||
self._executions.pop(execution_id, None)
|
||||
self._step_queues.pop(execution_id, None)
|
||||
self._executed_steps.pop(execution_id, None)
|
||||
self._pending_steps.pop(execution_id, None)
|
||||
|
||||
|
||||
# 全局单例
|
||||
dynamic_execution_manager = DynamicExecutionManager()
|
||||
190
backend/AgentCoord/RehearsalEngine_V2/execution_state.py
Normal file
190
backend/AgentCoord/RehearsalEngine_V2/execution_state.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""
|
||||
全局执行状态管理器
|
||||
用于支持任务的暂停、恢复和停止功能
|
||||
使用轮询检查机制,确保线程安全
|
||||
"""
|
||||
|
||||
import threading
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Optional
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class ExecutionStatus(Enum):
|
||||
"""执行状态枚举"""
|
||||
RUNNING = "running" # 正在运行
|
||||
PAUSED = "paused" # 已暂停
|
||||
STOPPED = "stopped" # 已停止
|
||||
IDLE = "idle" # 空闲
|
||||
|
||||
|
||||
class ExecutionStateManager:
|
||||
"""
|
||||
全局执行状态管理器(单例模式)
|
||||
|
||||
功能:
|
||||
- 管理任务执行状态(运行/暂停/停止)
|
||||
- 使用轮询检查机制,避免异步事件的线程问题
|
||||
- 提供线程安全的状态查询和修改接口
|
||||
"""
|
||||
|
||||
_instance: Optional['ExecutionStateManager'] = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
"""单例模式"""
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""初始化状态管理器"""
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
self._initialized = True
|
||||
self._status = ExecutionStatus.IDLE
|
||||
self._current_goal: Optional[str] = None # 当前执行的任务目标
|
||||
# 使用简单的布尔标志,而不是 asyncio.Event
|
||||
self._should_pause = False
|
||||
self._should_stop = False
|
||||
|
||||
def get_status(self) -> ExecutionStatus:
|
||||
"""获取当前执行状态"""
|
||||
with self._lock:
|
||||
return self._status
|
||||
|
||||
def set_goal(self, goal: str):
|
||||
"""设置当前执行的任务目标"""
|
||||
with self._lock:
|
||||
self._current_goal = goal
|
||||
|
||||
def get_goal(self) -> Optional[str]:
|
||||
"""获取当前执行的任务目标"""
|
||||
with self._lock:
|
||||
return self._current_goal
|
||||
|
||||
def start_execution(self, goal: str):
|
||||
"""开始执行"""
|
||||
with self._lock:
|
||||
self._status = ExecutionStatus.RUNNING
|
||||
self._current_goal = goal
|
||||
self._should_pause = False
|
||||
self._should_stop = False
|
||||
print(f"🚀 [DEBUG] start_execution: 状态设置为 RUNNING, goal={goal}")
|
||||
|
||||
def pause_execution(self) -> bool:
|
||||
"""
|
||||
暂停执行
|
||||
|
||||
Returns:
|
||||
bool: 是否成功暂停
|
||||
"""
|
||||
with self._lock:
|
||||
if self._status != ExecutionStatus.RUNNING:
|
||||
print(f"⚠️ [DEBUG] pause_execution: 当前状态不是RUNNING,而是 {self._status}")
|
||||
return False
|
||||
self._status = ExecutionStatus.PAUSED
|
||||
self._should_pause = True
|
||||
print(f"⏸️ [DEBUG] pause_execution: 状态设置为PAUSED, should_pause=True")
|
||||
return True
|
||||
|
||||
def resume_execution(self) -> bool:
|
||||
"""
|
||||
恢复执行
|
||||
|
||||
Returns:
|
||||
bool: 是否成功恢复
|
||||
"""
|
||||
with self._lock:
|
||||
if self._status != ExecutionStatus.PAUSED:
|
||||
print(f"⚠️ [DEBUG] resume_execution: 当前状态不是PAUSED,而是 {self._status}")
|
||||
return False
|
||||
self._status = ExecutionStatus.RUNNING
|
||||
self._should_pause = False
|
||||
print(f"▶️ [DEBUG] resume_execution: 状态设置为RUNNING, should_pause=False")
|
||||
return True
|
||||
|
||||
def stop_execution(self) -> bool:
|
||||
"""
|
||||
停止执行
|
||||
|
||||
Returns:
|
||||
bool: 是否成功停止
|
||||
"""
|
||||
with self._lock:
|
||||
if self._status in [ExecutionStatus.IDLE, ExecutionStatus.STOPPED]:
|
||||
return False
|
||||
self._status = ExecutionStatus.STOPPED
|
||||
self._should_stop = True
|
||||
self._should_pause = False
|
||||
print(f"🛑 [DEBUG] stop_execution: 状态设置为STOPPED")
|
||||
return True
|
||||
|
||||
def reset(self):
|
||||
"""重置状态为空闲"""
|
||||
with self._lock:
|
||||
self._status = ExecutionStatus.IDLE
|
||||
self._current_goal = None
|
||||
self._should_pause = False
|
||||
self._should_stop = False
|
||||
print(f"🔄 [DEBUG] reset: 状态重置为IDLE")
|
||||
|
||||
async def async_check_pause(self):
|
||||
"""
|
||||
异步检查是否需要暂停(轮询方式)
|
||||
|
||||
如果处于暂停状态,会阻塞当前协程直到恢复或停止
|
||||
应该在执行循环的关键点调用此方法
|
||||
|
||||
Returns:
|
||||
bool: 如果返回True表示应该继续执行,False表示应该停止
|
||||
"""
|
||||
# 使用轮询检查,避免异步事件问题
|
||||
while True:
|
||||
# 检查停止标志
|
||||
if self._should_stop:
|
||||
print("🛑 [DEBUG] async_check_pause: 检测到停止信号")
|
||||
return False
|
||||
|
||||
# 检查暂停状态
|
||||
if self._should_pause:
|
||||
# 处于暂停状态,等待恢复
|
||||
print("⏸️ [DEBUG] async_check_pause: 检测到暂停,等待恢复...")
|
||||
await asyncio.sleep(0.1) # 短暂睡眠,避免占用CPU
|
||||
|
||||
# 如果恢复,继续执行
|
||||
if not self._should_pause:
|
||||
print("▶️ [DEBUG] async_check_pause: 从暂停中恢复!")
|
||||
continue
|
||||
# 如果停止了,返回
|
||||
if self._should_stop:
|
||||
return False
|
||||
# 继续等待
|
||||
continue
|
||||
|
||||
# 既没有停止也没有暂停,可以继续执行
|
||||
return True
|
||||
|
||||
def is_paused(self) -> bool:
|
||||
"""检查是否处于暂停状态"""
|
||||
with self._lock:
|
||||
return self._status == ExecutionStatus.PAUSED
|
||||
|
||||
def is_running(self) -> bool:
|
||||
"""检查是否正在运行"""
|
||||
with self._lock:
|
||||
return self._status == ExecutionStatus.RUNNING
|
||||
|
||||
def is_stopped(self) -> bool:
|
||||
"""检查是否已停止"""
|
||||
with self._lock:
|
||||
return self._status == ExecutionStatus.STOPPED
|
||||
|
||||
|
||||
# 全局单例实例
|
||||
execution_state_manager = ExecutionStateManager()
|
||||
@@ -20,6 +20,8 @@ def camel_case_to_normal(s):
|
||||
def generate_template_sentence_for_CollaborationBrief(
|
||||
input_object_list, output_object, agent_list, step_task
|
||||
):
|
||||
# Ensure step_task is not None
|
||||
step_task = step_task if step_task is not None else "perform the task"
|
||||
# Check if the names are in camel case (no spaces) and convert them to normal naming convention
|
||||
input_object_list = (
|
||||
[
|
||||
@@ -31,29 +33,48 @@ def generate_template_sentence_for_CollaborationBrief(
|
||||
)
|
||||
output_object = (
|
||||
camel_case_to_normal(output_object)
|
||||
if is_camel_case(output_object)
|
||||
else output_object
|
||||
if output_object is not None and is_camel_case(output_object)
|
||||
else (output_object if output_object is not None else "unknown output")
|
||||
)
|
||||
|
||||
# Format the agents into a string with proper grammar
|
||||
if agent_list is None or len(agent_list) == 0:
|
||||
agent_str = "Unknown agents"
|
||||
elif all(agent is not None for agent in agent_list):
|
||||
agent_str = (
|
||||
" and ".join([", ".join(agent_list[:-1]), agent_list[-1]])
|
||||
if len(agent_list) > 1
|
||||
else agent_list[0]
|
||||
)
|
||||
else:
|
||||
# Filter out None values
|
||||
filtered_agents = [agent for agent in agent_list if agent is not None]
|
||||
if filtered_agents:
|
||||
agent_str = (
|
||||
" and ".join([", ".join(filtered_agents[:-1]), filtered_agents[-1]])
|
||||
if len(filtered_agents) > 1
|
||||
else filtered_agents[0]
|
||||
)
|
||||
else:
|
||||
agent_str = "Unknown agents"
|
||||
|
||||
if input_object_list is None or len(input_object_list) == 0:
|
||||
# Combine all the parts into the template sentence
|
||||
template_sentence = f"{agent_str} perform the task of {step_task} to obtain {output_object}."
|
||||
else:
|
||||
# Format the input objects into a string with proper grammar
|
||||
# Filter out None values from input_object_list
|
||||
filtered_input_list = [obj for obj in input_object_list if obj is not None]
|
||||
if filtered_input_list:
|
||||
input_str = (
|
||||
" and ".join(
|
||||
[", ".join(input_object_list[:-1]), input_object_list[-1]]
|
||||
[", ".join(filtered_input_list[:-1]), filtered_input_list[-1]]
|
||||
)
|
||||
if len(input_object_list) > 1
|
||||
else input_object_list[0]
|
||||
if len(filtered_input_list) > 1
|
||||
else filtered_input_list[0]
|
||||
)
|
||||
else:
|
||||
input_str = "unknown inputs"
|
||||
# Combine all the parts into the template sentence
|
||||
template_sentence = f"Based on {input_str}, {agent_str} perform the task of {step_task} to obtain {output_object}."
|
||||
|
||||
@@ -90,7 +111,7 @@ def read_LLM_Completion(messages, useGroq=True):
|
||||
return json.loads(match.group(0).strip())
|
||||
except Exception:
|
||||
pass
|
||||
raise ("bad format!")
|
||||
return {} # 返回空对象而不是抛出异常
|
||||
|
||||
|
||||
def read_json_content(text):
|
||||
@@ -111,7 +132,7 @@ def read_json_content(text):
|
||||
if match:
|
||||
return json.loads(match.group(0).strip())
|
||||
|
||||
raise ("bad format!")
|
||||
return {} # 返回空对象而不是抛出异常
|
||||
|
||||
|
||||
def read_outputObject_content(text, keyword):
|
||||
@@ -127,4 +148,4 @@ def read_outputObject_content(text, keyword):
|
||||
if match:
|
||||
return match.group(1).strip()
|
||||
else:
|
||||
raise ("bad format!")
|
||||
return "" # 返回空字符串而不是抛出异常
|
||||
|
||||
@@ -13,6 +13,8 @@ colorMap = {
|
||||
"agent": [0, 0, 80],
|
||||
}
|
||||
|
||||
|
||||
|
||||
def OperateCamelStr(_str: str):
|
||||
if " " in _str:
|
||||
return _str
|
||||
|
||||
@@ -18,4 +18,3 @@ python ./server.py --port 8017
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1 +1 @@
|
||||
This folder is for request chache so that we can reproduce the error.
|
||||
This folder is for request cache so that we can reproduce the error.
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
## config for default LLM
|
||||
OPENAI_API_BASE: ""
|
||||
OPENAI_API_KEY: ""
|
||||
OPENAI_API_MODEL: "gpt-4-turbo-preview"
|
||||
OPENAI_API_BASE: "https://ai.gitee.com/v1"
|
||||
OPENAI_API_KEY: "HYCNGM39GGFNSB1F8MBBMI9QYJR3P1CRSYS2PV1A"
|
||||
OPENAI_API_MODEL: "DeepSeek-V3"
|
||||
|
||||
## config for fast mode
|
||||
FAST_DESIGN_MODE: True
|
||||
FAST_DESIGN_MODE: False
|
||||
GROQ_API_KEY: ""
|
||||
MISTRAL_API_KEY: ""
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
Flask==3.0.2
|
||||
openai==0.28.1
|
||||
openai==2.8.1
|
||||
PyYAML==6.0.1
|
||||
termcolor==2.4.0
|
||||
groq==0.4.2
|
||||
mistralai==0.1.6
|
||||
socksio==1.0.0
|
||||
flask-socketio==5.3.6
|
||||
python-socketio==5.11.0
|
||||
simple-websocket==1.0.0
|
||||
|
||||
1485
backend/server.py
1485
backend/server.py
File diff suppressed because it is too large
Load Diff
31
frontend-react/.dockerignore
Normal file
31
frontend-react/.dockerignore
Normal file
@@ -0,0 +1,31 @@
|
||||
.DS_Store
|
||||
|
||||
.pnp
|
||||
.pnp.js
|
||||
.env.local
|
||||
.env.*.local
|
||||
.history
|
||||
*.log*
|
||||
|
||||
node_modules/
|
||||
.yarn-integrity
|
||||
.pnpm-store/
|
||||
*.tsbuildinfo
|
||||
.eslintcache
|
||||
.changeset/pre.json
|
||||
|
||||
dist/
|
||||
coverage/
|
||||
release/
|
||||
output/
|
||||
output_resource/
|
||||
log/
|
||||
|
||||
.vscode/**/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/extensions.json
|
||||
.idea/
|
||||
|
||||
**/*/typings/auto-generated
|
||||
|
||||
modern.config.local.*
|
||||
32
frontend-react/.gitignore
vendored
Normal file
32
frontend-react/.gitignore
vendored
Normal file
@@ -0,0 +1,32 @@
|
||||
.DS_Store
|
||||
.env
|
||||
|
||||
.pnp
|
||||
.pnp.js
|
||||
.env.local
|
||||
.env.*.local
|
||||
.history
|
||||
*.log*
|
||||
|
||||
node_modules/
|
||||
.yarn-integrity
|
||||
.pnpm-store/
|
||||
*.tsbuildinfo
|
||||
.eslintcache
|
||||
.changeset/pre.json
|
||||
|
||||
dist/
|
||||
coverage/
|
||||
release/
|
||||
output/
|
||||
output_resource/
|
||||
log/
|
||||
|
||||
.vscode/**/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/extensions.json
|
||||
.idea/
|
||||
|
||||
**/*/typings/auto-generated
|
||||
|
||||
modern.config.local.*
|
||||
22
frontend-react/Dockerfile
Normal file
22
frontend-react/Dockerfile
Normal file
@@ -0,0 +1,22 @@
|
||||
FROM node:20-alpine AS base
|
||||
|
||||
FROM base AS installer
|
||||
WORKDIR /app
|
||||
COPY package.json .npmrc ./
|
||||
RUN npm install
|
||||
|
||||
FROM base AS builder
|
||||
WORKDIR /app
|
||||
COPY --from=installer /app/node_modules ./node_modules
|
||||
COPY . .
|
||||
RUN npm run build
|
||||
|
||||
FROM base AS runner
|
||||
WORKDIR /app
|
||||
EXPOSE 8080/tcp
|
||||
COPY .npmrc ./
|
||||
RUN npm install @modern-js/app-tools @modern-js/runtime --no-optional --no-shrinkwrap && mkdir src
|
||||
COPY modern.config.ts package.json ./
|
||||
COPY --from=builder /app/dist ./dist
|
||||
ENV API_BASE=
|
||||
CMD ["npm", "run", "serve"]
|
||||
39
frontend-react/README.md
Normal file
39
frontend-react/README.md
Normal file
@@ -0,0 +1,39 @@
|
||||
# AgentCoord Frontend
|
||||
|
||||
This is the frontend for the AgentCoord project. Root project is located [here](https://github.com/AgentCoord/AgentCoord)
|
||||
|
||||
## Installation
|
||||
|
||||
You can launch the frontend by simply using `docker-compose` in the root directory of the project.
|
||||
|
||||
Or, you can launch the frontend manually by following the steps below.
|
||||
|
||||
1. Install the dependencies by running `npm install`.
|
||||
|
||||
```bash
|
||||
npm install
|
||||
```
|
||||
|
||||
2. Build the frontend by running `npm run build`.
|
||||
|
||||
```bash
|
||||
npm run build
|
||||
```
|
||||
|
||||
3. Start the frontend by running `npm run serve`.
|
||||
|
||||
```bash
|
||||
npm run serve
|
||||
```
|
||||
|
||||
Then you can access the frontend by visiting `http://localhost:8080`.
|
||||
|
||||
## Development
|
||||
|
||||
You can run the frontend in development mode by running `npm run dev`.
|
||||
|
||||
```bash
|
||||
npm run dev
|
||||
```
|
||||
|
||||
The frontend website requires the backend server to be running. You can configure the backend server address by copying the `.env.example` file to `.env` and changing the `API_BASE` value to the backend server address.
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user