001/**
002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.gnu.org/licenses/lgpl-3.0.txt
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package dev.tinyflow.core.chain.runtime;
017
018import dev.tinyflow.core.chain.*;
019import dev.tinyflow.core.chain.event.ChainStatusChangeEvent;
020import dev.tinyflow.core.chain.listener.ChainErrorListener;
021import dev.tinyflow.core.chain.listener.ChainEventListener;
022import dev.tinyflow.core.chain.listener.ChainOutputListener;
023import dev.tinyflow.core.chain.listener.NodeErrorListener;
024import dev.tinyflow.core.chain.repository.ChainDefinitionRepository;
025import dev.tinyflow.core.chain.repository.ChainStateField;
026import dev.tinyflow.core.chain.repository.ChainStateRepository;
027import dev.tinyflow.core.chain.repository.NodeStateRepository;
028
029import java.util.*;
030import java.util.concurrent.CompletableFuture;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034
035/**
036 * TinyFlow 最新 ChainExecutor
037 * <p>
038 * 说明:
039 * * 负责触发 Chain 执行 / 恢复
040 * * 不持有长时间运行的 Chain 实例
041 * * 支持 async-only 架构
042 */
043public class ChainExecutor {
044
045    private final ChainDefinitionRepository definitionRepository;
046    private final ChainStateRepository chainStateRepository;
047    private final NodeStateRepository nodeStateRepository;
048    private final TriggerScheduler triggerScheduler;
049    private final EventManager eventManager = new EventManager();
050
051    public ChainExecutor(ChainDefinitionRepository definitionRepository
052            , ChainStateRepository chainStateRepository
053            , NodeStateRepository nodeStateRepository
054    ) {
055        this.definitionRepository = definitionRepository;
056        this.chainStateRepository = chainStateRepository;
057        this.nodeStateRepository = nodeStateRepository;
058        this.triggerScheduler = ChainRuntime.triggerScheduler();
059        this.triggerScheduler.registerConsumer(this::accept);
060    }
061
062
063    public ChainExecutor(ChainDefinitionRepository definitionRepository
064            , ChainStateRepository chainStateRepository
065            , NodeStateRepository nodeStateRepository
066            , TriggerScheduler triggerScheduler) {
067        this.definitionRepository = definitionRepository;
068        this.chainStateRepository = chainStateRepository;
069        this.nodeStateRepository = nodeStateRepository;
070        this.triggerScheduler = triggerScheduler;
071        this.triggerScheduler.registerConsumer(this::accept);
072    }
073
074
075    public Map<String, Object> execute(String definitionId, Map<String, Object> variables) {
076        return execute(definitionId, variables, Long.MAX_VALUE, TimeUnit.SECONDS);
077    }
078
079
080    public Map<String, Object> execute(String definitionId, Map<String, Object> variables, long timeout, TimeUnit unit) {
081        Chain chain = createChain(definitionId);
082        String stateInstanceId = chain.getStateInstanceId();
083        CompletableFuture<Map<String, Object>> future = new CompletableFuture<>();
084
085        ChainEventListener listener = (event, c) -> {
086            if (event instanceof ChainStatusChangeEvent) {
087                if (((ChainStatusChangeEvent) event).getStatus().isTerminal()
088                        && c.getStateInstanceId().equals(stateInstanceId)) {
089                    ChainState state = chainStateRepository.load(stateInstanceId);
090                    Map<String, Object> execResult = state.getExecuteResult();
091                    future.complete(execResult != null ? execResult : Collections.emptyMap());
092                }
093            }
094        };
095
096        ChainErrorListener errorListener = (error, c) -> {
097            if (c.getStateInstanceId().equals(stateInstanceId)) {
098                future.completeExceptionally(error);
099            }
100        };
101
102        try {
103            this.addEventListener(listener);
104            this.addErrorListener(errorListener);
105            chain.start(variables);
106            Map<String, Object> result = future.get(timeout, unit);
107            clearDefaultStates(result);
108            return result;
109        } catch (TimeoutException e) {
110            future.cancel(true);
111            throw new RuntimeException("Execution timed out", e);
112        } catch (InterruptedException e) {
113            Thread.currentThread().interrupt();
114            future.cancel(true);
115            throw new RuntimeException("Execution interrupted", e);
116        } catch (Throwable e) {
117            future.cancel(true);
118            throw new RuntimeException("Execution failed", e.getCause());
119        } finally {
120            this.removeEventListener(listener);
121            this.removeErrorListener(errorListener);
122        }
123    }
124
125    /**
126     * 清理默认状态
127     *
128     * @param result 执行结果
129     */
130    public void clearDefaultStates(Map<String, Object> result) {
131        if (result == null || result.isEmpty()) {
132            return;
133        }
134        result.remove(ChainConsts.SCHEDULE_NEXT_NODE_DISABLED_KEY);
135        result.remove(ChainConsts.NODE_STATE_STATUS_KEY);
136        result.remove(ChainConsts.CHAIN_STATE_STATUS_KEY);
137        result.remove(ChainConsts.CHAIN_STATE_MESSAGE_KEY);
138    }
139
140    public String executeAsync(String definitionId, Map<String, Object> variables) {
141        Chain chain = createChain(definitionId);
142        chain.start(variables);
143        return chain.getStateInstanceId();
144    }
145
146
147    /**
148     * 执行指定节点的业务逻辑
149     *
150     * @param definitionId 流程定义ID,用于标识哪个流程定义
151     * @param nodeId       节点ID,用于标识要执行的具体节点
152     * @param variables    执行上下文变量集合,包含节点执行所需的参数和数据
153     * @return 执行结果映射表,包含节点执行后的输出数据
154     */
155    public Map<String, Object> executeNode(String definitionId, String nodeId, Map<String, Object> variables) {
156        ChainDefinition chainDefinitionById = definitionRepository.getChainDefinitionById(definitionId);
157        Node node = chainDefinitionById.getNodeById(nodeId);
158        Chain temp = createChain(definitionId);
159        if (variables != null && !variables.isEmpty()) {
160            temp.updateStateSafely(s -> {
161                s.getMemory().putAll(variables);
162                return EnumSet.of(ChainStateField.MEMORY);
163            });
164        }
165        return node.execute(temp);
166    }
167
168
169    /**
170     * 获取指定节点的参数列表
171     *
172     * @param definitionId 链定义ID,用于定位具体的链定义
173     * @param nodeId       节点ID,用于在链定义中定位具体节点
174     * @return 返回指定节点的参数列表
175     */
176    public List<Parameter> getNodeParameters(String definitionId, String nodeId) {
177        ChainDefinition chainDefinitionById = definitionRepository.getChainDefinitionById(definitionId);
178        Node node = chainDefinitionById.getNodeById(nodeId);
179        return node.getParameters();
180    }
181
182
183    public void resumeAsync(String stateInstanceId) {
184        this.resumeAsync(stateInstanceId, Collections.emptyMap());
185    }
186
187
188    public void resumeAsync(String stateInstanceId, Map<String, Object> variables) {
189        ChainState state = chainStateRepository.load(stateInstanceId);
190        if (state == null) {
191            return;
192        }
193
194        ChainDefinition definition = definitionRepository.getChainDefinitionById(state.getChainDefinitionId());
195        if (definition == null) {
196            return;
197        }
198
199        Chain chain = new Chain(definition, state.getInstanceId());
200        chain.setTriggerScheduler(triggerScheduler);
201        chain.setChainStateRepository(chainStateRepository);
202        chain.setNodeStateRepository(nodeStateRepository);
203        chain.setEventManager(eventManager);
204
205        chain.resume(variables);
206    }
207
208
209    private Chain createChain(String definitionId) {
210        ChainDefinition definition = definitionRepository.getChainDefinitionById(definitionId);
211        if (definition == null) {
212            throw new RuntimeException("Chain definition not found");
213        }
214
215        String stateInstanceId = UUID.randomUUID().toString();
216        Chain chain = new Chain(definition, stateInstanceId);
217        chain.setTriggerScheduler(triggerScheduler);
218        chain.setChainStateRepository(chainStateRepository);
219        chain.setNodeStateRepository(nodeStateRepository);
220        chain.setEventManager(eventManager);
221
222        return chain;
223    }
224
225
226    private void accept(Trigger trigger, ExecutorService worker) {
227        ChainState state = chainStateRepository.load(trigger.getStateInstanceId());
228        if (state == null) {
229            throw new ChainException("Chain state not found");
230        }
231
232
233        ChainDefinition definition = definitionRepository.getChainDefinitionById(state.getChainDefinitionId());
234        if (definition == null) {
235            throw new ChainException("Chain definition not found");
236        }
237
238        Chain chain = new Chain(definition, trigger.getStateInstanceId());
239        chain.setTriggerScheduler(triggerScheduler);
240        chain.setChainStateRepository(chainStateRepository);
241        chain.setNodeStateRepository(nodeStateRepository);
242        chain.setEventManager(eventManager);
243
244        String nodeId = trigger.getNodeId();
245        if (nodeId == null) {
246            throw new ChainException("Node ID not found in trigger.");
247        }
248
249        Node node = definition.getNodeById(nodeId);
250        if (node == null) {
251            throw new ChainException("Node not found in definition(id: " + definition.getId() + ")");
252        }
253
254        chain.executeNode(node, trigger);
255    }
256
257
258    public synchronized void addEventListener(Class<? extends Event> eventClass, ChainEventListener listener) {
259        eventManager.addEventListener(eventClass, listener);
260    }
261
262    public synchronized void addEventListener(ChainEventListener listener) {
263        eventManager.addEventListener(listener);
264    }
265
266    public synchronized void removeEventListener(ChainEventListener listener) {
267        eventManager.removeEventListener(listener);
268    }
269
270    public synchronized void removeEventListener(Class<? extends Event> eventClass, ChainEventListener listener) {
271        eventManager.removeEventListener(eventClass, listener);
272    }
273
274    public synchronized void addErrorListener(ChainErrorListener listener) {
275        eventManager.addChainErrorListener(listener);
276    }
277
278    public synchronized void removeErrorListener(ChainErrorListener listener) {
279        eventManager.removeChainErrorListener(listener);
280    }
281
282    public synchronized void addNodeErrorListener(NodeErrorListener listener) {
283        eventManager.addNodeErrorListener(listener);
284    }
285
286    public synchronized void removeNodeErrorListener(NodeErrorListener listener) {
287        eventManager.removeNodeErrorListener(listener);
288    }
289
290    public void addOutputListener(ChainOutputListener outputListener) {
291        eventManager.addOutputListener(outputListener);
292    }
293
294    public ChainDefinitionRepository getDefinitionRepository() {
295        return definitionRepository;
296    }
297
298    public ChainStateRepository getChainStateRepository() {
299        return chainStateRepository;
300    }
301
302    public NodeStateRepository getNodeStateRepository() {
303        return nodeStateRepository;
304    }
305
306    public TriggerScheduler getTriggerScheduler() {
307        return triggerScheduler;
308    }
309
310    public EventManager getEventManager() {
311        return eventManager;
312    }
313}