Compare commits

..

2 Commits

Author SHA1 Message Date
liailing1026
c00c0072b8 feat 2025-12-11 14:05:08 +08:00
zhaoweijie
6392301833 refactor(LLMAPI): 重构LLM接口以支持新版本OpenAI SDK
- 升级openai依赖至2.x版本并替换旧版SDK调用方式
- 引入OpenAI和AsyncOpenAI客户端实例替代全局配置
- 更新所有聊天完成请求方法以适配新版API格式
- 为异步流式响应处理添加异常捕获和错误提示
- 统一超时时间和最大token数等默认参数设置
- 修复部分变量命名冲突和潜在的空值引用问题
- 添加打印彩色日志的辅助函数避免循环导入问题
2025-11-22 17:01:25 +08:00
11 changed files with 448 additions and 463 deletions

View File

@ -0,0 +1,24 @@
{
"permissions": {
"allow": [
"Read(//Users/zhaoweijie/Desktop/agent/AgentCoord/**)",
"Bash(python3:*)",
"Bash(source:*)",
"Bash(pip install:*)",
"Bash(python:*)",
"Bash(tree:*)",
"Bash(export FAST_DESIGN_MODE=True:*)",
"Bash(echo:*)",
"Bash(chmod:*)",
"Bash(lsof:*)",
"Bash(curl:*)",
"Bash(xargs kill:*)",
"Bash(pip:*)",
"WebSearch",
"WebFetch(domain:pypi.org)",
"Bash(cp:*)"
],
"deny": [],
"ask": []
}
}

View File

