Clean up distribution lock (#3081)
Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -202,9 +202,8 @@ public class JpaRepositoryConfiguration {
|
||||
@Bean
|
||||
@ConditionalOnProperty(name = "hawkbit.lock", havingValue = "distributed", matchIfMissing = true)
|
||||
@ConditionalOnMissingBean
|
||||
LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties,
|
||||
final PlatformTransactionManager txManager) {
|
||||
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
|
||||
LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties) {
|
||||
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties);
|
||||
repository.setPrefix("SP_");
|
||||
return repository;
|
||||
}
|
||||
|
||||
@@ -19,24 +19,21 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper;
|
||||
import org.springframework.dao.DataIntegrityViolationException;
|
||||
import org.jspecify.annotations.NullMarked;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.springframework.dao.PessimisticLockingFailureException;
|
||||
import org.springframework.dao.QueryTimeoutException;
|
||||
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
|
||||
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionTimedOutException;
|
||||
import org.springframework.transaction.annotation.Isolation;
|
||||
import org.springframework.transaction.annotation.Propagation;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
* Repository for {@link JdbcLockRegistry}. This class is not thread safe. Adds support for keeping lock longer then ttl
|
||||
* if really used by the instance.
|
||||
* Repository for {@link JdbcLockRegistry}. This class is not thread safe.
|
||||
* Adds support for keeping lock longer than ttl if really used by the instance (renew).
|
||||
*/
|
||||
@Slf4j
|
||||
@NullMarked
|
||||
public class DistributedLockRepository extends DefaultLockRepository {
|
||||
|
||||
private static final int MAX_DELETE_RETRY = 10;
|
||||
@@ -44,35 +41,32 @@ public class DistributedLockRepository extends DefaultLockRepository {
|
||||
// period between successive refresh tics
|
||||
private static final String TIC_PERIOD_MS = "${hawkbit.repository.cluster.lock.ticPeriodMS:2000}";
|
||||
|
||||
private final PlatformTransactionManager txManager;
|
||||
private final Duration renewTtl;
|
||||
|
||||
private final int refreshOnRemainMS;
|
||||
private final int refreshOnRemainPercent;
|
||||
|
||||
// if empty refresh is effectively disabled (when both REFRESH_ON_REMAINS_MS and REFRESH_ON_REMAINS_PERCENT are non-positive)
|
||||
// if null refresh is effectively disabled (when both REFRESH_ON_REMAINS_MS and REFRESH_ON_REMAINS_PERCENT are non-positive)
|
||||
// otherwise, a refresh is triggered at refreshAfterMillis after lock acquisition or last refresh
|
||||
private Optional<Integer> refreshAfterMillis;
|
||||
@Nullable
|
||||
private final Integer refreshAfterMillis;
|
||||
// lock <-> next refresh time
|
||||
private final Map<String, Instant> lockToRefreshTime = new ConcurrentHashMap<>();
|
||||
|
||||
public DistributedLockRepository(
|
||||
final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
|
||||
public DistributedLockRepository(final DataSource dataSource, final LockProperties lockProperties) {
|
||||
super(dataSource);
|
||||
this.txManager = txManager;
|
||||
|
||||
this.refreshOnRemainMS = lockProperties.getRefreshOnRemainMS();
|
||||
this.refreshOnRemainPercent = lockProperties.getRefreshOnRemainPercent();
|
||||
renewTtl = Duration.ofMillis(lockProperties.getTtl());
|
||||
|
||||
final int timeToLive = lockProperties.getTtl();
|
||||
final int triggerOnRemainMS = Math.max(
|
||||
lockProperties.getRefreshOnRemainMS(),
|
||||
timeToLive * lockProperties.getRefreshOnRemainPercent() / 100);
|
||||
final int refreshAfterMS = timeToLive - triggerOnRemainMS;
|
||||
refreshAfterMillis = refreshAfterMS <= 0 ? null : refreshAfterMS;
|
||||
|
||||
// to ensure that deprecated acquire and renew will use the lockProperties ttl. none shall call them but anyway - for sure
|
||||
// to be removed when the #setTimeToLive method is removed from DefaultLockRepository
|
||||
setTimeToLive(lockProperties.getTtl());
|
||||
}
|
||||
|
||||
// interceptor that handles refreshAfterMillis update when time to live is changed
|
||||
@Override
|
||||
public void setTimeToLive(final int timeToLive) {
|
||||
super.setTimeToLive(timeToLive);
|
||||
refreshAfterMillis = refreshAfterMillis(timeToLive);
|
||||
}
|
||||
|
||||
@Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
@Override
|
||||
public boolean delete(final String lock) {
|
||||
@@ -98,30 +92,20 @@ public class DistributedLockRepository extends DefaultLockRepository {
|
||||
|
||||
// Spring Integration 7.0 calls acquire(String, Duration) directly; the deprecated acquire(String) is no longer invoked
|
||||
// by JdbcLockRegistry. Override the new method to populate lockToRefreshTime for the refresh mechanism.
|
||||
@Transactional(propagation = Propagation.NOT_SUPPORTED)
|
||||
@Override
|
||||
public boolean acquire(final String lock, final Duration ttl) {
|
||||
try {
|
||||
// run in a new transaction so we know the committed result before updating lockToRefreshTime
|
||||
final boolean acquired = DeploymentHelper.runInNewTransaction(
|
||||
txManager, "lock-acquire", Isolation.READ_COMMITTED.value(), status -> super.acquire(lock, ttl));
|
||||
if (acquired) {
|
||||
refreshAfterMillis.ifPresent(afterMillis -> lockToRefreshTime.put(lock, Instant.now().plus(afterMillis, ChronoUnit.MILLIS)));
|
||||
}
|
||||
return acquired;
|
||||
} catch (final DataIntegrityViolationException | PessimisticLockingFailureException e) {
|
||||
log.debug("Could not acquire cluster lock {}. I guess another node has it.", lock, e);
|
||||
return false;
|
||||
} catch (final QueryTimeoutException e) {
|
||||
log.debug("Query timed out for lock {}.", lock, e);
|
||||
throw new TransactionTimedOutException("DB query timed out for lock " + lock, e);
|
||||
final boolean acquired = super.acquire(lock, ttl);
|
||||
if (acquired) {
|
||||
Optional.ofNullable(refreshAfterMillis).ifPresent(afterMillis ->
|
||||
lockToRefreshTime.put(lock, Instant.now().plus(afterMillis, ChronoUnit.MILLIS)));
|
||||
}
|
||||
return acquired;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "java:S1066" })
|
||||
@Scheduled(initialDelayString = TIC_PERIOD_MS, fixedDelayString = TIC_PERIOD_MS)
|
||||
public void refresh() {
|
||||
refreshAfterMillis.ifPresentOrElse(afterMillis -> {
|
||||
public void renew() {
|
||||
Optional.ofNullable(refreshAfterMillis).ifPresentOrElse(afterMillis -> {
|
||||
final Instant now = Instant.now();
|
||||
lockToRefreshTime.forEach((lock, refreshTime) -> {
|
||||
if (now.isAfter(refreshTime)) {
|
||||
@@ -129,8 +113,8 @@ public class DistributedLockRepository extends DefaultLockRepository {
|
||||
// if delete is called while iterating we must skip record update
|
||||
// otherwise, the lock will be unavailable for everyone until expiration
|
||||
if (lockToRefreshTime.containsKey(lock)) {
|
||||
if (!acquire(lock)) { // try to update record in lock table
|
||||
log.warn("Failed to refresh cluster lock {}!", lock);
|
||||
if (!renew(lock, renewTtl)) { // try to update record in lock table
|
||||
log.warn("Failed to renew cluster lock {}!", lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -138,24 +122,4 @@ public class DistributedLockRepository extends DefaultLockRepository {
|
||||
});
|
||||
}, lockToRefreshTime::clear);
|
||||
}
|
||||
|
||||
private Optional<Integer> refreshAfterMillis(final int timeToLive) {
|
||||
final int triggerOnRemainMS = Math.max(refreshOnRemainMS, timeToLive * refreshOnRemainPercent / 100);
|
||||
final int refreshAfterMS = timeToLive - triggerOnRemainMS;
|
||||
return refreshAfterMS <= 0 ? Optional.empty() : Optional.of(refreshAfterMS);
|
||||
}
|
||||
// * May be required if the super class doesn't execute in new transactions
|
||||
// * See https://github.com/spring-projects/spring-integration/issues/3683
|
||||
// * may be not needed anymore
|
||||
// @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
|
||||
// @Override
|
||||
// public boolean isAcquired(final String lock) {
|
||||
// return super.isAcquired(lock);
|
||||
// }
|
||||
//
|
||||
// @Transactional(propagation = Propagation.REQUIRES_NEW)
|
||||
// @Override
|
||||
// public void close() {
|
||||
// super.close();
|
||||
// }
|
||||
}
|
||||
@@ -37,7 +37,6 @@ import org.springframework.integration.support.locks.LockRegistry;
|
||||
import org.springframework.integration.util.UUIDConverter;
|
||||
import org.springframework.security.core.userdetails.UserDetailsService;
|
||||
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
|
||||
/**
|
||||
@@ -77,20 +76,17 @@ class DistributedLockTest extends AbstractJpaIntegrationTest {
|
||||
}
|
||||
|
||||
@Bean
|
||||
LockRepository lockRepository0(final DataSource dataSource, final LockProperties lockProperties,
|
||||
final PlatformTransactionManager txManager) {
|
||||
return lockRepository(dataSource, lockProperties, txManager);
|
||||
LockRepository lockRepository0(final DataSource dataSource, final LockProperties lockProperties) {
|
||||
return lockRepository(dataSource, lockProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
LockRepository lockRepository1(final DataSource dataSource, final LockProperties lockProperties,
|
||||
final PlatformTransactionManager txManager) {
|
||||
return lockRepository(dataSource, lockProperties, txManager);
|
||||
LockRepository lockRepository1(final DataSource dataSource, final LockProperties lockProperties) {
|
||||
return lockRepository(dataSource, lockProperties);
|
||||
}
|
||||
|
||||
private LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties,
|
||||
final PlatformTransactionManager txManager) {
|
||||
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
|
||||
private LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties) {
|
||||
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties);
|
||||
repository.setPrefix("SP_");
|
||||
return repository;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user