Clustering: Add distributed lock (#2333)

To support sync of activities in cluster setups

Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
Avgustin Marinov
2025-04-02 12:25:08 +03:00
committed by GitHub
parent 32990ab2ea
commit 2af5439b39
12 changed files with 484 additions and 11 deletions

View File

@@ -17,8 +17,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
/**
* Auto-Configuration for enabling JPA repository.
@@ -36,10 +34,4 @@ public class JpaRepositoryAutoConfiguration {
public VirtualPropertyReplacer virtualPropertyReplacer() {
return new VirtualPropertyResolver();
}
@Bean
@ConditionalOnMissingBean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
}

View File

@@ -0,0 +1,8 @@
-- Table and fields in upper case as Spring queries it that way
CREATE TABLE SP_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
);

View File

@@ -0,0 +1,8 @@
-- Table and fields in upper case as Spring queries it that way
CREATE TABLE SP_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE DATETIME(6) NOT NULL,
constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
);

View File

@@ -0,0 +1,8 @@
-- Table and fields in upper case as Spring queries it that way
CREATE TABLE SP_LOCK (
LOCK_KEY CHAR(36) NOT NULL,
REGION VARCHAR(100) NOT NULL,
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint SP_LOCK_PK primary key (LOCK_KEY, REGION)
);

View File

@@ -122,6 +122,10 @@
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>cz.jirutka.rsql</groupId>
<artifactId>rsql-parser</artifactId>

View File

@@ -13,6 +13,8 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import javax.sql.DataSource;
import jakarta.persistence.EntityManager;
import jakarta.validation.Validation;
@@ -78,6 +80,8 @@ import org.eclipse.hawkbit.repository.jpa.builder.JpaSoftwareModuleMetadataBuild
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetBuilder;
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetFilterQueryBuilder;
import org.eclipse.hawkbit.repository.jpa.builder.JpaTargetTypeBuilder;
import org.eclipse.hawkbit.repository.jpa.cluster.LockProperties;
import org.eclipse.hawkbit.repository.jpa.cluster.DistributedLockRepository;
import org.eclipse.hawkbit.repository.jpa.event.JpaEventEntityManager;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitDefaultServiceExecutor;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitExecutor;
@@ -186,6 +190,9 @@ import org.springframework.context.annotation.PropertySource;
import org.springframework.data.domain.AuditorAware;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.LockRepository;
import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.lang.NonNull;
import org.springframework.retry.annotation.EnableRetry;
@@ -206,7 +213,7 @@ import org.springframework.validation.beanvalidation.MethodValidationPostProcess
@EnableRetry
@EntityScan("org.eclipse.hawkbit.repository.jpa.model")
@PropertySource("classpath:/hawkbit-jpa-defaults.properties")
@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class })
@Import({ JpaConfiguration.class, RepositoryDefaultConfiguration.class, LockProperties.class, DataSourceAutoConfiguration.class, SystemManagementCacheKeyGenerator.class })
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
public class RepositoryApplicationConfiguration {
@@ -265,6 +272,21 @@ public class RepositoryApplicationConfiguration {
};
}
@Bean
@ConditionalOnMissingBean
LockRepository lockRepository(
final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
repository.setPrefix("SP_");
return repository;
}
@Bean
@ConditionalOnMissingBean
public LockRegistry lockRegistry() {
return new DefaultLockRegistry();
}
@Bean
@ConditionalOnMissingBean
PauseRolloutGroupAction pauseRolloutGroupAction(final RolloutManagement rolloutManagement,

View File

@@ -0,0 +1,165 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hawkbit.repository.jpa.cluster;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionTimedOutException;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
/**
* Repository for {@link JdbcLockRegistry}. This class is not thread safe. Adds support for keeping lock longer then ttl
* if really used by the instance.
*/
@Slf4j
public class DistributedLockRepository extends DefaultLockRepository {
private static final int MAX_DELETE_RETRY = 10;
// period between successive refresh tics
private static final String TIC_PERIOD_MS = "${hawkbit.repository.cluster.lock.ticPeriodMS:2000}";
private final PlatformTransactionManager txManager;
private final int refreshOnRemainMS;
private final int refreshOnRemainPercent;
// if empty refresh is effectively disabled (when both REFRESH_ON_REMAINS_MS and REFRESH_ON_REMAINS_PERCENT are non-positive)
// otherwise, a refresh is triggered at refreshAfterMillis after lock acquisition or last refresh
private Optional<Integer> refreshAfterMillis;
// lock <-> next refresh time
private final Map<String, Instant> lockToRefreshTime = new ConcurrentHashMap<>();
/**
* @param dataSource to use for managing the locks
*/
public DistributedLockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
super(dataSource);
this.txManager = txManager;
this.refreshOnRemainMS = lockProperties.getRefreshOnRemainMS();
this.refreshOnRemainPercent = lockProperties.getRefreshOnRemainPercent();
setTimeToLive(lockProperties.getTtl());
}
// interceptor that handles refreshAfterMillis update when time to live is changed
@Override
public void setTimeToLive(final int timeToLive) {
super.setTimeToLive(timeToLive);
refreshAfterMillis = refreshAfterMillis(timeToLive);
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public boolean delete(final String lock) {
synchronized (this) {
lockToRefreshTime.remove(lock);
}
return delete(lock, 0);
}
private boolean delete(final String lock, final int count) {
try {
return super.delete(lock);
} catch (final PessimisticLockingFailureException e) {
if (count < MAX_DELETE_RETRY) {
log.debug("Failed to delete cluster lock {}. We try again.", lock, e);
return delete(lock, count + 1);
} else {
log.warn("Failed to delete cluster lock {}!", lock, e);
return false;
}
}
}
@Transactional(propagation=Propagation.NOT_SUPPORTED)
@Override
public boolean acquire(final String lock) {
try {
// real acquisition (made by super.acquire) is made in a new transaction
// because we need to know real (after transaction commit) result Ïto know if it is really successful.
// otherwise the super.acquire will return result before been committed and could be false positive
final boolean acquired = DeploymentHelper.runInNewTransaction(
txManager, "lock-acquire", Isolation.READ_COMMITTED.value(), status -> super.acquire(lock));
if (acquired) {
// update next refresh time
refreshAfterMillis.ifPresent(
afterMillis -> lockToRefreshTime.put(lock, Instant.now().plus(afterMillis, ChronoUnit.MILLIS)));
}
return acquired;
} catch (final DataIntegrityViolationException | DeadlockLoserDataAccessException e) {
log.debug("Could not acquire cluster lock {}. I guess another node has it.", lock, e);
return false;
} catch (final QueryTimeoutException e) {
log.debug("Query timed out for lock {}.", lock, e);
throw new TransactionTimedOutException("DB query timed out for lock " + lock, e);
}
}
@SuppressWarnings({"java:S1066"})
@Scheduled(initialDelayString = TIC_PERIOD_MS, fixedDelayString = TIC_PERIOD_MS)
public void refresh() {
refreshAfterMillis.ifPresentOrElse(afterMillis -> {
final Instant now = Instant.now();
lockToRefreshTime.forEach((lock, refreshTime) -> {
if (now.isAfter(refreshTime)) {
synchronized (this) {
// if delete is called while iterating we must skip record update
// otherwise, the lock will be unavailable for everyone until expiration
if (lockToRefreshTime.containsKey(lock)) {
if (!acquire(lock)) { // try to update record in lock table
log.warn("Failed to refresh cluster lock {}!", lock);
}
}
}
}
});
}, lockToRefreshTime::clear);
}
private Optional<Integer> refreshAfterMillis(final int timeToLive) {
final int triggerOnRemainMS = Math.max(refreshOnRemainMS, timeToLive * refreshOnRemainPercent / 100);
final int refreshAfterMS = timeToLive - triggerOnRemainMS;
return refreshAfterMS <= 0 ? Optional.empty() : Optional.of(refreshAfterMS);
}
// * May be required if the super class doesn't execute in new transactions
// * See https://github.com/spring-projects/spring-integration/issues/3683
// * may be not needed anymore
// @Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = true)
// @Override
// public boolean isAcquired(final String lock) {
// return super.isAcquired(lock);
// }
//
// @Transactional(propagation = Propagation.REQUIRES_NEW)
// @Override
// public void close() {
// super.close();
// }
}

View File

@@ -0,0 +1,27 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hawkbit.repository.jpa.cluster;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
@Data
@ConfigurationProperties(prefix = "hawkbit.repository.cluster.lock")
@Validated
public class LockProperties {
private int ttl = 5 * 60_000; // 5 minutes
// when less than that time (in milliseconds) remains to lock expiration a refresh is triggered
private int refreshOnRemainMS = 4 * 60_000; // refresh after a minute, 4 minutes before expiration
// when less than that time (in percent of expiration) remains to lock expiration a refresh is triggered
private int refreshOnRemainPercent = 80; // refresh after a minute, 4 minutes before expiration
}

View File

@@ -22,3 +22,6 @@ spring.jpa.properties.eclipselink.logging.level=off
spring.jpa.properties.eclipselink.query-results-cache=false
spring.jpa.properties.eclipselink.cache.shared.default=false
### JPA / Datasource - END
# disables RolloutsLockRepository warnings when other instance has already obtained the lock
logging.level.org.mariadb.jdbc.message.server.ErrorPacket=ERROR

View File

@@ -0,0 +1,231 @@
/**
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.hawkbit.repository.jpa.cluster;
import static org.assertj.core.api.Assertions.assertThat;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import javax.sql.DataSource;
import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.jdbc.lock.DefaultLockRepository;
import org.springframework.integration.jdbc.lock.JdbcLockRegistry;
import org.springframework.integration.jdbc.lock.LockRepository;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@Feature("Component Tests - Repository")
@Story("Distributed Lock")
@SpringBootTest(classes = { DistributedLockTest.Config.class }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@Slf4j
class DistributedLockTest extends AbstractJpaIntegrationTest {
@Autowired
private LockProperties lockProperties;
@Autowired
@Qualifier("lockRepository0")
private LockRepository lockRepository0;
@Autowired
@Qualifier("lockRepository1")
private LockRepository lockRepository1;
@EnableTransactionManagement
@Configuration
@EnableConfigurationProperties({ LockProperties.class })
@PropertySource("classpath:/jpa-test.properties")
static class Config {
@Bean
@ConditionalOnMissingBean
public UserDetailsService userDetailsService() {
return new InMemoryUserDetailsManager();
}
@Bean
LockProperties lockProperties() {
return new LockProperties();
}
@Bean
LockRepository lockRepository0(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
return lockRepository(dataSource, lockProperties, txManager);
}
@Bean
LockRepository lockRepository1(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
return lockRepository(dataSource, lockProperties, txManager);
}
private LockRepository lockRepository(final DataSource dataSource, final LockProperties lockProperties, final PlatformTransactionManager txManager) {
final DefaultLockRepository repository = new DistributedLockRepository(dataSource, lockProperties, txManager);
repository.setPrefix("SP_");
return repository;
}
}
@SuppressWarnings({"java:S2925"})
@Test
@Description("Test to verify that lock is kept while ping runs")
void keepLockAlive() {
final LockRegistry lockRegistry0 = new JdbcLockRegistry(lockRepository0);
final LockRegistry lockRegistry1 = new JdbcLockRegistry(lockRepository1);
final String lockKey0 = "test-lock0";
final String lockKey1 = "test-lock1";
// JDBCLockRegistry#pathFor
final String path0 = UUIDConverter.getUUID(lockKey0).toString();
final String path1 = UUIDConverter.getUUID(lockKey1).toString();
// lock{i}{j} -> lockKey{i} obtained by lockRegistry{j}
final Lock lock00 = lockRegistry0.obtain(lockKey0);
final Lock lock01 = lockRegistry1.obtain(lockKey0);
final Lock lock10 = lockRegistry0.obtain(lockKey1);
final Lock lock11 = lockRegistry1.obtain(lockKey1);
final AtomicBoolean lock01Obtained = new AtomicBoolean();
final AtomicBoolean lock11Obtained = new AtomicBoolean();
final AtomicBoolean lock11Locked = new AtomicBoolean(); // state of the lock11
log.info("Starting test");
// service 0 must be able to lock lockKey0
assertThat(lock00.tryLock()).isTrue();
try {
assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state
final Thread lockThread1 = new Thread(() -> {
// asserts lockKey1 is free and could be locked
assertThat(lock11.tryLock()).isTrue();
assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state
try {
lock11Obtained.set(true);
lock11Locked.set(true);
// asserts lockKey0 is kept by lock00 and could not be locked via lockRepository1
try {
final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS);
while (Instant.now().isBefore(timeout)) {
log.info("lockThread1: loop, timeout: {}", new Date(timeout.toEpochMilli()));
assertThat(lock01.tryLock()).isFalse();
assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state
try {
Thread.sleep(Math.min(1, lockProperties.getTtl() / 4));
} catch (final InterruptedException e) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
}
}
}
} catch (final AssertionError e) {
log.error("lockRepository1 has locked lockKey0 which has to be in lockRepository0 possession!", e);
lock01Obtained.set(true);
lock01.unlock();
}
assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state
assertThat(lockRepository1.isAcquired(path1)).isTrue(); // check db state
} finally {
lock11Locked.set(false);
lock11.unlock();
assertThat(lockRepository1.isAcquired(path1)).isFalse(); // check db state
}
});
lockThread1.start();
// asserts lockKey1 is kept by lock11 and could not be locked via lockRepository0
final Instant timeout = Instant.now().plus(4 * lockProperties.getTtl(), ChronoUnit.MILLIS);
while (Instant.now().isBefore(timeout)) {
log.info("main thread: loop, timeout: {}", new Date(timeout.toEpochMilli()));
if (lock11Locked.get()) {
try {
assertThat(lock10.tryLock()).isFalse();
assertThat(lockRepository0.isAcquired(path1)).isFalse(); // check db state
} catch (final AssertionError e) {
log.error("lockRepository0 has locked lockKey1 which has to be in lockRepository1 possession!");
lock10.unlock();
if (lock11Locked.get()) {
throw e;
} else {
// otherwise the lock has been released
break;
}
}
try {
Thread.sleep(Math.min(1, lockProperties.getTtl() / 4));
} catch (final InterruptedException e) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
}
}
}
}
try {
lockThread1.join();
} catch (final InterruptedException e) {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
}
}
// assert that service 1 hasn't been able to acquire the lock 0
assertThat(lock01Obtained).isFalse();
// assert that service 1 has been able to acquire the lock 1
assertThat(lock11Obtained).isTrue();
assertThat(lockRepository0.isAcquired(path0)).isTrue(); // check db state
assertThat(lockRepository1.isAcquired(path0)).isFalse(); // check db state
} finally {
lock00.unlock();
assertThat(lockRepository0.isAcquired(path0)).isFalse(); // check db state
}
try {
// assert that lockKey1 has been released by lock11 and could be got again
// and in different thread
assertThat(lock10.tryLock()).isTrue();
// and can't be locked by it while locked by lock01
assertThat(lock11.tryLock()).isFalse();
} finally {
lock10.unlock();
}
// assert that db is clean
assertThat(lockRepository0.isAcquired(path0)).isFalse();
assertThat(lockRepository1.isAcquired(path0)).isFalse();
assertThat(lockRepository0.isAcquired(path1)).isFalse();
assertThat(lockRepository1.isAcquired(path1)).isFalse();
}
}

View File

@@ -50,3 +50,8 @@ logging.level.org.eclipse.persistence=ERROR
# enable / disable case sensitiveness of the DB when playing around
#hawkbit.rsql.caseInsensitiveDB=true
hawkbit.repository.cluster.lock.ttl=1000
hawkbit.repository.cluster.lock.refreshOnRemainMS=200
hawkbit.repository.cluster.lock.refreshOnRemainPercent=10
hawkbit.repository.cluster.lock.ticPeriodMS=10

View File

@@ -4,7 +4,7 @@ parent: Guides
weight: 33
---
hawkBit is able to run in a cluster with some constraints. This guide provides insights in the basic concepts and how to
hawkBit is able to run in a cluster. This guide provides insights in the basic concepts and how to
setup your own cluster. You can find additional information in
the [hawkBit runtimes's README](https://github.com/eclipse-hawkbit/hawkbit/blob/master/hawkbit-monolith/hawkbit-update-server/README.md).
<!--more-->
@@ -39,7 +39,7 @@ See [CacheAutoConfiguration](https://github.com/eclipse-hawkbit/hawkbit/blob/mas
## Schedulers
Every node has multiple schedulers which run after a defined period of time. All schedulers always run on every node.
This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once.
This has to be kept in mind e.g. if the scheduler executes critical code which has to be executed only once. This is implemented via using, by default, distributed database based lock.
## Known constraints