001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain.runtime; 017 018import dev.tinyflow.core.chain.*; 019import dev.tinyflow.core.chain.event.ChainStatusChangeEvent; 020import dev.tinyflow.core.chain.listener.ChainErrorListener; 021import dev.tinyflow.core.chain.listener.ChainEventListener; 022import dev.tinyflow.core.chain.listener.ChainOutputListener; 023import dev.tinyflow.core.chain.listener.NodeErrorListener; 024import dev.tinyflow.core.chain.repository.ChainDefinitionRepository; 025import dev.tinyflow.core.chain.repository.ChainStateField; 026import dev.tinyflow.core.chain.repository.ChainStateRepository; 027import dev.tinyflow.core.chain.repository.NodeStateRepository; 028 029import java.util.*; 030import java.util.concurrent.CompletableFuture; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034 035/** 036 * TinyFlow 最新 ChainExecutor 037 * <p> 038 * 说明: 039 * * 负责触发 Chain 执行 / 恢复 040 * * 不持有长时间运行的 Chain 实例 041 * * 支持 async-only 架构 042 */ 043public class ChainExecutor { 044 045 private final ChainDefinitionRepository definitionRepository; 046 private final ChainStateRepository chainStateRepository; 047 private final NodeStateRepository nodeStateRepository; 048 private final TriggerScheduler triggerScheduler; 049 private final EventManager eventManager = new EventManager(); 050 051 public ChainExecutor(ChainDefinitionRepository definitionRepository 052 , ChainStateRepository chainStateRepository 053 , NodeStateRepository nodeStateRepository 054 ) { 055 this.definitionRepository = definitionRepository; 056 this.chainStateRepository = chainStateRepository; 057 this.nodeStateRepository = nodeStateRepository; 058 this.triggerScheduler = ChainRuntime.triggerScheduler(); 059 this.triggerScheduler.registerConsumer(this::accept); 060 } 061 062 063 public ChainExecutor(ChainDefinitionRepository definitionRepository 064 , ChainStateRepository chainStateRepository 065 , NodeStateRepository nodeStateRepository 066 , TriggerScheduler triggerScheduler) { 067 this.definitionRepository = definitionRepository; 068 this.chainStateRepository = chainStateRepository; 069 this.nodeStateRepository = nodeStateRepository; 070 this.triggerScheduler = triggerScheduler; 071 this.triggerScheduler.registerConsumer(this::accept); 072 } 073 074 075 public Map<String, Object> execute(String definitionId, Map<String, Object> variables) { 076 return execute(definitionId, variables, Long.MAX_VALUE, TimeUnit.SECONDS); 077 } 078 079 080 public Map<String, Object> execute(String definitionId, Map<String, Object> variables, long timeout, TimeUnit unit) { 081 Chain chain = createChain(definitionId); 082 String stateInstanceId = chain.getStateInstanceId(); 083 CompletableFuture<Map<String, Object>> future = new CompletableFuture<>(); 084 085 ChainEventListener listener = (event, c) -> { 086 if (event instanceof ChainStatusChangeEvent) { 087 if (((ChainStatusChangeEvent) event).getStatus().isTerminal() 088 && c.getStateInstanceId().equals(stateInstanceId)) { 089 ChainState state = chainStateRepository.load(stateInstanceId); 090 Map<String, Object> execResult = state.getExecuteResult(); 091 future.complete(execResult != null ? execResult : Collections.emptyMap()); 092 } 093 } 094 }; 095 096 ChainErrorListener errorListener = (error, c) -> { 097 if (c.getStateInstanceId().equals(stateInstanceId)) { 098 future.completeExceptionally(error); 099 } 100 }; 101 102 try { 103 this.addEventListener(listener); 104 this.addErrorListener(errorListener); 105 chain.start(variables); 106 Map<String, Object> result = future.get(timeout, unit); 107 clearDefaultStates(result); 108 return result; 109 } catch (TimeoutException e) { 110 future.cancel(true); 111 throw new RuntimeException("Execution timed out", e); 112 } catch (InterruptedException e) { 113 Thread.currentThread().interrupt(); 114 future.cancel(true); 115 throw new RuntimeException("Execution interrupted", e); 116 } catch (Throwable e) { 117 future.cancel(true); 118 throw new RuntimeException("Execution failed", e.getCause()); 119 } finally { 120 this.removeEventListener(listener); 121 this.removeErrorListener(errorListener); 122 } 123 } 124 125 /** 126 * 清理默认状态 127 * 128 * @param result 执行结果 129 */ 130 public void clearDefaultStates(Map<String, Object> result) { 131 if (result == null || result.isEmpty()) { 132 return; 133 } 134 result.remove(ChainConsts.SCHEDULE_NEXT_NODE_DISABLED_KEY); 135 result.remove(ChainConsts.NODE_STATE_STATUS_KEY); 136 result.remove(ChainConsts.CHAIN_STATE_STATUS_KEY); 137 result.remove(ChainConsts.CHAIN_STATE_MESSAGE_KEY); 138 } 139 140 public String executeAsync(String definitionId, Map<String, Object> variables) { 141 Chain chain = createChain(definitionId); 142 chain.start(variables); 143 return chain.getStateInstanceId(); 144 } 145 146 147 /** 148 * 执行指定节点的业务逻辑 149 * 150 * @param definitionId 流程定义ID,用于标识哪个流程定义 151 * @param nodeId 节点ID,用于标识要执行的具体节点 152 * @param variables 执行上下文变量集合,包含节点执行所需的参数和数据 153 * @return 执行结果映射表,包含节点执行后的输出数据 154 */ 155 public Map<String, Object> executeNode(String definitionId, String nodeId, Map<String, Object> variables) { 156 ChainDefinition chainDefinitionById = definitionRepository.getChainDefinitionById(definitionId); 157 Node node = chainDefinitionById.getNodeById(nodeId); 158 Chain temp = createChain(definitionId); 159 if (variables != null && !variables.isEmpty()) { 160 temp.updateStateSafely(s -> { 161 s.getMemory().putAll(variables); 162 return EnumSet.of(ChainStateField.MEMORY); 163 }); 164 } 165 return node.execute(temp); 166 } 167 168 169 /** 170 * 获取指定节点的参数列表 171 * 172 * @param definitionId 链定义ID,用于定位具体的链定义 173 * @param nodeId 节点ID,用于在链定义中定位具体节点 174 * @return 返回指定节点的参数列表 175 */ 176 public List<Parameter> getNodeParameters(String definitionId, String nodeId) { 177 ChainDefinition chainDefinitionById = definitionRepository.getChainDefinitionById(definitionId); 178 Node node = chainDefinitionById.getNodeById(nodeId); 179 return node.getParameters(); 180 } 181 182 183 public void resumeAsync(String stateInstanceId) { 184 this.resumeAsync(stateInstanceId, Collections.emptyMap()); 185 } 186 187 188 public void resumeAsync(String stateInstanceId, Map<String, Object> variables) { 189 ChainState state = chainStateRepository.load(stateInstanceId); 190 if (state == null) { 191 return; 192 } 193 194 ChainDefinition definition = definitionRepository.getChainDefinitionById(state.getChainDefinitionId()); 195 if (definition == null) { 196 return; 197 } 198 199 Chain chain = new Chain(definition, state.getInstanceId()); 200 chain.setTriggerScheduler(triggerScheduler); 201 chain.setChainStateRepository(chainStateRepository); 202 chain.setNodeStateRepository(nodeStateRepository); 203 chain.setEventManager(eventManager); 204 205 chain.resume(variables); 206 } 207 208 209 private Chain createChain(String definitionId) { 210 ChainDefinition definition = definitionRepository.getChainDefinitionById(definitionId); 211 if (definition == null) { 212 throw new RuntimeException("Chain definition not found"); 213 } 214 215 String stateInstanceId = UUID.randomUUID().toString(); 216 Chain chain = new Chain(definition, stateInstanceId); 217 chain.setTriggerScheduler(triggerScheduler); 218 chain.setChainStateRepository(chainStateRepository); 219 chain.setNodeStateRepository(nodeStateRepository); 220 chain.setEventManager(eventManager); 221 222 return chain; 223 } 224 225 226 private void accept(Trigger trigger, ExecutorService worker) { 227 ChainState state = chainStateRepository.load(trigger.getStateInstanceId()); 228 if (state == null) { 229 throw new ChainException("Chain state not found"); 230 } 231 232 233 ChainDefinition definition = definitionRepository.getChainDefinitionById(state.getChainDefinitionId()); 234 if (definition == null) { 235 throw new ChainException("Chain definition not found"); 236 } 237 238 Chain chain = new Chain(definition, trigger.getStateInstanceId()); 239 chain.setTriggerScheduler(triggerScheduler); 240 chain.setChainStateRepository(chainStateRepository); 241 chain.setNodeStateRepository(nodeStateRepository); 242 chain.setEventManager(eventManager); 243 244 String nodeId = trigger.getNodeId(); 245 if (nodeId == null) { 246 throw new ChainException("Node ID not found in trigger."); 247 } 248 249 Node node = definition.getNodeById(nodeId); 250 if (node == null) { 251 throw new ChainException("Node not found in definition(id: " + definition.getId() + ")"); 252 } 253 254 chain.executeNode(node, trigger); 255 } 256 257 258 public synchronized void addEventListener(Class<? extends Event> eventClass, ChainEventListener listener) { 259 eventManager.addEventListener(eventClass, listener); 260 } 261 262 public synchronized void addEventListener(ChainEventListener listener) { 263 eventManager.addEventListener(listener); 264 } 265 266 public synchronized void removeEventListener(ChainEventListener listener) { 267 eventManager.removeEventListener(listener); 268 } 269 270 public synchronized void removeEventListener(Class<? extends Event> eventClass, ChainEventListener listener) { 271 eventManager.removeEventListener(eventClass, listener); 272 } 273 274 public synchronized void addErrorListener(ChainErrorListener listener) { 275 eventManager.addChainErrorListener(listener); 276 } 277 278 public synchronized void removeErrorListener(ChainErrorListener listener) { 279 eventManager.removeChainErrorListener(listener); 280 } 281 282 public synchronized void addNodeErrorListener(NodeErrorListener listener) { 283 eventManager.addNodeErrorListener(listener); 284 } 285 286 public synchronized void removeNodeErrorListener(NodeErrorListener listener) { 287 eventManager.removeNodeErrorListener(listener); 288 } 289 290 public void addOutputListener(ChainOutputListener outputListener) { 291 eventManager.addOutputListener(outputListener); 292 } 293 294 public ChainDefinitionRepository getDefinitionRepository() { 295 return definitionRepository; 296 } 297 298 public ChainStateRepository getChainStateRepository() { 299 return chainStateRepository; 300 } 301 302 public NodeStateRepository getNodeStateRepository() { 303 return nodeStateRepository; 304 } 305 306 public TriggerScheduler getTriggerScheduler() { 307 return triggerScheduler; 308 } 309 310 public EventManager getEventManager() { 311 return eventManager; 312 } 313}