001/**
002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com).
003 * <p>
004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 * <p>
008 * http://www.gnu.org/licenses/lgpl-3.0.txt
009 * <p>
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package dev.tinyflow.core.chain.runtime;
017
018import org.slf4j.Logger;
019import org.slf4j.LoggerFactory;
020
021import java.util.List;
022import java.util.Map;
023import java.util.Objects;
024import java.util.UUID;
025import java.util.concurrent.*;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028/**
029 *
030 * 功能:
031 * - schedule trigger (持久化到 TriggerStore 并 schedule)
032 * - cancel trigger
033 * - fire(triggerId) 用于 webhook/event/manual 主动触发
034 * - recoverAndSchedulePending() 启动时恢复未执行的 trigger
035 * - periodical scan findDue(upto) 以保证重启/宕机后补偿触发
036 * <p>
037 * 注意: 分布式环境下需要在 TriggerStore 层提供抢占/锁逻辑(例如 lease/owner 字段)。
038 */
039public class TriggerScheduler {
040
041    private static final Logger log = LoggerFactory.getLogger(TriggerScheduler.class);
042    private final TriggerStore store;
043    private final ScheduledExecutorService scheduler;
044    private final ExecutorService worker;
045    private final AtomicBoolean closed = new AtomicBoolean(false);
046
047    // map 用于管理取消:triggerId -> ScheduledFuture
048    private final ConcurrentMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
049
050    // consumer 来把 trigger 交给 ChainExecutor(或 ChainRuntime)去处理
051    private volatile TriggerConsumer consumer;
052
053    // 周期扫查间隔(ms)
054    private final long scanIntervalMs;
055
056    // 扫描任务 future
057    private ScheduledFuture<?> scanFuture;
058
059    public interface TriggerConsumer {
060        void accept(Trigger trigger, ExecutorService worker);
061    }
062
063    public TriggerScheduler(TriggerStore store, ScheduledExecutorService scheduler, ExecutorService worker, long scanIntervalMs) {
064        this.store = Objects.requireNonNull(store, "TriggerStore required");
065        this.scheduler = Objects.requireNonNull(scheduler, "ScheduledExecutorService required");
066        this.worker = Objects.requireNonNull(worker, "ExecutorService required");
067        this.scanIntervalMs = Math.max(1000, scanIntervalMs);
068
069        // 恢复并 schedule
070        recoverAndSchedulePending();
071
072        // 启动周期扫查 findDue
073        startPeriodicScan();
074    }
075
076
077    public void registerConsumer(TriggerConsumer consumer) {
078        this.consumer = consumer;
079    }
080
081    /**
082     * schedule a trigger: persist -> schedule (单机语义)
083     */
084    public Trigger schedule(Trigger trigger) {
085        if (closed.get()) throw new IllegalStateException("TriggerScheduler closed");
086        if (trigger.getId() == null) {
087            trigger.setId(UUID.randomUUID().toString());
088        }
089        store.save(trigger);
090        scheduleInternal(trigger);
091        return trigger;
092    }
093
094    /**
095     * cancel trigger (从 store 删除并尝试取消已 schedule 的 future)
096     */
097    public boolean cancel(String triggerId) {
098        boolean removed = store.remove(triggerId);
099        ScheduledFuture<?> f = scheduledFutures.remove(triggerId);
100        if (f != null) {
101            f.cancel(false);
102        }
103        return removed;
104    }
105
106    /**
107     * 主动触发(webhook/event/manual 场景)
108     */
109    public boolean fire(String triggerId) {
110        if (closed.get()) return false;
111        Trigger t = store.find(triggerId);
112        if (t == null) return false;
113        if (consumer == null) {
114            // 无 consumer,仍从 store 中移除
115            store.remove(triggerId);
116            return false;
117        }
118        // 在 worker 线程触发 consumer
119        worker.submit(() -> {
120            try {
121                consumer.accept(t, worker);
122            } catch (Exception e) {
123                log.error(e.toString(), e);
124            } finally {
125                // 默认语义:触发后移除
126                store.remove(triggerId);
127                ScheduledFuture<?> sf = scheduledFutures.remove(triggerId);
128                if (sf != null) sf.cancel(false);
129            }
130        });
131        return true;
132    }
133
134    /**
135     * internal scheduling for a trigger (单机 scheduled semantics)
136     */
137    private void scheduleInternal(Trigger trigger) {
138        if (closed.get()) return;
139
140        long delay = Math.max(0, trigger.getTriggerAt() - System.currentTimeMillis());
141
142        // cancel any existing scheduled future for same id
143        ScheduledFuture<?> prev = scheduledFutures.remove(trigger.getId());
144        if (prev != null) {
145            prev.cancel(false);
146        }
147
148        ScheduledFuture<?> future = scheduler.schedule(() -> {
149            // double-check existence in store (可能已被 cancel)
150            Trigger existing = store.find(trigger.getId());
151            if (existing == null) {
152                scheduledFutures.remove(trigger.getId());
153                return;
154            }
155
156            if (consumer != null) {
157                worker.submit(() -> {
158                    try {
159                        TriggerContext.setCurrentTrigger(existing);
160                        consumer.accept(existing, worker);
161                    } catch (Throwable e) {
162                        log.error(e.toString(), e);
163                    } finally {
164                        TriggerContext.clearCurrentTrigger();
165                        store.remove(existing.getId());
166                        scheduledFutures.remove(existing.getId());
167                    }
168                });
169            } else {
170                // 无 consumer,则移除
171                store.remove(existing.getId());
172                scheduledFutures.remove(existing.getId());
173            }
174        }, delay, TimeUnit.MILLISECONDS);
175
176        scheduledFutures.put(trigger.getId(), future);
177    }
178
179    private void recoverAndSchedulePending() {
180        try {
181            List<Trigger> list = store.findAllPending();
182            if (list == null || list.isEmpty()) return;
183            for (Trigger t : list) {
184                scheduleInternal(t);
185            }
186        } catch (Throwable t) {
187            // 忽略单次恢复错误,继续运行
188            t.printStackTrace();
189        }
190    }
191
192    private void startPeriodicScan() {
193        if (closed.get()) return;
194        scanFuture = scheduler.scheduleAtFixedRate(() -> {
195            try {
196                long upto = System.currentTimeMillis();
197                List<Trigger> due = store.findDue(upto);
198                if (due == null || due.isEmpty()) return;
199                for (Trigger t : due) {
200                    // 如果已被 schedule 到未来(scheduledFutures 包含且尚未到期),跳过
201                    ScheduledFuture<?> sf = scheduledFutures.get(t.getId());
202                    if (sf != null && !sf.isDone() && !sf.isCancelled()) {
203                        continue;
204                    }
205                    // 直接提交到 worker,让 consumer 处理;并从 store 中移除
206                    if (consumer != null) {
207                        worker.submit(() -> {
208                            try {
209                                consumer.accept(t, worker);
210                            } finally {
211                                store.remove(t.getId());
212                                ScheduledFuture<?> f2 = scheduledFutures.remove(t.getId());
213                                if (f2 != null) f2.cancel(false);
214                            }
215                        });
216                    } else {
217                        store.remove(t.getId());
218                    }
219                }
220            } catch (Throwable tt) {
221                tt.printStackTrace();
222            }
223        }, scanIntervalMs, scanIntervalMs, TimeUnit.MILLISECONDS);
224    }
225
226    public void shutdown() {
227        if (closed.compareAndSet(false, true)) {
228            if (scanFuture != null) scanFuture.cancel(false);
229            // cancel scheduled futures
230            for (Map.Entry<String, ScheduledFuture<?>> e : scheduledFutures.entrySet()) {
231                try {
232                    e.getValue().cancel(false);
233                } catch (Throwable ignored) {
234                }
235            }
236            scheduledFutures.clear();
237
238            try {
239                scheduler.shutdownNow();
240            } catch (Throwable ignored) {
241            }
242            try {
243                worker.shutdownNow();
244            } catch (Throwable ignored) {
245            }
246        }
247    }
248}