AutoAssign scheduler to obtain lock on Tenant level. Configurable to run separate tenant auto-assign checks in separate threads (#2843)
Co-authored-by: vasilchev <vasil.ilchev@bosch.com>
This commit is contained in:
@@ -342,8 +342,9 @@ public class JpaRepositoryConfiguration {
|
||||
@ConditionalOnProperty(prefix = "hawkbit.autoassign.scheduler", name = "enabled", matchIfMissing = true)
|
||||
AutoAssignScheduler autoAssignScheduler(
|
||||
final SystemManagement systemManagement, final AutoAssignExecutor autoAssignExecutor,
|
||||
@Value("${hawkbit.autoassign.executor.thread-pool.size:1}") final int threadPoolSize,
|
||||
final LockRegistry lockRegistry, final Optional<MeterRegistry> meterRegistry) {
|
||||
return new AutoAssignScheduler(systemManagement, autoAssignExecutor, lockRegistry, meterRegistry);
|
||||
return new AutoAssignScheduler(systemManagement, autoAssignExecutor, threadPoolSize, lockRegistry, meterRegistry);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -446,4 +447,4 @@ public class JpaRepositoryConfiguration {
|
||||
: new AuthorizationDeniedException("Access Denied", authorizationResult));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
package org.eclipse.hawkbit.repository.jpa.scheduler;
|
||||
|
||||
import static org.eclipse.hawkbit.context.AccessContext.asSystem;
|
||||
import static org.eclipse.hawkbit.context.AccessContext.asSystemAsTenant;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -22,6 +23,7 @@ import org.eclipse.hawkbit.repository.SystemManagement;
|
||||
import org.eclipse.hawkbit.tenancy.DefaultTenantConfiguration;
|
||||
import org.springframework.integration.support.locks.LockRegistry;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
/**
|
||||
* Scheduler to check target filters for auto assignment of distribution sets
|
||||
@@ -29,20 +31,23 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
@Slf4j
|
||||
public class AutoAssignScheduler {
|
||||
|
||||
private static final String PROP_SCHEDULER_DELAY_PLACEHOLDER = "${hawkbit.scheduler.scheduler.fixedDelay:2000}";
|
||||
private static final String PROP_SCHEDULER_DELAY_PLACEHOLDER = "${hawkbit.autoassign.scheduler.fixedDelay:2000}";
|
||||
|
||||
private final SystemManagement systemManagement;
|
||||
private final AutoAssignExecutor autoAssignExecutor;
|
||||
private final LockRegistry lockRegistry;
|
||||
private final Optional<MeterRegistry> meterRegistry;
|
||||
private final ThreadPoolTaskExecutor autoAssignTaskExecutor;
|
||||
|
||||
public AutoAssignScheduler(
|
||||
final SystemManagement systemManagement, final AutoAssignExecutor autoAssignExecutor,
|
||||
final int threadPoolSize,
|
||||
final LockRegistry lockRegistry, final Optional<MeterRegistry> meterRegistry) {
|
||||
this.systemManagement = systemManagement;
|
||||
this.autoAssignExecutor = autoAssignExecutor;
|
||||
this.lockRegistry = lockRegistry;
|
||||
this.meterRegistry = meterRegistry;
|
||||
autoAssignTaskExecutor = SchedulerUtils.threadPoolTaskExecutor("auto-assign-exec-", threadPoolSize);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -52,41 +57,42 @@ public class AutoAssignScheduler {
|
||||
@Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER)
|
||||
public void autoAssignScheduler() {
|
||||
// run this code in system code privileged to have the necessary permission to query and create entities.
|
||||
asSystem(this::executeAutoAssign);
|
||||
log.debug("Triggered auto-assign scheduler.");
|
||||
final long startNano = java.lang.System.nanoTime();
|
||||
asSystem(() ->
|
||||
systemManagement.forEachTenantAsSystem(tenant -> {
|
||||
if (autoAssignTaskExecutor == null) {// sync
|
||||
handleAll(tenant);
|
||||
} else {// async
|
||||
autoAssignTaskExecutor.execute(() -> asSystemAsTenant(tenant, () -> handleAll(tenant)));
|
||||
}
|
||||
})
|
||||
);
|
||||
meterRegistry
|
||||
.map(mReg -> mReg.timer("hawkbit.autoassign.scheduler.all"))
|
||||
.ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS));
|
||||
log.debug("Finished auto-assign scheduler run.");
|
||||
}
|
||||
|
||||
@SuppressWarnings("squid:S3516")
|
||||
private Object executeAutoAssign() {
|
||||
// workaround eclipselink that is currently not possible to execute a query without multitenancy if MultiTenant
|
||||
// annotation is used - https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So iterate through all tenants and execute the rollout
|
||||
// check for each tenant separately.
|
||||
final Lock lock = lockRegistry.obtain("scheduler");
|
||||
private void handleAll(final String tenant) {
|
||||
final Lock lock = lockRegistry.obtain(createAutoAssignmentLockKey(tenant));
|
||||
if (!lock.tryLock()) {
|
||||
return null;
|
||||
return;
|
||||
}
|
||||
|
||||
final long startNano = java.lang.System.nanoTime();
|
||||
final long startNanoT = System.nanoTime();
|
||||
try {
|
||||
log.debug("Auto assign scheduled execution has acquired lock and started for each tenant.");
|
||||
systemManagement.forEachTenantAsSystem(tenant -> {
|
||||
final long startNanoT = java.lang.System.nanoTime();
|
||||
|
||||
autoAssignExecutor.checkAllTargets();
|
||||
|
||||
meterRegistry
|
||||
.map(mReg -> mReg.timer(
|
||||
"hawkbit.scheduler.executor",
|
||||
DefaultTenantConfiguration.TENANT_TAG, tenant))
|
||||
.ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNanoT, TimeUnit.NANOSECONDS));
|
||||
});
|
||||
autoAssignExecutor.checkAllTargets();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
meterRegistry
|
||||
.map(mReg -> mReg.timer("hawkbit.scheduler.executor.all"))
|
||||
.ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS));
|
||||
log.debug("Auto assign scheduled execution has released lock and finished.");
|
||||
.map(mReg -> mReg.timer(
|
||||
"hawkbit.autoassign.scheduler",
|
||||
DefaultTenantConfiguration.TENANT_TAG, tenant))
|
||||
.ifPresent(timer -> timer.record(System.nanoTime() - startNanoT, TimeUnit.NANOSECONDS));
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
private static String createAutoAssignmentLockKey(final String tenant) {
|
||||
return tenant + "-auto-assign";
|
||||
}
|
||||
}
|
||||
@@ -19,7 +19,6 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.repository.RolloutHandler;
|
||||
import org.eclipse.hawkbit.repository.SystemManagement;
|
||||
import org.eclipse.hawkbit.repository.jpa.rollout.BlockWhenFullPolicy;
|
||||
import org.eclipse.hawkbit.tenancy.DefaultTenantConfiguration;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
@@ -45,7 +44,7 @@ public class RolloutScheduler {
|
||||
this.systemManagement = systemManagement;
|
||||
this.rolloutHandler = rolloutHandler;
|
||||
this.meterRegistry = meterRegistry;
|
||||
rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize);
|
||||
rolloutTaskExecutor = SchedulerUtils.threadPoolTaskExecutor("rollout-exec-", threadPoolSize);
|
||||
|
||||
}
|
||||
|
||||
@@ -67,7 +66,7 @@ public class RolloutScheduler {
|
||||
if (rolloutTaskExecutor == null) {
|
||||
handleAll(tenant);
|
||||
} else {
|
||||
handleAllAsync(tenant);
|
||||
rolloutTaskExecutor.submit(() -> asSystemAsTenant(tenant, () -> handleAll(tenant)));
|
||||
}
|
||||
}));
|
||||
|
||||
@@ -91,23 +90,4 @@ public class RolloutScheduler {
|
||||
.ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS));
|
||||
}
|
||||
|
||||
private void handleAllAsync(final String tenant) {
|
||||
rolloutTaskExecutor.submit(() -> asSystemAsTenant(tenant, () -> handleAll(tenant)));
|
||||
}
|
||||
|
||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor(final int threadPoolSize) {
|
||||
if (threadPoolSize <= 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(threadPoolSize);
|
||||
executor.setMaxPoolSize(threadPoolSize);
|
||||
executor.setQueueCapacity(0); // forces a Synchronous Queue
|
||||
// This policy will block the submitter until a worker thread is free
|
||||
executor.setRejectedExecutionHandler(new BlockWhenFullPolicy());
|
||||
executor.setThreadNamePrefix("rollout-exec-");
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.repository.jpa.scheduler;
|
||||
|
||||
import org.eclipse.hawkbit.repository.jpa.rollout.BlockWhenFullPolicy;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
public final class SchedulerUtils {
|
||||
|
||||
private SchedulerUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link ThreadPoolTaskExecutor} with the given thread name
|
||||
* prefix and pool size.
|
||||
*
|
||||
* @param threadNamePrefix
|
||||
* the prefix for the thread name
|
||||
* @param threadPoolSize
|
||||
* the size of the pool
|
||||
* @return the new scheduler
|
||||
*/
|
||||
public static ThreadPoolTaskExecutor threadPoolTaskExecutor(final String threadNamePrefix, final int threadPoolSize) {
|
||||
if (threadPoolSize <= 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(threadPoolSize);
|
||||
executor.setMaxPoolSize(threadPoolSize);
|
||||
executor.setQueueCapacity(0); // forces a Synchronous Queue
|
||||
// This policy will block the submitter until a worker thread is free
|
||||
executor.setRejectedExecutionHandler(new BlockWhenFullPolicy());
|
||||
executor.setThreadNamePrefix(threadNamePrefix);
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ class AutoAssignTest extends AbstractAccessControllerManagementTest {
|
||||
void verifyOnlyUpdatableTargetsArePartOfAutoAssignmentByScheduler() throws Exception {
|
||||
// auto assign scheduler apply stored access control context and the context is correctly applied
|
||||
verifyOnlyUpdatableTargetsArePartOfAutoAssignment(
|
||||
() -> new AutoAssignScheduler(systemManagement, autoAssignExecutor, lockRegistry, Optional.empty()).autoAssignScheduler());
|
||||
() -> new AutoAssignScheduler(systemManagement, autoAssignExecutor, 1, lockRegistry, Optional.empty()).autoAssignScheduler());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user