diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java
index 5dd99f45e..00db0e5ce 100644
--- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java
+++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/repository/JpaRepositoryAutoConfiguration.java
@@ -17,8 +17,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
-import org.springframework.integration.support.locks.DefaultLockRegistry;
-import org.springframework.integration.support.locks.LockRegistry;
/**
* Auto-Configuration for enabling JPA repository.
@@ -36,10 +34,4 @@ public class JpaRepositoryAutoConfiguration {
public VirtualPropertyReplacer virtualPropertyReplacer() {
return new VirtualPropertyResolver();
}
-
- @Bean
- @ConditionalOnMissingBean
- public LockRegistry lockRegistry() {
- return new DefaultLockRegistry();
- }
}
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql
new file mode 100644
index 000000000..e5fc064a9
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/H2/V1_12_30__add_distrubuted_lock___H2.sql
@@ -0,0 +1,8 @@
+-- Table and fields in upper case as Spring queries it that way
+CREATE TABLE SP_LOCK (
+ LOCK_KEY CHAR(36) NOT NULL,
+ REGION VARCHAR(100) NOT NULL,
+ CLIENT_ID CHAR(36),
+ CREATED_DATE TIMESTAMP NOT NULL,
+ constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
+);
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql
new file mode 100644
index 000000000..ed8f9c835
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/MYSQL/V1_12_30__add_distrubuted_lock___MYSQL.sql
@@ -0,0 +1,8 @@
+-- Table and fields in upper case as Spring queries it that way
+CREATE TABLE SP_LOCK (
+ LOCK_KEY CHAR(36) NOT NULL,
+ REGION VARCHAR(100) NOT NULL,
+ CLIENT_ID CHAR(36),
+ CREATED_DATE DATETIME(6) NOT NULL,
+ constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
+);
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql
new file mode 100644
index 000000000..e5fc064a9
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa-flyway/src/main/resources/db/migration/POSTGRESQL/V1_12_31__add_distrubuted_lock___POSTGRESQL.sql
@@ -0,0 +1,8 @@
+-- Table and fields in upper case as Spring queries it that way
+CREATE TABLE SP_LOCK (
+ LOCK_KEY CHAR(36) NOT NULL,
+ REGION VARCHAR(100) NOT NULL,
+ CLIENT_ID CHAR(36),
+ CREATED_DATE TIMESTAMP NOT NULL,
+ constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
+);
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa/pom.xml b/hawkbit-repository/hawkbit-repository-jpa/pom.xml
index 046934c84..6ca8135c9 100644
--- a/hawkbit-repository/hawkbit-repository-jpa/pom.xml
+++ b/hawkbit-repository/hawkbit-repository-jpa/pom.xml
@@ -122,6 +122,10 @@
org.springframework.security
spring-security-core
+
+ org.springframework.integration
+ spring-integration-jdbc
+
cz.jirutka.rsql
rsql-parser
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java
index 45f766260..daebb0a07 100644
--- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java
@@ -13,6 +13,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
+import javax.sql.DataSource;
+
import jakarta.persistence.EntityManager;
import jakarta.validation.Validation;
@@ -78,6 +80,8 @@ import org.eclipse.hawkbit.repository.jpa.builder.JpaSoftwareModuleMetadataBuild
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetBuilder;
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetFilterQueryBuilder;
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetTypeBuilder;
+import org.eclipse.hawkbit.repository.jpa.cluster.LockProperties;
+import org.eclipse.hawkbit.repository.jpa.cluster.DistributedLockRepository;
import org.eclipse.hawkbit.repository.jpa.event.JpaEventEntityManager;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitDefaultServiceExecutor;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitExecutor;
@@ -186,6 +190,9 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.data.domain.AuditorAware;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
+import org.springframework.integration.jdbc.lock.DefaultLockRepository;
+import org.springframework.integration.jdbc.lock.LockRepository;
+import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.lang.NonNull;
import org.springframework.retry.annotation.EnableRetry;
@@ -206,7 +213,7 @@ import org.springframework.validation.beanvalidation.MethodValidationPostProcess
@EnableRetry
@EntityScan("org.eclipse.hawkbit.repository.jpa.model")
@PropertySource("classpath:/hawkbit-jpa-defaults.properties")
-@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class })
+@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, LockProperties.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class })
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class RepositoryApplicationConfiguration {
@@ -265,6 +272,21 @@ public class RepositoryApplicationConfiguration {
};
}
+ @Bean
+ @ConditionalOnMissingBean
+ LockRepository lockRepository(
+ final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
+ final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
+ repository.setPrefix("SP_");
+ return repository;
+ }
+
+ @Bean
+ @ConditionalOnMissingBean
+ public LockRegistry lockRegistry() {
+ return new DefaultLockRegistry();
+ }
+
@Bean
@ConditionalOnMissingBean
PauseRolloutGroupAction pauseRolloutGroupAction(final RolloutManagement rolloutManagement,
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java
new file mode 100644
index 000000000..e3a46775c
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockRepository.java
@@ -0,0 +1,165 @@
+/**
+ * Copyright (c) 2025 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.hawkbit.repository.jpa.cluster;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.sql.DataSource;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper;
+import org.springframework.dao.DataIntegrityViolationException;
+import org.springframework.dao.DeadlockLoserDataAccessException;
+import org.springframework.dao.PessimisticLockingFailureException;
+import org.springframework.dao.QueryTimeoutException;
+import org.springframework.integration.jdbc.lock.DefaultLockRepository;
+import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionTimedOutException;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Repository for {@link JdbcLockRegistry}. This class is not thread safe. Adds support for keeping lock longer then ttl
+ * if really used by the instance.
+ */
+@Slf4j
+public class DistributedLockRepository extends DefaultLockRepository {
+
+ private static final int MAX_DELETE_RETRY = 10;
+
+ // period between successive refresh tics
+ private static final String TIC_PERIOD_MS = "${hawkbit.repository.cluster.lock.ticPeriodMS:2000}";
+
+ private final PlatformTransactionManager txManager;
+
+ private final int refreshOnRemainMS;
+ private final int refreshOnRemainPercent;
+
+ // if empty refresh is effectively disabled (when both REFRESH_ON_REMAINS_MS and REFRESH_ON_REMAINS_PERCENT are non-positive)
+ // otherwise, a refresh is triggered at refreshAfterMillis after lock acquisition or last refresh
+ private Optional refreshAfterMillis;
+ // lock <-> next refresh time
+ private final Map lockToRefreshTime = new ConcurrentHashMap<>();
+
+ /**
+ * @param dataSource to use for managing the locks
+ */
+ public DistributedLockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
+ super(dataSource);
+ this.txManager = txManager;
+
+ this.refreshOnRemainMS = lockProperties.getRefreshOnRemainMS();
+ this.refreshOnRemainPercent = lockProperties.getRefreshOnRemainPercent();
+
+ setTimeToLive(lockProperties.getTtl());
+ }
+
+ // interceptor that handles refreshAfterMillis update when time to live is changed
+ @Override
+ public void setTimeToLive(final int timeToLive) {
+ super.setTimeToLive(timeToLive);
+ refreshAfterMillis = refreshAfterMillis(timeToLive);
+ }
+
+ @Transactional(propagation = Propagation.REQUIRES_NEW)
+ @Override
+ public boolean delete(final String lock) {
+ synchronized (this) {
+ lockToRefreshTime.remove(lock);
+ }
+ return delete(lock, 0);
+ }
+
+ private boolean delete(final String lock, final int count) {
+ try {
+ return super.delete(lock);
+ } catch (final PessimisticLockingFailureException e) {
+ if (count < MAX_DELETE_RETRY) {
+ log.debug("Failed to delete cluster lock {}. We try again.", lock, e);
+ return delete(lock, count + 1);
+ } else {
+ log.warn("Failed to delete cluster lock {}!", lock, e);
+ return false;
+ }
+ }
+ }
+
+ @Transactional(propagation=Propagation.NOT_SUPPORTED)
+ @Override
+ public boolean acquire(final String lock) {
+ try {
+ // real acquisition (made by super.acquire) is made in a new transaction
+ // because we need to know real (after transaction commit) result Ïto know if it is really successful.
+ // otherwise the super.acquire will return result before been committed and could be false positive
+ final boolean acquired = DeploymentHelper.runInNewTransaction(
+ txManager, "lock-acquire", Isolation.READ_COMMITTED.value(), status -> super.acquire(lock));
+ if (acquired) {
+ // update next refresh time
+ refreshAfterMillis.ifPresent(
+ afterMillis -> lockToRefreshTime.put(lock, Instant.now().plus(afterMillis, ChronoUnit.MILLIS)));
+ }
+ return acquired;
+ } catch (final DataIntegrityViolationException | DeadlockLoserDataAccessException e) {
+ log.debug("Could not acquire cluster lock {}. I guess another node has it.", lock, e);
+ return false;
+ } catch (final QueryTimeoutException e) {
+ log.debug("Query timed out for lock {}.", lock, e);
+ throw new TransactionTimedOutException("DB query timed out for lock " + lock, e);
+ }
+ }
+
+ @SuppressWarnings({"java:S1066"})
+ @Scheduled(initialDelayString = TIC_PERIOD_MS, fixedDelayString = TIC_PERIOD_MS)
+ public void refresh() {
+ refreshAfterMillis.ifPresentOrElse(afterMillis -> {
+ final Instant now = Instant.now();
+ lockToRefreshTime.forEach((lock, refreshTime) -> {
+ if (now.isAfter(refreshTime)) {
+ synchronized (this) {
+ // if delete is called while iterating we must skip record update
+ // otherwise, the lock will be unavailable for everyone until expiration
+ if (lockToRefreshTime.containsKey(lock)) {
+ if (!acquire(lock)) { // try to update record in lock table
+ log.warn("Failed to refresh cluster lock {}!", lock);
+ }
+ }
+ }
+ }
+ });
+ }, lockToRefreshTime::clear);
+ }
+
+ private Optional refreshAfterMillis(final int timeToLive) {
+ final int triggerOnRemainMS = Math.max(refreshOnRemainMS, timeToLive * refreshOnRemainPercent / 100);
+ final int refreshAfterMS = timeToLive - triggerOnRemainMS;
+ return refreshAfterMS <= 0 ? Optional.empty() : Optional.of(refreshAfterMS);
+ }
+// * May be required if the super class doesn't execute in new transactions
+// * See https://github.com/spring-projects/spring-integration/issues/3683
+// * may be not needed anymore
+// @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
+// @Override
+// public boolean isAcquired(final String lock) {
+// return super.isAcquired(lock);
+// }
+//
+// @Transactional(propagation = Propagation.REQUIRES_NEW)
+// @Override
+// public void close() {
+// super.close();
+// }
+}
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java
new file mode 100644
index 000000000..96b7703ba
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/cluster/LockProperties.java
@@ -0,0 +1,27 @@
+/**
+ * Copyright (c) 2025 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.hawkbit.repository.jpa.cluster;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+@Data
+@ConfigurationProperties(prefix = "hawkbit.repository.cluster.lock")
+@Validated
+public class LockProperties {
+
+ private int ttl = 5 * 60_000; // 5 minutes
+
+ // when less than that time (in milliseconds) remains to lock expiration a refresh is triggered
+ private int refreshOnRemainMS = 4 * 60_000; // refresh after a minute, 4 minutes before expiration
+ // when less than that time (in percent of expiration) remains to lock expiration a refresh is triggered
+ private int refreshOnRemainPercent = 80; // refresh after a minute, 4 minutes before expiration
+}
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties b/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties
index 9413648d7..b00c9d560 100644
--- a/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/resources/hawkbit-jpa-defaults.properties
@@ -22,3 +22,6 @@ spring.jpa.properties.eclipselink.logging.level=off
spring.jpa.properties.eclipselink.query-results-cache=false
spring.jpa.properties.eclipselink.cache.shared.default=false
### JPA / Datasource - END
+
+# disables RolloutsLockRepository warnings when other instance has already obtained the lock
+logging.level.org.mariadb.jdbc.message.server.ErrorPacket=ERROR
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java
new file mode 100644
index 000000000..d52b0150a
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/cluster/DistributedLockTest.java
@@ -0,0 +1,231 @@
+/**
+ * Copyright (c) 2025 Contributors to the Eclipse Foundation
+ *
+ * This program and the accompanying materials are made
+ * available under the terms of the Eclipse Public License 2.0
+ * which is available at https://www.eclipse.org/legal/epl-2.0/
+ *
+ * SPDX-License-Identifier: EPL-2.0
+ */
+package org.eclipse.hawkbit.repository.jpa.cluster;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+import javax.sql.DataSource;
+
+import io.qameta.allure.Description;
+import io.qameta.allure.Feature;
+import io.qameta.allure.Story;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.integration.jdbc.lock.DefaultLockRepository;
+import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
+import org.springframework.integration.jdbc.lock.LockRepository;
+import org.springframework.integration.support.locks.LockRegistry;
+import org.springframework.integration.util.UUIDConverter;
+import org.springframework.security.core.userdetails.UserDetailsService;
+import org.springframework.security.provisioning.InMemoryUserDetailsManager;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+@Feature("Component Tests - Repository")
+@Story("Distributed Lock")
+@SpringBootTest(classes = { DistributedLockTest.Config.class }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@Slf4j
+class DistributedLockTest extends AbstractJpaIntegrationTest {
+
+ @Autowired
+ private LockProperties lockProperties;
+
+ @Autowired
+ @Qualifier("lockRepository0")
+ private LockRepository lockRepository0;
+
+ @Autowired
+ @Qualifier("lockRepository1")
+ private LockRepository lockRepository1;
+
+ @EnableTransactionManagement
+ @Configuration
+ @EnableConfigurationProperties({ LockProperties.class })
+ @PropertySource("classpath:/jpa-test.properties")
+ static class Config {
+
+ @Bean
+ @ConditionalOnMissingBean
+ public UserDetailsService userDetailsService() {
+ return new InMemoryUserDetailsManager();
+ }
+
+ @Bean
+ LockProperties lockProperties() {
+ return new LockProperties();
+ }
+
+ @Bean
+ LockRepository lockRepository0(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
+ return lockRepository(dataSource, lockProperties, txManager);
+ }
+
+ @Bean
+ LockRepository lockRepository1(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
+ return lockRepository(dataSource, lockProperties, txManager);
+ }
+
+ private LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
+ final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
+ repository.setPrefix("SP_");
+ return repository;
+ }
+ }
+
+ @SuppressWarnings({"java:S2925"})
+ @Test
+ @Description("Test to verify that lock is kept while ping runs")
+ void keepLockAlive() {
+ final LockRegistry lockRegistry0 = new JdbcLockRegistry(lockRepository0);
+ final LockRegistry lockRegistry1 = new JdbcLockRegistry(lockRepository1);
+
+ final String lockKey0 = "test-lock0";
+ final String lockKey1 = "test-lock1";
+ // JDBCLockRegistry#pathFor
+ final String path0 = UUIDConverter.getUUID(lockKey0).toString();
+ final String path1 = UUIDConverter.getUUID(lockKey1).toString();
+ // lock{i}{j} -> lockKey{i} obtained by lockRegistry{j}
+ final Lock lock00 = lockRegistry0.obtain(lockKey0);
+ final Lock lock01 = lockRegistry1.obtain(lockKey0);
+ final Lock lock10 = lockRegistry0.obtain(lockKey1);
+ final Lock lock11 = lockRegistry1.obtain(lockKey1);
+
+ final AtomicBoolean lock01Obtained = new AtomicBoolean();
+ final AtomicBoolean lock11Obtained = new AtomicBoolean();
+
+ final AtomicBoolean lock11Locked = new AtomicBoolean(); // state of the lock11
+ log.info("Starting test");
+ // service 0 must be able to lock lockKey0
+ assertThat(lock00.tryLock()).isTrue();
+ try {
+ assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state
+
+ final Thread lockThread1 = new Thread(() -> {
+ // asserts lockKey1 is free and could be locked
+ assertThat(lock11.tryLock()).isTrue();
+ assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state
+
+ try {
+ lock11Obtained.set(true);
+ lock11Locked.set(true);
+
+ // asserts lockKey0 is kept by lock00 and could not be locked via lockRepository1
+ try {
+ final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS);
+ while (Instant.now().isBefore(timeout)) {
+ log.info("lockThread1: loop, timeout: {}", new Date(timeout.toEpochMilli()));
+ assertThat(lock01.tryLock()).isFalse();
+ assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state
+
+ try {
+ Thread.sleep(Math.min(1, lockProperties.getTtl() / 4));
+ } catch (final InterruptedException e) {
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } catch (final AssertionError e) {
+ log.error("lockRepository1 has locked lockKey0 which has to be in lockRepository0 possession!", e);
+ lock01Obtained.set(true);
+ lock01.unlock();
+ }
+
+ assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state
+ assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state
+ } finally {
+ lock11Locked.set(false);
+ lock11.unlock();
+ assertThat(lockRepository1.isAcquired(path1)).isFalse(); // check db state
+ }
+ });
+ lockThread1.start();
+
+ // asserts lockKey1 is kept by lock11 and could not be locked via lockRepository0
+ final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS);
+ while (Instant.now().isBefore(timeout)) {
+ log.info("main thread: loop, timeout: {}", new Date(timeout.toEpochMilli()));
+ if (lock11Locked.get()) {
+ try {
+ assertThat(lock10.tryLock()).isFalse();
+ assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state
+ } catch (final AssertionError e) {
+ log.error("lockRepository0 has locked lockKey1 which has to be in lockRepository1 possession!");
+ lock10.unlock();
+ if (lock11Locked.get()) {
+ throw e;
+ } else {
+ // otherwise the lock has been released
+ break;
+ }
+ }
+
+ try {
+ Thread.sleep(Math.min(1, lockProperties.getTtl() / 4));
+ } catch (final InterruptedException e) {
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ try {
+ lockThread1.join();
+ } catch (final InterruptedException e) {
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ // assert that service 1 hasn't been able to acquire the lock 0
+ assertThat(lock01Obtained).isFalse();
+ // assert that service 1 has been able to acquire the lock 1
+ assertThat(lock11Obtained).isTrue();
+
+ assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state
+ assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state
+ } finally {
+ lock00.unlock();
+ assertThat(lockRepository0.isAcquired(path0)).isFalse(); // check db state
+ }
+
+ try {
+ // assert that lockKey1 has been released by lock11 and could be got again
+ // and in different thread
+ assertThat(lock10.tryLock()).isTrue();
+ // and can't be locked by it while locked by lock01
+ assertThat(lock11.tryLock()).isFalse();
+ } finally {
+ lock10.unlock();
+ }
+
+ // assert that db is clean
+ assertThat(lockRepository0.isAcquired(path0)).isFalse();
+ assertThat(lockRepository1.isAcquired(path0)).isFalse();
+ assertThat(lockRepository0.isAcquired(path1)).isFalse();
+ assertThat(lockRepository1.isAcquired(path1)).isFalse();
+ }
+}
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties b/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties
index b10c73419..176de3681 100644
--- a/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/resources/jpa-test.properties
@@ -50,3 +50,8 @@ logging.level.org.eclipse.persistence=ERROR
# enable / disable case sensitiveness of the DB when playing around
#hawkbit.rsql.caseInsensitiveDB=true
+
+hawkbit.repository.cluster.lock.ttl=1000
+hawkbit.repository.cluster.lock.refreshOnRemainMS=200
+hawkbit.repository.cluster.lock.refreshOnRemainPercent=10
+hawkbit.repository.cluster.lock.ticPeriodMS=10
\ No newline at end of file
diff --git a/site/content/guides/clustering.md b/site/content/guides/clustering.md
index 0294a8a40..111b9f365 100644
--- a/site/content/guides/clustering.md
+++ b/site/content/guides/clustering.md
@@ -4,7 +4,7 @@ parent: Guides
weight: 33
---
-hawkBit is able to run in a cluster with some constraints. This guide provides insights in the basic concepts and how to
+hawkBit is able to run in a cluster. This guide provides insights in the basic concepts and how to
setup your own cluster. You can find additional information in
the [hawkBit runtimes's README](https://github.com/eclipse-hawkbit/hawkbit/blob/master/hawkbit-monolith/hawkbit-update-server/README.md).
@@ -39,7 +39,7 @@ See [CacheAutoConfiguration](https://github.com/eclipse-hawkbit/hawkbit/blob/mas
## Schedulers
Every node has multiple schedulers which run after a defined period of time. All schedulers always run on every node.
-This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once.
+This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once. This is implemented via using, by default, distributed database based lock.
## Known constraints