任务描述
+{task_content or '无'}
+diff --git a/backend/AgentCoord/Export/__init__.py b/backend/AgentCoord/Export/__init__.py new file mode 100644 index 0000000..4e87363 --- /dev/null +++ b/backend/AgentCoord/Export/__init__.py @@ -0,0 +1,951 @@ +""" +导出工具模块 +支持多种格式的文件生成:Word、Markdown、Excel、PPT、思维导图、信息图 +""" + +import json +from typing import Dict, Any, Optional +from datetime import datetime + + +class BaseExporter: + """导出器基类""" + + def __init__(self, task_data: Dict[str, Any]): + self.task_data = task_data + + def generate(self, file_path: str) -> bool: + """生成导出文件,子类实现具体逻辑""" + raise NotImplementedError + + +class MarkdownExporter(BaseExporter): + """Markdown 导出器""" + + def generate(self, file_path: str) -> bool: + """生成 Markdown 文件""" + try: + content_lines = [] + + # 标题 + task_name = self.task_data.get('task_name', '未命名任务') + content_lines.append(f"# {task_name}\n") + + # 任务描述 + task_content = self.task_data.get('task_content', '') + if task_content: + content_lines.append("## 任务描述\n") + content_lines.append(f"{task_content}\n") + + # 任务大纲 + task_outline = self.task_data.get('task_outline') + if task_outline: + content_lines.append("## 任务大纲\n") + content_lines.append(self._format_outline(task_outline)) + + # 执行结果 + result = self.task_data.get('result') + if result: + content_lines.append("## 执行结果\n") + if isinstance(result, list): + for idx, item in enumerate(result, 1): + content_lines.append(f"### 步骤 {idx}\n") + content_lines.append(f"{json.dumps(item, ensure_ascii=False, indent=2)}\n") + else: + content_lines.append(f"{json.dumps(result, ensure_ascii=False, indent=2)}\n") + + # 参与智能体 - 从 task_outline 的 Collaboration Process 中提取 + task_outline = self.task_data.get('task_outline') + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + # 收集所有参与步骤的智能体 + all_agents = set() + for step in collaboration_process: + if isinstance(step, dict): + agent_selection = step.get('AgentSelection', []) + if isinstance(agent_selection, list): + for agent in agent_selection: + if agent: + all_agents.add(agent) + if all_agents: + content_lines.append("## 参与智能体\n") + for agent_name in sorted(all_agents): + content_lines.append(f"- {agent_name}") + + # 写入文件 + with open(file_path, 'w', encoding='utf-8') as f: + f.write('\n'.join(content_lines)) + + return True + except Exception as e: + print(f"Markdown 导出失败: {e}") + return False + + def _format_outline(self, outline: Any, level: int = 2) -> str: + """格式化大纲内容""" + lines = [] + prefix = '#' * level + + if isinstance(outline, dict): + for key, value in outline.items(): + # 如果值是简单类型,直接显示 + if isinstance(value, (str, int, float, bool)) or value is None: + lines.append(f"**{key}**: {value}") + # 如果值是列表或字典,递归处理 + elif isinstance(value, list): + lines.append(f"**{key}**:") + lines.append(self._format_outline(value, level + 1)) + elif isinstance(value, dict): + lines.append(f"**{key}**:") + lines.append(self._format_outline(value, level + 1)) + else: + lines.append(f"**{key}**: {value}") + elif isinstance(outline, list): + for idx, item in enumerate(outline, 1): + if isinstance(item, dict): + # 列表中的每个字典作为一个整体项 + lines.append(f"{prefix} 步骤 {idx}") + for key, value in item.items(): + if isinstance(value, (str, int, float, bool)) or value is None: + lines.append(f" - **{key}**: {value}") + elif isinstance(value, list): + lines.append(f" - **{key}**:") + for v in value: + lines.append(f" - {v}") + elif isinstance(value, dict): + lines.append(f" - **{key}**:") + lines.append(self._format_outline(value, level + 2)) + else: + lines.append(f" - **{key}**: {value}") + lines.append("") + else: + lines.append(f"- {item}") + else: + lines.append(str(outline)) + + return '\n'.join(lines) + + +class DocxExporter(BaseExporter): + """Word 文档导出器""" + + def generate(self, file_path: str) -> bool: + """生成 Word 文档""" + try: + from docx import Document + from docx.shared import Inches, Pt + from docx.enum.text import WD_ALIGN_PARAGRAPH + + doc = Document() + + # 标题 + task_name = self.task_data.get('task_name', '未命名任务') + title = doc.add_heading(task_name, level=0) + title.alignment = WD_ALIGN_PARAGRAPH.CENTER + + # 任务描述 + task_content = self.task_data.get('task_content', '') + if task_content: + doc.add_heading('任务描述', level=1) + doc.add_paragraph(task_content) + + # 任务大纲 + task_outline = self.task_data.get('task_outline') + if task_outline: + doc.add_heading('任务大纲', level=1) + self._add_outline_to_doc(doc, task_outline) + + # 执行结果 + result = self.task_data.get('result') + if result: + doc.add_heading('执行结果', level=1) + if isinstance(result, list): + for idx, item in enumerate(result, 1): + doc.add_heading(f'步骤 {idx}', level=2) + doc.add_paragraph(json.dumps(item, ensure_ascii=False, indent=2)) + else: + doc.add_paragraph(json.dumps(result, ensure_ascii=False, indent=2)) + + # 参与智能体 - 从 task_outline 的 Collaboration Process 中提取 + task_outline = self.task_data.get('task_outline') + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + # 收集所有参与步骤的智能体 + all_agents = set() + for step in collaboration_process: + if isinstance(step, dict): + agent_selection = step.get('AgentSelection', []) + if isinstance(agent_selection, list): + for agent in agent_selection: + if agent: + all_agents.add(agent) + if all_agents: + doc.add_heading('参与智能体', level=1) + for agent_name in sorted(all_agents): + doc.add_paragraph(f"- {agent_name}") + + # 添加时间戳 + doc.add_paragraph(f"\n\n导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + doc.save(file_path) + return True + except ImportError: + print("请安装 python-docx 库: pip install python-docx") + return False + except Exception as e: + print(f"Word 导出失败: {e}") + return False + + def _add_outline_to_doc(self, doc, outline: Any, level: int = 2): + """递归添加大纲到文档""" + if isinstance(outline, dict): + for key, value in outline.items(): + # 如果值是简单类型,直接显示为段落 + if isinstance(value, (str, int, float, bool)) or value is None: + doc.add_paragraph(f"**{key}**: {value}") + # 如果值是列表或字典,递归处理 + elif isinstance(value, list): + doc.add_paragraph(f"**{key}**:") + self._add_outline_to_doc(doc, value, level + 1) + elif isinstance(value, dict): + doc.add_paragraph(f"**{key}**:") + self._add_outline_to_doc(doc, value, level + 1) + else: + doc.add_paragraph(f"**{key}**: {value}") + elif isinstance(outline, list): + for idx, item in enumerate(outline, 1): + if isinstance(item, dict): + # 列表中的每个字典作为一个整体项 + doc.add_heading(f"步骤 {idx}", level=min(level, 3)) + for key, value in item.items(): + if isinstance(value, (str, int, float, bool)) or value is None: + doc.add_paragraph(f" - **{key}**: {value}") + elif isinstance(value, list): + doc.add_paragraph(f" - **{key}**:") + for v in value: + doc.add_paragraph(f" - {v}") + elif isinstance(value, dict): + doc.add_paragraph(f" - **{key}**:") + self._add_outline_to_doc(doc, value, level + 2) + else: + doc.add_paragraph(f" - **{key}**: {value}") + else: + doc.add_paragraph(str(item)) + else: + doc.add_paragraph(str(outline)) + + +class ExcelExporter(BaseExporter): + """Excel 导出器""" + + def generate(self, file_path: str) -> bool: + """生成 Excel 文件""" + try: + from openpyxl import Workbook + from openpyxl.styles import Font, Alignment + from openpyxl.utils import get_column_letter + + wb = Workbook() + ws = wb.active + ws.title = "任务执行结果" + + # 标题 + task_name = self.task_data.get('task_name', '未命名任务') + ws['A1'] = task_name + ws['A1'].font = Font(size=14, bold=True) + ws.merge_cells('A1:E1') + + row = 3 + + # 任务描述 + ws[f'A{row}'] = "任务描述" + ws[f'A{row}'].font = Font(bold=True) + ws[f'B{row}'] = self.task_data.get('task_content', '') + row += 1 + + # 导出时间 + ws[f'A{row}'] = "导出时间" + ws[f'A{row}'].font = Font(bold=True) + ws[f'B{row}'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + row += 2 + + # 参与智能体 + task_outline = self.task_data.get('task_outline') + all_agents = [] + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + agents_set = set() + for step in collaboration_process: + if isinstance(step, dict): + agent_selection = step.get('AgentSelection', []) + if isinstance(agent_selection, list): + for agent in agent_selection: + if agent: + agents_set.add(agent) + all_agents = sorted(agents_set) + + if all_agents: + ws[f'A{row}'] = "参与智能体" + ws[f'A{row}'].font = Font(bold=True) + ws[f'B{row}'] = '、'.join(all_agents) + row += 2 + + # 任务大纲 + if task_outline: + ws[f'A{row}'] = "任务大纲" + ws[f'A{row}'].font = Font(size=12, bold=True) + row += 1 + + if isinstance(task_outline, dict): + # 写入任务大纲的键值对 + for key, value in task_outline.items(): + if key == 'Collaboration Process': + # 单独处理协作步骤 + if isinstance(value, list): + for idx, step in enumerate(value, 1): + if isinstance(step, dict): + step_name = step.get('StepName', f'步骤{idx}') + task_content = step.get('TaskContent', '') + ws[f'A{row}'] = f"步骤{idx}: {step_name}" + ws[f'A{row}'].font = Font(bold=True) + ws[f'B{row}'] = task_content + row += 1 + + # 参与智能体 + agents = step.get('AgentSelection', []) + if agents: + ws[f'B{row}'] = f"参与智能体: {'、'.join(agents)}" + row += 1 + else: + ws[f'A{row}'] = key + ws[f'A{row}'].font = Font(bold=True) + if isinstance(value, (str, int, float, bool)): + ws[f'B{row}'] = str(value) + elif isinstance(value, list): + ws[f'B{row}'] = json.dumps(value, ensure_ascii=False) + elif isinstance(value, dict): + ws[f'B{row}'] = json.dumps(value, ensure_ascii=False) + row += 1 + row += 1 + + # 执行结果 + ws[f'A{row}'] = "执行结果" + ws[f'A{row}'].font = Font(size=12, bold=True) + row += 1 + + ws[f'A{row}'] = "步骤" + ws[f'B{row}'] = "结果" + for col in ['A', 'B']: + ws[f'{col}{row}'].font = Font(bold=True) + row += 1 + + result = self.task_data.get('result') + if isinstance(result, list): + for idx, item in enumerate(result, 1): + if isinstance(item, dict): + # 完整的执行结果,包含 ActionHistory 和每个 action 的结果 + ws[f'A{row}'] = f"步骤 {idx}" + ws[f'A{row}'].font = Font(bold=True) + row += 1 + + # 遍历所有字段 + for key, value in item.items(): + if key == 'ActionHistory' and isinstance(value, list): + # ActionHistory 需要展开显示 + ws[f'A{row}'] = "动作历史:" + ws[f'A{row}'].font = Font(bold=True) + row += 1 + + for action_idx, action in enumerate(value, 1): + if isinstance(action, dict): + # 每个 action 显示 AgentName, ActionType, Description, Action_Result + ws[f'A{row}'] = f" 动作{action_idx}" + ws[f'A{row}'].font = Font(bold=True) + + action_details = [] + if action.get('AgentName'): + action_details.append(f"智能体: {action.get('AgentName')}") + if action.get('ActionType'): + action_details.append(f"类型: {action.get('ActionType')}") + if action.get('Description'): + action_details.append(f"描述: {action.get('Description')}") + if action.get('Action_Result'): + # 执行结果是长文本,全部显示 + action_details.append(f"结果: {action.get('Action_Result')}") + + ws[f'B{row}'] = '\n'.join(action_details) + ws.column_dimensions['B'].width = 100 + row += 1 + else: + # 其他字段 + ws[f'A{row}'] = key + ws[f'A{row}'].font = Font(bold=True) + if isinstance(value, (str, int, float, bool)): + ws[f'B{row}'] = str(value) + else: + ws[f'B{row}'] = json.dumps(value, ensure_ascii=False, indent=2) + row += 1 + row += 1 # 步骤之间空一行 + else: + ws[f'A{row}'] = f"步骤 {idx}" + ws[f'A{row}'].font = Font(bold=True) + ws[f'B{row}'] = str(item) + row += 1 + + # 设置列宽 + ws.column_dimensions['A'].width = 20 + + wb.save(file_path) + return True + except ImportError: + print("请安装 openpyxl 库: pip install openpyxl") + return False + except Exception as e: + print(f"Excel 导出失败: {e}") + import traceback + traceback.print_exc() + return False + + +class PptExporter(BaseExporter): + """PPT 导出器""" + + def generate(self, file_path: str) -> bool: + """生成 PowerPoint 文件""" + try: + from pptx import Presentation + from pptx.util import Inches, Pt + + prs = Presentation() + + # 标题页 + title_slide_layout = prs.slide_layouts[0] + slide = prs.slides.add_slide(title_slide_layout) + task_name = self.task_data.get('task_name', '未命名任务') + title = slide.shapes.title + subtitle = slide.placeholders[1] + title.text = task_name + subtitle.text = f"导出时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + # 任务描述 + task_content = self.task_data.get('task_content', '') + if task_content: + slide = prs.slides.add_slide(prs.slide_layouts[1]) + title = slide.shapes.title + title.text = '任务描述' + body = slide.placeholders[1] + tf = body.text_frame + tf.text = task_content[:500] + '...' if len(task_content) > 500 else task_content + + # 参与智能体 + task_outline = self.task_data.get('task_outline') + all_agents = [] + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + agents_set = set() + for step in collaboration_process: + if isinstance(step, dict): + agent_selection = step.get('AgentSelection', []) + if isinstance(agent_selection, list): + for agent in agent_selection: + if agent: + agents_set.add(agent) + all_agents = sorted(agents_set) + + if all_agents: + slide = prs.slides.add_slide(prs.slide_layouts[1]) + title = slide.shapes.title + title.text = '参与智能体' + body = slide.placeholders[1] + tf = body.text_frame + for agent in all_agents: + p = tf.add_paragraph() + p.text = f"• {agent}" + + # 任务步骤 + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + for idx, step in enumerate(collaboration_process, 1): + if isinstance(step, dict): + step_name = step.get('StepName', f'步骤 {idx}') + task_content_step = step.get('TaskContent', '') + + slide = prs.slides.add_slide(prs.slide_layouts[1]) + title = slide.shapes.title + title.text = f'{idx}. {step_name}' + body = slide.placeholders[1] + tf = body.text_frame + tf.text = task_content_step[:300] + '...' if len(task_content_step) > 300 else task_content_step + + # 执行结果 - 包含步骤信息、输入产物、动作、输出产物 + result = self.task_data.get('result') + max_lines_per_slide = 20 # 内容区最大行数 + + def create_slide(prs, title_text, content_lines): + """创建幻灯片,处理超过最大行数的情况""" + total_lines = len(content_lines) + total_pages = (total_lines + max_lines_per_slide - 1) // max_lines_per_slide + + for page_idx in range(total_pages): + start_idx = page_idx * max_lines_per_slide + end_idx = min(start_idx + max_lines_per_slide, total_lines) + page_content = content_lines[start_idx:end_idx] + + slide = prs.slides.add_slide(prs.slide_layouts[1]) + title = slide.shapes.title + if total_pages > 1: + title.text = f'{title_text} - {page_idx+1}/{total_pages}' + else: + title.text = title_text + title.text_frame.paragraphs[0].font.size = Pt(24) + + body = slide.placeholders[1] + tf = body.text_frame + + for line in page_content: + p = tf.add_paragraph() + p.text = line['text'] + p.font.size = line.get('font_size', Pt(10)) + if line.get('bold'): + p.font.bold = True + + def add_content_lines(content_lines, text, font_size=Pt(10), bold=False): + """将文本添加到内容行,按原始换行符分割""" + if text: + lines = text.split('\n') + for line in lines: + line = line.strip() + if line: + content_lines.append({'text': line, 'font_size': font_size, 'bold': bold}) + + if isinstance(result, list): + for idx, item in enumerate(result, 1): + if not isinstance(item, dict): + continue + + # 获取步骤基本信息 + step_task_content = item.get('TaskContent', '') + output_name = item.get('OutputName', '') + + # 获取输入产物信息 + input_names = item.get('InputName_List', []) + input_records = item.get('inputObject_Record', []) + + # 调试:打印步骤的 keys 和 input_names + print(f"[PPT导出] 步骤 {idx}: input_names={input_names}, input_records类型={type(input_records)}, input_records长度={len(input_records) if isinstance(input_records, list) else 'N/A'}") + + if not isinstance(input_records, list): + input_records = [] + + # 调试:打印 input_records 内容 + if input_names and input_records: + print(f"[PPT导出] 步骤 {idx}: input_records内容={input_records[:2]}...") # 只打印前2个 + + # 步骤信息页:包含任务内容、输入产物 + content_lines = [] + if step_task_content: + content_lines.append({'text': '任务内容:', 'font_size': Pt(14), 'bold': True}) + add_content_lines(content_lines, step_task_content, Pt(12)) + + if input_names: + content_lines.append({'text': '输入产物:', 'font_size': Pt(14), 'bold': True}) + for i, input_name in enumerate(input_names): + content_lines.append({'text': f'{i+1}. {input_name}', 'font_size': Pt(12), 'bold': True}) + + # 从 input_records 中查找匹配的产物内容 + # inputObject_Record 格式: [{"产物名称": "产物内容"}] + if isinstance(input_records, list): + print(f"[PPT导出] 步骤 {idx}: 开始查找 input_name='{input_name}'") + for record_idx, record in enumerate(input_records): + print(f"[PPT导出] 步骤 {idx}: record[{record_idx}]={type(record)}, keys={list(record.keys()) if isinstance(record, dict) else 'N/A'}") + if isinstance(record, dict) and input_name in record: + value = record[input_name] + print(f"[PPT导出] 步骤 {idx}: 找到匹配! value类型={type(value)}, value前20字={str(value)[:20]}") + if isinstance(value, str): + add_content_lines(content_lines, f' 内容: {value}', Pt(10)) + print(f"[PPT导出] 步骤 {idx}: 已添加内容到content_lines") + break + else: + print(f"[PPT导出] 步骤 {idx}: 未找到匹配 '{input_name}'") + + if output_name: + content_lines.append({'text': '输出产物:', 'font_size': Pt(14), 'bold': True}) + content_lines.append({'text': str(output_name), 'font_size': Pt(12)}) + + if content_lines: + create_slide(prs, f'步骤 {idx} - 概述', content_lines) + + # 动作历史页:每个动作一个幻灯片 + actions = item.get('ActionHistory', []) + if isinstance(actions, list): + for action in actions: + if not isinstance(action, dict): + continue + + agent = action.get('AgentName', '未知') + action_type = action.get('ActionType', '') + + content_lines = [] + + # 动作描述 + desc = action.get('Description', '') + if desc: + content_lines.append({'text': '动作描述:', 'font_size': Pt(14), 'bold': True}) + add_content_lines(content_lines, desc, Pt(12)) + + # 执行结果 + action_result = action.get('Action_Result', '') + if action_result: + content_lines.append({'text': '执行结果:', 'font_size': Pt(14), 'bold': True}) + add_content_lines(content_lines, action_result, Pt(10)) + + # 创建幻灯片(自动分页) + if content_lines: + create_slide(prs, f'{agent} ({action_type})', content_lines) + # 没有 ActionHistory,显示整个结果 + slide = prs.slides.add_slide(prs.slide_layouts[1]) + title = slide.shapes.title + title.text = f'步骤 {idx}' + body = slide.placeholders[1] + tf = body.text_frame + tf.text = json.dumps(item, ensure_ascii=False, indent=2)[:2000] + + prs.save(file_path) + return True + except ImportError: + print("请安装 python-pptx 库: pip install python-pptx") + return False + except Exception as e: + print(f"PPT 导出失败: {e}") + import traceback + traceback.print_exc() + return False + + +class MindmapExporter(BaseExporter): + """思维导图导出器(输出 Markdown 格式,可通过 markmap 渲染)""" + + def generate(self, file_path: str) -> bool: + """生成思维导图(Markdown 格式)""" + try: + content_lines = [] + + # 标题 + task_name = self.task_data.get('task_name', '未命名任务') + content_lines.append(f"# {task_name}\n") + + # 使用 Markmap 兼容的 Markdown 格式 + content_lines.append("```mindmap") + content_lines.append(f" - {task_name}") + + # 任务描述 + task_content = self.task_data.get('task_content', '') + if task_content: + content_lines.append(f" - 任务描述") + # 限制内容长度 + short_content = task_content[:100] + '...' if len(task_content) > 100 else task_content + content_lines.append(f" - {short_content}") + + # 任务大纲 + task_outline = self.task_data.get('task_outline') + if task_outline: + content_lines.append(f" - 任务大纲") + content_lines.append(self._format_mindmap(task_outline, indent=4)) + + # 执行结果 + result = self.task_data.get('result') + if result: + content_lines.append(f" - 执行结果") + if isinstance(result, list): + content_lines.append(f" - 共 {len(result)} 个步骤") + + content_lines.append("```") + + with open(file_path, 'w', encoding='utf-8') as f: + f.write('\n'.join(content_lines)) + + return True + except Exception as e: + print(f"思维导图导出失败: {e}") + return False + + def _format_mindmap(self, data: Any, indent: int = 4) -> str: + """格式化思维导图""" + lines = [] + prefix = ' ' * indent + + if isinstance(data, dict): + for key, value in data.items(): + # 如果值是简单类型,直接显示 + if isinstance(value, (str, int, float, bool)) or value is None: + lines.append(f"{prefix}- {key}: {value}") + elif isinstance(value, list): + lines.append(f"{prefix}- {key}") + for v in value: + if isinstance(v, dict): + lines.append(self._format_mindmap(v, indent + 2)) + else: + lines.append(f"{prefix} - {v}") + elif isinstance(value, dict): + lines.append(f"{prefix}- {key}") + lines.append(self._format_mindmap(value, indent + 2)) + else: + lines.append(f"{prefix}- {key}: {value}") + elif isinstance(data, list): + for idx, item in enumerate(data, 1): + if isinstance(item, dict): + # 列表中的每个字典作为一个步骤 + step_name = item.get('StepName', f'步骤{idx}') + lines.append(f"{prefix}- {step_name}") + for key, value in item.items(): + if key == 'StepName': + continue + if isinstance(value, (str, int, float, bool)) or value is None: + lines.append(f"{prefix} - {key}: {value}") + elif isinstance(value, list): + lines.append(f"{prefix} - {key}:") + for v in value: + if isinstance(v, dict): + lines.append(self._format_mindmap(v, indent + 4)) + else: + lines.append(f"{prefix} - {v}") + elif isinstance(value, dict): + lines.append(f"{prefix} - {key}:") + lines.append(self._format_mindmap(value, indent + 4)) + else: + lines.append(f"{prefix}- {item}") + else: + lines.append(f"{prefix}- {data}") + + return '\n'.join(lines) + + return '\n'.join(lines) + + +class InfographicExporter(BaseExporter): + """信息图导出器(输出 HTML)""" + + def generate(self, file_path: str) -> bool: + """生成信息图(HTML 格式)""" + try: + task_name = self.task_data.get('task_name', '未命名任务') + task_content = self.task_data.get('task_content', '') + task_outline = self.task_data.get('task_outline') + result = self.task_data.get('result') + + # 从 task_outline 中提取参与智能体 + all_agents = [] + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if collaboration_process and isinstance(collaboration_process, list): + agents_set = set() + for step in collaboration_process: + if isinstance(step, dict): + agent_selection = step.get('AgentSelection', []) + if isinstance(agent_selection, list): + for agent in agent_selection: + if agent: + agents_set.add(agent) + all_agents = sorted(agents_set) + + # 统计执行步骤数 + step_count = 0 + if task_outline and isinstance(task_outline, dict): + collaboration_process = task_outline.get('Collaboration Process', []) + if isinstance(collaboration_process, list): + step_count = len(collaboration_process) + + html = f""" + +
+ + +{task_content or '无'}
+无
' + + collaboration_process = task_outline.get('Collaboration Process', []) + if not collaboration_process or not isinstance(collaboration_process, list): + return '无
' + + steps_html = [] + for idx, step in enumerate(collaboration_process, 1): + if isinstance(step, dict): + step_name = step.get('StepName', f'步骤 {idx}') + task_content = step.get('TaskContent', '') + agent_selection = step.get('AgentSelection', []) + agents_str = '、'.join(agent_selection) if isinstance(agent_selection, list) else '' + + steps_html.append(f""" +无
' + + def _format_result_html(self, result): + """格式化执行结果为 HTML""" + if not result: + return '无执行结果
' + + try: + if isinstance(result, list): + # 遍历每个步骤的结果 + result_html = [] + for idx, item in enumerate(result, 1): + result_html.append(f'{key}: {value}
') + else: + result_html.append(f'{item}
') + result_html.append('无执行结果
' + elif isinstance(result, dict): + # 字典格式的结果 + result_html = [] + for key, value in result.items(): + if isinstance(value, str) and len(value) > 500: + value = value[:500] + '...' + result_html.append(f'{key}: {value}
') + return ''.join(result_html) if result_html else '无执行结果
' + else: + # 其他类型直接转字符串 + result_str = str(result) + if len(result_str) > 1000: + result_str = result_str[:1000] + '...' + return f'{result_str}
' + except Exception as e: + return f'解析执行结果失败: {str(e)}
' + + +# 导出器工厂 +class ExportFactory: + """导出器工厂类""" + + _exporters = { + 'markdown': MarkdownExporter, + 'doc': DocxExporter, + 'excel': ExcelExporter, + 'ppt': PptExporter, + 'mindmap': MindmapExporter, + 'infographic': InfographicExporter, + } + + @classmethod + def create(cls, export_type: str, task_data: Dict[str, Any]) -> Optional[BaseExporter]: + """创建导出器实例""" + exporter_class = cls._exporters.get(export_type) + if exporter_class: + return exporter_class(task_data) + return None + + @classmethod + def export(cls, export_type: str, task_data: Dict[str, Any], file_path: str) -> bool: + """执行导出""" + exporter = cls.create(export_type, task_data) + if exporter: + return exporter.generate(file_path) + return False diff --git a/backend/db/__init__.py b/backend/db/__init__.py index b50209d..8ab49cc 100644 --- a/backend/db/__init__.py +++ b/backend/db/__init__.py @@ -5,8 +5,8 @@ AgentCoord 数据库模块 """ from .database import get_db, get_db_context, test_connection, engine, text -from .models import MultiAgentTask, UserAgent, TaskStatus -from .crud import MultiAgentTaskCRUD, UserAgentCRUD +from .models import MultiAgentTask, UserAgent, ExportRecord, TaskStatus +from .crud import MultiAgentTaskCRUD, UserAgentCRUD, ExportRecordCRUD __all__ = [ # 连接管理 @@ -18,8 +18,10 @@ __all__ = [ # 模型 "MultiAgentTask", "UserAgent", + "ExportRecord", "TaskStatus", # CRUD "MultiAgentTaskCRUD", "UserAgentCRUD", + "ExportRecordCRUD", ] diff --git a/backend/db/crud.py b/backend/db/crud.py index 7c1f996..e726321 100644 --- a/backend/db/crud.py +++ b/backend/db/crud.py @@ -9,7 +9,7 @@ from datetime import datetime, timezone from typing import List, Optional from sqlalchemy.orm import Session -from .models import MultiAgentTask, UserAgent +from .models import MultiAgentTask, UserAgent, ExportRecord class MultiAgentTaskCRUD: @@ -414,3 +414,85 @@ class UserAgentCRUD: db.commit() db.refresh(agent) return agent + + +class ExportRecordCRUD: + """导出记录 CRUD 操作""" + + @staticmethod + def create( + db: Session, + task_id: str, + user_id: str, + export_type: str, + file_name: str, + file_path: str, + file_url: str = "", + file_size: int = 0, + ) -> ExportRecord: + """创建导出记录""" + record = ExportRecord( + task_id=task_id, + user_id=user_id, + export_type=export_type, + file_name=file_name, + file_path=file_path, + file_url=file_url, + file_size=file_size, + ) + db.add(record) + db.commit() + db.refresh(record) + return record + + @staticmethod + def get_by_id(db: Session, record_id: int) -> Optional[ExportRecord]: + """根据 ID 获取记录""" + return db.query(ExportRecord).filter(ExportRecord.id == record_id).first() + + @staticmethod + def get_by_task_id( + db: Session, task_id: str, limit: int = 50 + ) -> List[ExportRecord]: + """根据任务 ID 获取导出记录列表""" + return ( + db.query(ExportRecord) + .filter(ExportRecord.task_id == task_id) + .order_by(ExportRecord.created_at.desc()) + .limit(limit) + .all() + ) + + @staticmethod + def get_by_user_id( + db: Session, user_id: str, limit: int = 50 + ) -> List[ExportRecord]: + """根据用户 ID 获取导出记录列表""" + return ( + db.query(ExportRecord) + .filter(ExportRecord.user_id == user_id) + .order_by(ExportRecord.created_at.desc()) + .limit(limit) + .all() + ) + + @staticmethod + def delete(db: Session, record_id: int) -> bool: + """删除导出记录""" + record = db.query(ExportRecord).filter(ExportRecord.id == record_id).first() + if record: + db.delete(record) + db.commit() + return True + return False + + @staticmethod + def delete_by_task_id(db: Session, task_id: str) -> bool: + """删除任务的所有导出记录""" + records = db.query(ExportRecord).filter(ExportRecord.task_id == task_id).all() + if records: + for record in records: + db.delete(record) + db.commit() + return True + return False diff --git a/backend/db/models.py b/backend/db/models.py index 2d20d53..33968d8 100644 --- a/backend/db/models.py +++ b/backend/db/models.py @@ -81,6 +81,39 @@ class MultiAgentTask(Base): } +class ExportRecord(Base): + """导出记录模型""" + __tablename__ = "export_records" + + id = Column(Integer, primary_key=True, autoincrement=True) + task_id = Column(String(64), nullable=False, index=True) # 关联任务ID + user_id = Column(String(64), nullable=False, index=True) # 用户ID + export_type = Column(String(32), nullable=False) # 导出类型: doc/markdown/mindmap/infographic/excel/ppt + file_name = Column(String(256), nullable=False) # 文件名 + file_path = Column(String(512), nullable=False) # 服务器存储路径 + file_url = Column(String(512)) # 访问URL + file_size = Column(Integer, default=0) # 文件大小(字节) + created_at = Column(DateTime(timezone=True), default=utc_now) + + __table_args__ = ( + Index("idx_export_records_task_user", "task_id", "user_id"), + ) + + def to_dict(self) -> dict: + """转换为字典""" + return { + "id": self.id, + "task_id": self.task_id, + "user_id": self.user_id, + "export_type": self.export_type, + "file_name": self.file_name, + "file_path": self.file_path, + "file_url": self.file_url, + "file_size": self.file_size, + "created_at": self.created_at.isoformat() if self.created_at else None, + } + + class UserAgent(Base): """用户保存的智能体配置模型 (可选表)""" __tablename__ = "user_agents" diff --git a/backend/requirements.txt b/backend/requirements.txt index 2707c9c..c31e75d 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -7,4 +7,7 @@ mistralai==0.1.6 flask-socketio==5.3.6 python-socketio==5.11.0 simple-websocket==1.0.0 -eventlet==0.40.4 \ No newline at end of file +eventlet==0.40.4 +python-docx==1.1.2 +openpyxl==3.1.5 +python-pptx==1.0.2 \ No newline at end of file diff --git a/backend/server.py b/backend/server.py index 4849aac..6efcc39 100644 --- a/backend/server.py +++ b/backend/server.py @@ -1,4 +1,5 @@ -from flask import Flask, request, jsonify, Response, stream_with_context +from flask import Flask, request, jsonify, Response, stream_with_context, send_file +from flask_cors import CORS from flask_socketio import SocketIO, emit, join_room, leave_room import json from DataProcess import Add_Collaboration_Brief_FrontEnd @@ -27,10 +28,14 @@ from db import ( get_db_context, MultiAgentTaskCRUD, UserAgentCRUD, + ExportRecordCRUD, TaskStatus, text, ) +# 导出模块导入 +from AgentCoord.Export import ExportFactory + # initialize global variables yaml_file = os.path.join(os.getcwd(), "config", "config.yaml") try: @@ -48,8 +53,18 @@ AgentProfile_Dict = {} Request_Cache: dict[str, str] = {} app = Flask(__name__) app.config['SECRET_KEY'] = 'agentcoord-secret-key' +CORS(app) # 启用 CORS 支持 socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading') +# 配置静态文件服务(用于导出文件访问) +EXPORT_DIR = os.path.join(os.getcwd(), "uploads", "exports") + +@app.route('/uploads/