001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain; 017 018import dev.tinyflow.core.chain.event.*; 019import dev.tinyflow.core.chain.repository.*; 020import dev.tinyflow.core.chain.runtime.*; 021import dev.tinyflow.core.util.CollectionUtil; 022import dev.tinyflow.core.util.StringUtil; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import java.util.*; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicReference; 029import java.util.function.Supplier; 030 031 032public class Chain { 033 034 private static final Logger log = LoggerFactory.getLogger(Chain.class); 035 private static final ThreadLocal<Chain> EXECUTION_THREAD_LOCAL = new ThreadLocal<>(); 036 037 038 protected final ChainDefinition definition; 039 protected String stateInstanceId; 040 041 // protected final ChainState state; 042 protected ChainStateRepository chainStateRepository; 043 protected NodeStateRepository nodeStateRepository; 044 protected EventManager eventManager; 045 protected TriggerScheduler triggerScheduler; 046 047 public static Chain currentChain() { 048 return EXECUTION_THREAD_LOCAL.get(); 049 } 050 051 public Chain(ChainDefinition definition, String stateInstanceId) { 052 this.definition = definition; 053 this.stateInstanceId = stateInstanceId; 054 } 055 056 public void notifyEvent(Event event) { 057 eventManager.notifyEvent(event, this); 058 } 059 060 public void setStatusAndNotifyEvent(ChainStatus status) { 061 AtomicReference<ChainStatus> before = new AtomicReference<>(); 062 updateStateSafely(state -> { 063 before.set(state.getStatus()); 064 state.setStatus(status); 065 return EnumSet.of(ChainStateField.STATUS); 066 }); 067 notifyEvent(new ChainStatusChangeEvent(this, status, before.get())); 068 } 069 070 public void setStatusAndNotifyEvent(String stateInstanceId, ChainStatus status) { 071 AtomicReference<ChainStatus> before = new AtomicReference<>(); 072 updateStateSafely(stateInstanceId, state -> { 073 before.set(state.getStatus()); 074 state.setStatus(status); 075 return EnumSet.of(ChainStateField.STATUS); 076 }); 077 notifyEvent(new ChainStatusChangeEvent(this, status, before.get())); 078 } 079 080 /** 081 * Safely updates the chain state with optimistic locking and retry-on-conflict. 082 * 083 * @param modifier the modifier that applies changes and declares updated fields 084 * @throws ChainUpdateTimeoutException if update cannot succeed within timeout 085 */ 086 public ChainState updateStateSafely(ChainStateModifier modifier) { 087 return updateStateSafely(this.stateInstanceId, modifier); 088 } 089 090 091 public ChainState updateStateSafely(String stateInstanceId, ChainStateModifier modifier) { 092 final long timeoutMs = 30_000; // 30 seconds total timeout 093 final long maxRetryDelayMs = 100; // Maximum delay between retries 094 095 long startTime = System.currentTimeMillis(); 096 int attempt = 0; 097 ChainState current = null; 098 while (System.currentTimeMillis() - startTime < timeoutMs) { 099 current = chainStateRepository.load(stateInstanceId); 100 if (current == null) { 101 throw new IllegalStateException("Chain state not found: " + stateInstanceId); 102 } 103 104 EnumSet<ChainStateField> updatedFields = modifier.modify(current); 105 if (updatedFields == null || updatedFields.isEmpty()) { 106 return current; // No actual changes, exit early 107 } 108 109 if (chainStateRepository.tryUpdate(current, updatedFields)) { 110 return current; 111 } 112 113 // Prepare next retry 114 attempt++; 115 long nextDelay = calculateNextRetryDelay(attempt, maxRetryDelayMs); 116 sleepUninterruptibly(nextDelay); 117 } 118 119 // Timeout reached 120 assert current != null; 121 String msg = String.format( 122 "Chain state update timeout after %d ms (instanceId: %s)", 123 timeoutMs, current.getInstanceId() 124 ); 125 log.warn(msg); 126 throw new ChainUpdateTimeoutException(msg); 127 } 128 129 130 public NodeState updateNodeStateSafely(String nodeId, NodeStateModifier modifier) { 131 return this.updateNodeStateSafely(this.stateInstanceId, nodeId, modifier); 132 } 133 134 public NodeState updateNodeStateSafely(String stateInstanceId, String nodeId, NodeStateModifier modifier) { 135 final long timeoutMs = 30_000; 136 final long maxRetryDelayMs = 100; 137 long startTime = System.currentTimeMillis(); 138 int attempt = 0; 139 140 while (System.currentTimeMillis() - startTime < timeoutMs) { 141 // 1. 加载最新 ChainState(获取 chainVersion) 142 ChainState chainState = chainStateRepository.load(stateInstanceId); 143 if (chainState == null) { 144 throw new IllegalStateException("Chain state not found"); 145 } 146 147 // 2. 加载 NodeState 148 NodeState nodeState = nodeStateRepository.load(stateInstanceId, nodeId); 149 if (nodeState == null) { 150 nodeState = new NodeState(); 151 nodeState.setChainInstanceId(chainState.getInstanceId()); 152 nodeState.setNodeId(nodeId); 153 } 154 155 // 3. 应用修改 156 EnumSet<NodeStateField> updatedFields = modifier.modify(nodeState); 157 158 if (updatedFields == null || updatedFields.isEmpty()) { 159 return nodeState; 160 } 161 162 // 4. 尝试更新(传入 chainVersion 保证一致性) 163 if (nodeStateRepository.tryUpdate(nodeState, updatedFields, chainState.getVersion())) { 164 return nodeState; 165 } 166 167 // 5. 退避重试 168 attempt++; 169 sleepUninterruptibly(calculateNextRetryDelay(attempt, maxRetryDelayMs)); 170 } 171 172 throw new ChainUpdateTimeoutException("Node state update timeout"); 173 } 174 175 176 /** 177 * Calculates the next retry delay using exponential backoff with jitter. 178 * 179 * @param attempt the current retry attempt (1-based) 180 * @param maxDelayMs the maximum delay in milliseconds 181 * @return the delay in milliseconds to wait before next retry 182 */ 183 private long calculateNextRetryDelay(int attempt, long maxDelayMs) { 184 // Base delay: 10ms * (2^(attempt-1)) 185 long baseDelay = 10L * (1L << (attempt - 1)); 186 187 // Add jitter: ±25% randomness to avoid thundering herd 188 double jitterFactor = 0.75 + (Math.random() * 0.5); // [0.75, 1.25) 189 long delayWithJitter = (long) (baseDelay * jitterFactor); 190 191 // Clamp between 1ms and maxDelayMs 192 return Math.max(1L, Math.min(delayWithJitter, maxDelayMs)); 193 } 194 195 /** 196 * Sleeps for the specified duration, silently ignoring interrupts 197 * but preserving the interrupt status. 198 * 199 * @param millis the length of time to sleep in milliseconds 200 */ 201 private void sleepUninterruptibly(long millis) { 202 try { 203 Thread.sleep(millis); 204 } catch (InterruptedException e) { 205 Thread.currentThread().interrupt(); // Preserve interrupt status 206 // Do NOT throw here — we want to continue retrying 207 } 208 } 209 210 211 public void start(Map<String, Object> variables) { 212 Trigger prev = TriggerContext.getCurrentTrigger(); 213 try { 214 // start 可能在 node 里执行一个新的 chain 的情况, 215 // 需要清空父级 chain 的 Trigger 216 TriggerContext.setCurrentTrigger(null); 217 updateStateSafely(state -> { 218 EnumSet<ChainStateField> fields = EnumSet.of(ChainStateField.STATUS); 219 state.setStatus(ChainStatus.RUNNING); 220 221 if (variables != null && !variables.isEmpty()) { 222 state.getMemory().putAll(variables); 223 fields.add(ChainStateField.MEMORY); 224 } 225 226 if (StringUtil.noText(state.getChainDefinitionId())) { 227 state.setChainDefinitionId(definition.getId()); 228 fields.add(ChainStateField.CHAIN_DEFINITION_ID); 229 } 230 231 return fields; 232 }); 233 234 notifyEvent(new ChainStartEvent(this, variables)); 235 setStatusAndNotifyEvent(ChainStatus.RUNNING); 236 237 // 调度入口节点 238 List<Node> startNodes = definition.getStartNodes(); 239 for (Node startNode : startNodes) { 240 scheduleNode(startNode, null, TriggerType.START, 0); 241 } 242 } finally { 243 // 恢复父级 chain 的 Trigger 244 TriggerContext.setCurrentTrigger(prev); 245 } 246 } 247 248 public void executeNode(Node node, Trigger trigger) { 249 try { 250 EXECUTION_THREAD_LOCAL.set(this); 251 ChainState chainState = getState(); 252 253 // 当前处于挂起状态 254 if (chainState.getStatus() == ChainStatus.SUSPEND) { 255 updateStateSafely(state -> { 256 chainState.addSuspendNodeId(node.getId()); 257 return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS); 258 }); 259 return; 260 } 261 // 处于非运行状态,比如错误状态 262 else if (chainState.getStatus() != ChainStatus.RUNNING) { 263 return; 264 } 265 266 String triggerEdgeId = trigger.getEdgeId(); 267 if (shouldSkipNode(node, triggerEdgeId)) { 268 return; 269 } 270 271 Map<String, Object> nodeResult = null; 272 Throwable error = null; 273 try { 274 NodeState nodeState = getNodeState(node.id); 275 276 // 如果节点状态不是运行中,则更新为运行中 277 // 目前只有 Loop 节点会处于 Running 状态,因为它会多次触发 278 if (nodeState.getStatus() != NodeStatus.RUNNING) { 279 updateNodeStateSafely(node.id, s -> { 280 s.setStatus(NodeStatus.RUNNING); 281 s.recordExecute(triggerEdgeId); 282 return EnumSet.of(NodeStateField.EXECUTE_COUNT, NodeStateField.EXECUTE_EDGE_IDS, NodeStateField.STATUS); 283 }); 284 TriggerType type = trigger.getType(); 285 notifyEvent(new NodeStartEvent(this, node)); 286 } 287 // 只需记录执行次数 288 else { 289 updateNodeStateSafely(node.id, s -> { 290 s.recordExecute(triggerEdgeId); 291 return EnumSet.of(NodeStateField.EXECUTE_COUNT, NodeStateField.EXECUTE_EDGE_IDS); 292 }); 293 } 294 295 updateStateSafely(state -> { 296 state.addTriggerNodeId(node.id); 297 return EnumSet.of(ChainStateField.TRIGGER_NODE_IDS); 298 }); 299 300 nodeResult = node.execute(this); 301 } catch (Throwable throwable) { 302 log.error("Node execute error", throwable); 303 error = throwable; 304 } 305 handleNodeResult(node, nodeResult, triggerEdgeId, error); 306 } finally { 307 EXECUTION_THREAD_LOCAL.remove(); 308 } 309 } 310 311 public NodeState getNodeState(String nodeId) { 312 return getNodeState(this.stateInstanceId, nodeId); 313 } 314 315 public NodeState getNodeState(String stateInstanceId, String nodeId) { 316 return nodeStateRepository.load(stateInstanceId, nodeId); 317 } 318 319 public <T> T executeWithLock(String instanceId, long timeout, TimeUnit unit, Supplier<T> action) { 320 try (ChainLock lock = chainStateRepository.getLock(instanceId, timeout, unit)) { 321 if (!lock.isAcquired()) { 322 throw new ChainLockTimeoutException("Failed to acquire lock for instance: " + instanceId); 323 } 324 return action.get(); 325 } 326 } 327 328 private boolean shouldSkipNode(Node node, String edgeId) { 329 return executeWithLock(stateInstanceId, 10, TimeUnit.SECONDS, () -> { 330 NodeState newState = updateNodeStateSafely(node.id, s -> { 331 s.recordTrigger(edgeId); 332 return EnumSet.of(NodeStateField.TRIGGER_COUNT, NodeStateField.TRIGGER_EDGE_IDS); 333 }); 334 335 NodeCondition condition = node.getCondition(); 336 if (condition == null) { 337 return false; 338 } 339 Map<String, Object> prevResult = Collections.emptyMap(); 340 boolean shouldSkipNode = !condition.check(this, newState, prevResult); 341 if (shouldSkipNode) { 342 updateStateSafely(state -> { 343 state.addUncheckedNodeId(node.id); 344 return EnumSet.of(ChainStateField.UNCHECKED_NODE_IDS); 345 }); 346 } else { 347 updateStateSafely(state -> { 348 if (state.removeUncheckedNodeId(node.id)) { 349 return EnumSet.of(ChainStateField.UNCHECKED_NODE_IDS); 350 } else { 351 return null; 352 } 353 }); 354 } 355 return shouldSkipNode; 356 }); 357 } 358 359 360 private void handleNodeResult(Node node, Map<String, Object> prevNodeResult, String triggerEdgeId, Throwable error) { 361 ChainStatus finalChainStatus = null; 362 NodeStatus finalNodeStatus = null; 363 try { 364 if (error == null) { 365 // 更新 state 数据 366 updateStateSafely(state -> { 367 EnumSet<ChainStateField> fields = EnumSet.of(ChainStateField.EXECUTE_RESULT); 368 state.setExecuteResult(prevNodeResult); 369 370 if (prevNodeResult != null && !prevNodeResult.isEmpty()) { 371 prevNodeResult.forEach((k, v) -> { 372 if (v != null) { 373 state.getMemory().put(node.getId() + "." + k, v); 374 } 375 }); 376 fields.add(ChainStateField.MEMORY); 377 } 378 379 return fields; 380 }); 381 382 if (node.isRetryEnable() && node.isResetRetryCountAfterNormal()) { 383 updateNodeStateSafely(node.id, state -> { 384 state.setRetryCount(0); 385 return EnumSet.of(NodeStateField.RETRY_COUNT); 386 }); 387 } 388 389 finalNodeStatus = prevNodeResult == null ? null : (NodeStatus) prevNodeResult.get(ChainConsts.NODE_STATE_STATUS_KEY); 390 391 // 不调度下一个节点,由 node 自行调度,比如 Loop 循环 392 Boolean scheduleNextNodeDisabled = prevNodeResult == null ? null : (Boolean) prevNodeResult.get(ChainConsts.SCHEDULE_NEXT_NODE_DISABLED_KEY); 393 if (scheduleNextNodeDisabled != null && scheduleNextNodeDisabled) { 394 return; 395 } 396 397 // 结束节点 398 finalChainStatus = prevNodeResult != null ? (ChainStatus) prevNodeResult.get(ChainConsts.CHAIN_STATE_STATUS_KEY) : null; 399 if (finalChainStatus != null && finalChainStatus.isTerminal()) { 400 return; 401 } 402 403 // 调度下一个节点 404 scheduleNextForNode(node, prevNodeResult, triggerEdgeId); 405 } else { 406 // 挂起 407 if (error instanceof ChainSuspendException) { 408 updateNodeStateSafely(node.id, s -> { 409 s.setStatus(NodeStatus.SUSPEND); 410 return EnumSet.of(NodeStateField.STATUS); 411 }); 412 413 updateStateSafely(s -> { 414 s.addSuspendNodeId(node.getId()); 415 s.addSuspendForParameters(((ChainSuspendException) error).getSuspendParameters()); 416 return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS, ChainStateField.SUSPEND_FOR_PARAMETERS); 417 }); 418 419 finalNodeStatus = NodeStatus.SUSPEND; 420 finalChainStatus = ChainStatus.SUSPEND; 421 } 422 // 失败 423 else { 424 NodeState newState = updateNodeStateSafely(node.getId(), s -> { 425 s.setStatus(NodeStatus.ERROR); 426 s.setError(new ExceptionSummary(error)); 427 return EnumSet.of(NodeStateField.ERROR, NodeStateField.STATUS); 428 }); 429 430 eventManager.notifyNodeError(error, node, prevNodeResult, this); 431 432 if (node.isRetryEnable() 433 && node.getMaxRetryCount() > 0 434 && newState.getRetryCount() < node.getMaxRetryCount()) { 435 436 updateNodeStateSafely(node.getId(), s -> { 437 s.setRetryCount(s.getRetryCount() + 1); 438 return EnumSet.of(NodeStateField.RETRY_COUNT); 439 }); 440 441 scheduleNode(node, triggerEdgeId, TriggerType.RETRY, node.getRetryIntervalMs()); 442 } else { 443 // close https://gitee.com/tinyflow-ai/tinyflow-java/issues/IEZAK0 444 finalNodeStatus = NodeStatus.FAILED; 445 finalChainStatus = handleNodeError(node.id, error); 446 } 447 } 448 } 449 } catch (Exception e) { 450 // 在 scheduleNextForNode 执行时, js 判断等可能会出错。 451 finalNodeStatus = NodeStatus.FAILED; 452 finalChainStatus = handleNodeError(node.id, e); 453 } finally { 454 // 如果当前的工作流正在执行中,则不发送 NodeEndEvent 事件 455 if (finalNodeStatus != NodeStatus.RUNNING) { 456 NodeStatus nodeStatus = finalNodeStatus == null ? NodeStatus.SUCCEEDED : finalNodeStatus; 457 updateNodeStateSafely(node.id, state -> { 458 state.setStatus(nodeStatus); 459 return EnumSet.of(NodeStateField.STATUS); 460 }); 461 notifyEvent(new NodeEndEvent(this, node, prevNodeResult, error)); 462 } 463 464 if (finalChainStatus != null) { 465 setStatusAndNotifyEvent(finalChainStatus); 466 467 // chain 执行结束 468 if (finalChainStatus.isTerminal()) { 469 eventManager.notifyEvent(new ChainEndEvent(this), this); 470 471 // 执行结束,但是未执行成功,失败和取消等 472 // 更新父级链的状态 473 if (!finalChainStatus.isSuccess()) { 474 ChainState currentState = getState(); 475 ChainStatus currentStatus = finalChainStatus; 476 while (currentState != null && StringUtil.hasText(currentState.getParentInstanceId())) { 477 updateStateSafely(currentState.getParentInstanceId(), state -> { 478 state.setStatus(currentStatus); 479 return EnumSet.of(ChainStateField.STATUS); 480 }); 481 setStatusAndNotifyEvent(currentState.getParentInstanceId(), currentStatus); 482 currentState = getState(currentState.getParentInstanceId()); 483 } 484 } 485 } 486 } 487 } 488 } 489 490 /** 491 * 为指定节点调度下一次执行 492 * 493 * @param node 要调度的节点 494 * @param result 节点执行结果 495 * @param byEdigeId 触发边的ID 496 */ 497 private void scheduleNextForNode(Node node, Map<String, Object> result, String byEdigeId) { 498 // 如果节点不支持循环,则直接调度向外的节点 499 if (!node.isLoopEnable()) { 500 scheduleOutwardNodes(node, result); 501 return; 502 } 503 504 NodeState nodeState = getNodeState(node.getId()); 505 // 如果达到最大循环次数限制,则调度向外的节点 506 if (node.getMaxLoopCount() > 0 && nodeState.getLoopCount() >= node.getMaxLoopCount()) { 507 scheduleOutwardNodes(node, result); 508 return; 509 } 510 511 // 检查循环中断条件,如果满足则调度向外的节点 512 NodeCondition breakCondition = node.getLoopBreakCondition(); 513 if (breakCondition != null && breakCondition.check(this, nodeState, result)) { 514 scheduleOutwardNodes(node, result); 515 return; 516 } 517 518 // 增加循环计数并重新调度当前节点 519 updateNodeStateSafely(node.getId(), s -> { 520 s.setLoopCount(s.getLoopCount() + 1); 521 return EnumSet.of(NodeStateField.LOOP_COUNT); 522 }); 523 524 scheduleNode(node, byEdigeId, TriggerType.LOOP, node.getLoopIntervalMs()); 525 } 526 527 528 private void scheduleOutwardNodes(Node node, Map<String, Object> result) { 529 List<Edge> edges = definition.getOutwardEdge(node.getId()); 530 if (!CollectionUtil.hasItems(edges)) { 531 // 当前节点没有向外的边,则调度父节点(自动回归父节点) 用在 Loop 循环等场景 532 if (StringUtil.hasText(node.getParentId())) { 533 Node parent = definition.getNodeById(node.getParentId()); 534 scheduleNode(parent, null, TriggerType.PARENT, 0L); 535 } 536 return; 537 } 538 539 // 检查所有向外的边是不是同一个父节点 540 boolean allNotSameParent = false; 541 boolean scheduleSuccess = false; 542 for (Edge edge : edges) { 543 Node nextNode = definition.getNodeById(edge.getTarget()); 544 if (nextNode == null) { 545 throw new ChainException("Invalid edge target: " + edge.getTarget()); 546 } 547 548 // 如果存在不同父节点的边,则跳过, 比如 Loop 节点可能只有子节点,没有后续的节点 549 if (!isSameParent(node, nextNode)) { 550 allNotSameParent = true; 551 continue; 552 } 553 554 allNotSameParent = false; 555 EdgeCondition edgeCondition = edge.getCondition(); 556 if (edgeCondition == null) { 557 scheduleNode(nextNode, edge.getId(), TriggerType.NEXT, 0L); 558 scheduleSuccess = true; 559 continue; 560 } 561 562 if (edgeCondition.check(this, edge, result)) { 563 updateStateSafely(state -> { 564 if (state.removeUncheckedEdgeId(edge.getId())) { 565 return EnumSet.of(ChainStateField.UNCHECKED_EDGE_IDS); 566 } else { 567 return null; 568 } 569 }); 570 scheduleNode(nextNode, edge.getId(), TriggerType.NEXT, 0L); 571 scheduleSuccess = true; 572 } else { 573 updateStateSafely(state -> { 574 state.addUncheckedEdgeId(edge.getId()); 575 return EnumSet.of(ChainStateField.UNCHECKED_EDGE_IDS); 576 }); 577 eventManager.notifyEvent(new EdgeConditionCheckFailedEvent(this, edge, node, result), this); 578 } 579 } 580 581 // 如果所有向外的边都不满足条件,则调度父节点(自动回归父节点) 用在 Loop 循环嵌套等场景(Loop 下的第一个节点是 Loop) 582 if (allNotSameParent && !scheduleSuccess) { 583 if (StringUtil.hasText(node.getParentId())) { 584 Node parent = definition.getNodeById(node.getParentId()); 585 scheduleNode(parent, null, TriggerType.PARENT, 0L); 586 } 587 } 588 } 589 590 /** 591 * 判断两个节点是否具有相同的父节点 592 * 593 * @param node 第一个节点 594 * @param next 第二个节点 595 * @return 如果两个节点的父节点ID相同则返回true,否则返回false 596 */ 597 private boolean isSameParent(Node node, Node next) { 598 // 如果两个节点的父节点ID都为空或空白,则认为是相同父节点 599 if (StringUtil.noText(node.getParentId()) && StringUtil.noText(next.getParentId())) { 600 return true; 601 } 602 603 // 比较两个节点的父节点ID是否相等 604 return node.getParentId() != null && node.getParentId().equals(next.getParentId()); 605 } 606 607 608 public void scheduleNode(Node node, String edgeId, TriggerType type, long delayMs) { 609 Trigger trigger = new Trigger(); 610 trigger.setStateInstanceId(stateInstanceId); 611 trigger.setEdgeId(edgeId); 612 trigger.setNodeId(node.getId()); 613 trigger.setType(type); 614 trigger.setTriggerAt(System.currentTimeMillis() + delayMs); 615 616 if (edgeId != null) { 617 updateStateSafely(state -> { 618 state.addTriggerEdgeId(edgeId); 619 return EnumSet.of(ChainStateField.TRIGGER_EDGE_IDS); 620 }); 621 622 eventManager.notifyEvent(new EdgeTriggerEvent(this, trigger), this); 623 } 624 625 getTriggerScheduler().schedule(trigger); 626 } 627 628 629 private ChainStatus handleNodeError(String nodeId, Throwable throwable) { 630 updateNodeStateSafely(nodeId, s -> { 631 s.setStatus(NodeStatus.FAILED); 632 s.setError(new ExceptionSummary(throwable)); 633 return EnumSet.of(NodeStateField.ERROR, NodeStateField.STATUS); 634 }); 635 636 updateStateSafely(state -> { 637 state.setError(new ExceptionSummary(throwable)); 638 return EnumSet.of(ChainStateField.ERROR); 639 }); 640 641 setStatusAndNotifyEvent(ChainStatus.FAILED); 642 eventManager.notifyChainError(throwable, this); 643 return ChainStatus.FAILED; 644 } 645 646 public void suspend() { 647 setStatusAndNotifyEvent(ChainStatus.SUSPEND); 648 } 649 650 public void suspend(Node node) { 651 updateStateSafely(state -> { 652 state.addSuspendNodeId(node.getId()); 653 return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS); 654 }); 655 setStatusAndNotifyEvent(ChainStatus.SUSPEND); 656 } 657 658 659 public void resume(Map<String, Object> variables) { 660 ChainState newState = updateStateSafely(state -> { 661 if (variables != null) { 662 state.getMemory().putAll(variables); 663 return EnumSet.of(ChainStateField.MEMORY); 664 } else { 665 return null; 666 } 667 }); 668 669 notifyEvent(new ChainResumeEvent(this, variables)); 670 setStatusAndNotifyEvent(ChainStatus.RUNNING); 671 672 Set<String> suspendNodeIds = newState.getSuspendNodeIds(); 673 if (suspendNodeIds != null && !suspendNodeIds.isEmpty()) { 674 // 移除 suspend 状态,方便二次 suspend 时,不带有旧数据 675 updateStateSafely(state -> { 676 state.setSuspendNodeIds(null); 677 state.setSuspendForParameters(null); 678 return EnumSet.of(ChainStateField.SUSPEND_NODE_IDS, ChainStateField.SUSPEND_FOR_PARAMETERS); 679 }); 680 681 for (String id : suspendNodeIds) { 682 Node node = definition.getNodeById(id); 683 if (node == null) { 684 throw new ChainException("Node not found: " + id); 685 } 686 scheduleNode(node, null, TriggerType.RESUME, 0L); 687 } 688 } 689 } 690 691 public void resume() { 692 resume(Collections.emptyMap()); 693 } 694 695 public void output(Node node, Object response) { 696 eventManager.notifyOutput(this, node, response); 697 } 698 699 700 public EventManager getEventManager() { 701 return eventManager; 702 } 703 704 public void setEventManager(EventManager eventManager) { 705 this.eventManager = eventManager; 706 } 707 708 public ChainStateRepository getChainStateRepository() { 709 return chainStateRepository; 710 } 711 712 public void setChainStateRepository(ChainStateRepository chainStateRepository) { 713 this.chainStateRepository = chainStateRepository; 714 } 715 716 public ChainDefinition getDefinition() { 717 return definition; 718 } 719 720 public TriggerScheduler getTriggerScheduler() { 721 if (this.triggerScheduler == null) { 722 this.triggerScheduler = ChainRuntime.triggerScheduler(); 723 } 724 return triggerScheduler; 725 } 726 727 public void setTriggerScheduler(TriggerScheduler triggerScheduler) { 728 this.triggerScheduler = triggerScheduler; 729 } 730 731 public NodeStateRepository getNodeStateRepository() { 732 return nodeStateRepository; 733 } 734 735 public void setNodeStateRepository(NodeStateRepository nodeStateRepository) { 736 this.nodeStateRepository = nodeStateRepository; 737 } 738 739 public String getStateInstanceId() { 740 return stateInstanceId; 741 } 742 743 public ChainState getState() { 744 return chainStateRepository.load(stateInstanceId); 745 } 746 747 public ChainState getState(String stateInstanceId) { 748 return chainStateRepository.load(stateInstanceId); 749 } 750 751 public void setStateInstanceId(String stateInstanceId) { 752 this.stateInstanceId = stateInstanceId; 753 } 754}