From 977b3fe40c6c9d3642fc3e6b6a5fc17a3aa3647e Mon Sep 17 00:00:00 2001 From: Vasil Ilchev Date: Tue, 2 Dec 2025 15:40:55 +0200 Subject: [PATCH] AutoAssign scheduler to obtain lock on Tenant level. Configurable to run separate tenant auto-assign checks in separate threads (#2843) Co-authored-by: vasilchev --- .../jpa/JpaRepositoryConfiguration.java | 5 +- .../jpa/scheduler/AutoAssignScheduler.java | 60 ++++++++++--------- .../jpa/scheduler/RolloutScheduler.java | 24 +------- .../jpa/scheduler/SchedulerUtils.java | 46 ++++++++++++++ .../repository/jpa/acm/AutoAssignTest.java | 2 +- 5 files changed, 85 insertions(+), 52 deletions(-) create mode 100644 hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/SchedulerUtils.java diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaRepositoryConfiguration.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaRepositoryConfiguration.java index 9a7e6b70e..14348e388 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaRepositoryConfiguration.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaRepositoryConfiguration.java @@ -342,8 +342,9 @@ public class JpaRepositoryConfiguration { @ConditionalOnProperty(prefix = "hawkbit.autoassign.scheduler", name = "enabled", matchIfMissing = true) AutoAssignScheduler autoAssignScheduler( final SystemManagement systemManagement, final AutoAssignExecutor autoAssignExecutor, + @Value("${hawkbit.autoassign.executor.thread-pool.size:1}") final int threadPoolSize, final LockRegistry lockRegistry, final Optional meterRegistry) { - return new AutoAssignScheduler(systemManagement, autoAssignExecutor, lockRegistry, meterRegistry); + return new AutoAssignScheduler(systemManagement, autoAssignExecutor, threadPoolSize, lockRegistry, meterRegistry); } /** @@ -446,4 +447,4 @@ public class JpaRepositoryConfiguration { : new AuthorizationDeniedException("Access Denied", authorizationResult)); } } -} \ No newline at end of file +} diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/AutoAssignScheduler.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/AutoAssignScheduler.java index 57585086d..3905f37c7 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/AutoAssignScheduler.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/AutoAssignScheduler.java @@ -10,6 +10,7 @@ package org.eclipse.hawkbit.repository.jpa.scheduler; import static org.eclipse.hawkbit.context.AccessContext.asSystem; +import static org.eclipse.hawkbit.context.AccessContext.asSystemAsTenant; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -22,6 +23,7 @@ import org.eclipse.hawkbit.repository.SystemManagement; import org.eclipse.hawkbit.tenancy.DefaultTenantConfiguration; import org.springframework.integration.support.locks.LockRegistry; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Scheduler to check target filters for auto assignment of distribution sets @@ -29,20 +31,23 @@ import org.springframework.scheduling.annotation.Scheduled; @Slf4j public class AutoAssignScheduler { - private static final String PROP_SCHEDULER_DELAY_PLACEHOLDER = "${hawkbit.scheduler.scheduler.fixedDelay:2000}"; + private static final String PROP_SCHEDULER_DELAY_PLACEHOLDER = "${hawkbit.autoassign.scheduler.fixedDelay:2000}"; private final SystemManagement systemManagement; private final AutoAssignExecutor autoAssignExecutor; private final LockRegistry lockRegistry; private final Optional meterRegistry; + private final ThreadPoolTaskExecutor autoAssignTaskExecutor; public AutoAssignScheduler( final SystemManagement systemManagement, final AutoAssignExecutor autoAssignExecutor, + final int threadPoolSize, final LockRegistry lockRegistry, final Optional meterRegistry) { this.systemManagement = systemManagement; this.autoAssignExecutor = autoAssignExecutor; this.lockRegistry = lockRegistry; this.meterRegistry = meterRegistry; + autoAssignTaskExecutor = SchedulerUtils.threadPoolTaskExecutor("auto-assign-exec-", threadPoolSize); } /** @@ -52,41 +57,42 @@ public class AutoAssignScheduler { @Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER) public void autoAssignScheduler() { // run this code in system code privileged to have the necessary permission to query and create entities. - asSystem(this::executeAutoAssign); + log.debug("Triggered auto-assign scheduler."); + final long startNano = java.lang.System.nanoTime(); + asSystem(() -> + systemManagement.forEachTenantAsSystem(tenant -> { + if (autoAssignTaskExecutor == null) {// sync + handleAll(tenant); + } else {// async + autoAssignTaskExecutor.execute(() -> asSystemAsTenant(tenant, () -> handleAll(tenant))); + } + }) + ); + meterRegistry + .map(mReg -> mReg.timer("hawkbit.autoassign.scheduler.all")) + .ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS)); + log.debug("Finished auto-assign scheduler run."); } - @SuppressWarnings("squid:S3516") - private Object executeAutoAssign() { - // workaround eclipselink that is currently not possible to execute a query without multitenancy if MultiTenant - // annotation is used - https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So iterate through all tenants and execute the rollout - // check for each tenant separately. - final Lock lock = lockRegistry.obtain("scheduler"); + private void handleAll(final String tenant) { + final Lock lock = lockRegistry.obtain(createAutoAssignmentLockKey(tenant)); if (!lock.tryLock()) { - return null; + return; } - - final long startNano = java.lang.System.nanoTime(); + final long startNanoT = System.nanoTime(); try { - log.debug("Auto assign scheduled execution has acquired lock and started for each tenant."); - systemManagement.forEachTenantAsSystem(tenant -> { - final long startNanoT = java.lang.System.nanoTime(); - - autoAssignExecutor.checkAllTargets(); - - meterRegistry - .map(mReg -> mReg.timer( - "hawkbit.scheduler.executor", - DefaultTenantConfiguration.TENANT_TAG, tenant)) - .ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNanoT, TimeUnit.NANOSECONDS)); - }); + autoAssignExecutor.checkAllTargets(); } finally { lock.unlock(); meterRegistry - .map(mReg -> mReg.timer("hawkbit.scheduler.executor.all")) - .ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS)); - log.debug("Auto assign scheduled execution has released lock and finished."); + .map(mReg -> mReg.timer( + "hawkbit.autoassign.scheduler", + DefaultTenantConfiguration.TENANT_TAG, tenant)) + .ifPresent(timer -> timer.record(System.nanoTime() - startNanoT, TimeUnit.NANOSECONDS)); } + } - return null; + private static String createAutoAssignmentLockKey(final String tenant) { + return tenant + "-auto-assign"; } } \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/RolloutScheduler.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/RolloutScheduler.java index 35e5e41ab..0ef68d840 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/RolloutScheduler.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/RolloutScheduler.java @@ -19,7 +19,6 @@ import io.micrometer.core.instrument.MeterRegistry; import lombok.extern.slf4j.Slf4j; import org.eclipse.hawkbit.repository.RolloutHandler; import org.eclipse.hawkbit.repository.SystemManagement; -import org.eclipse.hawkbit.repository.jpa.rollout.BlockWhenFullPolicy; import org.eclipse.hawkbit.tenancy.DefaultTenantConfiguration; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -45,7 +44,7 @@ public class RolloutScheduler { this.systemManagement = systemManagement; this.rolloutHandler = rolloutHandler; this.meterRegistry = meterRegistry; - rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize); + rolloutTaskExecutor = SchedulerUtils.threadPoolTaskExecutor("rollout-exec-", threadPoolSize); } @@ -67,7 +66,7 @@ public class RolloutScheduler { if (rolloutTaskExecutor == null) { handleAll(tenant); } else { - handleAllAsync(tenant); + rolloutTaskExecutor.submit(() -> asSystemAsTenant(tenant, () -> handleAll(tenant))); } })); @@ -91,23 +90,4 @@ public class RolloutScheduler { .ifPresent(timer -> timer.record(java.lang.System.nanoTime() - startNano, TimeUnit.NANOSECONDS)); } - private void handleAllAsync(final String tenant) { - rolloutTaskExecutor.submit(() -> asSystemAsTenant(tenant, () -> handleAll(tenant))); - } - - private ThreadPoolTaskExecutor threadPoolTaskExecutor(final int threadPoolSize) { - if (threadPoolSize <= 1) { - return null; - } - - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(threadPoolSize); - executor.setMaxPoolSize(threadPoolSize); - executor.setQueueCapacity(0); // forces a Synchronous Queue - // This policy will block the submitter until a worker thread is free - executor.setRejectedExecutionHandler(new BlockWhenFullPolicy()); - executor.setThreadNamePrefix("rollout-exec-"); - executor.initialize(); - return executor; - } } \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/SchedulerUtils.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/SchedulerUtils.java new file mode 100644 index 000000000..f9434a5ba --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/scheduler/SchedulerUtils.java @@ -0,0 +1,46 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.repository.jpa.scheduler; + +import org.eclipse.hawkbit.repository.jpa.rollout.BlockWhenFullPolicy; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +public final class SchedulerUtils { + + private SchedulerUtils() { + } + + /** + * Creates a new {@link ThreadPoolTaskExecutor} with the given thread name + * prefix and pool size. + * + * @param threadNamePrefix + * the prefix for the thread name + * @param threadPoolSize + * the size of the pool + * @return the new scheduler + */ + public static ThreadPoolTaskExecutor threadPoolTaskExecutor(final String threadNamePrefix, final int threadPoolSize) { + if (threadPoolSize <= 1) { + return null; + } + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(threadPoolSize); + executor.setMaxPoolSize(threadPoolSize); + executor.setQueueCapacity(0); // forces a Synchronous Queue + // This policy will block the submitter until a worker thread is free + executor.setRejectedExecutionHandler(new BlockWhenFullPolicy()); + executor.setThreadNamePrefix(threadNamePrefix); + executor.initialize(); + return executor; + } +} + diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/acm/AutoAssignTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/acm/AutoAssignTest.java index c415ee05e..47fbf74a3 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/acm/AutoAssignTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/acm/AutoAssignTest.java @@ -41,7 +41,7 @@ class AutoAssignTest extends AbstractAccessControllerManagementTest { void verifyOnlyUpdatableTargetsArePartOfAutoAssignmentByScheduler() throws Exception { // auto assign scheduler apply stored access control context and the context is correctly applied verifyOnlyUpdatableTargetsArePartOfAutoAssignment( - () -> new AutoAssignScheduler(systemManagement, autoAssignExecutor, lockRegistry, Optional.empty()).autoAssignScheduler()); + () -> new AutoAssignScheduler(systemManagement, autoAssignExecutor, 1, lockRegistry, Optional.empty()).autoAssignScheduler()); } @Test