feat:用户修改步骤后重新执行实现

This commit is contained in:
liailing1026
2026-01-26 15:06:17 +08:00
parent b287867069
commit 641d70033d
7 changed files with 581 additions and 24 deletions

View File

@@ -2,7 +2,6 @@
import { computed, onUnmounted, ref, reactive, nextTick, watch, onMounted } from 'vue'
import { throttle } from 'lodash'
import { AnchorLocations, BezierConnector } from '@jsplumb/browser-ui'
import { ElMessageBox } from 'element-plus'
import AdditionalOutputCard from './AdditionalOutputCard.vue'
import SvgIcon from '@/components/SvgIcon/index.vue'
import { getActionTypeDisplay, getAgentMapIcon } from '@/layout/components/config.ts'
@@ -42,6 +41,7 @@ const stepExecutionStatus = ref<Record<string, StepExecutionStatus>>({})
// 用于标记暂停时的"最后动作完成"状态
const isPausing = ref(false) // 正在请求暂停(等待当前动作完成)
const isRestarting = ref(false) // 正在重新执行(停止旧执行导致的错误不应显示)
// Check if step is ready to execute (has TaskProcess data)
const isStepReady = (step: IRawStepTask) => {
@@ -211,7 +211,23 @@ function handleSaveEdit(stepId: string, processId: string, value: string) {
if (step) {
const process = step.TaskProcess.find(p => p.ID === processId)
if (process) {
process.Description = value
// 保存旧值用于比较
const oldValue = process.Description
// 只有内容真正变化时才更新和记录修改
if (value !== oldValue) {
process.Description = value
// 记录修改过的步骤索引到 store
const stepIndex = collaborationProcess.value.findIndex(s => s.Id === stepId)
if (stepIndex >= 0) {
agentsStore.addModifiedStep(stepIndex)
console.log(
`📝 步骤 ${stepIndex + 1} (${step.StepName}) 的描述已被修改,将从该步骤重新执行`
)
}
} else {
console.log(` 步骤 ${step.StepName} 的描述未发生变化,不记录修改`)
}
}
}
editMap[key] = false
@@ -341,7 +357,6 @@ const executionProgress = ref({
const {
notifications,
progress: showProgress,
updateProgress,
updateProgressDetail,
success,
info,
@@ -457,9 +472,12 @@ async function executeNextReadyBatch() {
switch (event.type) {
case 'step_start':
// 使用全局步骤索引计算当前步骤
const globalStepIndex = collaborationProcess.value.findIndex(s => s.StepName === event.step_name)
const globalStepIndex = collaborationProcess.value.findIndex(
s => s.StepName === event.step_name
)
executionProgress.value = {
currentStep: globalStepIndex >= 0 ? globalStepIndex + 1 : (event.step_index || 0) + 1,
currentStep:
globalStepIndex >= 0 ? globalStepIndex + 1 : (event.step_index || 0) + 1,
totalSteps: collaborationProcess.value.length,
currentAction: 0,
totalActions: 0,
@@ -495,8 +513,13 @@ async function executeNextReadyBatch() {
: ''
// 使用全局步骤索引
const globalStepIndexForAction = collaborationProcess.value.findIndex(s => s.StepName === event.step_name)
const stepIndexForAction = globalStepIndexForAction >= 0 ? globalStepIndexForAction + 1 : (event.step_index || 0) + 1
const globalStepIndexForAction = collaborationProcess.value.findIndex(
s => s.StepName === event.step_name
)
const stepIndexForAction =
globalStepIndexForAction >= 0
? globalStepIndexForAction + 1
: (event.step_index || 0) + 1
const totalStepsValue = collaborationProcess.value.length
executionProgress.value = {
@@ -594,6 +617,13 @@ async function executeNextReadyBatch() {
const errorMessage = event.message || event.error || '未知错误'
console.error('执行错误:', errorMessage)
// 如果正在重新执行,不显示错误通知(因为旧执行的步骤中断是预期的)
if (isRestarting.value) {
console.log(' 正在重新执行,忽略旧执行的中断错误')
resolve() // 正常完成,不显示错误
return
}
// 关闭进度通知并显示错误通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
@@ -614,6 +644,13 @@ async function executeNextReadyBatch() {
(err: Error) => {
console.error('流式执行错误:', err)
// 如果正在重新执行,不显示错误通知(因为旧执行的步骤中断是预期的)
if (isRestarting.value) {
console.log(' 正在重新执行,忽略旧执行的中断错误')
resolve() // 正常完成,不显示错误
return
}
// 关闭进度通知并显示错误通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
@@ -695,7 +732,25 @@ function removeNotification(id: string) {
// Pause/Resume handler
async function handlePauseResume() {
if (isPaused.value) {
// Resume execution - 正常恢复执行
// Resume execution - 检查是否有修改的步骤
if (agentsStore.hasModifiedSteps()) {
// 有修改的步骤,必须从最早修改的步骤重新执行
// 因为步骤之间是串行依赖的修改了步骤2会导致步骤2-6都需要重新执行
const earliestModifiedIndex = Math.min(...agentsStore.modifiedSteps)
const modifiedStepName =
collaborationProcess.value[earliestModifiedIndex]?.StepName ||
`步骤${earliestModifiedIndex + 1}`
console.log(`🔄 从步骤 ${earliestModifiedIndex + 1} (${modifiedStepName}) 重新执行`)
success('开始重新执行', `检测到步骤修改,从步骤 ${earliestModifiedIndex + 1} 重新执行`)
// 直接调用重新执行函数
await restartFromStep(earliestModifiedIndex)
return
}
// 正常恢复执行(没有修改)
try {
if (websocket.connected) {
await websocket.send('resume_execution', {
@@ -739,6 +794,371 @@ async function handlePauseResume() {
}
}
// ==================== 重新执行功能 ====================
/**
* 从RehearsalLog中提取KeyObjects
* @param rehearsalLog RehearsalLog数组
* @returns KeyObjects字典
*/
function buildKeyObjectsFromLog(rehearsalLog: any[]): Record<string, any> {
const keyObjects: Record<string, any> = {}
for (const logNode of rehearsalLog) {
if (logNode.LogNodeType === 'object' && logNode.content) {
keyObjects[logNode.NodeId] = logNode.content
}
}
return keyObjects
}
/**
* 构建截断后的 RehearsalLog使用步骤名称匹配
* @param restartFromStepIndex 重新执行的起始步骤索引例如1 表示从步骤2重新执行
* @returns 截断后的 RehearsalLog
*/
function buildTruncatedRehearsalLog(restartFromStepIndex: number) {
const steps = agentsStore.agentRawPlan.data?.['Collaboration Process'] || []
const truncatedLog: any[] = []
for (const logNode of agentsStore.executePlan) {
if (logNode.LogNodeType === 'step') {
const stepIndex = steps.findIndex((s: any) => s.StepName === logNode.NodeId)
if (stepIndex >= 0 && stepIndex < restartFromStepIndex) {
truncatedLog.push(logNode)
}
} else if (logNode.LogNodeType === 'object') {
const stepIndex = steps.findIndex((s: any) => s.OutputObject === logNode.NodeId)
if (stepIndex >= 0 && stepIndex < restartFromStepIndex) {
truncatedLog.push(logNode)
}
}
}
return truncatedLog
}
/**
* 清除执行结果中指定步骤及之后的记录
* @param fromStepIndex 起始步骤索引
*/
function clearExecutionResults(fromStepIndex: number) {
const steps = agentsStore.agentRawPlan.data?.['Collaboration Process'] || []
// 过滤掉要重新执行的步骤及其之后的所有步骤
agentsStore.executePlan = agentsStore.executePlan.filter(logNode => {
if (logNode.LogNodeType === 'step') {
// 找到该步骤在原始步骤列表中的索引
const stepIndex = steps.findIndex((s: any) => s.StepName === logNode.NodeId)
return stepIndex < fromStepIndex
}
// 对于 object 节点,也需要判断
if (logNode.LogNodeType === 'object') {
// 找到该 object 对应的步骤索引
const stepIndex = steps.findIndex((s: any) => s.OutputObject === logNode.NodeId)
return stepIndex < fromStepIndex
}
return true
})
}
/**
* 重置步骤执行状态
* @param fromStepIndex 起始步骤索引
*/
function resetStepStatus(fromStepIndex: number) {
const steps = agentsStore.agentRawPlan.data?.['Collaboration Process'] || []
for (let i = fromStepIndex; i < steps.length; i++) {
const step = steps[i]
if (!step) continue
const stepName = step.StepName || step.Id || ''
stepExecutionStatus.value[stepName] = StepExecutionStatus.READY
}
}
/**
* 从指定步骤重新执行
* @param stepIndex 要重新执行的步骤索引例如1 表示从步骤2重新执行
*/
async function restartFromStep(stepIndex: number) {
try {
loading.value = true
isRestarting.value = true // 标记正在重新执行
// 清空修改记录(避免重复执行)
agentsStore.clearModifiedSteps()
// 0. 先停止旧的执行(避免双重执行)
if (websocket.connected && currentExecutionId.value) {
try {
console.log('🛑 停止旧的执行任务,避免双重执行')
const stopResponse = await websocket.send('stop_execution', {
goal: agentsStore.agentRawPlan.data?.['General Goal'] || ''
})
console.log('✅ 后端确认停止:', stopResponse)
// 等待一下确保后端完全停止
await new Promise(resolve => setTimeout(resolve, 1000))
} catch (err) {
console.warn('⚠️ 停止旧执行失败(可能已经停止):', err)
}
}
// 1. 构建截断后的 RehearsalLog
const truncatedLog = buildTruncatedRehearsalLog(stepIndex)
// 2. 从截断日志中提取 KeyObjects
const existingKeyObjects = buildKeyObjectsFromLog(truncatedLog)
console.log('📦 传递给后端的数据:', {
truncatedLogLength: truncatedLog.length,
keyObjectsCount: Object.keys(existingKeyObjects).length,
keyObjectsKeys: Object.keys(existingKeyObjects)
})
// 3. 清除前端执行结果中指定步骤及之后的记录
clearExecutionResults(stepIndex)
// 3.5 关闭旧的进度通知(避免通知混乱)
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
currentProgressNotificationId.value = null
}
// 4. 重置步骤状态
resetStepStatus(stepIndex)
// 4.5 强制触发 Vue 响应式更新
nextTick(() => {
// 触发 UI 更新
})
// 5. 重置执行状态为执行中
isPaused.value = false
isStreaming.value = true
loading.value = true
// 6. 调用执行 API传递截断后的 RehearsalLog 和 KeyObjects
api.executePlanOptimized(
agentsStore.agentRawPlan.data!,
// onMessage - 使用内联函数处理事件
(event: StreamingEvent) => {
// 如果正在暂停isPausing或已暂停isPaused只允许特定事件
// 这样可以确保当前正在完成的动作的结果能够被正确保存
if (isPausing.value || isPaused.value) {
if (
event.type !== 'action_complete' &&
event.type !== 'step_complete' &&
event.type !== 'error'
) {
return
}
}
switch (event.type) {
case 'step_start':
stepExecutionStatus.value[event.step_name] = StepExecutionStatus.RUNNING
// 使用全局步骤索引计算当前步骤
const globalStepIndex = collaborationProcess.value.findIndex(
s => s.StepName === event.step_name
)
const currentStepNumber =
globalStepIndex >= 0 ? globalStepIndex + 1 : (event.step_index || 0) + 1
// 创建或更新进度通知
if (!currentProgressNotificationId.value) {
currentProgressNotificationId.value = showProgress(
'重新执行中',
currentStepNumber,
collaborationProcess.value.length
)
updateProgressDetail(
currentProgressNotificationId.value,
`步骤 ${currentStepNumber}/${collaborationProcess.value.length}`,
`正在执行: ${event.step_name}`
)
} else {
updateProgressDetail(
currentProgressNotificationId.value,
`步骤 ${currentStepNumber}/${collaborationProcess.value.length}`,
`正在执行: ${event.step_name}`,
currentStepNumber,
collaborationProcess.value.length
)
}
break
case 'action_complete':
// 检测是否正在暂停(等待当前动作完成)
if (isPausing.value) {
// 当前动作完成,完成暂停
isPaused.value = true
isPausing.value = false
success('已暂停', '已暂停执行,可稍后继续')
}
// 实时更新store
const existingStep = collaborationProcess.value.find(
s => s.StepName === event.step_name
)
if (existingStep) {
const currentResults = agentsStore.executePlan
const stepLogNode = currentResults.find(
r => r.NodeId === event.step_name && r.LogNodeType === 'step'
)
if (!stepLogNode) {
const newStepLog = {
LogNodeType: 'step',
NodeId: event.step_name,
InputName_List: existingStep.InputObject_List || [],
OutputName: existingStep.OutputObject || '',
chatLog: [],
inputObject_Record: [],
ActionHistory: [event.action_result]
}
agentsStore.setExecutePlan([...currentResults, newStepLog])
} else {
stepLogNode.ActionHistory.push(event.action_result)
agentsStore.setExecutePlan([...currentResults])
}
}
// 使用全局步骤索引
const globalStepIndexForAction = collaborationProcess.value.findIndex(
s => s.StepName === event.step_name
)
const stepNumberForAction =
globalStepIndexForAction >= 0
? globalStepIndexForAction + 1
: (event.step_index || 0) + 1
// 更新进度通知
if (currentProgressNotificationId.value) {
const parallelInfo = event.batch_info?.is_parallel
? ` [并行 ${event.batch_info!.batch_size} 个动作]`
: ''
updateProgressDetail(
currentProgressNotificationId.value,
`步骤 ${stepNumberForAction}/${collaborationProcess.value.length}`,
`${event.step_name} - 动作 ${event.completed_actions}/${event.total_actions} 完成${parallelInfo}`,
stepNumberForAction,
collaborationProcess.value.length
)
}
break
case 'step_complete':
stepExecutionStatus.value[event.step_name] = StepExecutionStatus.COMPLETED
// 更新完整步骤日志
const currentResults = agentsStore.executePlan
const existingLog = currentResults.find(
r => r.NodeId === event.step_name && r.LogNodeType === 'step'
)
if (existingLog) {
existingLog.ActionHistory = event.step_log_node.ActionHistory
agentsStore.setExecutePlan([...currentResults])
} else if (event.step_log_node) {
agentsStore.setExecutePlan([...currentResults, event.step_log_node])
}
// 添加object_log_node
const updatedResults = agentsStore.executePlan
if (event.object_log_node) {
agentsStore.setExecutePlan([...updatedResults, event.object_log_node])
}
break
case 'execution_complete':
const steps = collaborationProcess.value
steps.forEach(step => {
const stepName = step.StepName || step.Id || ''
if (stepExecutionStatus.value[stepName] !== StepExecutionStatus.COMPLETED) {
stepExecutionStatus.value[stepName] = StepExecutionStatus.COMPLETED
}
})
// 关闭进度通知并显示完成通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
currentProgressNotificationId.value = null
}
success('重新执行完成', `所有步骤已执行完成`, { duration: 3000 })
loading.value = false
isPaused.value = false
isStreaming.value = false
break
case 'error':
const errorMessage = event.message || '未知错误'
console.error('重新执行错误:', errorMessage)
// 关闭进度通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
currentProgressNotificationId.value = null
}
// 静默处理所有错误,不显示任何通知
loading.value = false
isPaused.value = false
isStreaming.value = false
isRestarting.value = false // 重新执行结束
break
}
},
// onError
(err: Error) => {
console.error('重新执行错误:', err)
// 关闭进度通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
currentProgressNotificationId.value = null
}
// 静默处理所有错误,不显示任何通知
loading.value = false
isPaused.value = false
isStreaming.value = false
isRestarting.value = false // 重新执行结束
},
// onComplete
() => {
// 关闭进度通知
if (currentProgressNotificationId.value) {
removeNotification(currentProgressNotificationId.value)
currentProgressNotificationId.value = null
}
loading.value = false
isPaused.value = false
isStreaming.value = false
isRestarting.value = false // 重新执行结束
},
true, // useWebSocket
existingKeyObjects, // ✅ 传递从截断日志中提取的KeyObjects
true, // enableDynamic
(executionId: string) => {
currentExecutionId.value = executionId
},
undefined, // executionId
stepIndex, // restartFromStepIndex
truncatedLog // ✅ 传递截断后的 RehearsalLog
)
success('重新执行', `正在从步骤 ${stepIndex + 1} 重新执行...`)
} catch (err) {
console.error('重新执行失败:', err)
error('重新执行失败', '无法启动重新执行')
loading.value = false
isRestarting.value = false // 重新执行失败,也要重置标记
}
}
// Handle execute button click
async function handleExecuteButtonClick() {
// If streaming, show pause/resume functionality
@@ -757,16 +1177,12 @@ async function handleRun() {
const waitingSteps = stepsReadyStatus.value.waiting
if (readySteps.length === 0 && waitingSteps.length > 0) {
ElMessageBox.confirm(
`All ${waitingSteps.length} steps的数据还在填充中\n\n${waitingSteps.join(
warning(
'步骤数据未就绪',
`${waitingSteps.length} 个步骤的数据还在填充中:${waitingSteps.join(
'、'
)}\n\n建议等待数据填充完成后再执行。`,
'Step data not ready',
{
confirmButtonText: 'I Understand',
cancelButtonText: 'Close',
type: 'warning'
}
)}建议等待数据填充完成后再执行。`,
{ duration: 5000 }
)
return
}

View File

@@ -201,7 +201,23 @@ const saveEditing = () => {
if (editingTaskId.value && editingContent.value.trim()) {
const taskToUpdate = collaborationProcess.value.find(item => item.Id === editingTaskId.value)
if (taskToUpdate) {
taskToUpdate.TaskContent = editingContent.value.trim()
// 保存旧值用于比较
const oldValue = taskToUpdate.TaskContent
const newValue = editingContent.value.trim()
// 只有内容真正变化时才更新和记录修改
if (newValue !== oldValue) {
taskToUpdate.TaskContent = newValue
// 记录修改过的步骤索引到 store用于重新执行
const stepIndex = collaborationProcess.value.findIndex(item => item.Id === editingTaskId.value)
if (stepIndex >= 0) {
agentsStore.addModifiedStep(stepIndex)
console.log(`📝 步骤 ${stepIndex + 1} (${taskToUpdate.StepName}) 的 TaskContent 已被修改,将从该步骤重新执行`)
}
} else {
console.log(` 步骤 ${taskToUpdate.StepName} 的 TaskContent 未发生变化,不记录修改`)
}
}
}
editingTaskId.value = null