001/**
002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.gnu.org/licenses/lgpl-3.0.txt
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package dev.tinyflow.core.chain;
017
018import dev.tinyflow.core.chain.event.*;
019import dev.tinyflow.core.chain.repository.*;
020import dev.tinyflow.core.chain.runtime.*;
021import dev.tinyflow.core.util.CollectionUtil;
022import dev.tinyflow.core.util.StringUtil;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import java.util.*;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicReference;
029import java.util.function.Supplier;
030
031
032public class Chain {
033
034    private static final Logger log = LoggerFactory.getLogger(Chain.class);
035    private static final ThreadLocal<Chain> EXECUTION_THREAD_LOCAL = new ThreadLocal<>();
036
037
038    protected final ChainDefinition definition;
039    protected String stateInstanceId;
040
041    //    protected final ChainState state;
042    protected ChainStateRepository chainStateRepository;
043    protected NodeStateRepository nodeStateRepository;
044    protected EventManager eventManager;
045    protected TriggerScheduler triggerScheduler;
046
047    public static Chain currentChain() {
048        return EXECUTION_THREAD_LOCAL.get();
049    }
050
051    public Chain(ChainDefinition definition, String stateInstanceId) {
052        this.definition = definition;
053        this.stateInstanceId = stateInstanceId;
054    }
055
056    public void notifyEvent(Event event) {
057        eventManager.notifyEvent(event, this);
058    }
059
060    public void setStatusAndNotifyEvent(ChainStatus status) {
061        AtomicReference<ChainStatus> before = new AtomicReference<>();
062        updateStateSafely(state -> {
063            before.set(state.getStatus());
064            state.setStatus(status);
065            return EnumSet.of(ChainStateField.STATUS);
066        });
067        notifyEvent(new ChainStatusChangeEvent(this, status, before.get()));
068    }
069
070    public void setStatusAndNotifyEvent(String stateInstanceId, ChainStatus status) {
071        AtomicReference<ChainStatus> before = new AtomicReference<>();
072        updateStateSafely(stateInstanceId, state -> {
073            before.set(state.getStatus());
074            state.setStatus(status);
075            return EnumSet.of(ChainStateField.STATUS);
076        });
077        notifyEvent(new ChainStatusChangeEvent(this, status, before.get()));
078    }
079
080    /**
081     * Safely updates the chain state with optimistic locking and retry-on-conflict.
082     *
083     * @param modifier the modifier that applies changes and declares updated fields
084     * @throws ChainUpdateTimeoutException if update cannot succeed within timeout
085     */
086    public ChainState updateStateSafely(ChainStateModifier modifier) {
087        return updateStateSafely(this.stateInstanceId, modifier);
088    }
089
090
091    public ChainState updateStateSafely(String stateInstanceId, ChainStateModifier modifier) {
092        final long timeoutMs = 30_000; // 30 seconds total timeout
093        final long maxRetryDelayMs = 100; // Maximum delay between retries
094
095        long startTime = System.currentTimeMillis();
096        int attempt = 0;
097        ChainState current = null;
098        while (System.currentTimeMillis() - startTime < timeoutMs) {
099            current = chainStateRepository.load(stateInstanceId);
100            if (current == null) {
101                throw new IllegalStateException("Chain state not found: " + stateInstanceId);
102            }
103
104            EnumSet<ChainStateField> updatedFields = modifier.modify(current);
105            if (updatedFields == null || updatedFields.isEmpty()) {
106                return current; // No actual changes, exit early
107            }
108
109            if (chainStateRepository.tryUpdate(current, updatedFields)) {
110                return current;
111            }
112
113            // Prepare next retry
114            attempt++;
115            long nextDelay = calculateNextRetryDelay(attempt, maxRetryDelayMs);
116            sleepUninterruptibly(nextDelay);
117        }
118
119        // Timeout reached
120        assert current != null;
121        String msg = String.format(
122                "Chain state update timeout after %d ms (instanceId: %s)",
123                timeoutMs, current.getInstanceId()
124        );
125        log.warn(msg);
126        throw new ChainUpdateTimeoutException(msg);
127    }
128
129
130    public NodeState updateNodeStateSafely(String nodeId, NodeStateModifier modifier) {
131        return this.updateNodeStateSafely(this.stateInstanceId, nodeId, modifier);
132    }
133
134    public NodeState updateNodeStateSafely(String stateInstanceId, String nodeId, NodeStateModifier modifier) {
135        final long timeoutMs = 30_000;
136        final long maxRetryDelayMs = 100;
137        long startTime = System.currentTimeMillis();
138        int attempt = 0;
139
140        while (System.currentTimeMillis() - startTime < timeoutMs) {
141            // 1. 加载最新 ChainState(获取 chainVersion)
142            ChainState chainState = chainStateRepository.load(stateInstanceId);
143            if (chainState == null) {
144                throw new IllegalStateException("Chain state not found");
145            }
146
147            // 2. 加载 NodeState
148            NodeState nodeState = nodeStateRepository.load(stateInstanceId, nodeId);
149            if (nodeState == null) {
150                nodeState = new NodeState();
151                nodeState.setChainInstanceId(chainState.getInstanceId());
152                nodeState.setNodeId(nodeId);
153            }
154
155            // 3. 应用修改
156            EnumSet<NodeStateField> updatedFields = modifier.modify(nodeState);
157
158            if (updatedFields == null || updatedFields.isEmpty()) {
159                return nodeState;
160            }
161
162            // 4. 尝试更新(传入 chainVersion 保证一致性)
163            if (nodeStateRepository.tryUpdate(nodeState, updatedFields, chainState.getVersion())) {
164                return nodeState;
165            }
166
167            // 5. 退避重试
168            attempt++;
169            sleepUninterruptibly(calculateNextRetryDelay(attempt, maxRetryDelayMs));
170        }
171
172        throw new ChainUpdateTimeoutException("Node state update timeout");
173    }
174
175
176    /**
177     * Calculates the next retry delay using exponential backoff with jitter.
178     *
179     * @param attempt    the current retry attempt (1-based)
180     * @param maxDelayMs the maximum delay in milliseconds
181     * @return the delay in milliseconds to wait before next retry
182     */
183    private long calculateNextRetryDelay(int attempt, long maxDelayMs) {
184        // Base delay: 10ms * (2^(attempt-1))
185        long baseDelay = 10L * (1L << (attempt - 1));
186
187        // Add jitter: ±25% randomness to avoid thundering herd
188        double jitterFactor = 0.75 + (Math.random() * 0.5); // [0.75, 1.25)
189        long delayWithJitter = (long) (baseDelay * jitterFactor);
190
191        // Clamp between 1ms and maxDelayMs
192        return Math.max(1L, Math.min(delayWithJitter, maxDelayMs));
193    }
194
195    /**
196     * Sleeps for the specified duration, silently ignoring interrupts
197     * but preserving the interrupt status.
198     *
199     * @param millis the length of time to sleep in milliseconds
200     */
201    private void sleepUninterruptibly(long millis) {
202        try {
203            Thread.sleep(millis);
204        } catch (InterruptedException e) {
205            Thread.currentThread().interrupt(); // Preserve interrupt status
206            // Do NOT throw here — we want to continue retrying
207        }
208    }
209
210
211    public void start(Map<String, Object> variables) {
212        Trigger prev = TriggerContext.getCurrentTrigger();
213        try {
214            // start 可能在 node 里执行一个新的 chain 的情况,
215            // 需要清空父级 chain 的 Trigger
216            TriggerContext.setCurrentTrigger(null);
217            updateStateSafely(state -> {
218                EnumSet<ChainStateField> fields = EnumSet.of(ChainStateField.STATUS);
219                state.setStatus(ChainStatus.RUNNING);
220
221                if (variables != null && !variables.isEmpty()) {
222                    state.getMemory().putAll(variables);
223                    fields.add(ChainStateField.MEMORY);
224                }
225
226                if (StringUtil.noText(state.getChainDefinitionId())) {
227                    state.setChainDefinitionId(definition.getId());
228                    fields.add(ChainStateField.CHAIN_DEFINITION_ID);
229                }
230
231                return fields;
232            });
233
234            notifyEvent(new ChainStartEvent(this, variables));
235            setStatusAndNotifyEvent(ChainStatus.RUNNING);
236
237            // 调度入口节点
238            List<Node> startNodes = definition.getStartNodes();
239            for (Node startNode : startNodes) {
240                scheduleNode(startNode, null, TriggerType.START, 0);
241            }
242        } finally {
243            // 恢复父级 chain 的 Trigger
244            TriggerContext.setCurrentTrigger(prev);
245        }
246    }
247
248    public void executeNode(Node node, Trigger trigger) {
249        try {
250            EXECUTION_THREAD_LOCAL.set(this);
251            ChainState chainState = getState();
252
253            // 当前处于挂起状态
254            if (chainState.getStatus() == ChainStatus.SUSPEND) {
255                updateStateSafely(state -> {
256                    chainState.addSuspendNodeId(node.getId());
257                    return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS);
258                });
259                return;
260            }
261            // 处于非运行状态,比如错误状态
262            else if (chainState.getStatus() != ChainStatus.RUNNING) {
263                return;
264            }
265
266            String triggerEdgeId = trigger.getEdgeId();
267            if (shouldSkipNode(node, triggerEdgeId)) {
268                return;
269            }
270
271            Map<String, Object> nodeResult = null;
272            Throwable error = null;
273            try {
274                NodeState nodeState = getNodeState(node.id);
275
276                // 如果节点状态不是运行中,则更新为运行中
277                // 目前只有 Loop 节点会处于 Running 状态,因为它会多次触发
278                if (nodeState.getStatus() != NodeStatus.RUNNING) {
279                    updateNodeStateSafely(node.id, s -> {
280                        s.setStatus(NodeStatus.RUNNING);
281                        s.recordExecute(triggerEdgeId);
282                        return EnumSet.of(NodeStateField.EXECUTE_COUNT, NodeStateField.EXECUTE_EDGE_IDS, NodeStateField.STATUS);
283                    });
284                    TriggerType type = trigger.getType();
285                    notifyEvent(new NodeStartEvent(this, node));
286                }
287                // 只需记录执行次数
288                else {
289                    updateNodeStateSafely(node.id, s -> {
290                        s.recordExecute(triggerEdgeId);
291                        return EnumSet.of(NodeStateField.EXECUTE_COUNT, NodeStateField.EXECUTE_EDGE_IDS);
292                    });
293                }
294
295                updateStateSafely(state -> {
296                    state.addTriggerNodeId(node.id);
297                    return EnumSet.of(ChainStateField.TRIGGER_NODE_IDS);
298                });
299
300                nodeResult = node.execute(this);
301            } catch (Throwable throwable) {
302                log.error("Node execute error", throwable);
303                error = throwable;
304            }
305            handleNodeResult(node, nodeResult, triggerEdgeId, error);
306        } finally {
307            EXECUTION_THREAD_LOCAL.remove();
308        }
309    }
310
311    public NodeState getNodeState(String nodeId) {
312        return getNodeState(this.stateInstanceId, nodeId);
313    }
314
315    public NodeState getNodeState(String stateInstanceId, String nodeId) {
316        return nodeStateRepository.load(stateInstanceId, nodeId);
317    }
318
319    public <T> T executeWithLock(String instanceId, long timeout, TimeUnit unit, Supplier<T> action) {
320        try (ChainLock lock = chainStateRepository.getLock(instanceId, timeout, unit)) {
321            if (!lock.isAcquired()) {
322                throw new ChainLockTimeoutException("Failed to acquire lock for instance: " + instanceId);
323            }
324            return action.get();
325        }
326    }
327
328    private boolean shouldSkipNode(Node node, String edgeId) {
329        return executeWithLock(stateInstanceId, 10, TimeUnit.SECONDS, () -> {
330            NodeState newState = updateNodeStateSafely(node.id, s -> {
331                s.recordTrigger(edgeId);
332                return EnumSet.of(NodeStateField.TRIGGER_COUNT, NodeStateField.TRIGGER_EDGE_IDS);
333            });
334
335            NodeCondition condition = node.getCondition();
336            if (condition == null) {
337                return false;
338            }
339            Map<String, Object> prevResult = Collections.emptyMap();
340            boolean shouldSkipNode = !condition.check(this, newState, prevResult);
341            if (shouldSkipNode) {
342                updateStateSafely(state -> {
343                    state.addUncheckedNodeId(node.id);
344                    return EnumSet.of(ChainStateField.UNCHECKED_NODE_IDS);
345                });
346            } else {
347                updateStateSafely(state -> {
348                    if (state.removeUncheckedNodeId(node.id)) {
349                        return EnumSet.of(ChainStateField.UNCHECKED_NODE_IDS);
350                    } else {
351                        return null;
352                    }
353                });
354            }
355            return shouldSkipNode;
356        });
357    }
358
359
360    private void handleNodeResult(Node node, Map<String, Object> prevNodeResult, String triggerEdgeId, Throwable error) {
361        ChainStatus finalChainStatus = null;
362        NodeStatus finalNodeStatus = null;
363        try {
364            if (error == null) {
365                // 更新 state 数据
366                updateStateSafely(state -> {
367                    EnumSet<ChainStateField> fields = EnumSet.of(ChainStateField.EXECUTE_RESULT);
368                    state.setExecuteResult(prevNodeResult);
369
370                    if (prevNodeResult != null && !prevNodeResult.isEmpty()) {
371                        prevNodeResult.forEach((k, v) -> {
372                            if (v != null) {
373                                state.getMemory().put(node.getId() + "." + k, v);
374                            }
375                        });
376                        fields.add(ChainStateField.MEMORY);
377                    }
378
379                    return fields;
380                });
381
382                if (node.isRetryEnable() && node.isResetRetryCountAfterNormal()) {
383                    updateNodeStateSafely(node.id, state -> {
384                        state.setRetryCount(0);
385                        return EnumSet.of(NodeStateField.RETRY_COUNT);
386                    });
387                }
388
389                finalNodeStatus = prevNodeResult == null ? null : (NodeStatus) prevNodeResult.get(ChainConsts.NODE_STATE_STATUS_KEY);
390
391                // 不调度下一个节点,由 node 自行调度,比如 Loop 循环
392                Boolean scheduleNextNodeDisabled = prevNodeResult == null ? null : (Boolean) prevNodeResult.get(ChainConsts.SCHEDULE_NEXT_NODE_DISABLED_KEY);
393                if (scheduleNextNodeDisabled != null && scheduleNextNodeDisabled) {
394                    return;
395                }
396
397                // 结束节点
398                finalChainStatus = prevNodeResult != null ? (ChainStatus) prevNodeResult.get(ChainConsts.CHAIN_STATE_STATUS_KEY) : null;
399                if (finalChainStatus != null && finalChainStatus.isTerminal()) {
400                    return;
401                }
402
403                // 调度下一个节点
404                scheduleNextForNode(node, prevNodeResult, triggerEdgeId);
405            } else {
406                // 挂起
407                if (error instanceof ChainSuspendException) {
408                    updateNodeStateSafely(node.id, s -> {
409                        s.setStatus(NodeStatus.SUSPEND);
410                        return EnumSet.of(NodeStateField.STATUS);
411                    });
412
413                    updateStateSafely(s -> {
414                        s.addSuspendNodeId(node.getId());
415                        s.addSuspendForParameters(((ChainSuspendException) error).getSuspendParameters());
416                        return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS, ChainStateField.SUSPEND_FOR_PARAMETERS);
417                    });
418
419                    finalNodeStatus = NodeStatus.SUSPEND;
420                    finalChainStatus = ChainStatus.SUSPEND;
421                }
422                // 失败
423                else {
424                    NodeState newState = updateNodeStateSafely(node.getId(), s -> {
425                        s.setStatus(NodeStatus.ERROR);
426                        s.setError(new ExceptionSummary(error));
427                        return EnumSet.of(NodeStateField.ERROR, NodeStateField.STATUS);
428                    });
429
430                    eventManager.notifyNodeError(error, node, prevNodeResult, this);
431
432                    if (node.isRetryEnable()
433                            && node.getMaxRetryCount() > 0
434                            && newState.getRetryCount() < node.getMaxRetryCount()) {
435
436                        updateNodeStateSafely(node.getId(), s -> {
437                            s.setRetryCount(s.getRetryCount() + 1);
438                            return EnumSet.of(NodeStateField.RETRY_COUNT);
439                        });
440
441                        scheduleNode(node, triggerEdgeId, TriggerType.RETRY, node.getRetryIntervalMs());
442                    } else {
443                        // close https://gitee.com/tinyflow-ai/tinyflow-java/issues/IEZAK0
444                        finalNodeStatus = NodeStatus.FAILED;
445                        finalChainStatus = handleNodeError(node.id, error);
446                    }
447                }
448            }
449        } catch (Exception e) {
450            // 在 scheduleNextForNode 执行时, js 判断等可能会出错。
451            finalNodeStatus = NodeStatus.FAILED;
452            finalChainStatus = handleNodeError(node.id, e);
453        } finally {
454            // 如果当前的工作流正在执行中,则不发送 NodeEndEvent 事件
455            if (finalNodeStatus != NodeStatus.RUNNING) {
456                NodeStatus nodeStatus = finalNodeStatus == null ? NodeStatus.SUCCEEDED : finalNodeStatus;
457                updateNodeStateSafely(node.id, state -> {
458                    state.setStatus(nodeStatus);
459                    return EnumSet.of(NodeStateField.STATUS);
460                });
461                notifyEvent(new NodeEndEvent(this, node, prevNodeResult, error));
462            }
463
464            if (finalChainStatus != null) {
465                setStatusAndNotifyEvent(finalChainStatus);
466
467                // chain 执行结束
468                if (finalChainStatus.isTerminal()) {
469                    eventManager.notifyEvent(new ChainEndEvent(this), this);
470
471                    // 执行结束,但是未执行成功,失败和取消等
472                    // 更新父级链的状态
473                    if (!finalChainStatus.isSuccess()) {
474                        ChainState currentState = getState();
475                        ChainStatus currentStatus = finalChainStatus;
476                        while (currentState != null && StringUtil.hasText(currentState.getParentInstanceId())) {
477                            updateStateSafely(currentState.getParentInstanceId(), state -> {
478                                state.setStatus(currentStatus);
479                                return EnumSet.of(ChainStateField.STATUS);
480                            });
481                            setStatusAndNotifyEvent(currentState.getParentInstanceId(), currentStatus);
482                            currentState = getState(currentState.getParentInstanceId());
483                        }
484                    }
485                }
486            }
487        }
488    }
489
490    /**
491     * 为指定节点调度下一次执行
492     *
493     * @param node      要调度的节点
494     * @param result    节点执行结果
495     * @param byEdigeId 触发边的ID
496     */
497    private void scheduleNextForNode(Node node, Map<String, Object> result, String byEdigeId) {
498        // 如果节点不支持循环,则直接调度向外的节点
499        if (!node.isLoopEnable()) {
500            scheduleOutwardNodes(node, result);
501            return;
502        }
503
504        NodeState nodeState = getNodeState(node.getId());
505        // 如果达到最大循环次数限制,则调度向外的节点
506        if (node.getMaxLoopCount() > 0 && nodeState.getLoopCount() >= node.getMaxLoopCount()) {
507            scheduleOutwardNodes(node, result);
508            return;
509        }
510
511        // 检查循环中断条件,如果满足则调度向外的节点
512        NodeCondition breakCondition = node.getLoopBreakCondition();
513        if (breakCondition != null && breakCondition.check(this, nodeState, result)) {
514            scheduleOutwardNodes(node, result);
515            return;
516        }
517
518        // 增加循环计数并重新调度当前节点
519        updateNodeStateSafely(node.getId(), s -> {
520            s.setLoopCount(s.getLoopCount() + 1);
521            return EnumSet.of(NodeStateField.LOOP_COUNT);
522        });
523
524        scheduleNode(node, byEdigeId, TriggerType.LOOP, node.getLoopIntervalMs());
525    }
526
527
528    private void scheduleOutwardNodes(Node node, Map<String, Object> result) {
529        List<Edge> edges = definition.getOutwardEdge(node.getId());
530        if (!CollectionUtil.hasItems(edges)) {
531            // 当前节点没有向外的边,则调度父节点(自动回归父节点) 用在 Loop 循环等场景
532            if (StringUtil.hasText(node.getParentId())) {
533                Node parent = definition.getNodeById(node.getParentId());
534                scheduleNode(parent, null, TriggerType.PARENT, 0L);
535            }
536            return;
537        }
538
539        // 检查所有向外的边是不是同一个父节点
540        boolean allNotSameParent = false;
541        boolean scheduleSuccess = false;
542        for (Edge edge : edges) {
543            Node nextNode = definition.getNodeById(edge.getTarget());
544            if (nextNode == null) {
545                throw new ChainException("Invalid edge target: " + edge.getTarget());
546            }
547
548            // 如果存在不同父节点的边,则跳过, 比如 Loop 节点可能只有子节点,没有后续的节点
549            if (!isSameParent(node, nextNode)) {
550                allNotSameParent = true;
551                continue;
552            }
553
554            allNotSameParent = false;
555            EdgeCondition edgeCondition = edge.getCondition();
556            if (edgeCondition == null) {
557                scheduleNode(nextNode, edge.getId(), TriggerType.NEXT, 0L);
558                scheduleSuccess = true;
559                continue;
560            }
561
562            if (edgeCondition.check(this, edge, result)) {
563                updateStateSafely(state -> {
564                    if (state.removeUncheckedEdgeId(edge.getId())) {
565                        return EnumSet.of(ChainStateField.UNCHECKED_EDGE_IDS);
566                    } else {
567                        return null;
568                    }
569                });
570                scheduleNode(nextNode, edge.getId(), TriggerType.NEXT, 0L);
571                scheduleSuccess = true;
572            } else {
573                updateStateSafely(state -> {
574                    state.addUncheckedEdgeId(edge.getId());
575                    return EnumSet.of(ChainStateField.UNCHECKED_EDGE_IDS);
576                });
577                eventManager.notifyEvent(new EdgeConditionCheckFailedEvent(this, edge, node, result), this);
578            }
579        }
580
581        // 如果所有向外的边都不满足条件,则调度父节点(自动回归父节点) 用在 Loop 循环嵌套等场景(Loop 下的第一个节点是 Loop)
582        if (allNotSameParent && !scheduleSuccess) {
583            if (StringUtil.hasText(node.getParentId())) {
584                Node parent = definition.getNodeById(node.getParentId());
585                scheduleNode(parent, null, TriggerType.PARENT, 0L);
586            }
587        }
588    }
589
590    /**
591     * 判断两个节点是否具有相同的父节点
592     *
593     * @param node 第一个节点
594     * @param next 第二个节点
595     * @return 如果两个节点的父节点ID相同则返回true,否则返回false
596     */
597    private boolean isSameParent(Node node, Node next) {
598        // 如果两个节点的父节点ID都为空或空白,则认为是相同父节点
599        if (StringUtil.noText(node.getParentId()) && StringUtil.noText(next.getParentId())) {
600            return true;
601        }
602
603        // 比较两个节点的父节点ID是否相等
604        return node.getParentId() != null && node.getParentId().equals(next.getParentId());
605    }
606
607
608    public void scheduleNode(Node node, String edgeId, TriggerType type, long delayMs) {
609        Trigger trigger = new Trigger();
610        trigger.setStateInstanceId(stateInstanceId);
611        trigger.setEdgeId(edgeId);
612        trigger.setNodeId(node.getId());
613        trigger.setType(type);
614        trigger.setTriggerAt(System.currentTimeMillis() + delayMs);
615
616        if (edgeId != null) {
617            updateStateSafely(state -> {
618                state.addTriggerEdgeId(edgeId);
619                return EnumSet.of(ChainStateField.TRIGGER_EDGE_IDS);
620            });
621
622            eventManager.notifyEvent(new EdgeTriggerEvent(this, trigger), this);
623        }
624
625        getTriggerScheduler().schedule(trigger);
626    }
627
628
629    private ChainStatus handleNodeError(String nodeId, Throwable throwable) {
630        updateNodeStateSafely(nodeId, s -> {
631            s.setStatus(NodeStatus.FAILED);
632            s.setError(new ExceptionSummary(throwable));
633            return EnumSet.of(NodeStateField.ERROR, NodeStateField.STATUS);
634        });
635
636        updateStateSafely(state -> {
637            state.setError(new ExceptionSummary(throwable));
638            return EnumSet.of(ChainStateField.ERROR);
639        });
640
641        setStatusAndNotifyEvent(ChainStatus.FAILED);
642        eventManager.notifyChainError(throwable, this);
643        return ChainStatus.FAILED;
644    }
645
646    public void suspend() {
647        setStatusAndNotifyEvent(ChainStatus.SUSPEND);
648    }
649
650    public void suspend(Node node) {
651        updateStateSafely(state -> {
652            state.addSuspendNodeId(node.getId());
653            return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS);
654        });
655        setStatusAndNotifyEvent(ChainStatus.SUSPEND);
656    }
657
658
659    public void resume(Map<String, Object> variables) {
660        ChainState newState = updateStateSafely(state -> {
661            if (variables != null) {
662                state.getMemory().putAll(variables);
663                return EnumSet.of(ChainStateField.MEMORY);
664            } else {
665                return null;
666            }
667        });
668
669        notifyEvent(new ChainResumeEvent(this, variables));
670        setStatusAndNotifyEvent(ChainStatus.RUNNING);
671
672        Set<String> suspendNodeIds = newState.getSuspendNodeIds();
673        if (suspendNodeIds != null && !suspendNodeIds.isEmpty()) {
674            // 移除 suspend 状态,方便二次 suspend 时,不带有旧数据
675            updateStateSafely(state -> {
676                state.setSuspendNodeIds(null);
677                state.setSuspendForParameters(null);
678                return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS, ChainStateField.SUSPEND_FOR_PARAMETERS);
679            });
680
681            for (String id : suspendNodeIds) {
682                Node node = definition.getNodeById(id);
683                if (node == null) {
684                    throw new ChainException("Node not found: " + id);
685                }
686                scheduleNode(node, null, TriggerType.RESUME, 0L);
687            }
688        }
689    }
690
691    public void resume() {
692        resume(Collections.emptyMap());
693    }
694
695    public void output(Node node, Object response) {
696        eventManager.notifyOutput(this, node, response);
697    }
698
699
700    public EventManager getEventManager() {
701        return eventManager;
702    }
703
704    public void setEventManager(EventManager eventManager) {
705        this.eventManager = eventManager;
706    }
707
708    public ChainStateRepository getChainStateRepository() {
709        return chainStateRepository;
710    }
711
712    public void setChainStateRepository(ChainStateRepository chainStateRepository) {
713        this.chainStateRepository = chainStateRepository;
714    }
715
716    public ChainDefinition getDefinition() {
717        return definition;
718    }
719
720    public TriggerScheduler getTriggerScheduler() {
721        if (this.triggerScheduler == null) {
722            this.triggerScheduler = ChainRuntime.triggerScheduler();
723        }
724        return triggerScheduler;
725    }
726
727    public void setTriggerScheduler(TriggerScheduler triggerScheduler) {
728        this.triggerScheduler = triggerScheduler;
729    }
730
731    public NodeStateRepository getNodeStateRepository() {
732        return nodeStateRepository;
733    }
734
735    public void setNodeStateRepository(NodeStateRepository nodeStateRepository) {
736        this.nodeStateRepository = nodeStateRepository;
737    }
738
739    public String getStateInstanceId() {
740        return stateInstanceId;
741    }
742
743    public ChainState getState() {
744        return chainStateRepository.load(stateInstanceId);
745    }
746
747    public ChainState getState(String stateInstanceId) {
748        return chainStateRepository.load(stateInstanceId);
749    }
750
751    public void setStateInstanceId(String stateInstanceId) {
752        this.stateInstanceId = stateInstanceId;
753    }
754}