@ -1,12 +1,15 @@
import asyncio
import openai
from openai import OpenAI, AsyncOpenAI
import yaml
from termcolor import colored
import os
# Helper function to avoid circular import
def print_colored(text, text_color="green", background="on_white"):
print(colored(text, text_color, background))
# load config (apikey, apibase, model)
yaml_file = os.path.join(os.getcwd(), "config", "config.yaml")
yaml_data = {}
try:
with open(yaml_file, "r", encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
@ -15,11 +18,13 @@ except Exception:
OPENAI_API_BASE = os.getenv("OPENAI_API_BASE") or yaml_data.get(
"OPENAI_API_BASE", "https://api.openai.com"
)
openai.api_base = OPENAI_API_BASE
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") or yaml_data.get(
"OPENAI_API_KEY", ""
)
openai.api_key = OPENAI_API_KEY
# Initialize OpenAI clients
client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE)
async_client = AsyncOpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE)
MODEL: str = os.getenv("OPENAI_API_MODEL") or yaml_data.get(
"OPENAI_API_MODEL", "gpt-4-turbo-preview"
)
@ -38,28 +43,7 @@ MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") or yaml_data.get(
def LLM_Completion(
messages: list[dict], stream: bool = True, useGroq: bool = True
) -> str:
# 增强消息验证:确保所有消息的 role 和 content 非空且不是空白字符串
if not messages or len(messages) == 0:
raise ValueError("Messages list is empty")
# print(f"[DEBUG] LLM_Completion received {len(messages)} messages", flush=True)
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
raise ValueError(f"Message at index {i} is not a dictionary")
if not msg.get("role") or str(msg.get("role")).strip() == "":
raise ValueError(f"Message at index {i} has empty 'role'")
if not msg.get("content") or str(msg.get("content")).strip() == "":
raise ValueError(f"Message at index {i} has empty 'content'")
# 额外验证确保content不会因为格式化问题变成空
content = str(msg.get("content")).strip()
if len(content) < 10: # 设置最小长度阈值
print(f"[WARNING] Message at index {i} has very short content: '{content}'", flush=True)
# 修改1
if not GROQ_API_KEY:
useGroq = False
elif not useGroq or not FAST_DESIGN_MODE:
if not useGroq or not FAST_DESIGN_MODE:
force_gpt4 = True
useGroq = False
else:
@ -93,14 +77,14 @@ def LLM_Completion(
async def _achat_completion_stream_groq(messages: list[dict]) -> str:
from groq import AsyncGroq
client = AsyncGroq(api_key=GROQ_API_KEY)
groq_client = AsyncGroq(api_key=GROQ_API_KEY)
max_attempts = 5
for attempt in range(max_attempts):
print("Attempt to use Groq (Fase Design Mode):")
try:
stream = await client.chat.completions.create(
response = await groq_client.chat.completions.create(
messages=messages,
# model='gemma-7b-it',
model="mixtral-8x7b-32768",
@ -114,9 +98,9 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str:
if attempt < max_attempts - 1: # i is zero indexed
continue
else:
raise "failed"
raise Exception("failed")
full_reply_content = stream.choices[0].message.content
full_reply_content = response.choices[0].message.content
print(colored(full_reply_content, "blue", "on_white"), end="")
print()
return full_reply_content
@ -125,14 +109,14 @@ async def _achat_completion_stream_groq(messages: list[dict]) -> str:
async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
from mistralai.client import MistralClient
from mistralai.models.chat_completion import ChatMessage
client = MistralClient(api_key=MISTRAL_API_KEY)
mistral_client = MistralClient(api_key=MISTRAL_API_KEY)
# client=AsyncGroq(api_key=GROQ_API_KEY)
max_attempts = 5
for attempt in range(max_attempts):
try:
messages[len(messages) - 1]["role"] = "user"
stream = client.chat(
stream = mistral_client.chat(
messages=[
ChatMessage(
role=message["role"], content=message["content"]
@ -141,14 +125,13 @@ async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
],
# model = "mistral-small-latest",
model="open-mixtral-8x7b",
# response_format={"type": "json_object"},
)
break # If the operation is successful, break the loop
except Exception:
if attempt < max_attempts - 1: # i is zero indexed
continue
else:
raise "failed"
raise Exception("failed")
full_reply_content = stream.choices[0].message.content
print(colored(full_reply_content, "blue", "on_white"), end="")
@ -157,25 +140,14 @@ async def _achat_completion_stream_mixtral(messages: list[dict]) -> str:
async def _achat_completion_stream_gpt35(messages: list[dict]) -> str:
openai.api_key = OPENAI_API_KEY
openai.api_base = OPENAI_API_BASE
kwargs = {
"messages": messages,
"max_tokens": 4096,
"n": 1,
"stop": None,
"temperature": 0.3,
"timeout": 3,
"model": "gpt-3.5-turbo-16k",
"stream": True,
}
# print("[DEBUG] about to call acreate with kwargs:", type(kwargs), kwargs)
assert kwargs is not None, "kwargs is None right before acreate!"
assert isinstance(kwargs, dict), "kwargs must be dict!"
response = await openai.ChatCompletion.acreate(**kwargs)
response = await async_client.chat.completions.create(
messages=messages,
max_tokens=4096,
temperature=0.3,
timeout=30,
model="gpt-3.5-turbo-16k",
stream=True,
)
# create variables to collect the stream of chunks
collected_chunks = []
@ -183,40 +155,33 @@ async def _achat_completion_stream_gpt35(messages: list[dict]) -> str:
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
choices = chunk["choices"]
choices = chunk.choices
if len(choices) > 0:
chunk_message = chunk["choices"][0].get(
"delta", {}
) # extract the message
chunk_message = chunk.choices[0].delta
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
if chunk_message.content:
print(
colored(chunk_message["content"], "blue", "on_white"),
colored(chunk_message.content, "blue", "on_white"),
end="",
)
print()
full_reply_content = "".join(
[m.get("content", "") for m in collected_messages]
[m.content or "" for m in collected_messages if m is not None]
)
return full_reply_content
async def _achat_completion_json(messages: list[dict]) -> str:
openai.api_key = OPENAI_API_KEY
openai.api_base = OPENAI_API_BASE
max_attempts = 5
for attempt in range(max_attempts):
try:
stream = await openai.ChatCompletion.acreate(
response = await async_client.chat.completions.create(
messages=messages,
max_tokens=4096,
n=1,
stop=None,
temperature=0.3,
timeout=3,
timeout=30,
model=MODEL,
response_format={"type": "json_object"},
)
@ -225,107 +190,63 @@ async def _achat_completion_json(messages: list[dict]) -> str:
if attempt < max_attempts - 1: # i is zero indexed
continue
else:
raise "failed"
raise Exception("failed")
full_reply_content = stream.choices[0].message.content
full_reply_content = response.choices[0].message.content
print(colored(full_reply_content, "blue", "on_white"), end="")
print()
return full_reply_content
async def _achat_completion_stream(messages: list[dict]) -> str:
# print(">>>> _achat_completion_stream 被调用", flush=True)
# print(">>>> messages 实参 =", messages, flush=True)
# print(">>>> messages 类型 =", type(messages), flush=True)
openai.api_key = OPENAI_API_KEY
openai.api_base = OPENAI_API_BASE
response = await openai.ChatCompletion.acreate(
**_cons_kwargs(messages), stream=True
)
try:
response = await async_client.chat.completions.create(
**_cons_kwargs(messages), stream=True
)
# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
choices = chunk["choices"]
if len(choices) > 0:
chunk_message = chunk["choices"][0].get(
"delta", {}
) # extract the message
collected_messages.append(chunk_message) # save the message
if "content" in chunk_message:
print(
colored(chunk_message["content"], "blue", "on_white"),
end="",
)
print()
# create variables to collect the stream of chunks
collected_chunks = []
collected_messages = []
# iterate through the stream of events
async for chunk in response:
collected_chunks.append(chunk) # save the event response
choices = chunk.choices
if len(choices) > 0:
chunk_message = chunk.choices[0].delta
collected_messages.append(chunk_message) # save the message
if chunk_message.content:
print(
colored(chunk_message.content, "blue", "on_white"),
end="",
)
print()
full_reply_content = "".join(
[m.get("content", "") for m in collected_messages]
)
return full_reply_content
full_reply_content = "".join(
[m.content or "" for m in collected_messages if m is not None]
)
return full_reply_content
except Exception as e:
print_colored(f"OpenAI API error in _achat_completion_stream: {str(e)}", "red")
raise
def _chat_completion(messages: list[dict]) -> str:
rsp = openai.ChatCompletion.create(**_cons_kwargs(messages))
content = rsp["choices"][0]["message"]["content"]
return content
try:
rsp = client.chat.completions.create(**_cons_kwargs(messages))
content = rsp.choices[0].message.content
return content
except Exception as e:
print_colored(f"OpenAI API error in _chat_completion: {str(e)}", "red")
raise
def _cons_kwargs(messages: list[dict]) -> dict:
kwargs = {
"messages": messages,
"max_tokens": 4096,
"temperature": 0.5,
"max_tokens": 2000,
"temperature": 0.3,
"timeout": 15,
}
print("[DEBUG] kwargs =", kwargs)
assert isinstance(kwargs, dict), f"_cons_kwargs returned {type(kwargs)}, must be dict"
# 添加调试信息
print(f'[DEBUG] _cons_kwargs messages: {messages}', flush=True)
# 检查并修复消息中的null值
for i, msg in enumerate(messages):
# 确保msg是字典
if not isinstance(msg, dict):
print(f"[ERROR] Message {i} is not a dictionary: {msg}", flush=True)
messages[i] = {"role": "user", "content": str(msg) if msg is not None else ""}
continue
# 确保role和content存在且不为None
if "role" not in msg or msg["role"] is None:
print(f"[ERROR] Message {i} missing role, setting to 'user'", flush=True)
msg["role"] = "user"
else:
msg["role"] = str(msg["role"]).strip()
if "content" not in msg or msg["content"] is None:
print(f"[ERROR] Message {i} missing content, setting to empty string", flush=True)
msg["content"] = ""
else:
msg["content"] = str(msg["content"]).strip()
# 根据不同的API提供商调整参数
if "deepseek" in MODEL.lower():
# DeepSeek API特殊处理
print("[DEBUG] DeepSeek API detected, adjusting parameters", flush=True)
kwargs.pop("n", None) # 移除n参数DeepSeek可能不支持
if "timeout" in kwargs:
kwargs.pop("timeout", None)
# DeepSeek可能不支持stop参数
kwargs.pop("stop", None)
else:
# OpenAI兼容的API
kwargs["n"] = 1
kwargs["stop"] = None
kwargs["timeout"] = 3
kwargs["model"] = MODEL
# 确保messages列表中的每个元素都有有效的role和content
kwargs["messages"] = [msg for msg in messages if msg["role"] and msg["content"]]
print(f"[DEBUG] Final kwargs for API call: {kwargs.keys()}", flush=True)
return kwargs
kwargs_mode = {"model": MODEL}
kwargs.update(kwargs_mode)
return kwargs

View File

@ -79,6 +79,10 @@ def generate_AbilityRequirement(General_Goal, Current_Task):
return read_LLM_Completion(messages)["AbilityRequirement"]
def generate_AgentSelection(General_Goal, Current_Task, Agent_Board):
# Check if Agent_Board is None or empty
if Agent_Board is None or len(Agent_Board) == 0:
raise ValueError("Agent_Board cannot be None or empty. Please ensure agents are set via /setAgents endpoint before generating a plan.")
messages = [
{
"role": "system",

View File

@ -2,7 +2,7 @@ from AgentCoord.util.converter import read_LLM_Completion
from typing import List
from pydantic import BaseModel
import json
import asyncio
PROMPT_PLAN_OUTLINE_GENERATION = """
## Instruction
Based on "Output Format Example", "General Goal", and "Initial Key Object List", output a formatted "Plan_Outline".
@ -69,13 +69,6 @@ class PlanOutline(BaseModel):
def generate_PlanOutline(InitialObject_List, General_Goal):
# 新增:校验 General_Goal 必须有有效内容
if not isinstance(General_Goal, str) or len(General_Goal.strip()) == 0:
raise ValueError("General_Goal 不能为空!必须提供具体的目标描述")
# 处理 InitialObject_List 为空的情况(可选,但更友好)
if not InitialObject_List:
InitialObject_List = ["无初始对象"] # 避免空列表导致的歧义
messages = [
{
"role": "system",
@ -90,11 +83,16 @@ def generate_PlanOutline(InitialObject_List, General_Goal):
),
},
]
# 二次校验 messages 内容(防止意外空值)
for msg in messages:
content = msg.get("content", "").strip()
if not content:
raise ValueError("生成的 LLM 请求消息内容为空,请检查参数")
return read_LLM_Completion(messages)["Plan_Outline"]
result = read_LLM_Completion(messages)
if isinstance(result, dict) and "Plan_Outline" in result:
return result["Plan_Outline"]
else:
# 如果格式不正确,返回默认的计划大纲
return [
{
"StepName": "Default Step",
"TaskContent": "Generated default plan step due to format error",
"InputObject_List": [],
"OutputObject": "Default Output"
}
]

View File

@ -80,7 +80,14 @@ class BaseAction():
Important_Mark = ""
action_Record += PROMPT_TEMPLATE_ACTION_RECORD.format(AgentName = actionInfo["AgentName"], Action_Description = actionInfo["AgentName"], Action_Result = actionInfo["Action_Result"], Important_Mark = Important_Mark)
prompt = PROMPT_TEMPLATE_TAKE_ACTION_BASE.format(agentName = agentName, agentProfile = AgentProfile_Dict[agentName], General_Goal = General_Goal, Current_Task_Description = TaskDescription, Input_Objects = inputObject_Record, History_Action = action_Record, Action_Description = self.info["Description"], Action_Custom_Note = self.Action_Custom_Note)
# Handle missing agent profiles gracefully
if agentName not in AgentProfile_Dict:
print_colored(text=f"Warning: Agent '{agentName}' not found in AgentProfile_Dict. Using default profile.", text_color="yellow")
agentProfile = f"AI Agent named {agentName}"
else:
agentProfile = AgentProfile_Dict[agentName]
prompt = PROMPT_TEMPLATE_TAKE_ACTION_BASE.format(agentName = agentName, agentProfile = agentProfile, General_Goal = General_Goal, Current_Task_Description = TaskDescription, Input_Objects = inputObject_Record, History_Action = action_Record, Action_Description = self.info["Description"], Action_Custom_Note = self.Action_Custom_Note)
print_colored(text = prompt, text_color="red")
messages = [{"role":"system", "content": prompt}]
ActionResult = LLM_Completion(messages,stream=False)

View File

@ -85,9 +85,17 @@ def executePlan(plan, num_StepToRun, RehearsalLog, AgentProfile_Dict):
# start the group chat
util.print_colored(TaskDescription, text_color="green")
ActionHistory = []
action_count = 0
total_actions = len(TaskProcess)
for ActionInfo in TaskProcess:
action_count += 1
actionType = ActionInfo["ActionType"]
agentName = ActionInfo["AgentName"]
# 添加进度日志
util.print_colored(f"🔄 Executing action {action_count}/{total_actions}: {actionType} by {agentName}", text_color="yellow")
if actionType in Action.customAction_Dict:
currentAction = Action.customAction_Dict[actionType](
info=ActionInfo,

View File

@ -1,6 +1,6 @@
import re
import json
from AgentCoord.LLMAPI.LLMAPI import LLM_Completion,GROQ_API_KEY
from AgentCoord.LLMAPI.LLMAPI import LLM_Completion
def create_agent_dict(agent_list):
@ -20,6 +20,8 @@ def camel_case_to_normal(s):
def generate_template_sentence_for_CollaborationBrief(
input_object_list, output_object, agent_list, step_task
):
# Ensure step_task is not None
step_task = step_task if step_task is not None else "perform the task"
# Check if the names are in camel case (no spaces) and convert them to normal naming convention
input_object_list = (
[
@ -31,29 +33,48 @@ def generate_template_sentence_for_CollaborationBrief(
)
output_object = (
camel_case_to_normal(output_object)
if is_camel_case(output_object)
else output_object
if output_object is not None and is_camel_case(output_object)
else (output_object if output_object is not None else "unknown output")
)
# Format the agents into a string with proper grammar
agent_str = (
" and ".join([", ".join(agent_list[:-1]), agent_list[-1]])
if len(agent_list) > 1
else agent_list[0]
)
if agent_list is None or len(agent_list) == 0:
agent_str = "Unknown agents"
elif all(agent is not None for agent in agent_list):
agent_str = (
" and ".join([", ".join(agent_list[:-1]), agent_list[-1]])
if len(agent_list) > 1
else agent_list[0]
)
else:
# Filter out None values
filtered_agents = [agent for agent in agent_list if agent is not None]
if filtered_agents:
agent_str = (
" and ".join([", ".join(filtered_agents[:-1]), filtered_agents[-1]])
if len(filtered_agents) > 1
else filtered_agents[0]
)
else:
agent_str = "Unknown agents"
if input_object_list is None or len(input_object_list) == 0:
# Combine all the parts into the template sentence
template_sentence = f"{agent_str} perform the task of {step_task} to obtain {output_object}."
else:
# Format the input objects into a string with proper grammar
input_str = (
" and ".join(
[", ".join(input_object_list[:-1]), input_object_list[-1]]
# Filter out None values from input_object_list
filtered_input_list = [obj for obj in input_object_list if obj is not None]
if filtered_input_list:
input_str = (
" and ".join(
[", ".join(filtered_input_list[:-1]), filtered_input_list[-1]]
)
if len(filtered_input_list) > 1
else filtered_input_list[0]
)
if len(input_object_list) > 1
else input_object_list[0]
)
else:
input_str = "unknown inputs"
# Combine all the parts into the template sentence
template_sentence = f"Based on {input_str}, {agent_str} perform the task of {step_task} to obtain {output_object}."
@ -73,80 +94,24 @@ def remove_render_spec(duty_spec):
return duty_spec
def read_LLM_Completion(messages, useGroq=None):
if useGroq is None:
useGroq = bool(GROQ_API_KEY)
# 添加调试信息和输入验证
print(f"[DEBUG] read_LLM_Completion called with {len(messages)} messages", flush=True)
if not messages or len(messages) == 0:
raise ValueError("No messages provided to read_LLM_Completion")
# 确保messages中的每个元素都是有效的
for i, msg in enumerate(messages):
if not isinstance(msg, dict):
print(f"[ERROR] Message {i} is not a dictionary: {type(msg)}", flush=True)
raise ValueError(f"Message {i} is not a dictionary")
if 'content' not in msg or msg['content'] is None:
print(f"[ERROR] Message {i} has no content or content is None", flush=True)
msg['content'] = "" # 提供默认空字符串
for attempt in range(3):
try:
print(f"[DEBUG] Attempt {attempt + 1}/3 to get LLM response", flush=True)
text = LLM_Completion(messages, useGroq=useGroq)
# 确保text是字符串类型
if text is None:
print(f"[ERROR] Null response from LLM on attempt {attempt + 1}", flush=True)
continue
text = str(text).strip()
if not text:
print(f"[ERROR] Empty response from LLM on attempt {attempt + 1}", flush=True)
continue
print(f"[DEBUG] LLM response length: {len(text)} characters", flush=True)
def read_LLM_Completion(messages, useGroq=True):
for _ in range(3):
text = LLM_Completion(messages, useGroq=useGroq)
# 尝试从代码块中提取JSON
pattern = r"(?:.*?```json)(.*?)(?:```.*?)"
match = re.search(pattern, text, re.DOTALL)
pattern = r"(?:.*?```json)(.*?)(?:```.*?)"
match = re.search(pattern, text, re.DOTALL)
if match:
json_content = match.group(1).strip()
print(f"[DEBUG] Found JSON in code block, length: {len(json_content)}", flush=True)
try:
result = json.loads(json_content)
print(f"[DEBUG] Successfully parsed JSON from code block", flush=True)
return result
except json.JSONDecodeError as e:
print(f"[ERROR] JSON decode error in code block: {e}", flush=True)
print(f"[ERROR] JSON content was: {json_content}", flush=True)
if match:
return json.loads(match.group(1).strip())
# 尝试直接提取JSON对象
pattern = r"\{.*\}"
match = re.search(pattern, text, re.DOTALL)
if match:
json_content = match.group(0).strip()
print(f"[DEBUG] Found JSON in plain text, length: {len(json_content)}", flush=True)
try:
result = json.loads(json_content)
print(f"[DEBUG] Successfully parsed JSON from plain text", flush=True)
return result
except json.JSONDecodeError as e:
print(f"[ERROR] JSON decode error in plain text: {e}", flush=True)
print(f"[ERROR] JSON content was: {json_content}", flush=True)
print(f"[ERROR] No valid JSON found in response on attempt {attempt + 1}", flush=True)
print(f"[ERROR] Full response was: {text[:200]}..." if len(text) > 200 else f"[ERROR] Full response was: {text}", flush=True)
except Exception as e:
print(f"[ERROR] Exception on attempt {attempt + 1}: {e}", flush=True)
import traceback
traceback.print_exc()
continue
raise ValueError(f"Failed to get valid JSON response after 3 attempts. Last error: bad format or empty response")
pattern = r"\{.*\}"
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(0).strip())
except Exception:
pass
return {} # 返回空对象而不是抛出异常
def read_json_content(text):
@ -167,7 +132,7 @@ def read_json_content(text):
if match:
return json.loads(match.group(0).strip())
raise ("bad format!")
return {} # 返回空对象而不是抛出异常
def read_outputObject_content(text, keyword):
@ -183,4 +148,4 @@ def read_outputObject_content(text, keyword):
if match:
return match.group(1).strip()
else:
raise ("bad format!")
return "" # 返回空字符串而不是抛出异常

View File

@ -1,5 +1,5 @@
Flask==3.0.2
openai==0.28.1
openai==2.8.1
PyYAML==6.0.1
termcolor==2.4.0
groq==0.4.2

View File

@ -1,10 +1,3 @@
import nest_asyncio
nest_asyncio.apply()
import os, sys, functools
print = functools.partial(print, flush=True) # 全局 flush
sys.stdout.reconfigure(line_buffering=True) # 3.7+ 有效
import asyncio
from flask import Flask, request, jsonify
import json
from DataProcess import Add_Collaboration_Brief_FrontEnd
@ -26,12 +19,11 @@ import argparse
# initialize global variables
yaml_file = os.path.join(os.getcwd(), "config", "config.yaml")
yaml_data = {}
try:
with open(yaml_file, "r", encoding="utf-8") as file:
yaml_data = yaml.safe_load(file)
except Exception:
yaml_data = {}
yaml_file = {}
USE_CACHE: bool = os.getenv("USE_CACHE")
if USE_CACHE is None:
USE_CACHE = yaml_data.get("USE_CACHE", False)
@ -43,98 +35,14 @@ Request_Cache: dict[str, str] = {}
app = Flask(__name__)
from jsonschema import validate, ValidationError
AGENT_SELECTION_SCHEMA = {
"type": "object",
"properties": {
"AgentSelectionPlan": {
"type": "array",
"items": {
"type": "string",
"minLength": 1, # 不允许空字符串
"pattern": r"^\S+$" # 不允许仅空白
},
"minItems": 1 # 至少选一个
}
},
"required": ["AgentSelectionPlan"],
"additionalProperties": False
}
BASE_PLAN_SCHEMA = {
"type": "object",
"properties": {
"Plan_Outline": {
"type": "array",
"items": {
"type": "object",
"properties": {
"StepName": {"type": "string"},
"TaskContent": {"type": "string"},
"InputObject_List":{"type": "array", "items": {"type": "string"}},
"OutputObject": {"type": "string"},
},
"required": ["StepName", "TaskContent", "InputObject_List", "OutputObject"],
"additionalProperties": False,
},
}
},
"required": ["Plan_Outline"],
"additionalProperties": False,
}
def safe_join(iterable, sep=""):
"""保证 join 前全是 strNone 变空串"""
return sep.join("" if x is None else str(x) for x in iterable)
def clean_agent_board(board):
"""把 AgentBoard 洗成只含 str 的字典列表"""
if not board:
return []
return [
{"Name": (a.get("Name") or "").strip(),
"Profile": (a.get("Profile") or "").strip()}
for a in board
if a and a.get("Name")
]
def clean_plan_outline(outline):
"""清洗 Plan_Outline 里的 None"""
if not isinstance(outline, list):
return []
for step in outline:
if not isinstance(step, dict):
continue
step["InputObject_List"] = [
str(i) for i in step.get("InputObject_List", []) if i is not None
]
step["OutputObject"] = str(step.get("OutputObject") or "")
step["StepName"] = str(step.get("StepName") or "")
step["TaskContent"] = str(step.get("TaskContent") or "")
return outline
@app.route("/fill_stepTask_TaskProcess", methods=["post"])
def Handle_fill_stepTask_TaskProcess():
incoming_data = request.get_json()
# print(f"[DEBUG] fill_stepTask_TaskProcess received data: {incoming_data}", flush=True)
# 验证必需参数
General_Goal = incoming_data.get("General Goal", "").strip()
stepTask_lackTaskProcess = incoming_data.get("stepTask_lackTaskProcess")
if not General_Goal:
return jsonify({"error": "General Goal is required and cannot be empty"}), 400
if not stepTask_lackTaskProcess:
return jsonify({"error": "stepTask_lackTaskProcess is required"}), 400
requestIdentifier = str(
(
"/fill_stepTask_TaskProcess",
General_Goal,
stepTask_lackTaskProcess,
incoming_data["General Goal"],
incoming_data["stepTask_lackTaskProcess"],
)
)
@ -142,54 +50,40 @@ def Handle_fill_stepTask_TaskProcess():
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
try:
filled_stepTask = fill_stepTask_TaskProcess(
General_Goal=General_Goal,
stepTask=stepTask_lackTaskProcess,
AgentProfile_Dict=AgentProfile_Dict,
)
except Exception as e:
print(f"[ERROR] fill_stepTask_TaskProcess failed: {e}", flush=True)
return jsonify({"error": str(e)}), 500
filled_stepTask = fill_stepTask_TaskProcess(
General_Goal=incoming_data["General Goal"],
stepTask=incoming_data["stepTask_lackTaskProcess"],
AgentProfile_Dict=AgentProfile_Dict,
)
filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask)
Request_Cache[requestIdentifier] = filled_stepTask
response = jsonify(filled_stepTask)
return response
@app.route("/agentSelectModify_init", methods=["POST"])
@app.route("/agentSelectModify_init", methods=["post"])
def Handle_agentSelectModify_init():
incoming = request.get_json(silent=True) or {}
general_goal = (incoming.get("General Goal") or "").strip()
step_task = incoming.get("stepTask")
if not general_goal or not step_task:
return jsonify({"error": "Missing field"}), 400
if not AgentBoard: # 空 Board 直接返回
return jsonify({"AgentSelectionPlan": []})
req_id = str(("/agentSelectModify_init", general_goal, step_task))
if USE_CACHE and req_id in Request_Cache:
return jsonify(Request_Cache[req_id])
try:
clean_board = clean_agent_board(AgentBoard)
raw = AgentSelectModify_init(stepTask=step_task,
General_Goal=general_goal,
Agent_Board=clean_board)
if not isinstance(raw, dict):
raise ValueError("model returned non-dict")
plan = raw.get("AgentSelectionPlan") or []
cleaned = [str(x).strip() for x in plan if x is not None and str(x).strip()]
raw["AgentSelectionPlan"] = cleaned
validate(instance=raw, schema=AGENT_SELECTION_SCHEMA)
except Exception as exc:
print(f"[ERROR] AgentSelectModify_init: {exc}")
return jsonify({"error": str(exc)}), 500
incoming_data = request.get_json()
requestIdentifier = str(
(
"/agentSelectModify_init",
incoming_data["General Goal"],
incoming_data["stepTask"],
)
)
if USE_CACHE:
Request_Cache[req_id] = raw
return jsonify(raw)
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
scoreTable = AgentSelectModify_init(
stepTask=incoming_data["stepTask"],
General_Goal=incoming_data["General Goal"],
Agent_Board=AgentBoard,
)
Request_Cache[requestIdentifier] = scoreTable
response = jsonify(scoreTable)
return response
@app.route("/agentSelectModify_addAspect", methods=["post"])
@ -204,7 +98,7 @@ def Handle_agentSelectModify_addAspect():
return jsonify(Request_Cache[requestIdentifier])
scoreTable = AgentSelectModify_addAspect(
aspectList=incoming_data["aspectList"], Agent_Board=AgentBoard or []
aspectList=incoming_data["aspectList"], Agent_Board=AgentBoard
)
Request_Cache[requestIdentifier] = scoreTable
response = jsonify(scoreTable)
@ -214,22 +108,11 @@ def Handle_agentSelectModify_addAspect():
@app.route("/fill_stepTask", methods=["post"])
def Handle_fill_stepTask():
incoming_data = request.get_json()
# print(f"[DEBUG] fill_stepTask received data: {incoming_data}", flush=True)
# 验证必需参数
General_Goal = incoming_data.get("General Goal", "").strip()
stepTask = incoming_data.get("stepTask")
if not General_Goal:
return jsonify({"error": "General Goal is required and cannot be empty"}), 400
if not stepTask:
return jsonify({"error": "stepTask is required"}), 400
requestIdentifier = str(
(
"/fill_stepTask",
General_Goal,
stepTask,
incoming_data["General Goal"],
incoming_data["stepTask"],
)
)
@ -237,16 +120,12 @@ def Handle_fill_stepTask():
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
try:
filled_stepTask = fill_stepTask(
General_Goal=General_Goal,
stepTask=stepTask,
Agent_Board=AgentBoard,
AgentProfile_Dict=AgentProfile_Dict,
)
except Exception as e:
print(f"[ERROR] fill_stepTask failed: {e}", flush=True)
return jsonify({"error": str(e)}), 500
filled_stepTask = fill_stepTask(
General_Goal=incoming_data["General Goal"],
stepTask=incoming_data["stepTask"],
Agent_Board=AgentBoard,
AgentProfile_Dict=AgentProfile_Dict,
)
filled_stepTask = Add_Collaboration_Brief_FrontEnd(filled_stepTask)
Request_Cache[requestIdentifier] = filled_stepTask
response = jsonify(filled_stepTask)
@ -319,46 +198,36 @@ def Handle_branch_TaskProcess():
return response
@app.route("/generate_basePlan", methods=["POST"])
@app.route("/generate_basePlan", methods=["post"])
def Handle_generate_basePlan():
incoming = request.get_json(silent=True) or {}
general_goal = (incoming.get("General Goal") or "").strip()
initial_objs = incoming.get("Initial Input Object") or []
incoming_data = request.get_json()
requestIdentifier = str(
(
"/generate_basePlan",
incoming_data["General Goal"],
incoming_data["Initial Input Object"],
)
)
if not general_goal:
return jsonify({"error": "General Goal is required"}), 400
# 1. 空 Board 直接短路
if not AgentBoard:
print("[SKIP] AgentBoard empty")
out = Add_Collaboration_Brief_FrontEnd({"Plan_Outline": []})
return jsonify(out)
req_id = str(("/generate_basePlan", general_goal, initial_objs))
if USE_CACHE and req_id in Request_Cache:
return jsonify(Request_Cache[req_id])
if USE_CACHE:
if requestIdentifier in Request_Cache:
return jsonify(Request_Cache[requestIdentifier])
try:
# 2. 洗 Board → 调模型 → 洗返回
clean_board = clean_agent_board(AgentBoard)
raw_plan = asyncio.run(
generate_basePlan(
General_Goal=general_goal,
Agent_Board=clean_board,
AgentProfile_Dict=AgentProfile_Dict,
InitialObject_List=initial_objs,
)
basePlan = generate_basePlan(
General_Goal=incoming_data["General Goal"],
Agent_Board=AgentBoard,
AgentProfile_Dict=AgentProfile_Dict,
InitialObject_List=incoming_data["Initial Input Object"],
)
raw_plan["Plan_Outline"] = clean_plan_outline(raw_plan.get("Plan_Outline"))
validate(instance=raw_plan, schema=BASE_PLAN_SCHEMA) # 可选,二次校验
except Exception as exc:
print(f"[ERROR] generate_basePlan failed: {exc}")
return jsonify({"error": "model call failed", "detail": str(exc)}), 500
out = Add_Collaboration_Brief_FrontEnd(raw_plan)
if USE_CACHE:
Request_Cache[req_id] = out
return jsonify(out)
except ValueError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": f"An unexpected error occurred: {str(e)}"}), 500
basePlan_withRenderSpec = Add_Collaboration_Brief_FrontEnd(basePlan)
Request_Cache[requestIdentifier] = basePlan_withRenderSpec
response = jsonify(basePlan_withRenderSpec)
return response
@app.route("/executePlan", methods=["post"])
@ -403,19 +272,49 @@ def Handle_saveRequestCashe():
@app.route("/setAgents", methods=["POST"])
def set_agents():
global AgentBoard, AgentProfile_Dict
board_in = request.json or []
# 先清洗再赋值
AgentBoard = clean_agent_board(board_in)
AgentProfile_Dict = {a["Name"]: a["Profile"] for a in AgentBoard}
return jsonify({"code": 200, "content": "AgentBoard set successfully"})
AgentBoard = request.json
AgentProfile_Dict = {}
for item in AgentBoard:
name = item["Name"]
profile = item["Profile"]
AgentProfile_Dict[name] = profile
return jsonify({"code": 200, "content": "set agentboard successfully"})
def init():
global AgentBoard, AgentProfile_Dict, Request_Cache
with open(
os.path.join(os.getcwd(), "RequestCache", "Request_Cache.json"), "r"
) as json_file:
Request_Cache = json.load(json_file)
# Load Request Cache
try:
with open(
os.path.join(os.getcwd(), "RequestCache", "Request_Cache.json"), "r"
) as json_file:
Request_Cache = json.load(json_file)
print(f"✅ Loaded Request_Cache with {len(Request_Cache)} entries")
except Exception as e:
print(f"⚠️ Failed to load Request_Cache: {e}")
Request_Cache = {}
# Load Agent Board
try:
with open(
os.path.join(os.getcwd(), "AgentRepo", "agentBoard_v1.json"), "r", encoding="utf-8"
) as json_file:
AgentBoard = json.load(json_file)
print(f"✅ Loaded AgentBoard with {len(AgentBoard)} agents")
# Build AgentProfile_Dict
AgentProfile_Dict = {}
for item in AgentBoard:
name = item["Name"]
profile = item["Profile"]
AgentProfile_Dict[name] = profile
print(f"✅ Built AgentProfile_Dict with {len(AgentProfile_Dict)} profiles")
except Exception as e:
print(f"⚠️ Failed to load AgentBoard: {e}")
AgentBoard = []
AgentProfile_Dict = {}
if __name__ == "__main__":
@ -425,8 +324,8 @@ if __name__ == "__main__":
parser.add_argument(
"--port",
type=int,
default=8017,
help="set the port number, 8017 by defaul.",
default=8000,
help="set the port number, 8000 by defaul.",
)
args = parser.parse_args()
init()

View File

@ -0,0 +1,159 @@
# AgentCoord Backend 代码逻辑架构分析
## 🏗️ Backend 代码逻辑架构
### 📁 核心目录结构
```
backend/
├── server.py # Flask主服务器入口
├── config/config.yaml # LLM API配置
├── AgentRepo/agentBoard_v1.json # 智能体定义库
├── DataProcess/ # 数据处理层
├── RequestCache/ # 缓存机制
└── AgentCoord/ # 核心业务逻辑
├── LLMAPI/ # LLM接口封装
├── PlanEngine/ # 计划生成引擎
├── RehearsalEngine_V2/ # 计划执行引擎
└── util/ # 工具模块
```
### 🔄 主要工作流程
#### 1**计划生成流程** (PlanEngine)
```
用户目标 → 生成计划大纲 → 选择智能体 → 生成任务流程 → 输出完整计划
```
**核心模块:**
- `basePlan_Generator.py` - 整合所有计划生成组件
- `planOutline_Generator.py` - 生成高级计划大纲
- `taskProcess_Generator.py` - 生成详细任务执行流程
- `AgentSelection_Generator.py` - 选择最适合的智能体
#### 2**计划执行流程** (RehearsalEngine_V2)
```
协作计划 → 初始化执行环境 → 按步骤执行 → 智能体协作 → 记录执行日志
```
**动作类型:**
- **Propose** - 提出建议和方案
- **Critique** - 提供反馈和批评
- **Improve** - 基于反馈改进结果
- **Finalize** - 最终确定输出
#### 3**LLM接口层** (LLMAPI)
支持的模型:
- OpenAI GPT-4/GPT-3.5
- Groq Mixtral-8x7b (快速模式)
- Mistral Open-Mixtral-8x7b
### 🌐 API端点架构
#### **计划生成APIs**
- `POST /generate_basePlan` - 生成基础协作计划
- `POST /fill_stepTask` - 填充步骤任务详情
- `POST /branch_PlanOutline` - 处理计划分支
#### **计划执行APIs**
- `POST /executePlan` - 执行协作计划
- `POST /agentSelectModify_init` - 初始化智能体选择
#### **系统管理APIs**
- `POST /setAgents` - 设置智能体板
- `POST /_saveRequestCache` - 保存请求缓存
### 💾 数据流设计
#### **输入层**
- HTTP请求验证
- 参数提取和格式化
- 缓存检查
#### **业务逻辑层**
- LLM API调用
- 多智能体协调
- 任务状态管理
#### **输出层**
- 结果格式化
- 前端渲染模板生成
- JSON响应
### ⚙️ 配置与优化
#### **配置文件 (config.yaml)**
```yaml
OPENAI_API_BASE: "https://api.openai.com"
OPENAI_API_KEY: "your-key"
OPENAI_API_MODEL: "gpt-4-turbo-preview"
FAST_DESIGN_MODE: True # 启用快速模式
USE_CACHE: False # 缓存开关
```
#### **性能优化**
- 请求缓存机制
- 快速模式支持Groq
- 异步LLM调用
- 重试和错误处理
### 🔧 关键技术特点
1. **模块化架构** - 清晰的职责分离
2. **多LLM支持** - 灵活的模型切换
3. **智能体协作** - 复杂的多智能体工作流
4. **前端适配** - 自动生成渲染模板
5. **可扩展性** - 支持自定义智能体和动作
## 📋 详细模块说明
### 1. 服务器入口 (server.py)
- **功能**: Flask应用主入口提供RESTful API
- **特点**: 支持请求缓存、全局状态管理、参数化端口配置
### 2. 计划引擎 (PlanEngine)
**核心功能**: 生成多智能体协作计划
- **basePlan_Generator.py**: 整合所有生成器,生成完整协作计划
- **planOutline_Generator.py**: 基于目标生成计划大纲
- **taskProcess_Generator.py**: 为每个任务步骤生成执行流程
- **AgentSelection_Generator.py**: 选择合适的智能体执行任务
### 3. 排练引擎 (RehearsalEngine_V2)
**核心功能**: 执行生成的协作计划
- **ExecutePlan.py**: 计划执行控制器
- **Action模块**: 实现各种协作动作Propose, Critique, Improve, Finalize
### 4. LLM API接口 (LLMAPI)
**核心功能**: 封装多种大语言模型API
- 支持流式响应
- 异步处理
- 快速模式切换
### 5. 数据处理 (DataProcess)
**核心功能**: 格式转换和前端适配
- 颜色映射:不同元素类型的视觉区分
- 模板生成:为前端生成渲染模板
- 格式化:处理驼峰命名和自然语言转换
## 🚀 启动和调试
### 开发环境启动
```bash
cd backend
source venv/bin/activate
python server.py --port 8017
```
### 调试模式
Flask已内置debug=True支持
- 交互式调试器
- 自动重载
- 详细错误页面
### Docker部署
```bash
docker-compose up
```
这个backend实现了一个完整的多智能体协作平台通过精心设计的模块化架构支持复杂任务的规划和执行。

View File

@ -17,7 +17,7 @@ import _ from 'lodash';
// fakeAgentSelections,
// fakeCurrentAgentSelection,
// } from './data/fakeAgentAssignment';
import CheckIcon from '@/icons/CheckIcon';
import CheckIcon from '@/icons/checkIcon';
import AgentIcon from '@/components/AgentIcon';
import { globalStorage } from '@/storage';
import SendIcon from '@/icons/SendIcon';