001/** 002 * Copyright (c) 2025-2026, Michael Yang 杨福海 (fuhai999@gmail.com). 003 * <p> 004 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * <p> 008 * http://www.gnu.org/licenses/lgpl-3.0.txt 009 * <p> 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package dev.tinyflow.core.chain.repository; 017 018import java.util.Map; 019import java.util.concurrent.ConcurrentHashMap; 020import java.util.concurrent.TimeUnit; 021import java.util.concurrent.locks.ReentrantLock; 022 023public class LocalChainLock implements ChainLock { 024 025 private static final Map<String, LockRef> GLOBAL_LOCKS = new ConcurrentHashMap<>(); 026 027 private final String instanceId; 028 private final ReentrantLock lock; 029 private final boolean acquired; 030 031 public LocalChainLock(String instanceId, long timeout, TimeUnit unit) { 032 if (instanceId == null || instanceId.isEmpty()) { 033 throw new IllegalArgumentException("instanceId must not be blank"); 034 } 035 this.instanceId = instanceId; 036 037 // 获取或创建锁(带引用计数) 038 LockRef lockRef = GLOBAL_LOCKS.compute(instanceId, (key, ref) -> { 039 if (ref == null) { 040 return new LockRef(new ReentrantLock()); 041 } else { 042 ref.refCount.incrementAndGet(); 043 return ref; 044 } 045 }); 046 047 this.lock = lockRef.lock; 048 boolean locked = false; 049 try { 050 if (timeout <= 0) { 051 lock.lock(); 052 locked = true; 053 } else { 054 locked = lock.tryLock(timeout, unit); 055 } 056 } catch (InterruptedException e) { 057 Thread.currentThread().interrupt(); 058 } 059 this.acquired = locked; 060 061 // 如果获取失败,清理引用计数 062 if (!locked) { 063 releaseRef(); 064 } 065 } 066 067 @Override 068 public boolean isAcquired() { 069 return acquired; 070 } 071 072 @Override 073 public void close() { 074 if (acquired) { 075 lock.unlock(); 076 releaseRef(); 077 } 078 } 079 080 private void releaseRef() { 081 GLOBAL_LOCKS.computeIfPresent(instanceId, (key, ref) -> { 082 if (ref.refCount.decrementAndGet() <= 0) { 083 return null; // 移除,允许 GC 084 } 085 return ref; 086 }); 087 } 088 089 // 内部类:带引用计数的锁包装 090 private static class LockRef { 091 final ReentrantLock lock; 092 final java.util.concurrent.atomic.AtomicInteger refCount = new java.util.concurrent.atomic.AtomicInteger(1); 093 094 LockRef(ReentrantLock lock) { 095 this.lock = lock; 096 } 097 } 098}