001/**
002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.gnu.org/licenses/lgpl-3.0.txt
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package dev.tinyflow.core.chain;
017
018import java.io.Serializable;
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.atomic.AtomicInteger;
023import java.util.stream.Collectors;
024
025public class NodeState implements Serializable {
026
027    private String nodeId;
028    private String chainInstanceId;
029
030    protected ConcurrentHashMap<String, Object> memory = new ConcurrentHashMap<>();
031    protected NodeStatus status = NodeStatus.READY;
032
033    private int retryCount = 0;
034    private int loopCount = 0;
035
036    private AtomicInteger triggerCount = new AtomicInteger(0);
037    private List<String> triggerEdgeIds = new ArrayList<>();
038
039    private AtomicInteger executeCount = new AtomicInteger(0);
040    private List<String> executeEdgeIds = new ArrayList<>();
041
042    ExceptionSummary error;
043
044    private long version;
045
046    public NodeState() {
047    }
048
049    public String getNodeId() {
050        return nodeId;
051    }
052
053    public void setNodeId(String nodeId) {
054        this.nodeId = nodeId;
055    }
056
057    public String getChainInstanceId() {
058        return chainInstanceId;
059    }
060
061    public void setChainInstanceId(String chainInstanceId) {
062        this.chainInstanceId = chainInstanceId;
063    }
064
065    public ConcurrentHashMap<String, Object> getMemory() {
066        return memory;
067    }
068
069    public void setMemory(ConcurrentHashMap<String, Object> memory) {
070        this.memory = memory;
071    }
072
073    public void addMemory(String key, Object value) {
074        memory.put(key, value);
075    }
076
077    public <T> T getMemoryOrDefault(String key, T defaultValue) {
078        Object value = memory.get(key);
079        if (value == null) {
080            return defaultValue;
081        }
082        //noinspection unchecked
083        return (T) value;
084    }
085
086    public NodeStatus getStatus() {
087        return status;
088    }
089
090    public int getRetryCount() {
091        return retryCount;
092    }
093
094    public void setRetryCount(int retryCount) {
095        this.retryCount = retryCount;
096    }
097
098    public int getLoopCount() {
099        return loopCount;
100    }
101
102    public void setLoopCount(int loopCount) {
103        this.loopCount = loopCount;
104    }
105
106    public AtomicInteger getTriggerCount() {
107        return triggerCount;
108    }
109
110    public void setTriggerCount(AtomicInteger triggerCount) {
111        this.triggerCount = triggerCount;
112    }
113
114    public List<String> getTriggerEdgeIds() {
115        return triggerEdgeIds;
116    }
117
118    public void setTriggerEdgeIds(List<String> triggerEdgeIds) {
119        this.triggerEdgeIds = triggerEdgeIds;
120    }
121
122    public AtomicInteger getExecuteCount() {
123        return executeCount;
124    }
125
126    public void setExecuteCount(AtomicInteger executeCount) {
127        this.executeCount = executeCount;
128    }
129
130    public List<String> getExecuteEdgeIds() {
131        return executeEdgeIds;
132    }
133
134    public void setExecuteEdgeIds(List<String> executeEdgeIds) {
135        this.executeEdgeIds = executeEdgeIds;
136    }
137
138    public ExceptionSummary getError() {
139        return error;
140    }
141
142    public void setError(ExceptionSummary error) {
143        this.error = error;
144    }
145
146    public long getVersion() {
147        return version;
148    }
149
150    public void setVersion(long version) {
151        this.version = version;
152    }
153
154    public boolean isUpstreamFullyExecuted() {
155        ChainDefinition definition = Chain.currentChain().getDefinition();
156        List<Edge> inwardEdges = definition.getInwardEdge(nodeId);
157        if (inwardEdges == null || inwardEdges.isEmpty()) {
158            return true;
159        }
160
161        List<String> shouldBeTriggerIds = inwardEdges.stream().map(Edge::getId).collect(Collectors.toList());
162        List<String> triggerEdgeIds = this.triggerEdgeIds;
163        return triggerEdgeIds.size() >= shouldBeTriggerIds.size()
164                && shouldBeTriggerIds.parallelStream().allMatch(triggerEdgeIds::contains);
165    }
166
167    public void recordTrigger(String fromEdgeId) {
168        triggerCount.incrementAndGet();
169        if (fromEdgeId == null) {
170            fromEdgeId = "none";
171        }
172        triggerEdgeIds.add(fromEdgeId);
173    }
174
175    public void recordExecute(String fromEdgeId) {
176        executeCount.incrementAndGet();
177        if (fromEdgeId == null) {
178            fromEdgeId = "none";
179        }
180        executeEdgeIds.add(fromEdgeId);
181    }
182
183    public void setStatus(NodeStatus status) {
184        this.status = status;
185    }
186
187    public String getLastExecuteEdgeId() {
188        if (!executeEdgeIds.isEmpty()) {
189            return executeEdgeIds.get(executeEdgeIds.size() - 1);
190        }
191        return null;
192    }
193}