feat:导出功能重构

This commit is contained in:
liailing1026
2026-03-11 17:46:42 +08:00
parent 14b79bc282
commit 26c42697e8
20 changed files with 4070 additions and 126 deletions

View File

@@ -4,9 +4,16 @@
"""
import json
import os
from typing import Dict, Any, Optional
from datetime import datetime
# 导入 LLM 导出器
from .docx_llm import DocxLLMExporter
from .xlsx_llm import XlsxLLMExporter
from .infographic_llm import InfographicLLMExporter
from .markdown_llm import MarkdownLLMExporter
class BaseExporter:
"""导出器基类"""
@@ -22,6 +29,11 @@ class BaseExporter:
class MarkdownExporter(BaseExporter):
"""Markdown 导出器"""
# 需要过滤掉的任务大纲字段
OUTLINE_FIELDS_TO_SKIP = {
'Collaboration_Brief_FrontEnd', 'data', 'color', 'template', 'Collaboration_Brief'
}
def generate(self, file_path: str) -> bool:
"""生成 Markdown 文件"""
try:
@@ -43,23 +55,11 @@ class MarkdownExporter(BaseExporter):
content_lines.append("## 任务大纲\n")
content_lines.append(self._format_outline(task_outline))
# 执行结果
result = self.task_data.get('result')
if result:
content_lines.append("## 执行结果\n")
if isinstance(result, list):
for idx, item in enumerate(result, 1):
content_lines.append(f"### 步骤 {idx}\n")
content_lines.append(f"{json.dumps(item, ensure_ascii=False, indent=2)}\n")
else:
content_lines.append(f"{json.dumps(result, ensure_ascii=False, indent=2)}\n")
# 参与智能体 - 从 task_outline 的 Collaboration Process 中提取
# 参与智能体
task_outline = self.task_data.get('task_outline')
if task_outline and isinstance(task_outline, dict):
collaboration_process = task_outline.get('Collaboration Process', [])
if collaboration_process and isinstance(collaboration_process, list):
# 收集所有参与步骤的智能体
all_agents = set()
for step in collaboration_process:
if isinstance(step, dict):
@@ -69,10 +69,33 @@ class MarkdownExporter(BaseExporter):
if agent:
all_agents.add(agent)
if all_agents:
content_lines.append("## 参与智能体\n")
content_lines.append("\n## 参与智能体\n")
for agent_name in sorted(all_agents):
content_lines.append(f"- {agent_name}")
# 智能体评分信息
agent_scores = self.task_data.get('agent_scores')
if agent_scores:
content_lines.append("\n## 智能体评分\n")
content_lines.append(self._format_agent_scores(agent_scores))
# 执行结果 - 合并 rehearsal_log 和 result取最完整的数据
rehearsal_log = self.task_data.get('rehearsal_log')
result = self.task_data.get('result')
# 优先使用 rehearsal_log如果某个步骤数据不完整用 result 补充
if rehearsal_log:
content_lines.append("\n## 执行结果\n")
content_lines.append(self._format_with_fallback(rehearsal_log, result))
elif result:
content_lines.append("\n## 执行结果\n")
if isinstance(result, list):
for idx, item in enumerate(result, 1):
content_lines.append(f"### 步骤 {idx}\n")
content_lines.append(self._format_result_item(item))
else:
content_lines.append(self._format_result_item(result))
# 写入文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(content_lines))
@@ -80,8 +103,242 @@ class MarkdownExporter(BaseExporter):
return True
except Exception as e:
print(f"Markdown 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _format_result_item(self, item: Any, fallback: dict = None) -> str:
"""格式化执行结果项
Args:
item: 主数据字典
fallback: 可选的备用数据字典,用于补充 item 中缺失的字段
"""
lines = []
if not isinstance(item, dict):
lines.append(str(item))
return '\n'.join(lines)
# 如果有 fallback用 fallback 补充缺失的字段
if fallback:
data = {
'OutputName': item.get('OutputName', fallback.get('OutputName', '')),
'NodeId': item.get('NodeId', fallback.get('NodeId', '')),
'TaskContent': item.get('TaskContent', fallback.get('TaskContent', '')),
'InputName_List': item.get('InputName_List', fallback.get('InputName_List', [])),
'inputObject_Record': item.get('inputObject_Record', fallback.get('inputObject_Record', [])),
'ActionHistory': item.get('ActionHistory', fallback.get('ActionHistory', [])),
'content': item.get('content', fallback.get('content', '')),
}
else:
data = item
# 提取关键字段
output_name = data.get('OutputName', '')
node_id = data.get('NodeId', '')
task_content = data.get('TaskContent', '')
action_history = data.get('ActionHistory', [])
content = data.get('content', '')
# 输出产物
if output_name:
lines.append(f"**输出产物**: {output_name}")
if node_id and node_id != output_name:
lines.append(f"**步骤名称**: {node_id}")
# 产物级:产物执行结果 - content
# content 是产物级别的,独立显示
if content:
if isinstance(content, str):
decoded_content = content.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
lines.append(f"\n**产物执行结果**:")
for line in decoded_content.split('\n'):
lines.append(line)
else:
lines.append(f"\n**产物执行结果**: {content}")
# 步骤级:动作执行过程 - ActionHistory
# 只有当没有 content 时才显示执行过程
elif action_history and isinstance(action_history, list) and len(action_history) > 0:
lines.append("\n### 执行过程\n")
for action_idx, action in enumerate(action_history, 1):
if not isinstance(action, dict):
continue
agent_name = action.get('AgentName', '未知智能体')
action_type = action.get('ActionType', '')
description = action.get('Description', '')
action_result = action.get('Action_Result', '')
lines.append(f"#### 动作 {action_idx}: {agent_name} ({action_type})")
if description:
lines.append(f"**任务描述**: {description}")
if action_result:
if isinstance(action_result, str):
decoded_result = action_result.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
lines.append(f"\n**执行结果**:\n{decoded_result}")
else:
lines.append(f"\n**执行结果**: {action_result}")
lines.append("")
# 任务内容
if task_content:
lines.append(f"\n**任务内容**: {task_content}")
# 输入产物 - 包含详细内容
input_name_list = data.get('InputName_List', [])
input_object_record = data.get('inputObject_Record', [])
if input_name_list:
lines.append(f"\n**输入产物**:")
for inp_name in input_name_list:
lines.append(f" - {inp_name}")
# 查找对应的详细内容
if isinstance(input_object_record, list):
for record in input_object_record:
if isinstance(record, dict) and inp_name in record:
inp_content = record[inp_name]
if isinstance(inp_content, str):
decoded_inp = inp_content.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
lines.append(f" 内容:")
for line in decoded_inp.split('\n'):
lines.append(f" {line}")
break
lines.append("")
return '\n'.join(lines)
def _should_skip_field(self, key: str) -> bool:
"""判断是否应该跳过某个字段"""
key_lower = key.lower()
skip_patterns = ['brief', 'data', 'color', 'template', '_front']
return any(pattern in key_lower for pattern in skip_patterns)
def _format_agent_scores(self, agent_scores: Any) -> str:
"""格式化智能体评分信息"""
lines = []
if not agent_scores:
return "无评分信息"
if isinstance(agent_scores, list):
for item in agent_scores:
if isinstance(item, dict):
aspect_list = item.get('aspectList', [])
agent_scores_dict = item.get('agentScores', {})
if aspect_list:
lines.append(f"\n**评估维度**: {', '.join(aspect_list)}\n")
if isinstance(agent_scores_dict, dict):
for agent_name, scores in agent_scores_dict.items():
lines.append(f"\n### {agent_name}")
if isinstance(scores, dict):
for aspect, score_info in scores.items():
if isinstance(score_info, dict):
score = score_info.get('score', '')
reason = score_info.get('reason', '')
lines.append(f"- **{aspect}**:")
if score:
lines.append(f" - 评分: {score}")
if reason:
lines.append(f" - 理由: {reason}")
else:
lines.append(f"- **{aspect}**: {score_info}")
else:
lines.append(f"- {scores}")
elif isinstance(agent_scores, dict):
for agent_name, score in agent_scores.items():
if isinstance(score, dict):
lines.append(f"\n### {agent_name}")
for k, v in score.items():
lines.append(f"- **{k}**: {v}")
else:
lines.append(f"\n### {agent_name}: {score}")
else:
lines.append(str(agent_scores))
return '\n'.join(lines) if lines else "无评分信息"
def _format_with_fallback(self, rehearsal_log: Any, result: Any) -> str:
"""合并 rehearsal_log 和 result 的数据,取最完整的"""
lines = []
if not rehearsal_log and not result:
return "无执行日志"
# 将 result 转换为字典方便查找
# 支持多种 key: NodeId, OutputName, content
result_dict = {}
if result and isinstance(result, list):
for item in result:
if isinstance(item, dict):
# 用多个字段作为 key
keys = []
if item.get('NodeId'):
keys.append(item['NodeId'])
if item.get('OutputName'):
keys.append(item['OutputName'])
for key in keys:
result_dict[key] = item
if rehearsal_log and isinstance(rehearsal_log, list):
for idx, entry in enumerate(rehearsal_log, 1):
if isinstance(entry, dict):
# 用 NodeId 或 OutputName 作为 key
step_key = entry.get('NodeId', entry.get('OutputName', ''))
lines.append(f"### 步骤 {idx}\n")
# 如果 rehearsal_log 数据不完整,尝试从 result 补充
found_in_result = False
if step_key and step_key in result_dict:
result_item = result_dict[step_key]
lines.append(self._format_result_item(entry, result_item))
found_in_result = True
elif step_key:
# 尝试模糊匹配 - result 中的 NodeId 可能包含在 key 中
for k, v in result_dict.items():
if k in step_key or step_key in k:
lines.append(self._format_result_item(entry, v))
found_in_result = True
break
if not found_in_result:
lines.append(self._format_result_item(entry))
lines.append("")
elif rehearsal_log and isinstance(rehearsal_log, dict):
lines.append(self._format_result_item(rehearsal_log))
else:
if result and isinstance(result, list):
for idx, item in enumerate(result, 1):
lines.append(f"### 步骤 {idx}\n")
lines.append(self._format_result_item(item))
lines.append("")
elif result:
lines.append(self._format_result_item(result))
return '\n'.join(lines)
def _format_rehearsal_log(self, rehearsal_log: Any) -> str:
"""格式化详细执行日志 - 使用和 result 相同的格式"""
lines = []
if not rehearsal_log:
return "无执行日志"
if isinstance(rehearsal_log, list):
for idx, entry in enumerate(rehearsal_log, 1):
lines.append(f"### 步骤 {idx}\n")
lines.append(self._format_result_item(entry))
lines.append("")
elif isinstance(rehearsal_log, dict):
# 如果是单个步骤
lines.append(self._format_result_item(rehearsal_log))
else:
lines.append(str(rehearsal_log))
return '\n'.join(lines)
def _format_outline(self, outline: Any, level: int = 2) -> str:
"""格式化大纲内容"""
lines = []
@@ -89,14 +346,24 @@ class MarkdownExporter(BaseExporter):
if isinstance(outline, dict):
for key, value in outline.items():
# 跳过不需要的字段
if self._should_skip_field(key):
continue
# 如果值是简单类型,直接显示
if isinstance(value, (str, int, float, bool)) or value is None:
lines.append(f"**{key}**: {value}")
# 如果值是列表或字典,递归处理
elif isinstance(value, list):
lines.append(f"**{key}**:")
lines.append(self._format_outline(value, level + 1))
if len(value) == 0:
lines.append(f"**{key}**: (无)")
else:
lines.append(f"**{key}**:")
lines.append(self._format_outline(value, level + 1))
elif isinstance(value, dict):
# 检查是否整个字典都是要跳过的字段
if all(self._should_skip_field(k) for k in value.keys()):
continue
lines.append(f"**{key}**:")
lines.append(self._format_outline(value, level + 1))
else:
@@ -104,20 +371,43 @@ class MarkdownExporter(BaseExporter):
elif isinstance(outline, list):
for idx, item in enumerate(outline, 1):
if isinstance(item, dict):
# 列表中的每个字典作为一个整体项
lines.append(f"{prefix} 步骤 {idx}")
for key, value in item.items():
if isinstance(value, (str, int, float, bool)) or value is None:
lines.append(f" - **{key}**: {value}")
elif isinstance(value, list):
lines.append(f" - **{key}**:")
for v in value:
lines.append(f" - {v}")
elif isinstance(value, dict):
lines.append(f" - **{key}**:")
lines.append(self._format_outline(value, level + 2))
else:
lines.append(f" - **{key}**: {value}")
# 检查是否是 Collaboration Process 中的步骤
step_name = item.get('StepName', f'步骤 {idx}')
task_content = item.get('TaskContent', '')
agent_selection = item.get('AgentSelection', [])
output_object = item.get('OutputObject', '')
input_object_list = item.get('InputObject_List', [])
# 步骤标题
lines.append(f"{prefix} {step_name}")
# 任务内容
if task_content:
lines.append(f" - **任务内容**: {task_content}")
# 输入产物
if input_object_list:
lines.append(f" - **输入产物**: {', '.join(input_object_list)}")
# 输出产物
if output_object:
lines.append(f" - **输出产物**: {output_object}")
# 参与智能体
if agent_selection:
lines.append(f" - **参与智能体**: {', '.join(agent_selection)}")
# 任务流程 - 显示所有动作
task_process = item.get('TaskProcess', [])
if task_process and isinstance(task_process, list):
lines.append(f" - **执行流程**: 共 {len(task_process)} 个动作")
for proc_idx, proc in enumerate(task_process, 1):
if isinstance(proc, dict):
proc_agent = proc.get('AgentName', '')
proc_type = proc.get('ActionType', '')
proc_desc = proc.get('Description', '')
lines.append(f" {proc_idx}. {proc_agent} ({proc_type}): {proc_desc}")
lines.append("")
else:
lines.append(f"- {item}")
@@ -163,16 +453,15 @@ class DocxExporter(BaseExporter):
if isinstance(result, list):
for idx, item in enumerate(result, 1):
doc.add_heading(f'步骤 {idx}', level=2)
doc.add_paragraph(json.dumps(item, ensure_ascii=False, indent=2))
self._add_result_item_to_doc(doc, item)
else:
doc.add_paragraph(json.dumps(result, ensure_ascii=False, indent=2))
self._add_result_item_to_doc(doc, result)
# 参与智能体 - 从 task_outline 的 Collaboration Process 中提取
task_outline = self.task_data.get('task_outline')
if task_outline and isinstance(task_outline, dict):
collaboration_process = task_outline.get('Collaboration Process', [])
if collaboration_process and isinstance(collaboration_process, list):
# 收集所有参与步骤的智能体
all_agents = set()
for step in collaboration_process:
if isinstance(step, dict):
@@ -196,20 +485,82 @@ class DocxExporter(BaseExporter):
return False
except Exception as e:
print(f"Word 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _add_result_item_to_doc(self, doc, item: Any):
"""格式化执行结果项并添加到 Word 文档"""
if not isinstance(item, dict):
doc.add_paragraph(str(item))
return
output_name = item.get('OutputName', '')
node_id = item.get('NodeId', '')
task_content = item.get('TaskContent', '')
if output_name:
doc.add_paragraph(f"输出产物: {output_name}")
if node_id and node_id != output_name:
doc.add_paragraph(f"步骤名称: {node_id}")
if task_content:
doc.add_paragraph(f"\n任务内容: {task_content}")
input_name_list = item.get('InputName_List', [])
if input_name_list:
doc.add_paragraph(f"\n输入产物:")
for inp in input_name_list:
doc.add_paragraph(f" - {inp}")
action_history = item.get('ActionHistory', [])
if action_history and isinstance(action_history, list):
doc.add_heading('执行过程', level=3)
for action_idx, action in enumerate(action_history, 1):
if not isinstance(action, dict):
continue
agent_name = action.get('AgentName', '未知智能体')
action_type = action.get('ActionType', '')
description = action.get('Description', '')
action_result = action.get('Action_Result', '')
doc.add_paragraph(f"动作 {action_idx}: {agent_name} ({action_type})")
if description:
doc.add_paragraph(f" 任务描述: {description}")
if action_result:
if isinstance(action_result, str):
decoded_result = action_result.replace('\\n', '\n').replace('\\t', '\t').replace('\\r', '\r')
doc.add_paragraph(f"\n执行结果:\n{decoded_result}")
else:
doc.add_paragraph(f"\n执行结果: {action_result}")
doc.add_paragraph("")
def _should_skip_field(self, key: str) -> bool:
"""判断是否应该跳过某个字段"""
key_lower = key.lower()
skip_patterns = ['brief', 'data', 'color', 'template', '_front']
return any(pattern in key_lower for pattern in skip_patterns)
def _add_outline_to_doc(self, doc, outline: Any, level: int = 2):
"""递归添加大纲到文档"""
if isinstance(outline, dict):
for key, value in outline.items():
# 如果值是简单类型,直接显示为段落
if self._should_skip_field(key):
continue
if isinstance(value, (str, int, float, bool)) or value is None:
doc.add_paragraph(f"**{key}**: {value}")
# 如果值是列表或字典,递归处理
elif isinstance(value, list):
doc.add_paragraph(f"**{key}**:")
self._add_outline_to_doc(doc, value, level + 1)
if len(value) == 0:
doc.add_paragraph(f"**{key}**: (无)")
else:
doc.add_paragraph(f"**{key}**:")
self._add_outline_to_doc(doc, value, level + 1)
elif isinstance(value, dict):
if all(self._should_skip_field(k) for k in value.keys()):
continue
doc.add_paragraph(f"**{key}**:")
self._add_outline_to_doc(doc, value, level + 1)
else:
@@ -217,20 +568,37 @@ class DocxExporter(BaseExporter):
elif isinstance(outline, list):
for idx, item in enumerate(outline, 1):
if isinstance(item, dict):
# 列表中的每个字典作为一个整体项
doc.add_heading(f"步骤 {idx}", level=min(level, 3))
for key, value in item.items():
if isinstance(value, (str, int, float, bool)) or value is None:
doc.add_paragraph(f" - **{key}**: {value}")
elif isinstance(value, list):
doc.add_paragraph(f" - **{key}**:")
for v in value:
doc.add_paragraph(f" - {v}")
elif isinstance(value, dict):
doc.add_paragraph(f" - **{key}**:")
self._add_outline_to_doc(doc, value, level + 2)
else:
doc.add_paragraph(f" - **{key}**: {value}")
step_name = item.get('StepName', f'步骤 {idx}')
task_content = item.get('TaskContent', '')
agent_selection = item.get('AgentSelection', [])
output_object = item.get('OutputObject', '')
input_object_list = item.get('InputObject_List', [])
doc.add_heading(step_name, level=min(level, 3))
if task_content:
doc.add_paragraph(f" 任务内容: {task_content}")
if input_object_list:
doc.add_paragraph(f" 输入产物: {', '.join(input_object_list)}")
if output_object:
doc.add_paragraph(f" 输出产物: {output_object}")
if agent_selection:
doc.add_paragraph(f" 参与智能体: {', '.join(agent_selection)}")
task_process = item.get('TaskProcess', [])
if task_process and isinstance(task_process, list):
doc.add_paragraph(f" 执行流程: 共 {len(task_process)} 个动作")
for proc_idx, proc in enumerate(task_process, 1):
if isinstance(proc, dict):
proc_agent = proc.get('AgentName', '')
proc_type = proc.get('ActionType', '')
proc_desc = proc.get('Description', '')
doc.add_paragraph(f" {proc_idx}. {proc_agent} ({proc_type}): {proc_desc}")
doc.add_paragraph("")
else:
doc.add_paragraph(str(item))
else:
@@ -737,8 +1105,9 @@ class InfographicExporter(BaseExporter):
task_content = self.task_data.get('task_content', '')
task_outline = self.task_data.get('task_outline')
result = self.task_data.get('result')
rehearsal_log = self.task_data.get('rehearsal_log')
agent_scores = self.task_data.get('agent_scores')
# 从 task_outline 中提取参与智能体
all_agents = []
if task_outline and isinstance(task_outline, dict):
collaboration_process = task_outline.get('Collaboration Process', [])
@@ -753,13 +1122,104 @@ class InfographicExporter(BaseExporter):
agents_set.add(agent)
all_agents = sorted(agents_set)
# 统计执行步骤数
step_count = 0
if task_outline and isinstance(task_outline, dict):
collaboration_process = task_outline.get('Collaboration Process', [])
if isinstance(collaboration_process, list):
step_count = len(collaboration_process)
infographic_data = None
try:
llm_exporter = InfographicLLMExporter()
infographic_data = llm_exporter.generate(self.task_data)
except Exception as e:
print(f"调用 LLM 生成信息图内容失败: {e}")
summary = infographic_data.get('summary', '') if infographic_data else ''
highlights = infographic_data.get('highlights', []) if infographic_data else []
statistics = infographic_data.get('statistics', {}) if infographic_data else {}
key_insights = infographic_data.get('key_insights', []) if infographic_data else []
timeline = infographic_data.get('timeline', []) if infographic_data else []
agent_performance = infographic_data.get('agent_performance', []) if infographic_data else []
llm_step_count = statistics.get('total_steps', step_count)
llm_agent_count = statistics.get('agent_count', len(all_agents))
completion_rate = statistics.get('completion_rate', 0)
quality_score = statistics.get('quality_score', 0)
summary_html = f'''
<div class="section">
<h2>执行摘要</h2>
<div class="summary-box">
<p>{summary or ''}</p>
</div>
</div>
''' if summary else ''
stats_extra_html = f'''
<div class="stat-item">
<div class="value">{completion_rate}%</div>
<div class="label">完成率</div>
</div>
<div class="stat-item">
<div class="value">{quality_score}</div>
<div class="label">质量评分</div>
</div>
''' if infographic_data else ''
highlights_html = f'''
<div class="section">
<h2>执行亮点</h2>
<div class="highlights">
{''.join(f'<div class="highlight-item"><p>{h}</p></div>' for h in highlights) if highlights else '<p>无</p>'}
</div>
</div>
''' if highlights else ''
insights_html = f'''
<div class="section">
<h2>关键洞察</h2>
<div class="insights">
{''.join(f'<div class="insight-item"><p>{insight}</p></div>' for insight in key_insights) if key_insights else '<p>无</p>'}
</div>
</div>
''' if key_insights else ''
agent_performance_items = ''.join('''
<div class="agent-card">
<div class="name">''' + ap.get('name', '') + '''</div>
<div class="score">''' + str(ap.get('score', 0)) + '''</div>
<div class="score-label">分</div>
<div class="contribution">''' + ap.get('contribution', '') + '''</div>
</div>
''' for ap in agent_performance) if agent_performance else '<p>无</p>'
agent_performance_html = f'''
<div class="section">
<h2>智能体表现</h2>
<div class="stats">
{agent_performance_items}
</div>
</div>
''' if agent_performance else ''
timeline_items = ''.join('''
<div class="timeline-item ''' + tl.get('status', 'pending') + '''">
<div class="step-name">''' + tl.get('step', '') + '''</div>
<div class="step-status">状态: ''' + tl.get('status', '') + '''</div>
<div class="step-result">''' + tl.get('key_result', '') + '''</div>
</div>
''' for tl in timeline) if timeline else '<p>无</p>'
timeline_html = f'''
<div class="section">
<h2>执行时间线</h2>
<div class="timeline">
{timeline_items}
</div>
</div>
''' if timeline else ''
html = f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
@@ -769,7 +1229,7 @@ class InfographicExporter(BaseExporter):
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; padding: 40px 20px; }}
.container {{ max-width: 800px; margin: 0 auto; }}
.container {{ max-width: 900px; margin: 0 auto; }}
.card {{ background: white; border-radius: 20px; box-shadow: 0 20px 60px rgba(0,0,0,0.3); overflow: hidden; }}
.header {{ background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 40px; text-align: center; }}
.header h1 {{ font-size: 32px; margin-bottom: 10px; }}
@@ -778,12 +1238,36 @@ class InfographicExporter(BaseExporter):
.section {{ margin-bottom: 30px; }}
.section h2 {{ color: #667eea; font-size: 20px; margin-bottom: 15px; padding-bottom: 10px; border-bottom: 2px solid #f0f0f0; }}
.section p {{ color: #666; line-height: 1.8; }}
.stats {{ display: flex; gap: 20px; flex-wrap: wrap; }}
.summary-box {{ background: linear-gradient(135deg, #f5f7fa 0%, #e4e8ec 100%); border-radius: 16px; padding: 24px; margin-bottom: 20px; }}
.summary-box p {{ color: #333; font-size: 16px; line-height: 1.8; font-weight: 500; }}
.stats {{ display: flex; gap: 20px; flex-wrap: wrap; justify-content: center; }}
.stat-item {{ flex: 1; min-width: 150px; background: #f8f9fa; padding: 20px; border-radius: 12px; text-align: center; }}
.stat-item .value {{ font-size: 28px; font-weight: bold; color: #667eea; }}
.stat-item .label {{ font-size: 14px; color: #999; margin-top: 5px; }}
.highlights {{ display: flex; flex-direction: column; gap: 12px; }}
.highlight-item {{ background: linear-gradient(135deg, #fff9e6 0%, #fff3cc 100%); border-left: 4px solid #ffc107; padding: 16px 20px; border-radius: 8px; }}
.highlight-item p {{ color: #856404; margin: 0; font-weight: 500; }}
.insights {{ display: flex; flex-direction: column; gap: 12px; }}
.insight-item {{ background: #e8f5e9; border-left: 4px solid #4caf50; padding: 16px 20px; border-radius: 8px; }}
.insight-item p {{ color: #2e7d32; margin: 0; }}
.agent-list {{ display: flex; flex-wrap: wrap; gap: 10px; }}
.agent-tag {{ background: #667eea; color: white; padding: 8px 16px; border-radius: 20px; font-size: 14px; }}
.agent-card {{ background: #f8f9fa; border-radius: 12px; padding: 16px; text-align: center; }}
.agent-card .name {{ font-weight: bold; color: #333; margin-bottom: 8px; }}
.agent-card .score {{ font-size: 24px; color: #667eea; font-weight: bold; }}
.agent-card .score-label {{ font-size: 12px; color: #999; }}
.agent-card .contribution {{ font-size: 12px; color: #666; margin-top: 8px; }}
.timeline {{ position: relative; padding-left: 30px; }}
.timeline::before {{ content: ''; position: absolute; left: 8px; top: 0; bottom: 0; width: 2px; background: #e0e0e0; }}
.timeline-item {{ position: relative; margin-bottom: 20px; padding-bottom: 20px; border-bottom: 1px solid #f0f0f0; }}
.timeline-item:last-child {{ border-bottom: none; margin-bottom: 0; padding-bottom: 0; }}
.timeline-item::before {{ content: ''; position: absolute; left: -26px; top: 4px; width: 12px; height: 12px; border-radius: 50%; background: #667eea; }}
.timeline-item.completed::before {{ background: #4caf50; }}
.timeline-item.in-progress::before {{ background: #ff9800; }}
.timeline-item.pending::before {{ background: #9e9e9e; }}
.timeline-item .step-name {{ font-weight: bold; color: #333; margin-bottom: 4px; }}
.timeline-item .step-status {{ font-size: 12px; color: #666; margin-bottom: 4px; }}
.timeline-item .step-result {{ font-size: 13px; color: #999; }}
.steps-list {{ background: #f8f9fa; border-radius: 12px; padding: 20px; }}
.step-item {{ margin-bottom: 15px; padding-bottom: 15px; border-bottom: 1px solid #eee; }}
.step-item:last-child {{ margin-bottom: 0; padding-bottom: 0; border-bottom: none; }}
@@ -804,6 +1288,7 @@ class InfographicExporter(BaseExporter):
<div class="time">导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div>
</div>
<div class="content">
{summary_html}
<div class="section">
<h2>任务描述</h2>
<p>{task_content or ''}</p>
@@ -812,33 +1297,32 @@ class InfographicExporter(BaseExporter):
<h2>执行统计</h2>
<div class="stats">
<div class="stat-item">
<div class="value">{step_count}</div>
<div class="value">{llm_step_count}</div>
<div class="label">执行步骤</div>
</div>
<div class="stat-item">
<div class="value">{len(all_agents)}</div>
<div class="value">{llm_agent_count}</div>
<div class="label">参与智能体</div>
</div>
{stats_extra_html}
</div>
</div>
{highlights_html}
{insights_html}
<div class="section">
<h2>参与智能体</h2>
<div class="agent-list">
{''.join(f'<span class="agent-tag">{agent}</span>' for agent in all_agents) if all_agents else ''}
</div>
</div>
{agent_performance_html}
{timeline_html}
<div class="section">
<h2>任务步骤</h2>
<div class="steps-list">
{self._format_steps_html(task_outline)}
</div>
</div>
<div class="section">
<h2>执行结果</h2>
<div class="result-content">
{self._format_result_html(result)}
</div>
</div>
</div>
</div>
</div>
@@ -927,7 +1411,7 @@ class ExportFactory:
_exporters = {
'markdown': MarkdownExporter,
'doc': DocxExporter,
'doc': DocxLLMExporter,
'excel': ExcelExporter,
'ppt': PptExporter,
'mindmap': MindmapExporter,
@@ -939,12 +1423,41 @@ class ExportFactory:
"""创建导出器实例"""
exporter_class = cls._exporters.get(export_type)
if exporter_class:
# doc 类型使用 LLM 导出器,不需要 task_data
if export_type == 'doc':
return exporter_class()
return exporter_class(task_data)
return None
@classmethod
def export(cls, export_type: str, task_data: Dict[str, Any], file_path: str) -> bool:
"""执行导出"""
# doc 和 ppt 类型使用 LLM 导出
if export_type == 'doc':
exporter = cls._exporters.get('doc')
if exporter:
return exporter().generate(task_data, file_path)
if export_type == 'ppt':
from .ppt_llm import PptLLMExporter
exporter = PptLLMExporter()
return exporter.generate(task_data, file_path)
if export_type == 'mindmap':
from .mindmap_llm import MindmapLLMExporter
exporter = MindmapLLMExporter()
return exporter.generate(task_data, file_path)
if export_type == 'excel':
from .xlsx_llm import XlsxLLMExporter
exporter = XlsxLLMExporter()
return exporter.generate(task_data, file_path)
if export_type == 'markdown':
exporter = MarkdownLLMExporter()
return exporter.generate(task_data, file_path)
# 普通导出器
exporter = cls.create(export_type, task_data)
if exporter:
return exporter.generate(file_path)

View File

@@ -0,0 +1,434 @@
"""
Word 文档 LLM 报告导出器
调用大模型生成专业的任务执行报告
"""
import json
import os
import re
from datetime import datetime
from typing import Dict, Any, Optional
class DocxLLMExporter:
"""Word 文档 LLM 报告导出器 - 调用大模型生成报告"""
# LLM 配置(从 config.yaml 加载)
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
# Prompt 模板
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和报告分析师。你的任务是将以下任务执行数据生成一份详细、专业、结构化的执行报告。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 报告要求
请生成一份完整的任务执行报告,包含以下章节:
### 1. 执行摘要
用 2-3 句话概括本次任务的整体执行情况。
### 2. 任务概述
- 任务背景与目标
- 任务范围与边界
### 3. 任务规划分析
- 任务拆解的合理性
- 智能体角色分配的优化建议
- 工作流程设计
### 4. 执行过程回顾
- 各阶段的完成情况
- 关键决策点
- 遇到的问题及解决方案
### 5. 成果产出分析
- 产出物的质量评估
- 产出与预期目标的匹配度
### 6. 团队协作分析
- 智能体之间的协作模式
- 信息传递效率
### 7. 质量评估
- 整体完成质量评分1-10分
- 各维度的具体评分及理由
### 8. 经验教训与改进建议
- 成功经验
- 存在的问题与不足
- 改进建议
---
## 输出格式要求
- 使用 Markdown 格式输出
- 语言:简体中文
- 适当使用列表、表格增强可读性
- 报告长度必须达到 4000-6000 字,每个章节都要详细展开,不要遗漏任何章节
- 每个章节的内容要充实,提供具体的分析和建议
- 注意:所有加粗标记必须成对出现,如 **文本**,不要单独使用 ** 或缺少结束标记
- 禁止使用 mermaid、graph TD、flowchart 等图表代码,如果需要描述流程请用纯文字描述
- 不要生成附录章节(如有关键参数对照表、工艺流程图等),如果确实需要附录再生成
- 不要在报告中显示"报告总字数"这样的统计信息
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
# 尝试多个可能的配置文件路径
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成 Word 文档(调用 LLM 生成报告)"""
try:
# 1. 准备数据
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
# 2. 提取参与智能体(从 task_outline 的 Collaboration Process 中提取)
agents = self._extract_agents(task_outline)
# 3. 过滤 agent_scores只保留参与当前任务的智能体评分
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
# 4. 格式化数据为 JSON 字符串
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
# 5. 构建 Prompt
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
# 6. 调用 LLM 生成报告
print("正在调用大模型生成报告...")
report_content = self._call_llm(prompt)
if not report_content:
print("LLM 生成报告失败")
return False
# 7. 清理报告内容:去掉开头的"任务执行报告"标题(如果存在)
report_content = self._clean_report_title(report_content)
print(f"报告生成成功,长度: {len(report_content)} 字符")
# 8. 将 Markdown 转换为 Word 文档
self._save_as_word(report_content, file_path)
return True
except Exception as e:
print(f"Word LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _clean_report_title(self, content: str) -> str:
"""清理报告开头的重复标题"""
lines = content.split('\n')
if not lines:
return content
# 检查第一行是否是"任务执行报告"
first_line = lines[0].strip()
if first_line == '任务执行报告' or first_line == '# 任务执行报告':
# 去掉第一行
lines = lines[1:]
# 去掉可能的空行
while lines and not lines[0].strip():
lines.pop(0)
return '\n'.join(lines)
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
# 只保留在 agents 列表中的智能体评分
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {
'aspectList': aspect_list,
'agentScores': filtered_scores
}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成报告"""
try:
import openai
# 验证配置
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
# 配置 OpenAI 客户端
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
# 调用 API
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=12000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _save_as_word(self, markdown_content: str, file_path: str):
"""将 Markdown 内容保存为 Word 文档"""
try:
from docx import Document
from docx.shared import Pt, Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
doc = Document()
# 提取文档标题(从第一个 # 标题获取)
lines = markdown_content.split('\n')
first_title = None
content_start = 0
for i, line in enumerate(lines):
line = line.strip()
if line.startswith('# '):
first_title = line[2:].strip()
content_start = i + 1
break
# 添加文档标题
if first_title:
title = doc.add_heading(first_title, level=0)
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 解析剩余的 Markdown 内容
remaining_content = '\n'.join(lines[content_start:])
self._parse_markdown_to_doc(remaining_content, doc)
# 添加时间戳
doc.add_paragraph(f"\n\n导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
doc.save(file_path)
print(f"Word 文档已保存: {file_path}")
except ImportError:
print("请安装 python-docx 库: pip install python-docx")
raise
except Exception as e:
print(f"保存 Word 文档失败: {e}")
raise
def _parse_markdown_to_doc(self, markdown_content: str, doc):
"""解析 Markdown 内容并添加到 Word 文档"""
lines = markdown_content.split('\n')
i = 0
table_rows = []
in_table = False
while i < len(lines):
line = lines[i].rstrip()
# 空行处理
if not line:
in_table = False
if table_rows:
self._add_table_to_doc(table_rows, doc)
table_rows = []
i += 1
continue
# 表格分隔线检测(跳过 |---| 或 |:---| 等格式的行)
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|') and '---' in stripped:
i += 1
continue
# 表格检测:检查是否是表格行
if '|' in line and line.strip().startswith('|'):
# 收集表格行
cells = [cell.strip() for cell in line.split('|')[1:-1]]
if cells and any(cells): # 跳过空行
table_rows.append(cells)
in_table = True
i += 1
continue
else:
# 如果之前在表格中,现在不是表格行了,添加表格
if in_table and table_rows:
self._add_table_to_doc(table_rows, doc)
table_rows = []
in_table = False
# 标题处理
if line.startswith('### '):
doc.add_heading(line[4:].strip(), level=3)
elif line.startswith('## '):
doc.add_heading(line[3:].strip(), level=1)
elif line.startswith('# '):
doc.add_heading(line[2:].strip(), level=0)
# 无序列表处理(去掉 • 或 - 符号)
elif line.startswith('- ') or line.startswith('* ') or line.startswith(''):
# 去掉列表符号,保留内容
text = line[2:].strip() if line.startswith(('- ', '* ')) else line[1:].strip()
self._add_formatted_paragraph(text, doc, 'List Bullet')
# 普通段落(处理加粗)
else:
# 使用格式化方法处理加粗
self._add_formatted_paragraph(line, doc)
i += 1
# 处理最后的表格
if table_rows:
self._add_table_to_doc(table_rows, doc)
def _add_table_to_doc(self, table_rows: list, doc):
"""将表格行添加到 Word 文档"""
if not table_rows:
return
# 创建表格
table = doc.add_table(rows=len(table_rows), cols=len(table_rows[0]))
table.style = 'Light Grid Accent 1'
for i, row_data in enumerate(table_rows):
row = table.rows[i]
for j, cell_text in enumerate(row_data):
cell = row.cells[j]
cell.text = ''
# 处理加粗
parts = re.split(r'(\*\*.+?\*\*)', cell_text)
for part in parts:
if part.startswith('**') and part.endswith('**'):
run = cell.paragraphs[0].add_run(part[2:-2])
run.bold = True
elif part:
cell.paragraphs[0].add_run(part)
def _add_formatted_paragraph(self, text: str, doc, style: str = None):
"""添加带格式的段落"""
# 处理加粗文本
para = doc.add_paragraph(style=style)
# 分割文本处理加粗
parts = re.split(r'(\*\*.+?\*\*)', text)
for part in parts:
if part.startswith('**') and part.endswith('**'):
# 加粗文本
run = para.add_run(part[2:-2])
run.bold = True
else:
para.add_run(part)

View File

@@ -0,0 +1,269 @@
"""
信息图 LLM 报告导出器
调用大模型生成信息图展示的格式化内容
"""
import json
import os
import re
from datetime import datetime
from typing import Dict, Any, Optional
class InfographicLLMExporter:
"""信息图 LLM 报告导出器 - 调用大模型生成信息图内容"""
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
PROMPT_TEMPLATE = """你是一位专业的可视化设计师和数据分析专家。你的任务是将以下任务执行数据生成适合信息图展示的格式化内容。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 信息图内容要求
请生成以下信息图展示内容JSON 格式输出):
{{
"summary": "执行摘要 - 2-3句话概括整体执行情况",
"highlights": [
"亮点1 - 取得的显著成果",
"亮点2 - 关键突破或创新",
"亮点3 - 重要的里程碑"
],
"statistics": {{
"total_steps": 执行总步骤数,
"agent_count": 参与智能体数量,
"completion_rate": 完成率(百分比),
"quality_score": 质量评分(1-10)
}},
"key_insights": [
"关键洞察1 - 从执行过程中总结的洞见",
"关键洞察2 - 值得关注的趋势或模式",
"关键洞察3 - 对未来工作的建议"
],
"timeline": [
{{"step": "步骤名称", "status": "完成/进行中/未完成", "key_result": "关键产出"}},
...
],
"agent_performance": [
{{"name": "智能体名称", "score": 评分, "contribution": "主要贡献"}},
...
]
}}
---
## 输出要求
- 输出必须是有效的 JSON 格式
- 语言:简体中文
- 所有字符串值使用中文
- statistics 中的数值必须是整数或浮点数
- 确保 JSON 格式正确,不要有语法错误
- 不要输出 JSON 之外的任何内容
- **重要**"执行结果"rehearsal_log只是参考数据用于帮助你分析整体执行情况生成摘要、亮点、统计数据等。**不要在任何输出字段中直接复制或输出原始执行结果数据**,而应该对这些数据进行分析和提炼,生成适合信息图展示的格式化内容。
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any]) -> Dict[str, Any]:
"""生成信息图内容(调用 LLM 生成)"""
try:
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
agents = self._extract_agents(task_outline)
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
print("正在调用大模型生成信息图内容...")
llm_result = self._call_llm(prompt)
if not llm_result:
print("LLM 生成信息图内容失败")
return None
infographic_data = self._parse_llm_result(llm_result)
if not infographic_data:
print("解析 LLM 结果失败")
return None
print(f"信息图内容生成成功")
return infographic_data
except Exception as e:
print(f"信息图 LLM 生成失败: {e}")
import traceback
traceback.print_exc()
return None
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {
'aspectList': aspect_list,
'agentScores': filtered_scores
}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成内容"""
try:
import openai
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=8000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _parse_llm_result(self, llm_result: str) -> Optional[Dict[str, Any]]:
"""解析 LLM 返回的 JSON 字符串"""
try:
json_str = llm_result.strip()
if json_str.startswith("```json"):
json_str = json_str[7:]
if json_str.startswith("```"):
json_str = json_str[3:]
if json_str.endswith("```"):
json_str = json_str[:-3]
json_str = json_str.strip()
return json.loads(json_str)
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
print(f"原始结果: {llm_result[:500]}...")
return None
except Exception as e:
print(f"解析失败: {e}")
return None

View File

@@ -0,0 +1,315 @@
"""
Markdown LLM 报告导出器
调用大模型生成专业的任务执行报告Markdown 格式)
"""
import json
import os
import re
from datetime import datetime
from typing import Dict, Any, Optional
class MarkdownLLMExporter:
"""Markdown LLM 报告导出器 - 调用大模型生成报告"""
# LLM 配置(从 config.yaml 加载)
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
# Prompt 模板
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和报告分析师。你的任务是将以下任务执行数据生成一份详细、专业、结构化的执行报告。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 报告要求
请生成一份完整的任务执行报告,包含以下章节:
### 1. 执行摘要
用 2-3 句话概括本次任务的整体执行情况。
### 2. 任务概述
- 任务背景与目标
- 任务范围与边界
### 3. 任务规划分析
- 任务拆解的合理性
- 智能体角色分配的优化建议
- 工作流程设计
### 4. 执行过程回顾
- 各阶段的完成情况
- 关键决策点
- 遇到的问题及解决方案
### 5. 成果产出分析
- 产出物的质量评估
- 产出与预期目标的匹配度
### 6. 团队协作分析
- 智能体之间的协作模式
- 信息传递效率
### 7. 质量评估
- 整体完成质量评分1-10分
- 各维度的具体评分及理由
### 8. 经验教训与改进建议
- 成功经验
- 存在的问题与不足
- 改进建议
---
## 输出格式要求
- 使用 Markdown 格式输出
- 语言:简体中文
- 适当使用列表、表格增强可读性
- 报告长度必须达到 4000-6000 字,每个章节都要详细展开,不要遗漏任何章节
- 每个章节的内容要充实,提供具体的分析和建议
- 注意:所有加粗标记必须成对出现,如 **文本**,不要单独使用 ** 或缺少结束标记
- 禁止使用 mermaid、graph TD、flowchart 等图表代码,如果需要描述流程请用纯文字描述
- 不要生成附录章节(如有关键参数对照表、工艺流程图等),如果确实需要附录再生成
- 不要在报告中显示"报告总字数"这样的统计信息
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
# 尝试多个可能的配置文件路径
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成 Markdown 文件(调用 LLM 生成报告)"""
try:
# 1. 准备数据
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
# 2. 提取参与智能体(从 task_outline 的 Collaboration Process 中提取)
agents = self._extract_agents(task_outline)
# 3. 过滤 agent_scores只保留参与当前任务的智能体评分
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
# 4. 格式化数据为 JSON 字符串
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
# 5. 构建 Prompt
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
# 6. 调用 LLM 生成报告
print("正在调用大模型生成 Markdown 报告...")
report_content = self._call_llm(prompt)
if not report_content:
print("LLM 生成报告失败")
return False
# 7. 清理报告内容:去掉开头的重复标题(如果存在)
report_content = self._clean_report_title(report_content)
print(f"Markdown 报告生成成功,长度: {len(report_content)} 字符")
# 8. 保存为 Markdown 文件
self._save_as_markdown(report_content, task_name, file_path)
return True
except Exception as e:
print(f"Markdown LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _clean_report_title(self, content: str) -> str:
"""清理报告开头的重复标题"""
lines = content.split('\n')
if not lines:
return content
# 检查第一行是否是"任务执行报告"
first_line = lines[0].strip()
if first_line == '任务执行报告' or first_line == '# 任务执行报告':
# 去掉第一行
lines = lines[1:]
# 去掉可能的空行
while lines and not lines[0].strip():
lines.pop(0)
return '\n'.join(lines)
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
# 只保留在 agents 列表中的智能体评分
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {
'aspectList': aspect_list,
'agentScores': filtered_scores
}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成报告"""
try:
import openai
# 验证配置
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
# 配置 OpenAI 客户端
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
# 调用 API
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=12000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _save_as_markdown(self, markdown_content: str, task_name: str, file_path: str):
"""将 Markdown 内容保存为文件"""
try:
# 在内容前添加标题(如果还没有的话)
if not markdown_content.strip().startswith('# '):
markdown_content = f"# {task_name}\n\n{markdown_content}"
# 添加时间戳
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
footer = f"\n\n---\n*导出时间: {timestamp}*"
# 如果内容末尾没有明显的分隔符,添加分隔符
if not markdown_content.rstrip().endswith('---'):
markdown_content = markdown_content.rstrip() + footer
else:
# 已有分隔符,在它之前添加时间戳
markdown_content = markdown_content.rstrip() + footer
with open(file_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"Markdown 文件已保存: {file_path}")
except Exception as e:
print(f"保存 Markdown 文件失败: {e}")
raise

View File

@@ -0,0 +1,374 @@
"""
思维导图 LLM 导出器
调用大模型生成专业的任务执行思维导图
"""
import json
import os
from datetime import datetime
from typing import Dict, Any
class MindmapLLMExporter:
"""思维导图 LLM 导出器 - 调用大模型生成思维导图"""
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和可视化设计师。你的任务是将以下任务执行数据生成一份结构清晰、层次分明的思维导图。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 思维导图要求
请生成一份完整的思维导图,包含以下核心分支:
### 1. 任务概述
- 任务背景与目标
- 任务范围
### 2. 任务规划
- 任务拆解
- 智能体角色分配
- 工作流程设计
### 3. 执行过程
- 各阶段的完成情况
- 关键决策点
- 遇到的问题及解决方案
### 4. 成果产出
- 产出物列表
- 质量评估
### 5. 团队协作
- 智能体之间的协作模式
- 信息传递效率
### 6. 质量评估
- 整体评分
- 各维度评分
### 7. 经验与改进
- 成功经验
- 改进建议
---
## 输出格式要求
请直接输出 JSON 格式,不要包含任何 Markdown 标记。JSON 结构如下:
```json
{{
"title": "思维导图标题",
"root": "中心主题",
"branches": [
{{
"name": "分支主题名称",
"children": [
{{
"name": "子主题名称",
"children": [
{{"name": "具体内容1"}},
{{"name": "具体内容2"}}
]
}}
]
}},
{{
"name": "另一个分支主题",
"children": [
{{"name": "子主题A"}},
{{"name": "子主题B"}}
]
}}
]
}}
```
注意:
- 思维导图层次分明,每个分支至少有 2-3 层深度
- 内容简洁,每个节点字数控制在 50-100 字以内
- 使用简体中文
- 确保 JSON 格式正确,不要有语法错误
- 不要生成"报告总字数"这样的统计信息
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成思维导图(调用 LLM 生成)"""
try:
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
agents = self._extract_agents(task_outline)
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
print("正在调用大模型生成思维导图...")
mindmap_json_str = self._call_llm(prompt)
if not mindmap_json_str:
print("LLM 生成思维导图失败")
return False
print(f"思维导图内容生成成功,长度: {len(mindmap_json_str)} 字符")
self._create_mindmap_from_json(mindmap_json_str, file_path, task_name)
return True
except Exception as e:
print(f"思维导图 LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {'aspectList': aspect_list, 'agentScores': filtered_scores}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成思维导图"""
try:
import openai
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=8000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _fix_json_string(self, json_str: str) -> str:
"""修复常见的 JSON 语法错误"""
import re
json_str = json_str.strip()
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
json_str = re.sub(r'""', '"', json_str)
single_quotes = re.findall(r"'[^']*'", json_str)
for sq in single_quotes:
if '"' not in sq:
json_str = json_str.replace(sq, sq.replace("'", '"'))
trailing_comma_pattern = re.compile(r',(\s*[}\]])')
json_str = trailing_comma_pattern.sub(r'\1', json_str)
unquoted_key_pattern = re.compile(r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:')
def fix_unquoted_key(match):
prefix = match.group(1)
key = match.group(2)
return f'{prefix}"{key}":'
json_str = unquoted_key_pattern.sub(fix_unquoted_key, json_str)
unquoted_value_pattern = re.compile(r':\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*([,}\]])')
def replace_unquoted(match):
value = match.group(1)
end = match.group(2)
if value.lower() in ('true', 'false', 'null'):
return f': {value}{end}'
else:
return f': "{value}"{end}'
json_str = unquoted_value_pattern.sub(replace_unquoted, json_str)
print(f"JSON 修复后长度: {len(json_str)} 字符")
return json_str
def _create_mindmap_from_json(self, json_str: str, file_path: str, task_name: str):
"""从 JSON 字符串创建思维导图Markdown 格式)"""
try:
json_str = json_str.strip()
if '```json' in json_str:
json_str = json_str.split('```json')[1].split('```')[0]
elif '```' in json_str:
json_str = json_str.split('```')[1].split('```')[0]
json_str = self._fix_json_string(json_str)
mindmap_data = json.loads(json_str)
# 生成 Markdown 格式
markdown_content = self._generate_markdown(mindmap_data, task_name)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"思维导图已保存: {file_path}")
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
raise
except Exception as e:
print(f"创建思维导图失败: {e}")
raise
def _generate_markdown(self, mindmap_data: dict, task_name: str) -> str:
"""将思维导图数据转换为 Markdown 格式markmap 兼容)"""
lines = []
# 直接使用无序列表,不需要标题和代码块包裹
# 根节点使用 2 个空格缩进
lines.append(f" - {task_name}")
branches = mindmap_data.get('branches', [])
for branch in branches:
# 一级分支使用 4 个空格缩进(比根节点多 2 个空格,是根节点的子节点)
self._add_branch(branch, lines, indent=4)
return '\n'.join(lines)
def _add_branch(self, branch: dict, lines: list, indent: int = 2):
"""递归添加分支到思维导图"""
name = branch.get('name', '')
if not name:
return
prefix = ' ' * indent
lines.append(f"{prefix}- {name}")
children = branch.get('children', [])
if children:
for child in children:
if isinstance(child, dict):
self._add_child_node(child, lines, indent + 2)
else:
lines.append(f"{' ' * (indent + 2)}- {child}")
def _add_child_node(self, child: dict, lines: list, indent: int):
"""添加子节点"""
name = child.get('name', '')
if not name:
return
prefix = ' ' * indent
lines.append(f"{prefix}- {name}")
children = child.get('children', [])
if children:
for grandchild in children:
if isinstance(grandchild, dict):
self._add_child_node(grandchild, lines, indent + 2)
else:
lines.append(f"{' ' * (indent + 2)}- {grandchild}")

View File

@@ -0,0 +1,772 @@
"""
PowerPoint 演示文稿 LLM 导出器
调用大模型生成专业的任务执行演示文稿
"""
import json
import os
from datetime import datetime
from typing import Dict, Any
from pptx import Presentation
from pptx.util import Inches, Pt
from pptx.enum.text import PP_ALIGN
from pptx.chart.data import CategoryChartData
from pptx.enum.chart import XL_CHART_TYPE
class PptLLMExporter:
"""PowerPoint 演示文稿 LLM 导出器 - 调用大模型生成 PPT"""
# LLM 配置(从 config.yaml 加载)
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
# Prompt 模板 - 与 docx_llm.py 内容对齐
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和演示文稿设计师。你的任务是将以下任务执行数据生成一份详细、专业的 PowerPoint 演示文稿内容。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{execution_results}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 演示文稿要求
请生成一份完整的演示文稿包含以下幻灯片共11页
1. 封面页标题、用户ID、日期
2. 目录页5个主要章节其他作为小点
3. 执行摘要:任务整体执行情况概述
4. 任务概述:任务背景与目标、任务范围与边界
5. 任务规划分析:任务拆解的合理性、智能体角色分配的优化建议、工作流程设计
6. 执行过程回顾:各阶段的完成情况、关键决策点、遇到的问题及解决方案
7. 成果产出分析:产出物的质量评估、产出与预期目标的匹配度
8. 团队协作分析:智能体之间的协作模式、信息传递效率
9. 质量评估整体完成质量评分1-10分、各维度的具体评分及理由
10. 经验教训与改进建议:成功经验、存在的问题与不足、改进建议
11. 结束页:感谢语
---
## 输出格式要求
请直接输出 JSON 格式,不要包含任何 Markdown 标记。JSON 结构如下:
```json
{{
"title": "{task_name}",
"user_id": "{user_id}",
"date": "{date}",
"slides": [
{{
"type": "title",
"title": "主标题",
"user_id": "用户ID",
"date": "日期"
}},
{{
"type": "toc",
"title": "目录",
"sections": [
{{"title": "一、执行摘要", "subs": []}},
{{"title": "二、任务概述与规划", "subs": ["任务背景与目标", "任务范围与边界", "任务拆解", "智能体角色"]}},
{{"title": "三、执行过程与成果", "subs": ["阶段完成情况", "关键决策点", "产出质量", "目标匹配度"]}},
{{"title": "四、团队协作与质量评估", "subs": ["协作模式", "信息传递", "质量评分"]}},
{{"title": "五、经验建议与总结", "subs": ["成功经验", "问题与不足", "改进建议"]}}
]
}},
{{
"type": "content",
"title": "执行摘要",
"bullets": ["要点1", "要点2", "要点3"]
}},
{{
"type": "content",
"title": "任务概述",
"bullets": ["任务背景与目标", "任务范围与边界"]
}},
{{
"type": "content",
"title": "任务规划分析",
"bullets": ["任务拆解的合理性", "智能体角色分配", "工作流程设计"]
}},
{{
"type": "content",
"title": "执行过程回顾",
"bullets": ["各阶段完成情况", "关键决策点", "遇到的问题及解决方案"]
}},
{{
"type": "content",
"title": "成果产出分析",
"bullets": ["产出物质量评估", "与预期目标匹配度"]
}},
{{
"type": "content",
"title": "团队协作分析",
"bullets": ["智能体协作模式", "信息传递效率"]
}},
{{
"type": "content",
"title": "质量评估",
"bullets": ["整体评分1-10分", "各维度评分及理由"]
}},
{{
"type": "content",
"title": "经验教训与改进建议",
"bullets": ["成功经验", "存在的问题与不足", "改进建议"]
}},
{{
"type": "ending",
"title": "感谢聆听"
}},
{{
"type": "table",
"title": "质量评分表",
"headers": ["维度", "评分", "说明"],
"rows": [["整体质量", "8.5", "表现优秀"], ["协作效率", "7.5", "有待提升"]]
}},
{{
"type": "chart",
"title": "任务完成进度",
"chart_type": "bar",
"categories": ["规划阶段", "执行阶段", "评估阶段"],
"series": [
{{"name": "完成度", "values": [100, 85, 90]}}
]
}}
]
}}
```
**重要说明:**
1. **幻灯片顺序**slides 数组中的幻灯片顺序必须严格按照以下顺序:
- 第1页type="title" 封面页
- 第2页type="toc" 目录页
- 第3-10页type="content" 内容页8页左右
- 第11-14页type="table""chart" 表格/图表页
- 最后一页type="ending" 结束页
2. **每张幻灯片必须包含完整的字段**
- `content` 类型必须有 `title` 和 `bullets` 数组
- `table` 类型必须有 `title`、`headers` 数组和 `rows` 数组
- `chart` 类型必须有 `title`、`chart_type`、`categories` 数组和 `series` 数组
- `ending` 类型必须有 `title`
3. **禁止出现"单击此处添加标题"**:所有幻灯片都必须填写完整内容,不能留空
注意:
- 幻灯片数量为11-15 页
- 每页内容详细一点,使用要点式呈现,需要表格或者图表增强可读性
- 不要生成"报告总字数"这样的统计信息
- 所有文字使用简体中文
- 根据实际任务数据生成内容,不是编造
- **禁止使用中文引号「」『』""** ,所有引号必须是英文双引号 ""
- user_id 填写:{user_id}
- date 填写:{date}
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成 PowerPoint 演示文稿(调用 LLM 生成)"""
try:
task_name = task_data.get('task_name', '未命名任务')
task_content = task_data.get('task_content', '')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
user_id = task_data.get('user_id', '')
date = task_data.get('date', '')
agents = self._extract_agents(task_outline)
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_content=task_content,
task_outline=task_outline_str,
execution_results=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str,
user_id=user_id,
date=date
)
print("正在调用大模型生成 PPT 内容...")
ppt_json_str = self._call_llm(prompt)
if not ppt_json_str:
print("LLM 生成 PPT 失败")
return False
print(f"PPT 内容生成成功,长度: {len(ppt_json_str)} 字符")
self._create_ppt_from_json(ppt_json_str, file_path, task_name)
return True
except Exception as e:
print(f"PPT LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _extract_agents(self, task_outline: Any) -> list:
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {'aspectList': aspect_list, 'agentScores': filtered_scores}
return filtered
def _call_llm(self, prompt: str) -> str:
try:
import openai
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=8000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _fix_json_string(self, json_str: str) -> str:
"""修复常见的 JSON 语法错误"""
import re
json_str = json_str.strip()
# 首先尝试提取 JSON 代码块
if '```json' in json_str:
json_str = json_str.split('```json')[1].split('```')[0]
elif '```' in json_str:
# 检查是否有结束标记
parts = json_str.split('```')
if len(parts) >= 3:
json_str = parts[1]
else:
# 没有结束标记,可能是代码块内容
json_str = parts[1] if len(parts) > 1 else json_str
json_str = json_str.strip()
# 关键修复:处理中文双引号包含英文双引号的情况
# 例如:"工作流程采用"提出-评估-改进"的模式"
# 需要:找到中文引号对,去除它们,同时转义内部的英文引号
import re
# 中文引号 Unicode
CHINESE_LEFT = '\u201c' # "
CHINESE_RIGHT = '\u201d' # "
# 使用正则匹配中文引号对之间的内容,并处理内部的英文引号
# 匹配 "内容" 格式(中文引号对)
pattern = re.compile(r'(\u201c)(.*?)(\u201d)')
def replace_chinese_quotes(match):
content = match.group(2)
# 将内部未转义的英文双引号转义
content = re.sub(r'(?<!\\)"', r'\\"', content)
return content
json_str = pattern.sub(replace_chinese_quotes, json_str)
# 现在移除所有中文引号字符(已经被处理过了)
json_str = json_str.replace(CHINESE_LEFT, '')
json_str = json_str.replace(CHINESE_RIGHT, '')
json_str = json_str.replace('\u2018', "'") # 中文左单引号 '
json_str = json_str.replace('\u2019', "'") # 中文右单引号 '
# 移除行尾的逗号trailing comma
json_str = re.sub(r',\s*}', '}', json_str)
json_str = re.sub(r',\s*]', ']', json_str)
# 修复空字符串
json_str = re.sub(r'""', '"', json_str)
# 修复单引号
single_quotes = re.findall(r"'[^']*'", json_str)
for sq in single_quotes:
if '"' not in sq:
json_str = json_str.replace(sq, sq.replace("'", '"'))
# 移除 trailing comma
trailing_comma_pattern = re.compile(r',(\s*[}\]])')
json_str = trailing_comma_pattern.sub(r'\1', json_str)
# 修复未加引号的 key
unquoted_key_pattern = re.compile(r'([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:')
def fix_unquoted_key(match):
prefix = match.group(1)
key = match.group(2)
return f'{prefix}"{key}":'
json_str = unquoted_key_pattern.sub(fix_unquoted_key, json_str)
# 修复未加引号的值(但保留 true/false/null
unquoted_value_pattern = re.compile(r':\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*([,}\]])')
def replace_unquoted(match):
value = match.group(1)
end = match.group(2)
if value.lower() in ('true', 'false', 'null'):
return f': {value}{end}'
else:
return f': "{value}"{end}'
json_str = unquoted_value_pattern.sub(replace_unquoted, json_str)
# 新增:修复多余的逗号(连续逗号)
json_str = re.sub(r',,+,', ',', json_str)
# 新增修复缺少冒号的情况key 后面没有冒号)
# 匹配 "key" value 而不是 "key": value
json_str = re.sub(r'(\"[^\"]+\")\s+([\[{])', r'\1: \2', json_str)
# 新增:尝试修复换行符问题,将单个 \n 替换为 \\n
# 但这可能会导致问题,所以先注释掉
# json_str = json_str.replace('\n', '\\n')
# 新增:移除可能的 BOM 或不可见字符
json_str = json_str.replace('\ufeff', '')
print(f"JSON 修复后长度: {len(json_str)} 字符")
return json_str
def _create_ppt_from_json(self, json_str: str, file_path: str, task_name: str):
"""从 JSON 字符串创建 PPT直接创建不使用模板"""
try:
json_str = json_str.strip()
if '```json' in json_str:
json_str = json_str.split('```json')[1].split('```')[0]
elif '```' in json_str:
json_str = json_str.split('```')[1].split('```')[0]
json_str = self._fix_json_string(json_str)
# 尝试解析,失败时输出更多信息帮助调试
try:
replace_data = json.loads(json_str)
except json.JSONDecodeError as e:
# 输出错误位置附近的 JSON 内容帮助调试
print(f"JSON 解析失败: {e}")
print(f"错误位置: 第 {e.lineno} 行, 第 {e.colno}")
start = max(0, e.pos - 100)
end = min(len(json_str), e.pos + 100)
context = json_str[start:end]
print(f"错误上下文: ...{context}...")
raise
# 直接创建空白演示文稿,不使用模板
prs = Presentation()
prs.slide_width = Inches(13.333)
prs.slide_height = Inches(7.5)
slides_data = replace_data.get('slides', [])
# 分离 ending 幻灯片和其他幻灯片,确保 ending 在最后
ending_slides = [s for s in slides_data if s.get('type') == 'ending']
other_slides = [s for s in slides_data if s.get('type') != 'ending']
# 先创建其他幻灯片
for slide_data in other_slides:
slide_type = slide_data.get('type', 'content')
if slide_type == 'title':
self._add_title_slide(prs, slide_data)
elif slide_type == 'toc':
self._add_toc_slide(prs, slide_data)
elif slide_type == 'content':
self._add_content_slide(prs, slide_data)
elif slide_type == 'two_column':
self._add_two_column_slide(prs, slide_data)
elif slide_type == 'table':
self._add_table_slide(prs, slide_data)
elif slide_type == 'chart':
self._add_chart_slide(prs, slide_data)
# 最后创建 ending 幻灯片
for slide_data in ending_slides:
self._add_ending_slide(prs, slide_data)
prs.save(file_path)
print(f"PowerPoint 已保存: {file_path}")
except ImportError:
print("请安装 python-pptx 库: pip install python-pptx")
raise
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
raise
except Exception as e:
print(f"创建 PPT 失败: {e}")
raise
def _add_title_slide(self, prs, data):
"""添加封面页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 主标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(2.5), Inches(12.333), Inches(1.5))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '')
p.font.size = Pt(44)
p.font.bold = True
p.alignment = PP_ALIGN.CENTER
# 用户ID
user_id_box = slide.shapes.add_textbox(Inches(0.5), Inches(4.2), Inches(12.333), Inches(0.5))
tf = user_id_box.text_frame
p = tf.paragraphs[0]
p.text = f"用户ID: {data.get('user_id', '')}"
p.font.size = Pt(18)
p.alignment = PP_ALIGN.CENTER
# 日期
date_box = slide.shapes.add_textbox(Inches(0.5), Inches(4.8), Inches(12.333), Inches(0.5))
tf = date_box.text_frame
p = tf.paragraphs[0]
p.text = f"日期: {data.get('date', '')}"
p.font.size = Pt(18)
p.alignment = PP_ALIGN.CENTER
def _add_toc_slide(self, prs, data):
"""添加目录页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.4), Inches(12.333), Inches(0.8))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '目录')
p.font.size = Pt(36)
p.font.bold = True
# 目录内容 - 使用两栏布局
content_box = slide.shapes.add_textbox(Inches(0.5), Inches(1.5), Inches(12.333), Inches(5.5))
tf = content_box.text_frame
tf.word_wrap = True
sections = data.get('sections', [])
for i, section in enumerate(sections):
section_title = section.get('title', '')
subs = section.get('subs', [])
if i == 0:
p = tf.paragraphs[0]
else:
p = tf.add_paragraph()
# 大章节标题
p.text = section_title
p.level = 0
p.font.size = Pt(20)
p.font.bold = True
# 小点
for sub in subs:
p_sub = tf.add_paragraph()
p_sub.text = sub
p_sub.level = 1 # 次级缩进
p_sub.font.size = Pt(16)
def _add_ending_slide(self, prs, data):
"""添加结束页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 结束语
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(3), Inches(12.333), Inches(1.5))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '感谢聆听')
p.font.size = Pt(44)
p.font.bold = True
p.alignment = PP_ALIGN.CENTER
def _add_content_slide(self, prs, data):
"""添加内容页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状(避免出现"单击此处添加标题"
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.3), Inches(12.333), Inches(0.8))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '')
p.font.size = Pt(32)
p.font.bold = True
# 内容
content_box = slide.shapes.add_textbox(Inches(0.5), Inches(1.3), Inches(12.333), Inches(5.8))
tf = content_box.text_frame
tf.word_wrap = True
bullets = data.get('bullets', [])
for i, bullet in enumerate(bullets):
if i == 0:
p = tf.paragraphs[0]
else:
p = tf.add_paragraph()
p.text = bullet
p.level = 0
p.font.size = Pt(18)
def _add_two_column_slide(self, prs, data):
"""添加双栏内容页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.3), Inches(12.333), Inches(0.8))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '')
p.font.size = Pt(32)
p.font.bold = True
# 左侧
left_box = slide.shapes.add_textbox(Inches(0.5), Inches(1.3), Inches(5.8), Inches(5.8))
tf = left_box.text_frame
tf.word_wrap = True
left_title = data.get('left_title', '')
p = tf.paragraphs[0]
p.text = left_title
p.font.size = Pt(22)
p.font.bold = True
left_bullets = data.get('left_bullets', [])
for bullet in left_bullets:
p = tf.add_paragraph()
p.text = bullet
p.level = 1
p.font.size = Pt(16)
# 右侧
right_box = slide.shapes.add_textbox(Inches(7.0), Inches(1.3), Inches(5.8), Inches(5.8))
tf = right_box.text_frame
tf.word_wrap = True
right_title = data.get('right_title', '')
p = tf.paragraphs[0]
p.text = right_title
p.font.size = Pt(22)
p.font.bold = True
right_bullets = data.get('right_bullets', [])
for bullet in right_bullets:
p = tf.add_paragraph()
p.text = bullet
p.level = 1
p.font.size = Pt(16)
def _add_table_slide(self, prs, data):
"""添加表格页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.3), Inches(12), Inches(0.6))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '')
p.font.size = Pt(32)
p.font.bold = True
# 表格
headers = data.get('headers', [])
rows = data.get('rows', [])
if headers and rows:
x, y, cx, cy = Inches(0.5), Inches(1.5), Inches(12), Inches(3)
table = slide.shapes.add_table(len(rows) + 1, len(headers), x, y, cx, cy).table
# 表头
for i, header in enumerate(headers):
table.cell(0, i).text = header
# 数据
for row_idx, row_data in enumerate(rows):
for col_idx, cell_data in enumerate(row_data):
table.cell(row_idx + 1, col_idx).text = cell_data
def _add_chart_slide(self, prs, data):
"""添加图表页"""
slide_layout = prs.slide_layouts[6] # 空白布局
slide = prs.slides.add_slide(slide_layout)
# 删除默认的占位符形状
for shape in list(slide.shapes):
if shape.has_text_frame and shape.text_frame.text.strip() in ['单击此处添加标题', '单击此处添加内容', 'Click to add title', 'Click to add text']:
slide.shapes.remove(shape)
# 标题
title_box = slide.shapes.add_textbox(Inches(0.5), Inches(0.3), Inches(12.333), Inches(0.6))
tf = title_box.text_frame
p = tf.paragraphs[0]
p.text = data.get('title', '')
p.font.size = Pt(32)
p.font.bold = True
# 图表数据
chart_type_str = data.get('chart_type', 'bar').lower()
categories = data.get('categories', [])
series_list = data.get('series', [])
# 映射图表类型
chart_type_map = {
'bar': XL_CHART_TYPE.COLUMN_CLUSTERED,
'column': XL_CHART_TYPE.COLUMN_CLUSTERED,
'line': XL_CHART_TYPE.LINE,
'pie': XL_CHART_TYPE.PIE,
'饼图': XL_CHART_TYPE.PIE,
'柱状图': XL_CHART_TYPE.COLUMN_CLUSTERED,
'折线图': XL_CHART_TYPE.LINE,
}
chart_type = chart_type_map.get(chart_type_str, XL_CHART_TYPE.COLUMN_CLUSTERED)
# 创建图表数据
chart_data = CategoryChartData()
chart_data.categories = categories
for series in series_list:
series_name = series.get('name', '数据')
values = series.get('values', [])
chart_data.add_series(series_name, values)
# 添加图表
x, y, cx, cy = Inches(0.5), Inches(1.2), Inches(12.333), Inches(5.5)
chart = slide.shapes.add_chart(chart_type, x, y, cx, cy, chart_data).chart
# 兼容旧版本
def _load_llm_config_old(self):
self._load_llm_config()

View File

@@ -0,0 +1,481 @@
"""
Excel 文档 LLM 报告导出器
调用大模型生成专业的任务执行报告并保存为Excel格式
"""
import json
import os
from datetime import datetime
from typing import Dict, Any
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
except ImportError:
pass
class XlsxLLMExporter:
"""Excel 文档 LLM 报告导出器 - 调用大模型生成报告"""
LLM_CONFIG = {
'OPENAI_API_BASE': None,
'OPENAI_API_KEY': None,
'OPENAI_API_MODEL': None,
}
PROMPT_TEMPLATE = """你是一位专业的项目管理顾问和数据分析专家。你的任务是将以下任务执行数据生成一份详细、专业、结构化的执行报告。
## 任务基本信息
- 任务名称:{task_name}
## 任务大纲(规划阶段)
{task_outline}
## 执行结果
{rehearsal_log}
## 参与智能体
{agents}
## 智能体评分
{agent_scores}
---
## 报告要求
请生成一份完整的任务执行报告,包含以下章节:
### 1. 执行摘要
用 2-3 句话概括本次任务的整体执行情况。
### 2. 任务概述
- 任务背景与目标
- 任务范围与边界
### 3. 任务规划分析
- 任务拆解的合理性
- 智能体角色分配的优化建议
- 工作流程设计
### 4. 执行过程回顾
- 各阶段的完成情况
- 关键决策点
- 遇到的问题及解决方案
### 5. 成果产出分析
- 产出物的质量评估
- 产出与预期目标的匹配度
### 6. 团队协作分析
- 智能体之间的协作模式
- 信息传递效率
### 7. 质量评估
- 整体完成质量评分1-10分
- 各维度的具体评分及理由
### 8. 经验教训与改进建议
- 成功经验
- 存在的问题与不足
- 改进建议
---
## 输出格式要求
- 使用 Markdown 格式输出
- 语言:简体中文
- 适当使用列表、表格增强可读性
- 报告长度必须达到 3000-5000 字,每个章节都要详细展开,不要遗漏任何章节
- 每个章节的内容要充实,提供具体的分析和建议
- 注意:所有加粗标记必须成对出现,如 **文本**,不要单独使用 ** 或缺少结束标记
- 禁止使用 mermaid、graph TD、flowchart 等图表代码,如果需要描述流程请用纯文字描述
- 不要生成附录章节
- 不要在报告中显示"报告总字数"这样的统计信息
"""
def __init__(self):
self._load_llm_config()
def _load_llm_config(self):
"""从配置文件加载 LLM 配置"""
try:
import yaml
possible_paths = [
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config', 'config.yaml'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'backend', 'config', 'config.yaml'),
os.path.join(os.getcwd(), 'config', 'config.yaml'),
]
for config_path in possible_paths:
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if config:
self.LLM_CONFIG['OPENAI_API_BASE'] = config.get('OPENAI_API_BASE')
self.LLM_CONFIG['OPENAI_API_KEY'] = config.get('OPENAI_API_KEY')
self.LLM_CONFIG['OPENAI_API_MODEL'] = config.get('OPENAI_API_MODEL')
print(f"已加载 LLM 配置: {self.LLM_CONFIG['OPENAI_API_MODEL']}")
return
except Exception as e:
print(f"加载 LLM 配置失败: {e}")
def generate(self, task_data: Dict[str, Any], file_path: str) -> bool:
"""生成 Excel 文档(调用 LLM 生成报告)"""
try:
task_name = task_data.get('task_name', '未命名任务')
task_outline = task_data.get('task_outline')
rehearsal_log = task_data.get('rehearsal_log')
agent_scores = task_data.get('agent_scores')
agents = self._extract_agents(task_outline)
filtered_agent_scores = self._filter_agent_scores(agent_scores, agents)
task_outline_str = json.dumps(task_outline, ensure_ascii=False, indent=2) if task_outline else ''
rehearsal_log_str = json.dumps(rehearsal_log, ensure_ascii=False, indent=2) if rehearsal_log else ''
agents_str = ', '.join(agents) if agents else ''
agent_scores_str = json.dumps(filtered_agent_scores, ensure_ascii=False, indent=2) if filtered_agent_scores else ''
prompt = self.PROMPT_TEMPLATE.format(
task_name=task_name,
task_outline=task_outline_str,
rehearsal_log=rehearsal_log_str,
agents=agents_str,
agent_scores=agent_scores_str
)
print("正在调用大模型生成 Excel 报告...")
report_content = self._call_llm(prompt)
if not report_content:
print("LLM 生成报告失败")
return False
report_content = self._clean_report_title(report_content)
print(f"报告生成成功,长度: {len(report_content)} 字符")
self._save_as_excel(report_content, file_path, task_name)
return True
except Exception as e:
print(f"Excel LLM 导出失败: {e}")
import traceback
traceback.print_exc()
return False
def _clean_report_title(self, content: str) -> str:
"""清理报告开头的重复标题"""
lines = content.split('\n')
if not lines:
return content
first_line = lines[0].strip()
if first_line == '任务执行报告' or first_line == '# 任务执行报告':
lines = lines[1:]
while lines and not lines[0].strip():
lines.pop(0)
return '\n'.join(lines)
def _extract_agents(self, task_outline: Any) -> list:
"""从 task_outline 中提取参与智能体列表"""
agents = set()
if not task_outline or not isinstance(task_outline, dict):
return []
collaboration_process = task_outline.get('Collaboration Process', [])
if not collaboration_process or not isinstance(collaboration_process, list):
return []
for step in collaboration_process:
if isinstance(step, dict):
agent_selection = step.get('AgentSelection', [])
if isinstance(agent_selection, list):
for agent in agent_selection:
if agent:
agents.add(agent)
return list(agents)
def _filter_agent_scores(self, agent_scores: Any, agents: list) -> dict:
"""过滤 agent_scores只保留参与当前任务的智能体评分"""
if not agent_scores or not isinstance(agent_scores, dict):
return {}
if not agents:
return {}
filtered = {}
for step_id, step_data in agent_scores.items():
if not isinstance(step_data, dict):
continue
aspect_list = step_data.get('aspectList', [])
agent_scores_data = step_data.get('agentScores', {})
if not agent_scores_data:
continue
filtered_scores = {}
for agent_name, scores in agent_scores_data.items():
if agent_name in agents and isinstance(scores, dict):
filtered_scores[agent_name] = scores
if filtered_scores:
filtered[step_id] = {
'aspectList': aspect_list,
'agentScores': filtered_scores
}
return filtered
def _call_llm(self, prompt: str) -> str:
"""调用大模型 API 生成报告"""
try:
import openai
if not self.LLM_CONFIG['OPENAI_API_KEY']:
print("错误: OPENAI_API_KEY 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_BASE']:
print("错误: OPENAI_API_BASE 未配置")
return ""
if not self.LLM_CONFIG['OPENAI_API_MODEL']:
print("错误: OPENAI_API_MODEL 未配置")
return ""
client = openai.OpenAI(
api_key=self.LLM_CONFIG['OPENAI_API_KEY'],
base_url=self.LLM_CONFIG['OPENAI_API_BASE']
)
response = client.chat.completions.create(
model=self.LLM_CONFIG['OPENAI_API_MODEL'],
messages=[
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=10000,
)
if response and response.choices:
return response.choices[0].message.content
return ""
except ImportError:
print("请安装 openai 库: pip install openai")
return ""
except Exception as e:
print(f"调用 LLM 失败: {e}")
return ""
def _save_as_excel(self, markdown_content: str, file_path: str, task_name: str):
"""将 Markdown 内容保存为 Excel 文档"""
try:
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
wb = Workbook()
ws = wb.active
ws.title = "任务执行报告"
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font = Font(bold=True, color="FFFFFF", size=12)
title_font = Font(bold=True, size=14)
section_font = Font(bold=True, size=11)
normal_font = Font(size=10)
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
ws.column_dimensions['A'].width = 20
ws.column_dimensions['B'].width = 80
row = 1
ws[f'A{row}'] = task_name
ws[f'A{row}'].font = Font(bold=True, size=16)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
ws[f'A{row}'] = f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
ws[f'A{row}'].font = Font(size=9, italic=True)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 2
lines = markdown_content.split('\n')
current_section = ""
table_data = []
in_table = False
for line in lines:
line = line.rstrip()
if not line:
if in_table and table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
in_table = False
continue
stripped = line.strip()
if stripped.startswith('|') and stripped.endswith('|') and '---' in stripped:
continue
if '|' in line and line.strip().startswith('|'):
cells = [cell.strip() for cell in line.split('|')[1:-1]]
if cells and any(cells):
table_data.append(cells)
in_table = True
continue
else:
if in_table and table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
in_table = False
if line.startswith('### '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
current_section = line[4:].strip()
ws[f'A{row}'] = current_section
ws[f'A{row}'].font = Font(bold=True, size=12, color="4472C4")
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('## '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
section_title = line[2:].strip()
ws[f'A{row}'] = section_title
ws[f'A{row}'].font = Font(bold=True, size=13)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('# '):
pass
elif line.startswith('#### '):
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
table_data = []
current_section = line[5:].strip()
ws[f'A{row}'] = current_section
ws[f'A{row}'].font = Font(bold=True, size=11, color="4472C4")
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center')
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('- ') or line.startswith('* ') or line.startswith(''):
text = line[2:].strip() if line.startswith(('- ', '* ')) else line[1:].strip()
text = self._clean_markdown(text)
ws[f'A{row}'] = "" + text
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
elif line.startswith('**') and '**:' in line:
parts = line.split(':', 1)
if len(parts) == 2:
key = self._clean_markdown(parts[0])
value = self._clean_markdown(parts[1])
ws[f'A{row}'] = f"{key}: {value}"
ws[f'A{row}'].font = Font(bold=True, size=10)
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
else:
clean_line = self._clean_markdown(line)
if clean_line:
ws[f'A{row}'] = clean_line
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
row += 1
if table_data:
row = self._write_table_to_excel(ws, table_data, row, header_fill, header_font, thin_border, normal_font)
ws.column_dimensions['B'].width = 80
# 设置自适应行高
for r in range(1, row + 1):
ws.row_dimensions[r].bestFit = True
for col in ['A', 'B']:
for r in range(1, row + 1):
cell = ws[f'{col}{r}']
if cell.border is None or cell.border == Border():
cell.border = Border(
left=Side(style='none'),
right=Side(style='none'),
top=Side(style='none'),
bottom=Side(style='none')
)
wb.save(file_path)
print(f"Excel 文档已保存: {file_path}")
except ImportError:
print("请安装 openpyxl 库: pip install openpyxl")
raise
except Exception as e:
print(f"保存 Excel 文档失败: {e}")
raise
def _write_table_to_excel(self, ws, table_data, row, header_fill, header_font, border, normal_font):
"""将表格数据写入 Excel"""
if not table_data:
return row
if len(table_data) == 1:
ws[f'A{row}'] = table_data[0][0] if table_data[0] else ""
ws[f'A{row}'].font = normal_font
ws[f'A{row}'].alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
ws.merge_cells(f'A{row}:B{row}')
return row + 1
max_cols = max(len(row_data) for row_data in table_data)
max_cols = min(max_cols, 2)
for col_idx in range(max_cols):
col_letter = get_column_letter(col_idx + 1)
ws.column_dimensions[col_letter].width = 25 if max_cols > 1 else 80
start_row = row
for row_idx, row_data in enumerate(table_data):
for col_idx in range(min(len(row_data), max_cols)):
col_letter = get_column_letter(col_idx + 1)
cell = ws[f'{col_letter}{row + row_idx}']
cell.value = self._clean_markdown(row_data[col_idx])
cell.font = header_font if row_idx == 0 else normal_font
cell.fill = header_fill if row_idx == 0 else PatternFill()
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
cell.border = border
if max_cols > 1:
for col_idx in range(len(row_data), max_cols):
col_letter = get_column_letter(col_idx + 1)
ws[f'{col_letter}{row + row_idx}'].border = border
return row + len(table_data) + 1
def _clean_markdown(self, text: str) -> str:
"""清理 Markdown 格式标记"""
import re
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
text = re.sub(r'_(.+?)_', r'\1', text)
text = re.sub(r'~~(.+?)~~', r'\1', text)
text = re.sub(r'`(.+?)`', r'\1', text)
text = text.replace('\\n', '\n').replace('\\t', '\t')
return text.strip()

View File

@@ -2,6 +2,13 @@
OPENAI_API_BASE: "https://ai.gitee.com/v1"
OPENAI_API_KEY: "HYCNGM39GGFNSB1F8MBBMI9QYJR3P1CRSYS2PV1A"
OPENAI_API_MODEL: "DeepSeek-V3"
# OPENAI_API_BASE: "https://qianfan.baidubce.com/v2"
# OPENAI_API_KEY: "bce-v3/ALTAK-Rp1HuLgqIdXM1rywQHRxr/96c5a0e99ccddf91193157e7d42d89979981bf05"
# OPENAI_API_MODEL: "deepseek-v3"
#OPENAI_API_BASE: "https://api.yyds168.net/v1"
#OPENAI_API_KEY: "sk-5SJms7yWhGtb0YuwayqaDPjU95s4gapMqkaX5q9opWn3Ati3"
#OPENAI_API_MODEL: "deepseek-v3"
## config for fast mode
FAST_DESIGN_MODE: False
@@ -10,3 +17,13 @@ MISTRAL_API_KEY: ""
## options under experimentation, leave them as Fasle unless you know what it is for
USE_CACHE: False
# PostgreSQL 数据库配置
database:
host: "localhost"
port: 5432
username: "postgres"
password: "123456"
name: "agentcoord"
pool_size: 10
max_overflow: 20

View File

@@ -21,6 +21,7 @@ import yaml
import argparse
import uuid
import copy
import base64
from typing import List, Dict, Optional
# 数据库模块导入
@@ -2816,7 +2817,7 @@ EXPORT_TYPE_CONFIG = {
"markdown": {"ext": ".md", "mime": "text/markdown"},
"excel": {"ext": ".xlsx", "mime": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
"ppt": {"ext": ".pptx", "mime": "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
"mindmap": {"ext": ".md", "mime": "text/markdown"}, # 思维导图先用 markdown
"mindmap": {"ext": ".md", "mime": "text/markdown"}, # 思维导图导出为 Markdown 格式
"infographic": {"ext": ".html", "mime": "text/html"}, # 信息图先用 html
}
@@ -2889,13 +2890,19 @@ def handle_export(data):
return
# 准备导出数据
from datetime import datetime
current_date = datetime.now().strftime('%Y年%m月%d')
export_data = {
'task_name': task.query or '未命名任务',
'task_content': task.query or '', # 使用 query 作为任务描述
'task_content': task.query or '',
'task_outline': task.task_outline,
'result': task.result,
'agents_info': task.agents_info,
'assigned_agents': task.assigned_agents,
'rehearsal_log': task.rehearsal_log,
'agent_scores': task.agent_scores,
'user_id': user_id,
'date': current_date,
}
# 生成文件名
@@ -3066,6 +3073,35 @@ def preview_export(record_id: int):
with open(record.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({'content': content, 'type': 'markdown'})
elif record.export_type == 'mindmap':
# Markdown 格式的思维导图
with open(record.file_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({'content': content, 'type': 'mindmap'})
elif record.export_type in ['doc', 'docx']:
# Word 文件,直接返回文件流(使用 Flask 的 send_file
return send_file(
record.file_path,
mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document',
as_attachment=False,
download_name=record.file_name
)
elif record.export_type in ['excel', 'xlsx', 'xls']:
# Excel 文件,直接返回文件流
return send_file(
record.file_path,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
as_attachment=False,
download_name=record.file_name
)
elif record.export_type in ['ppt', 'pptx']:
# PPT 文件,直接返回文件流
return send_file(
record.file_path,
mimetype='application/vnd.openxmlformats-officedocument.presentationml.presentation',
as_attachment=False,
download_name=record.file_name
)
else:
# 其他类型返回文件路径,前端自行处理
return jsonify({
@@ -3101,6 +3137,26 @@ def share_export(record_id: int):
return jsonify({'error': str(e)}), 500
@app.route('/api/export/<int:record_id>/share/info', methods=['GET'])
def get_share_info(record_id: int):
"""获取分享文件信息(无需登录验证)"""
try:
with get_db_context() as db:
record = ExportRecordCRUD.get_by_id(db, record_id)
if not record:
return jsonify({'error': '文件不存在或已失效'}), 404
return jsonify({
'file_name': record.file_name,
'export_type': record.export_type,
'created_at': record.created_at.isoformat() if record.created_at else None,
'file_size': record.file_size or 0,
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/export/<int:record_id>', methods=['DELETE'])
def delete_export(record_id: int):
"""删除导出记录"""