001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain; 017 018import java.io.Serializable; 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.atomic.AtomicInteger; 023import java.util.stream.Collectors; 024 025public class NodeState implements Serializable { 026 027 private String nodeId; 028 private String chainInstanceId; 029 030 protected ConcurrentHashMap<String, Object> memory = new ConcurrentHashMap<>(); 031 protected NodeStatus status = NodeStatus.READY; 032 033 private int retryCount = 0; 034 private int loopCount = 0; 035 036 private AtomicInteger triggerCount = new AtomicInteger(0); 037 private List<String> triggerEdgeIds = new ArrayList<>(); 038 039 private AtomicInteger executeCount = new AtomicInteger(0); 040 private List<String> executeEdgeIds = new ArrayList<>(); 041 042 ExceptionSummary error; 043 044 private long version; 045 046 public NodeState() { 047 } 048 049 public String getNodeId() { 050 return nodeId; 051 } 052 053 public void setNodeId(String nodeId) { 054 this.nodeId = nodeId; 055 } 056 057 public String getChainInstanceId() { 058 return chainInstanceId; 059 } 060 061 public void setChainInstanceId(String chainInstanceId) { 062 this.chainInstanceId = chainInstanceId; 063 } 064 065 public ConcurrentHashMap<String, Object> getMemory() { 066 return memory; 067 } 068 069 public void setMemory(ConcurrentHashMap<String, Object> memory) { 070 this.memory = memory; 071 } 072 073 public void addMemory(String key, Object value) { 074 memory.put(key, value); 075 } 076 077 public <T> T getMemoryOrDefault(String key, T defaultValue) { 078 Object value = memory.get(key); 079 if (value == null) { 080 return defaultValue; 081 } 082 //noinspection unchecked 083 return (T) value; 084 } 085 086 public NodeStatus getStatus() { 087 return status; 088 } 089 090 public int getRetryCount() { 091 return retryCount; 092 } 093 094 public void setRetryCount(int retryCount) { 095 this.retryCount = retryCount; 096 } 097 098 public int getLoopCount() { 099 return loopCount; 100 } 101 102 public void setLoopCount(int loopCount) { 103 this.loopCount = loopCount; 104 } 105 106 public AtomicInteger getTriggerCount() { 107 return triggerCount; 108 } 109 110 public void setTriggerCount(AtomicInteger triggerCount) { 111 this.triggerCount = triggerCount; 112 } 113 114 public List<String> getTriggerEdgeIds() { 115 return triggerEdgeIds; 116 } 117 118 public void setTriggerEdgeIds(List<String> triggerEdgeIds) { 119 this.triggerEdgeIds = triggerEdgeIds; 120 } 121 122 public AtomicInteger getExecuteCount() { 123 return executeCount; 124 } 125 126 public void setExecuteCount(AtomicInteger executeCount) { 127 this.executeCount = executeCount; 128 } 129 130 public List<String> getExecuteEdgeIds() { 131 return executeEdgeIds; 132 } 133 134 public void setExecuteEdgeIds(List<String> executeEdgeIds) { 135 this.executeEdgeIds = executeEdgeIds; 136 } 137 138 public ExceptionSummary getError() { 139 return error; 140 } 141 142 public void setError(ExceptionSummary error) { 143 this.error = error; 144 } 145 146 public long getVersion() { 147 return version; 148 } 149 150 public void setVersion(long version) { 151 this.version = version; 152 } 153 154 public boolean isUpstreamFullyExecuted() { 155 ChainDefinition definition = Chain.currentChain().getDefinition(); 156 List<Edge> inwardEdges = definition.getInwardEdge(nodeId); 157 if (inwardEdges == null || inwardEdges.isEmpty()) { 158 return true; 159 } 160 161 List<String> shouldBeTriggerIds = inwardEdges.stream().map(Edge::getId).collect(Collectors.toList()); 162 List<String> triggerEdgeIds = this.triggerEdgeIds; 163 return triggerEdgeIds.size() >= shouldBeTriggerIds.size() 164 && shouldBeTriggerIds.parallelStream().allMatch(triggerEdgeIds::contains); 165 } 166 167 public void recordTrigger(String fromEdgeId) { 168 triggerCount.incrementAndGet(); 169 if (fromEdgeId == null) { 170 fromEdgeId = "none"; 171 } 172 triggerEdgeIds.add(fromEdgeId); 173 } 174 175 public void recordExecute(String fromEdgeId) { 176 executeCount.incrementAndGet(); 177 if (fromEdgeId == null) { 178 fromEdgeId = "none"; 179 } 180 executeEdgeIds.add(fromEdgeId); 181 } 182 183 public void setStatus(NodeStatus status) { 184 this.status = status; 185 } 186 187 public String getLastExecuteEdgeId() { 188 if (!executeEdgeIds.isEmpty()) { 189 return executeEdgeIds.get(executeEdgeIds.size() - 1); 190 } 191 return null; 192 } 193}