001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain; 017 018import com.alibaba.fastjson.JSON; 019import com.alibaba.fastjson.parser.DefaultJSONParser; 020import com.alibaba.fastjson.parser.Feature; 021import com.alibaba.fastjson.parser.ParserConfig; 022import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer; 023import com.alibaba.fastjson.serializer.JSONSerializer; 024import com.alibaba.fastjson.serializer.ObjectSerializer; 025import com.alibaba.fastjson.serializer.SerializeConfig; 026import com.alibaba.fastjson.serializer.SerializerFeature; 027import dev.tinyflow.core.util.MapUtil; 028import dev.tinyflow.core.util.StringUtil; 029import dev.tinyflow.core.util.TextTemplate; 030 031import java.io.IOException; 032import java.io.Serializable; 033import java.lang.reflect.Type; 034import java.util.*; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.stream.Collectors; 037 038public class ChainState implements Serializable { 039 040 private String instanceId; 041 private String parentInstanceId; 042 private String chainDefinitionId; 043 private ConcurrentHashMap<String, Object> memory = new ConcurrentHashMap<>(); 044 045 private Map<String, Object> executeResult; 046 private Map<String, Object> environment; 047 048 private List<String> triggerEdgeIds; 049 private List<String> triggerNodeIds; 050 051 private List<String> uncheckedEdgeIds; 052 private List<String> uncheckedNodeIds; 053 054 // 算力消耗定义,积分消耗 055 private long computeCost; 056 private Set<String> suspendNodeIds; 057 private List<Parameter> suspendForParameters; 058 private ChainStatus status; 059 private String message; 060 private ExceptionSummary error; 061 private long version; 062 063 public ChainState() { 064 this.instanceId = UUID.randomUUID().toString(); 065 this.status = ChainStatus.READY; 066 this.computeCost = 0; 067 } 068 069 public String getInstanceId() { 070 return instanceId; 071 } 072 073 public void setInstanceId(String instanceId) { 074 this.instanceId = instanceId; 075 } 076 077 public String getParentInstanceId() { 078 return parentInstanceId; 079 } 080 081 public void setParentInstanceId(String parentInstanceId) { 082 this.parentInstanceId = parentInstanceId; 083 } 084 085 public String getChainDefinitionId() { 086 return chainDefinitionId; 087 } 088 089 public void setChainDefinitionId(String chainDefinitionId) { 090 this.chainDefinitionId = chainDefinitionId; 091 } 092 093 public ConcurrentHashMap<String, Object> getMemory() { 094 return memory; 095 } 096 097 public void setMemory(ConcurrentHashMap<String, Object> memory) { 098 this.memory = memory; 099 } 100 101 public Map<String, Object> getExecuteResult() { 102 return executeResult; 103 } 104 105 public void setExecuteResult(Map<String, Object> executeResult) { 106 this.executeResult = executeResult; 107 } 108 109 public Map<String, Object> getEnvironment() { 110 return environment; 111 } 112 113 public void setEnvironment(Map<String, Object> environment) { 114 this.environment = environment; 115 } 116 117 118 public List<String> getTriggerEdgeIds() { 119 return triggerEdgeIds; 120 } 121 122 public void setTriggerEdgeIds(List<String> triggerEdgeIds) { 123 this.triggerEdgeIds = triggerEdgeIds; 124 } 125 126 public void addTriggerEdgeId(String edgeId) { 127 if (triggerEdgeIds == null) { 128 triggerEdgeIds = new ArrayList<>(); 129 } 130 triggerEdgeIds.add(edgeId); 131 } 132 133 public List<String> getTriggerNodeIds() { 134 return triggerNodeIds; 135 } 136 137 public void setTriggerNodeIds(List<String> triggerNodeIds) { 138 this.triggerNodeIds = triggerNodeIds; 139 } 140 141 public void addTriggerNodeId(String nodeId) { 142 if (triggerNodeIds == null) { 143 triggerNodeIds = new ArrayList<>(); 144 } 145 triggerNodeIds.add(nodeId); 146 } 147 148 public List<String> getUncheckedEdgeIds() { 149 return uncheckedEdgeIds; 150 } 151 152 public void setUncheckedEdgeIds(List<String> uncheckedEdgeIds) { 153 this.uncheckedEdgeIds = uncheckedEdgeIds; 154 } 155 156 public void addUncheckedEdgeId(String edgeId) { 157 if (uncheckedEdgeIds == null) { 158 uncheckedEdgeIds = new ArrayList<>(); 159 } 160 uncheckedEdgeIds.add(edgeId); 161 } 162 163 public boolean removeUncheckedEdgeId(String edgeId) { 164 if (uncheckedEdgeIds == null) { 165 return false; 166 } 167 return uncheckedEdgeIds.remove(edgeId); 168 } 169 170 public List<String> getUncheckedNodeIds() { 171 return uncheckedNodeIds; 172 } 173 174 public void setUncheckedNodeIds(List<String> uncheckedNodeIds) { 175 this.uncheckedNodeIds = uncheckedNodeIds; 176 } 177 178 public void addUncheckedNodeId(String nodeId) { 179 if (uncheckedNodeIds == null) { 180 uncheckedNodeIds = new ArrayList<>(); 181 } 182 uncheckedNodeIds.add(nodeId); 183 } 184 185 public boolean removeUncheckedNodeId(String nodeId) { 186 if (uncheckedNodeIds == null) { 187 return false; 188 } 189 return uncheckedNodeIds.remove(nodeId); 190 } 191 192 public Long getComputeCost() { 193 return computeCost; 194 } 195 196 public void setComputeCost(Long computeCost) { 197 this.computeCost = computeCost; 198 } 199 200 public void setComputeCost(long computeCost) { 201 this.computeCost = computeCost; 202 } 203 204 public Set<String> getSuspendNodeIds() { 205 return suspendNodeIds; 206 } 207 208 public void setSuspendNodeIds(Set<String> suspendNodeIds) { 209 this.suspendNodeIds = suspendNodeIds; 210 } 211 212 public void addSuspendNodeId(String nodeId) { 213 if (suspendNodeIds == null) { 214 suspendNodeIds = new HashSet<>(); 215 } 216 suspendNodeIds.add(nodeId); 217 } 218 219 public void removeSuspendNodeId(String nodeId) { 220 if (suspendNodeIds == null) { 221 return; 222 } 223 suspendNodeIds.remove(nodeId); 224 } 225 226 public List<Parameter> getSuspendForParameters() { 227 return suspendForParameters; 228 } 229 230 public void setSuspendForParameters(List<Parameter> suspendForParameters) { 231 this.suspendForParameters = suspendForParameters; 232 } 233 234 public void addSuspendForParameter(Parameter parameter) { 235 if (suspendForParameters == null) { 236 suspendForParameters = new ArrayList<>(); 237 } 238 suspendForParameters.add(parameter); 239 } 240 241 public void addSuspendForParameters(List<Parameter> parameters) { 242 if (parameters == null) { 243 return; 244 } 245 if (suspendForParameters == null) { 246 suspendForParameters = new ArrayList<>(); 247 } 248 suspendForParameters.addAll(parameters); 249 } 250 251 public ChainStatus getStatus() { 252 return status; 253 } 254 255 public void setStatus(ChainStatus status) { 256 this.status = status; 257 } 258 259 public String getMessage() { 260 return message; 261 } 262 263 public void setMessage(String message) { 264 this.message = message; 265 } 266 267 public ExceptionSummary getError() { 268 return error; 269 } 270 271 public void setError(ExceptionSummary error) { 272 this.error = error; 273 } 274 275 276 public long getVersion() { 277 return version; 278 } 279 280 public void setVersion(long version) { 281 this.version = version; 282 } 283 284 public static ChainState fromJSON(String jsonString) { 285 ParserConfig config = new ParserConfig(); 286 config.putDeserializer(ChainState.class, new ChainDeserializer()); 287 return JSON.parseObject(jsonString, ChainState.class, config, Feature.SupportAutoType); 288 } 289 290 public String toJSON() { 291 SerializeConfig config = new SerializeConfig(); 292 config.put(ChainState.class, new ChainSerializer()); 293 return JSON.toJSONString(this, config, SerializerFeature.WriteClassName); 294 } 295 296 public void reset() { 297 this.instanceId = null; 298 this.chainDefinitionId = null; 299 this.memory.clear(); 300 this.executeResult = null; 301 this.environment = null; 302 this.computeCost = 0; 303 this.suspendNodeIds = null; 304 this.suspendForParameters = null; 305 this.status = ChainStatus.READY; 306 this.message = null; 307 this.error = null; 308 } 309 310 311 public void addComputeCost(Long value) { 312 if (value == null) { 313 value = 0L; 314 } 315 this.computeCost += value; 316 } 317 318 319 public Map<String, Object> getNodeExecuteResult(String nodeId) { 320 if (memory == null || memory.isEmpty()) { 321 return Collections.emptyMap(); 322 } 323 Map<String, Object> result = new HashMap<>(); 324 memory.forEach((k, v) -> { 325 if (k.startsWith(nodeId + ".")) { 326 String newKey = k.substring(nodeId.length() + 1); 327 result.put(newKey, v); 328 } 329 }); 330 return result; 331 } 332 333// public Map<String, Object> getTriggerVariables() { 334// Trigger trigger = TriggerContext.getCurrentTrigger(); 335// if (trigger != null) { 336// return trigger.getVariables(); 337// } 338// return Collections.emptyMap(); 339// } 340 341 342 public Object resolveValue(String path) { 343 Object result = MapUtil.getByPath(getMemory(), path); 344 if (result == null) result = MapUtil.getByPath(getEnvironment(), path); 345// if (result == null) result = MapUtil.getByPath(getTriggerVariables(), path); 346 return result; 347 } 348 349 public Map<String, Object> resolveParameters(Node node) { 350 return resolveParameters(node, node.getParameters()); 351 } 352 353 public Map<String, Object> resolveParameters(Node node, List<? extends Parameter> parameters) { 354 return resolveParameters(node, parameters, null); 355 } 356 357 public Map<String, Object> resolveParameters(Node node, List<? extends Parameter> parameters, Map<String, Object> formatArgs) { 358 return resolveParameters(node, parameters, formatArgs, false); 359 } 360 361 private boolean isNullOrBlank(Object value) { 362 return value == null || value instanceof String && StringUtil.noText((String) value); 363 } 364 365 366 public Map<String, Object> getEnvMap() { 367 Map<String, Object> formatArgsMap = new HashMap<>(); 368 formatArgsMap.put("env", getEnvironment()); 369 formatArgsMap.put("env.sys", System.getenv()); 370 return formatArgsMap; 371 } 372 373 public Map<String, Object> getStartParameters() { 374 Map<String, Object> startParameters = new LinkedHashMap<>(); 375 ConcurrentHashMap<String, Object> memory = getMemory(); 376 if (memory != null) { 377 memory.forEach((s, o) -> { 378 if (!s.contains(".")) { 379 startParameters.put(s, o); 380 } 381 }); 382 } 383 return startParameters; 384 } 385 386 387 public Map<String, Object> resolveParameters(Node node, List<? extends Parameter> parameters, Map<String, Object> formatArgs, boolean ignoreRequired) { 388 if (parameters == null || parameters.isEmpty()) { 389 return Collections.emptyMap(); 390 } 391 Map<String, Object> variables = new LinkedHashMap<>(); 392 List<Parameter> suspendParameters = null; 393 for (Parameter parameter : parameters) { 394 RefType refType = parameter.getRefType(); 395 Object value = null; 396 if (refType == RefType.FIXED) { 397 value = TextTemplate.of(parameter.getValue()) 398 .formatToString(Arrays.asList(formatArgs, getEnvMap(), getStartParameters())); 399 } else if (refType == RefType.REF) { 400 value = this.resolveValue(parameter.getRef()); 401 } 402 // 单节点执行时,参数只会传入 name 内容。 403 if (value == null) { 404 value = this.resolveValue(parameter.getName()); 405 } 406 407 if (value == null && parameter.getDefaultValue() != null) { 408 value = parameter.getDefaultValue(); 409 } 410 411 if (refType == RefType.INPUT && isNullOrBlank(value)) { 412 if (!ignoreRequired && parameter.isRequired()) { 413 if (suspendParameters == null) { 414 suspendParameters = new ArrayList<>(); 415 } 416 suspendParameters.add(parameter); 417 continue; 418 } 419 } 420 421 if (parameter.isRequired() && isNullOrBlank(value)) { 422 if (!ignoreRequired) { 423 throw new ChainException(node.getName() + " Missing required parameter:" + parameter.getName()); 424 } 425 } 426 427 if (value instanceof String) { 428 value = ((String) value).trim(); 429 if (parameter.getDataType() == DataType.Boolean) { 430 value = "true".equalsIgnoreCase((String) value) || "1".equalsIgnoreCase((String) value); 431 } else if (parameter.getDataType() == DataType.Number) { 432 value = Long.parseLong((String) value); 433 } else if (parameter.getDataType() == DataType.Array) { 434 value = JSON.parseArray((String) value); 435 } 436 } 437 438 variables.put(parameter.getName(), value); 439 } 440 441 if (suspendParameters != null && !suspendParameters.isEmpty()) { 442 // 构建参数名称列表 443 String missingParams = suspendParameters.stream() 444 .map(Parameter::getName) 445 .collect(Collectors.joining("', '", "'", "'")); 446 447 String errorMessage = String.format( 448 "Node '%s' (type: %s) is suspended. Waiting for input parameters: %s.", 449 StringUtil.getFirstWithText(node.getName(), node.getId()), 450 node.getClass().getSimpleName(), 451 missingParams 452 ); 453 454 throw new ChainSuspendException(errorMessage, suspendParameters); 455 } 456 457 return variables; 458 } 459 460 461 public static class ChainSerializer implements ObjectSerializer { 462 @Override 463 public void write(JSONSerializer serializer, Object object, Object fieldName, Type fieldType, int features) throws IOException { 464 if (object == null) { 465 serializer.writeNull(); 466 return; 467 } 468 ChainState chain = (ChainState) object; 469 serializer.write(chain.toJSON()); 470 } 471 } 472 473 public static class ChainDeserializer implements ObjectDeserializer { 474 @Override 475 public <T> T deserialze(DefaultJSONParser parser, Type type, Object fieldName) { 476 String value = parser.parseObject(String.class); 477 //noinspection unchecked 478 return (T) ChainState.fromJSON(value); 479 } 480 } 481 482 483 @Override 484 public String toString() { 485 return "ChainState{" + 486 "instanceId='" + instanceId + '\'' + 487 ", chainDefinitionId='" + chainDefinitionId + '\'' + 488 ", memory=" + memory + 489 ", executeResult=" + executeResult + 490 ", environment=" + environment + 491 ", computeCost=" + computeCost + 492 ", suspendNodeIds=" + suspendNodeIds + 493 ", suspendForParameters=" + suspendForParameters + 494 ", status=" + status + 495 ", message='" + message + '\'' + 496 ", error=" + error + 497 ", version=" + version + 498 '}'; 499 } 500}