From 2af5439b3922042f232ec8d86d49a31904b32e9b Mon Sep 17 00:00:00 2001 From: Avgustin Marinov Date: Wed, 2 Apr 2025 12:25:08 +0300 Subject: [PATCH] Clustering: Add distributed lock (#2333) To support sync of activities in cluster setups Signed-off-by: Avgustin Marinov --- .../JpaRepositoryAutoConfiguration.java | 8 - .../V1_12_30__add_distrubuted_lock___H2.sql | 8 + ...V1_12_30__add_distrubuted_lock___MYSQL.sql | 8 + ..._31__add_distrubuted_lock___POSTGRESQL.sql | 8 + .../hawkbit-repository-jpa/pom.xml | 4 + .../RepositoryApplicationConfiguration.java | 24 +- .../cluster/DistributedLockRepository.java | 165 +++++++++++++ .../jpa/cluster/LockProperties.java | 27 ++ .../resources/hawkbit-jpa-defaults.properties | 3 + .../jpa/cluster/DistributedLockTest.java | 231 ++++++++++++++++++ .../src/test/resources/jpa-test.properties | 5 + site/content/guides/clustering.md | 4 +- 12 files changed, 484 insertions(+), 11 deletions(-) create mode 100644 hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql create mode 100644 hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql create mode 100644 hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql create mode 100644 hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java create mode 100644 hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java create mode 100644 hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java index 5dd99f45e..00db0e5ce 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java @@ -17,8 +17,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.integration.support.locks.DefaultLockRegistry; -import org.springframework.integration.support.locks.LockRegistry; /** * Auto-Configuration for enabling JPA repository. @@ -36,10 +34,4 @@ public class JpaRepositoryAutoConfiguration { public VirtualPropertyReplacer virtualPropertyReplacer() { return new VirtualPropertyResolver(); } - - @Bean - @ConditionalOnMissingBean - public LockRegistry lockRegistry() { - return new DefaultLockRegistry(); - } } \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql new file mode 100644 index 000000000..e5fc064a9 --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql @@ -0,0 +1,8 @@ +-- Table and fields in upper case as Spring queries it that way +CREATE TABLE SP_LOCK ( + LOCK_KEY CHAR(36) NOT NULL, + REGION VARCHAR(100) NOT NULL, + CLIENT_ID CHAR(36), + CREATED_DATE TIMESTAMP NOT NULL, + constraint SP_LOCK_PK primary key (LOCK_KEY, REGION) +); \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql new file mode 100644 index 000000000..ed8f9c835 --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql @@ -0,0 +1,8 @@ +-- Table and fields in upper case as Spring queries it that way +CREATE TABLE SP_LOCK ( + LOCK_KEY CHAR(36) NOT NULL, + REGION VARCHAR(100) NOT NULL, + CLIENT_ID CHAR(36), + CREATED_DATE DATETIME(6) NOT NULL, + constraint SP_LOCK_PK primary key (LOCK_KEY, REGION) +); \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql new file mode 100644 index 000000000..e5fc064a9 --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql @@ -0,0 +1,8 @@ +-- Table and fields in upper case as Spring queries it that way +CREATE TABLE SP_LOCK ( + LOCK_KEY CHAR(36) NOT NULL, + REGION VARCHAR(100) NOT NULL, + CLIENT_ID CHAR(36), + CREATED_DATE TIMESTAMP NOT NULL, + constraint SP_LOCK_PK primary key (LOCK_KEY, REGION) +); \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/pom.xml b/hawkbit-repository/hawkbit-repository-jpa/pom.xml index 046934c84..6ca8135c9 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/pom.xml +++ b/hawkbit-repository/hawkbit-repository-jpa/pom.xml @@ -122,6 +122,10 @@ org.springframework.security spring-security-core + + org.springframework.integration + spring-integration-jdbc + cz.jirutka.rsql rsql-parser diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java index 45f766260..daebb0a07 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import javax.sql.DataSource; + import jakarta.persistence.EntityManager; import jakarta.validation.Validation; @@ -78,6 +80,8 @@ import org.eclipse.hawkbit.repository.jpa.builder.JpaSoftwareModuleMetadataBuild import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetBuilder; import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetFilterQueryBuilder; import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetTypeBuilder; +import org.eclipse.hawkbit.repository.jpa.cluster.LockProperties; +import org.eclipse.hawkbit.repository.jpa.cluster.DistributedLockRepository; import org.eclipse.hawkbit.repository.jpa.event.JpaEventEntityManager; import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitDefaultServiceExecutor; import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitExecutor; @@ -186,6 +190,9 @@ import org.springframework.context.annotation.PropertySource; import org.springframework.data.domain.AuditorAware; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; +import org.springframework.integration.jdbc.lock.DefaultLockRepository; +import org.springframework.integration.jdbc.lock.LockRepository; +import org.springframework.integration.support.locks.DefaultLockRegistry; import org.springframework.integration.support.locks.LockRegistry; import org.springframework.lang.NonNull; import org.springframework.retry.annotation.EnableRetry; @@ -206,7 +213,7 @@ import org.springframework.validation.beanvalidation.MethodValidationPostProcess @EnableRetry @EntityScan("org.eclipse.hawkbit.repository.jpa.model") @PropertySource("classpath:/hawkbit-jpa-defaults.properties") -@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class }) +@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, LockProperties.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class }) @AutoConfigureAfter(DataSourceAutoConfiguration.class) public class RepositoryApplicationConfiguration { @@ -265,6 +272,21 @@ public class RepositoryApplicationConfiguration { }; } + @Bean + @ConditionalOnMissingBean + LockRepository lockRepository( + final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) { + final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager); + repository.setPrefix("SP_"); + return repository; + } + + @Bean + @ConditionalOnMissingBean + public LockRegistry lockRegistry() { + return new DefaultLockRegistry(); + } + @Bean @ConditionalOnMissingBean PauseRolloutGroupAction pauseRolloutGroupAction(final RolloutManagement rolloutManagement, diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java new file mode 100644 index 000000000..e3a46775c --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java @@ -0,0 +1,165 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.repository.jpa.cluster; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper; +import org.springframework.dao.DataIntegrityViolationException; +import org.springframework.dao.DeadlockLoserDataAccessException; +import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.dao.QueryTimeoutException; +import org.springframework.integration.jdbc.lock.DefaultLockRepository; +import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionTimedOutException; +import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +/** + * Repository for {@link JdbcLockRegistry}. This class is not thread safe. Adds support for keeping lock longer then ttl + * if really used by the instance. + */ +@Slf4j +public class DistributedLockRepository extends DefaultLockRepository { + + private static final int MAX_DELETE_RETRY = 10; + + // period between successive refresh tics + private static final String TIC_PERIOD_MS = "${hawkbit.repository.cluster.lock.ticPeriodMS:2000}"; + + private final PlatformTransactionManager txManager; + + private final int refreshOnRemainMS; + private final int refreshOnRemainPercent; + + // if empty refresh is effectively disabled (when both REFRESH_ON_REMAINS_MS and REFRESH_ON_REMAINS_PERCENT are non-positive) + // otherwise, a refresh is triggered at refreshAfterMillis after lock acquisition or last refresh + private Optional refreshAfterMillis; + // lock <-> next refresh time + private final Map lockToRefreshTime = new ConcurrentHashMap<>(); + + /** + * @param dataSource to use for managing the locks + */ + public DistributedLockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) { + super(dataSource); + this.txManager = txManager; + + this.refreshOnRemainMS = lockProperties.getRefreshOnRemainMS(); + this.refreshOnRemainPercent = lockProperties.getRefreshOnRemainPercent(); + + setTimeToLive(lockProperties.getTtl()); + } + + // interceptor that handles refreshAfterMillis update when time to live is changed + @Override + public void setTimeToLive(final int timeToLive) { + super.setTimeToLive(timeToLive); + refreshAfterMillis = refreshAfterMillis(timeToLive); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) + @Override + public boolean delete(final String lock) { + synchronized (this) { + lockToRefreshTime.remove(lock); + } + return delete(lock, 0); + } + + private boolean delete(final String lock, final int count) { + try { + return super.delete(lock); + } catch (final PessimisticLockingFailureException e) { + if (count < MAX_DELETE_RETRY) { + log.debug("Failed to delete cluster lock {}. We try again.", lock, e); + return delete(lock, count + 1); + } else { + log.warn("Failed to delete cluster lock {}!", lock, e); + return false; + } + } + } + + @Transactional(propagation=Propagation.NOT_SUPPORTED) + @Override + public boolean acquire(final String lock) { + try { + // real acquisition (made by super.acquire) is made in a new transaction + // because we need to know real (after transaction commit) result Ïto know if it is really successful. + // otherwise the super.acquire will return result before been committed and could be false positive + final boolean acquired = DeploymentHelper.runInNewTransaction( + txManager, "lock-acquire", Isolation.READ_COMMITTED.value(), status -> super.acquire(lock)); + if (acquired) { + // update next refresh time + refreshAfterMillis.ifPresent( + afterMillis -> lockToRefreshTime.put(lock, Instant.now().plus(afterMillis, ChronoUnit.MILLIS))); + } + return acquired; + } catch (final DataIntegrityViolationException | DeadlockLoserDataAccessException e) { + log.debug("Could not acquire cluster lock {}. I guess another node has it.", lock, e); + return false; + } catch (final QueryTimeoutException e) { + log.debug("Query timed out for lock {}.", lock, e); + throw new TransactionTimedOutException("DB query timed out for lock " + lock, e); + } + } + + @SuppressWarnings({"java:S1066"}) + @Scheduled(initialDelayString = TIC_PERIOD_MS, fixedDelayString = TIC_PERIOD_MS) + public void refresh() { + refreshAfterMillis.ifPresentOrElse(afterMillis -> { + final Instant now = Instant.now(); + lockToRefreshTime.forEach((lock, refreshTime) -> { + if (now.isAfter(refreshTime)) { + synchronized (this) { + // if delete is called while iterating we must skip record update + // otherwise, the lock will be unavailable for everyone until expiration + if (lockToRefreshTime.containsKey(lock)) { + if (!acquire(lock)) { // try to update record in lock table + log.warn("Failed to refresh cluster lock {}!", lock); + } + } + } + } + }); + }, lockToRefreshTime::clear); + } + + private Optional refreshAfterMillis(final int timeToLive) { + final int triggerOnRemainMS = Math.max(refreshOnRemainMS, timeToLive * refreshOnRemainPercent / 100); + final int refreshAfterMS = timeToLive - triggerOnRemainMS; + return refreshAfterMS <= 0 ? Optional.empty() : Optional.of(refreshAfterMS); + } +// * May be required if the super class doesn't execute in new transactions +// * See https://github.com/spring-projects/spring-integration/issues/3683 +// * may be not needed anymore +// @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true) +// @Override +// public boolean isAcquired(final String lock) { +// return super.isAcquired(lock); +// } +// +// @Transactional(propagation = Propagation.REQUIRES_NEW) +// @Override +// public void close() { +// super.close(); +// } +} \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java new file mode 100644 index 000000000..96b7703ba --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java @@ -0,0 +1,27 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.repository.jpa.cluster; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@Data +@ConfigurationProperties(prefix = "hawkbit.repository.cluster.lock") +@Validated +public class LockProperties { + + private int ttl = 5 * 60_000; // 5 minutes + + // when less than that time (in milliseconds) remains to lock expiration a refresh is triggered + private int refreshOnRemainMS = 4 * 60_000; // refresh after a minute, 4 minutes before expiration + // when less than that time (in percent of expiration) remains to lock expiration a refresh is triggered + private int refreshOnRemainPercent = 80; // refresh after a minute, 4 minutes before expiration +} \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties b/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties index 9413648d7..b00c9d560 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties @@ -22,3 +22,6 @@ spring.jpa.properties.eclipselink.logging.level=off spring.jpa.properties.eclipselink.query-results-cache=false spring.jpa.properties.eclipselink.cache.shared.default=false ### JPA / Datasource - END + +# disables RolloutsLockRepository warnings when other instance has already obtained the lock +logging.level.org.mariadb.jdbc.message.server.ErrorPacket=ERROR diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java new file mode 100644 index 000000000..d52b0150a --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java @@ -0,0 +1,231 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.repository.jpa.cluster; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; + +import javax.sql.DataSource; + +import io.qameta.allure.Description; +import io.qameta.allure.Feature; +import io.qameta.allure.Story; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.integration.jdbc.lock.DefaultLockRepository; +import org.springframework.integration.jdbc.lock.JdbcLockRegistry; +import org.springframework.integration.jdbc.lock.LockRepository; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.integration.util.UUIDConverter; +import org.springframework.security.core.userdetails.UserDetailsService; +import org.springframework.security.provisioning.InMemoryUserDetailsManager; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; + +@Feature("Component Tests - Repository") +@Story("Distributed Lock") +@SpringBootTest(classes = { DistributedLockTest.Config.class }, webEnvironment = SpringBootTest.WebEnvironment.NONE) +@Slf4j +class DistributedLockTest extends AbstractJpaIntegrationTest { + + @Autowired + private LockProperties lockProperties; + + @Autowired + @Qualifier("lockRepository0") + private LockRepository lockRepository0; + + @Autowired + @Qualifier("lockRepository1") + private LockRepository lockRepository1; + + @EnableTransactionManagement + @Configuration + @EnableConfigurationProperties({ LockProperties.class }) + @PropertySource("classpath:/jpa-test.properties") + static class Config { + + @Bean + @ConditionalOnMissingBean + public UserDetailsService userDetailsService() { + return new InMemoryUserDetailsManager(); + } + + @Bean + LockProperties lockProperties() { + return new LockProperties(); + } + + @Bean + LockRepository lockRepository0(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) { + return lockRepository(dataSource, lockProperties, txManager); + } + + @Bean + LockRepository lockRepository1(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) { + return lockRepository(dataSource, lockProperties, txManager); + } + + private LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) { + final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager); + repository.setPrefix("SP_"); + return repository; + } + } + + @SuppressWarnings({"java:S2925"}) + @Test + @Description("Test to verify that lock is kept while ping runs") + void keepLockAlive() { + final LockRegistry lockRegistry0 = new JdbcLockRegistry(lockRepository0); + final LockRegistry lockRegistry1 = new JdbcLockRegistry(lockRepository1); + + final String lockKey0 = "test-lock0"; + final String lockKey1 = "test-lock1"; + // JDBCLockRegistry#pathFor + final String path0 = UUIDConverter.getUUID(lockKey0).toString(); + final String path1 = UUIDConverter.getUUID(lockKey1).toString(); + // lock{i}{j} -> lockKey{i} obtained by lockRegistry{j} + final Lock lock00 = lockRegistry0.obtain(lockKey0); + final Lock lock01 = lockRegistry1.obtain(lockKey0); + final Lock lock10 = lockRegistry0.obtain(lockKey1); + final Lock lock11 = lockRegistry1.obtain(lockKey1); + + final AtomicBoolean lock01Obtained = new AtomicBoolean(); + final AtomicBoolean lock11Obtained = new AtomicBoolean(); + + final AtomicBoolean lock11Locked = new AtomicBoolean(); // state of the lock11 + log.info("Starting test"); + // service 0 must be able to lock lockKey0 + assertThat(lock00.tryLock()).isTrue(); + try { + assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state + + final Thread lockThread1 = new Thread(() -> { + // asserts lockKey1 is free and could be locked + assertThat(lock11.tryLock()).isTrue(); + assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state + + try { + lock11Obtained.set(true); + lock11Locked.set(true); + + // asserts lockKey0 is kept by lock00 and could not be locked via lockRepository1 + try { + final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS); + while (Instant.now().isBefore(timeout)) { + log.info("lockThread1: loop, timeout: {}", new Date(timeout.toEpochMilli())); + assertThat(lock01.tryLock()).isFalse(); + assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state + + try { + Thread.sleep(Math.min(1, lockProperties.getTtl() / 4)); + } catch (final InterruptedException e) { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + } + } + } + } catch (final AssertionError e) { + log.error("lockRepository1 has locked lockKey0 which has to be in lockRepository0 possession!", e); + lock01Obtained.set(true); + lock01.unlock(); + } + + assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state + assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state + } finally { + lock11Locked.set(false); + lock11.unlock(); + assertThat(lockRepository1.isAcquired(path1)).isFalse(); // check db state + } + }); + lockThread1.start(); + + // asserts lockKey1 is kept by lock11 and could not be locked via lockRepository0 + final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS); + while (Instant.now().isBefore(timeout)) { + log.info("main thread: loop, timeout: {}", new Date(timeout.toEpochMilli())); + if (lock11Locked.get()) { + try { + assertThat(lock10.tryLock()).isFalse(); + assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state + } catch (final AssertionError e) { + log.error("lockRepository0 has locked lockKey1 which has to be in lockRepository1 possession!"); + lock10.unlock(); + if (lock11Locked.get()) { + throw e; + } else { + // otherwise the lock has been released + break; + } + } + + try { + Thread.sleep(Math.min(1, lockProperties.getTtl() / 4)); + } catch (final InterruptedException e) { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + } + } + } + } + + try { + lockThread1.join(); + } catch (final InterruptedException e) { + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + } + } + + // assert that service 1 hasn't been able to acquire the lock 0 + assertThat(lock01Obtained).isFalse(); + // assert that service 1 has been able to acquire the lock 1 + assertThat(lock11Obtained).isTrue(); + + assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state + assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state + } finally { + lock00.unlock(); + assertThat(lockRepository0.isAcquired(path0)).isFalse(); // check db state + } + + try { + // assert that lockKey1 has been released by lock11 and could be got again + // and in different thread + assertThat(lock10.tryLock()).isTrue(); + // and can't be locked by it while locked by lock01 + assertThat(lock11.tryLock()).isFalse(); + } finally { + lock10.unlock(); + } + + // assert that db is clean + assertThat(lockRepository0.isAcquired(path0)).isFalse(); + assertThat(lockRepository1.isAcquired(path0)).isFalse(); + assertThat(lockRepository0.isAcquired(path1)).isFalse(); + assertThat(lockRepository1.isAcquired(path1)).isFalse(); + } +} diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties b/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties index b10c73419..176de3681 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties @@ -50,3 +50,8 @@ logging.level.org.eclipse.persistence=ERROR # enable / disable case sensitiveness of the DB when playing around #hawkbit.rsql.caseInsensitiveDB=true + +hawkbit.repository.cluster.lock.ttl=1000 +hawkbit.repository.cluster.lock.refreshOnRemainMS=200 +hawkbit.repository.cluster.lock.refreshOnRemainPercent=10 +hawkbit.repository.cluster.lock.ticPeriodMS=10 \ No newline at end of file diff --git a/site/content/guides/clustering.md b/site/content/guides/clustering.md index 0294a8a40..111b9f365 100644 --- a/site/content/guides/clustering.md +++ b/site/content/guides/clustering.md @@ -4,7 +4,7 @@ parent: Guides weight: 33 --- -hawkBit is able to run in a cluster with some constraints. This guide provides insights in the basic concepts and how to +hawkBit is able to run in a cluster. This guide provides insights in the basic concepts and how to setup your own cluster. You can find additional information in the [hawkBit runtimes's README](https://github.com/eclipse-hawkbit/hawkbit/blob/master/hawkbit-monolith/hawkbit-update-server/README.md). @@ -39,7 +39,7 @@ See [CacheAutoConfiguration](https://github.com/eclipse-hawkbit/hawkbit/blob/mas ## Schedulers Every node has multiple schedulers which run after a defined period of time. All schedulers always run on every node. -This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once. +This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once. This is implemented via using, by default, distributed database based lock. ## Known constraints