- 升级openai依赖至2.x版本并替换旧版SDK调用方式 - 引入OpenAI和AsyncOpenAI客户端实例替代全局配置 - 更新所有聊天完成请求方法以适配新版API格式 - 为异步流式响应处理添加异常捕获和错误提示 - 统一超时时间和最大token数等默认参数设置 - 修复部分变量命名冲突和潜在的空值引用问题 - 添加打印彩色日志的辅助函数避免循环导入问题
152 lines
4.8 KiB
Python
152 lines
4.8 KiB
Python
import re
|
|
import json
|
|
from AgentCoord.LLMAPI.LLMAPI import LLM_Completion
|
|
|
|
|
|
def create_agent_dict(agent_list):
|
|
return {agent["Name"]: agent["Profile"] for agent in agent_list}
|
|
|
|
|
|
def is_camel_case(s):
|
|
# If there are no spaces, it might be camel case
|
|
return " " not in s
|
|
|
|
|
|
def camel_case_to_normal(s):
|
|
# Split the camel case string into words
|
|
return "".join(" " + c if c.isupper() else c for c in s).lstrip()
|
|
|
|
|
|
def generate_template_sentence_for_CollaborationBrief(
|
|
input_object_list, output_object, agent_list, step_task
|
|
):
|
|
# Ensure step_task is not None
|
|
step_task = step_task if step_task is not None else "perform the task"
|
|
# Check if the names are in camel case (no spaces) and convert them to normal naming convention
|
|
input_object_list = (
|
|
[
|
|
camel_case_to_normal(obj) if is_camel_case(obj) else obj
|
|
for obj in input_object_list
|
|
]
|
|
if input_object_list is not None
|
|
else None
|
|
)
|
|
output_object = (
|
|
camel_case_to_normal(output_object)
|
|
if output_object is not None and is_camel_case(output_object)
|
|
else (output_object if output_object is not None else "unknown output")
|
|
)
|
|
|
|
# Format the agents into a string with proper grammar
|
|
if agent_list is None or len(agent_list) == 0:
|
|
agent_str = "Unknown agents"
|
|
elif all(agent is not None for agent in agent_list):
|
|
agent_str = (
|
|
" and ".join([", ".join(agent_list[:-1]), agent_list[-1]])
|
|
if len(agent_list) > 1
|
|
else agent_list[0]
|
|
)
|
|
else:
|
|
# Filter out None values
|
|
filtered_agents = [agent for agent in agent_list if agent is not None]
|
|
if filtered_agents:
|
|
agent_str = (
|
|
" and ".join([", ".join(filtered_agents[:-1]), filtered_agents[-1]])
|
|
if len(filtered_agents) > 1
|
|
else filtered_agents[0]
|
|
)
|
|
else:
|
|
agent_str = "Unknown agents"
|
|
|
|
if input_object_list is None or len(input_object_list) == 0:
|
|
# Combine all the parts into the template sentence
|
|
template_sentence = f"{agent_str} perform the task of {step_task} to obtain {output_object}."
|
|
else:
|
|
# Format the input objects into a string with proper grammar
|
|
# Filter out None values from input_object_list
|
|
filtered_input_list = [obj for obj in input_object_list if obj is not None]
|
|
if filtered_input_list:
|
|
input_str = (
|
|
" and ".join(
|
|
[", ".join(filtered_input_list[:-1]), filtered_input_list[-1]]
|
|
)
|
|
if len(filtered_input_list) > 1
|
|
else filtered_input_list[0]
|
|
)
|
|
else:
|
|
input_str = "unknown inputs"
|
|
# Combine all the parts into the template sentence
|
|
template_sentence = f"Based on {input_str}, {agent_str} perform the task of {step_task} to obtain {output_object}."
|
|
|
|
return template_sentence
|
|
|
|
|
|
def remove_render_spec(duty_spec):
|
|
if isinstance(duty_spec, dict):
|
|
return {
|
|
k: remove_render_spec(v)
|
|
for k, v in duty_spec.items()
|
|
if k != "renderSpec"
|
|
}
|
|
elif isinstance(duty_spec, list):
|
|
return [remove_render_spec(item) for item in duty_spec]
|
|
else:
|
|
return duty_spec
|
|
|
|
|
|
def read_LLM_Completion(messages, useGroq=True):
|
|
for _ in range(3):
|
|
text = LLM_Completion(messages, useGroq=useGroq)
|
|
|
|
pattern = r"(?:.*?```json)(.*?)(?:```.*?)"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
|
|
if match:
|
|
return json.loads(match.group(1).strip())
|
|
|
|
pattern = r"\{.*\}"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group(0).strip())
|
|
except Exception:
|
|
pass
|
|
return {} # 返回空对象而不是抛出异常
|
|
|
|
|
|
def read_json_content(text):
|
|
"""
|
|
Extracts and returns content between ```json and ```
|
|
|
|
:param text: The string containing the content enclosed by ```json and ```
|
|
"""
|
|
# pattern = r"```json(.*?)```"
|
|
pattern = r"(?:.*?```json)(.*?)(?:```.*?)"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
|
|
if match:
|
|
return json.loads(match.group(1).strip())
|
|
|
|
pattern = r"\{.*\}"
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
if match:
|
|
return json.loads(match.group(0).strip())
|
|
|
|
return {} # 返回空对象而不是抛出异常
|
|
|
|
|
|
def read_outputObject_content(text, keyword):
|
|
"""
|
|
Extracts and returns content between ```{keyword} and ```
|
|
|
|
:param text: The string containing the content enclosed by ```{keyword} and ```
|
|
"""
|
|
|
|
pattern = r"```{}(.*?)```".format(keyword)
|
|
match = re.search(pattern, text, re.DOTALL)
|
|
|
|
if match:
|
|
return match.group(1).strip()
|
|
else:
|
|
return "" # 返回空字符串而不是抛出异常
|