Files
AgentCoord/backend/AgentCoord/Export/xlsx_llm.py
2026-03-11 17:46:42 +08:00

482 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Excel 文档 LLM 报告导出器
调用大模型生成专业的任务执行报告并保存为Excel格式
"""
import json
import os
from datetime import datetime
from typing import Dict, Any
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
except ImportError:
pass
class XlsxLLMExporter:
"""Excel 文档 LLM 报告导出器 - 调用大模型生成报告"""
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和数据分析专家。你的任务是将以下任务执行数据生成一份详细、专业、结构化的执行报告。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 报告要求
请生成一份完整的任务执行报告,包含以下章节:
### 1. 执行摘要
用 2-3 句话概括本次任务的整体执行情况。
### 2. 任务概述
- 任务背景与目标
- 任务范围与边界
### 3. 任务规划分析
- 任务拆解的合理性
- 智能体角色分配的优化建议
- 工作流程设计
### 4. 执行过程回顾
- 各阶段的完成情况
- 关键决策点
- 遇到的问题及解决方案
### 5. 成果产出分析
- 产出物的质量评估
- 产出与预期目标的匹配度
### 6. 团队协作分析
- 智能体之间的协作模式
- 信息传递效率
### 7. 质量评估
- 整体完成质量评分1-10分
- 各维度的具体评分及理由
### 8. 经验教训与改进建议
- 成功经验
- 存在的问题与不足
- 改进建议
---
## 输出格式要求
- 使用 Markdown 格式输出
- 语言:简体中文
- 适当使用列表、表格增强可读性
- 报告长度必须达到 3000-5000 字,每个章节都要详细展开,不要遗漏任何章节
- 每个章节的内容要充实,提供具体的分析和建议
- 注意:所有加粗标记必须成对出现,如 **文本**,不要单独使用 ** 或缺少结束标记
- 禁止使用 mermaid、graph TD、flowchart 等图表代码,如果需要描述流程请用纯文字描述
- 不要生成附录章节
- 不要在报告中显示"报告总字数"这样的统计信息
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成 Excel 文档(调用 LLM 生成报告)"""
try:
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
agents = self._extract_agents(task_outline)
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
print("正在调用大模型生成 Excel 报告...")
report_content = self._call_llm(prompt)
if not report_content:
print("LLM 生成报告失败")
return False
report_content = self._clean_report_title(report_content)
print(f"报告生成成功,长度: {len(report_content)} 字符")
self._save_as_excel(report_content, file_path, task_name)
return True
except Exception as e:
print(f"Excel LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _clean_report_title(self, content: str) -> str:
"""清理报告开头的重复标题"""
lines = content.split('\n')
if not lines:
return content
first_line = lines[0].strip()
if first_line == '任务执行报告' or first_line == '# 任务执行报告':
lines = lines[1:]
while lines and not lines[0].strip():
lines.pop(0)
return '\n'.join(lines)
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {
'aspectList': aspect_list,
'agentScores': filtered_scores
}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成报告"""
try:
import openai
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=10000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _save_as_excel(self, markdown_content: str, file_path: str, task_name: str):
"""将 Markdown 内容保存为 Excel 文档"""
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
wb = Workbook()
ws = wb.active
ws.title = "任务执行报告"
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF", size=12)
title_font = Font(bold=True, size=14)
section_font = Font(bold=True, size=11)
normal_font = Font(size=10)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
ws.column_dimensions['A'].width = 20
ws.column_dimensions['B'].width = 80
row = 1
ws[f'A{row}'] = task_name
ws[f'A{row}'].font = Font(bold=True, size=16)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
ws[f'A{row}'] = f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
ws[f'A{row}'].font = Font(size=9, italic=True)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 2
lines = markdown_content.split('\n')
current_section = ""
table_data = []
in_table = False
for line in lines:
line = line.rstrip()
if not line:
if in_table and table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
in_table = False
continue
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|') and '---' in stripped:
continue
if '|' in line and line.strip().startswith('|'):
cells = [cell.strip() for cell in line.split('|')[1:-1]]
if cells and any(cells):
table_data.append(cells)
in_table = True
continue
else:
if in_table and table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
in_table = False
if line.startswith('### '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
current_section = line[4:].strip()
ws[f'A{row}'] = current_section
ws[f'A{row}'].font = Font(bold=True, size=12, color="4472C4")
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('## '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
section_title = line[2:].strip()
ws[f'A{row}'] = section_title
ws[f'A{row}'].font = Font(bold=True, size=13)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('# '):
pass
elif line.startswith('#### '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
current_section = line[5:].strip()
ws[f'A{row}'] = current_section
ws[f'A{row}'].font = Font(bold=True, size=11, color="4472C4")
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('- ') or line.startswith('* ') or line.startswith(''):
text = line[2:].strip() if line.startswith(('- ', '* ')) else line[1:].strip()
text = self._clean_markdown(text)
ws[f'A{row}'] = "" + text
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('**') and '**:' in line:
parts = line.split(':', 1)
if len(parts) == 2:
key = self._clean_markdown(parts[0])
value = self._clean_markdown(parts[1])
ws[f'A{row}'] = f"{key}: {value}"
ws[f'A{row}'].font = Font(bold=True, size=10)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
else:
clean_line = self._clean_markdown(line)
if clean_line:
ws[f'A{row}'] = clean_line
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
ws.column_dimensions['B'].width = 80
# 设置自适应行高
for r in range(1, row + 1):
ws.row_dimensions[r].bestFit = True
for col in ['A', 'B']:
for r in range(1, row + 1):
cell = ws[f'{col}{r}']
if cell.border is None or cell.border == Border():
cell.border = Border(
left=Side(style='none'),
right=Side(style='none'),
top=Side(style='none'),
bottom=Side(style='none')
)
wb.save(file_path)
print(f"Excel 文档已保存: {file_path}")
except ImportError:
print("请安装 openpyxl 库: pip install openpyxl")
raise
except Exception as e:
print(f"保存 Excel 文档失败: {e}")
raise
def _write_table_to_excel(self, ws, table_data, row, header_fill, header_font, border, normal_font):
"""将表格数据写入 Excel"""
if not table_data:
return row
if len(table_data) == 1:
ws[f'A{row}'] = table_data[0][0] if table_data[0] else ""
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
return row + 1
max_cols = max(len(row_data) for row_data in table_data)
max_cols = min(max_cols, 2)
for col_idx in range(max_cols):
col_letter = get_column_letter(col_idx + 1)
ws.column_dimensions[col_letter].width = 25 if max_cols > 1 else 80
start_row = row
for row_idx, row_data in enumerate(table_data):
for col_idx in range(min(len(row_data), max_cols)):
col_letter = get_column_letter(col_idx + 1)
cell = ws[f'{col_letter}{row + row_idx}']
cell.value = self._clean_markdown(row_data[col_idx])
cell.font = header_font if row_idx == 0 else normal_font
cell.fill = header_fill if row_idx == 0 else PatternFill()
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
cell.border = border
if max_cols > 1:
for col_idx in range(len(row_data), max_cols):
col_letter = get_column_letter(col_idx + 1)
ws[f'{col_letter}{row + row_idx}'].border = border
return row + len(table_data) + 1
def _clean_markdown(self, text: str) -> str:
"""清理 Markdown 格式标记"""
import re
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
text = re.sub(r'_(.+?)_', r'\1', text)
text = re.sub(r'~~(.+?)~~', r'\1', text)
text = re.sub(r'`(.+?)`', r'\1', text)
text = text.replace('\\n', '\n').replace('\\t', '\t')
return text.strip()