001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain.runtime; 017 018import org.slf4j.Logger; 019import org.slf4j.LoggerFactory; 020 021import java.util.List; 022import java.util.Map; 023import java.util.Objects; 024import java.util.UUID; 025import java.util.concurrent.*; 026import java.util.concurrent.atomic.AtomicBoolean; 027 028/** 029 * 030 * 功能: 031 * - schedule trigger (持久化到 TriggerStore 并 schedule) 032 * - cancel trigger 033 * - fire(triggerId) 用于 webhook/event/manual 主动触发 034 * - recoverAndSchedulePending() 启动时恢复未执行的 trigger 035 * - periodical scan findDue(upto) 以保证重启/宕机后补偿触发 036 * <p> 037 * 注意: 分布式环境下需要在 TriggerStore 层提供抢占/锁逻辑(例如 lease/owner 字段)。 038 */ 039public class TriggerScheduler { 040 041 private static final Logger log = LoggerFactory.getLogger(TriggerScheduler.class); 042 private final TriggerStore store; 043 private final ScheduledExecutorService scheduler; 044 private final ExecutorService worker; 045 private final AtomicBoolean closed = new AtomicBoolean(false); 046 047 // map 用于管理取消:triggerId -> ScheduledFuture 048 private final ConcurrentMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); 049 050 // consumer 来把 trigger 交给 ChainExecutor(或 ChainRuntime)去处理 051 private volatile TriggerConsumer consumer; 052 053 // 周期扫查间隔(ms) 054 private final long scanIntervalMs; 055 056 // 扫描任务 future 057 private ScheduledFuture<?> scanFuture; 058 059 public interface TriggerConsumer { 060 void accept(Trigger trigger, ExecutorService worker); 061 } 062 063 public TriggerScheduler(TriggerStore store, ScheduledExecutorService scheduler, ExecutorService worker, long scanIntervalMs) { 064 this.store = Objects.requireNonNull(store, "TriggerStore required"); 065 this.scheduler = Objects.requireNonNull(scheduler, "ScheduledExecutorService required"); 066 this.worker = Objects.requireNonNull(worker, "ExecutorService required"); 067 this.scanIntervalMs = Math.max(1000, scanIntervalMs); 068 069 // 恢复并 schedule 070 recoverAndSchedulePending(); 071 072 // 启动周期扫查 findDue 073 startPeriodicScan(); 074 } 075 076 077 public void registerConsumer(TriggerConsumer consumer) { 078 this.consumer = consumer; 079 } 080 081 /** 082 * schedule a trigger: persist -> schedule (单机语义) 083 */ 084 public Trigger schedule(Trigger trigger) { 085 if (closed.get()) throw new IllegalStateException("TriggerScheduler closed"); 086 if (trigger.getId() == null) { 087 trigger.setId(UUID.randomUUID().toString()); 088 } 089 store.save(trigger); 090 scheduleInternal(trigger); 091 return trigger; 092 } 093 094 /** 095 * cancel trigger (从 store 删除并尝试取消已 schedule 的 future) 096 */ 097 public boolean cancel(String triggerId) { 098 boolean removed = store.remove(triggerId); 099 ScheduledFuture<?> f = scheduledFutures.remove(triggerId); 100 if (f != null) { 101 f.cancel(false); 102 } 103 return removed; 104 } 105 106 /** 107 * 主动触发(webhook/event/manual 场景) 108 */ 109 public boolean fire(String triggerId) { 110 if (closed.get()) return false; 111 Trigger t = store.find(triggerId); 112 if (t == null) return false; 113 if (consumer == null) { 114 // 无 consumer,仍从 store 中移除 115 store.remove(triggerId); 116 return false; 117 } 118 // 在 worker 线程触发 consumer 119 worker.submit(() -> { 120 try { 121 consumer.accept(t, worker); 122 } catch (Exception e) { 123 log.error(e.toString(), e); 124 } finally { 125 // 默认语义:触发后移除 126 store.remove(triggerId); 127 ScheduledFuture<?> sf = scheduledFutures.remove(triggerId); 128 if (sf != null) sf.cancel(false); 129 } 130 }); 131 return true; 132 } 133 134 /** 135 * internal scheduling for a trigger (单机 scheduled semantics) 136 */ 137 private void scheduleInternal(Trigger trigger) { 138 if (closed.get()) return; 139 140 long delay = Math.max(0, trigger.getTriggerAt() - System.currentTimeMillis()); 141 142 // cancel any existing scheduled future for same id 143 ScheduledFuture<?> prev = scheduledFutures.remove(trigger.getId()); 144 if (prev != null) { 145 prev.cancel(false); 146 } 147 148 ScheduledFuture<?> future = scheduler.schedule(() -> { 149 // double-check existence in store (可能已被 cancel) 150 Trigger existing = store.find(trigger.getId()); 151 if (existing == null) { 152 scheduledFutures.remove(trigger.getId()); 153 return; 154 } 155 156 if (consumer != null) { 157 worker.submit(() -> { 158 try { 159 TriggerContext.setCurrentTrigger(existing); 160 consumer.accept(existing, worker); 161 } catch (Throwable e) { 162 log.error(e.toString(), e); 163 } finally { 164 TriggerContext.clearCurrentTrigger(); 165 store.remove(existing.getId()); 166 scheduledFutures.remove(existing.getId()); 167 } 168 }); 169 } else { 170 // 无 consumer,则移除 171 store.remove(existing.getId()); 172 scheduledFutures.remove(existing.getId()); 173 } 174 }, delay, TimeUnit.MILLISECONDS); 175 176 scheduledFutures.put(trigger.getId(), future); 177 } 178 179 private void recoverAndSchedulePending() { 180 try { 181 List<Trigger> list = store.findAllPending(); 182 if (list == null || list.isEmpty()) return; 183 for (Trigger t : list) { 184 scheduleInternal(t); 185 } 186 } catch (Throwable t) { 187 // 忽略单次恢复错误,继续运行 188 t.printStackTrace(); 189 } 190 } 191 192 private void startPeriodicScan() { 193 if (closed.get()) return; 194 scanFuture = scheduler.scheduleAtFixedRate(() -> { 195 try { 196 long upto = System.currentTimeMillis(); 197 List<Trigger> due = store.findDue(upto); 198 if (due == null || due.isEmpty()) return; 199 for (Trigger t : due) { 200 // 如果已被 schedule 到未来(scheduledFutures 包含且尚未到期),跳过 201 ScheduledFuture<?> sf = scheduledFutures.get(t.getId()); 202 if (sf != null && !sf.isDone() && !sf.isCancelled()) { 203 continue; 204 } 205 // 直接提交到 worker,让 consumer 处理;并从 store 中移除 206 if (consumer != null) { 207 worker.submit(() -> { 208 try { 209 consumer.accept(t, worker); 210 } finally { 211 store.remove(t.getId()); 212 ScheduledFuture<?> f2 = scheduledFutures.remove(t.getId()); 213 if (f2 != null) f2.cancel(false); 214 } 215 }); 216 } else { 217 store.remove(t.getId()); 218 } 219 } 220 } catch (Throwable tt) { 221 tt.printStackTrace(); 222 } 223 }, scanIntervalMs, scanIntervalMs, TimeUnit.MILLISECONDS); 224 } 225 226 public void shutdown() { 227 if (closed.compareAndSet(false, true)) { 228 if (scanFuture != null) scanFuture.cancel(false); 229 // cancel scheduled futures 230 for (Map.Entry<String, ScheduledFuture<?>> e : scheduledFutures.entrySet()) { 231 try { 232 e.getValue().cancel(false); 233 } catch (Throwable ignored) { 234 } 235 } 236 scheduledFutures.clear(); 237 238 try { 239 scheduler.shutdownNow(); 240 } catch (Throwable ignored) { 241 } 242 try { 243 worker.shutdownNow(); 244 } catch (Throwable ignored) { 245 } 246 } 247 } 248}