From 2d9073723d2d8043a573d9e09e8fe12b9a2940be Mon Sep 17 00:00:00 2001 From: Denislav Prinov Date: Thu, 6 Feb 2025 09:22:06 +0200 Subject: [PATCH] Introduce parallel rollout processing (#2248) * Introduce parallel rollout processing Signed-off-by: Denislav Prinov * Moving the ThreadPoolTaskExecutor initialization in RolloutScheduler. Changing to previous default behaviour when the thread pool size is <=1 Signed-off-by: Denislav Prinov * Refactoring Signed-off-by: Denislav Prinov * Refactoring based on review comments Signed-off-by: Denislav Prinov * License header fix Signed-off-by: Denislav Prinov --------- Signed-off-by: Denislav Prinov --- .../RepositoryApplicationConfiguration.java | 5 +- .../jpa/rollout/BlockWhenFullPolicy.java | 30 +++++++++ .../jpa/rollout/RolloutScheduler.java | 63 ++++++++++++++++--- 3 files changed, 89 insertions(+), 9 deletions(-) create mode 100644 hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java index 51ea38ad3..45f766260 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/RepositoryApplicationConfiguration.java @@ -167,6 +167,7 @@ import org.eclipse.hawkbit.tenancy.configuration.TenantConfigurationProperties; import org.eclipse.hawkbit.utils.TenantConfigHelper; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -992,8 +993,8 @@ public class RepositoryApplicationConfiguration { @Profile("!test") @ConditionalOnProperty(prefix = "hawkbit.rollout.scheduler", name = "enabled", matchIfMissing = true) RolloutScheduler rolloutScheduler(final SystemManagement systemManagement, - final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext) { - return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext); + final RolloutHandler rolloutHandler, final SystemSecurityContext systemSecurityContext, @Value("${hawkbit.rollout.executor.thread-pool.size:1}") final int threadPoolSize) { + return new RolloutScheduler(rolloutHandler, systemManagement, systemSecurityContext, threadPoolSize); } /** diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java new file mode 100644 index 000000000..d8d4ebbec --- /dev/null +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/BlockWhenFullPolicy.java @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2025 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ + +package org.eclipse.hawkbit.repository.jpa.rollout; + +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +public class BlockWhenFullPolicy implements RejectedExecutionHandler { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + // Because queueCapacity=0 => SynchronousQueue + // This put(...) call blocks if both threads are busy, + // until a thread is free + executor.getQueue().put(r); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Interrupted while waiting to queue task", e); + } + } +} \ No newline at end of file diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java index 7b69dcf54..ad356c244 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/rollout/RolloutScheduler.java @@ -14,6 +14,7 @@ import org.eclipse.hawkbit.repository.RolloutHandler; import org.eclipse.hawkbit.repository.SystemManagement; import org.eclipse.hawkbit.security.SystemSecurityContext; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** * Scheduler to schedule the {@link RolloutHandler#handleAll()}. The @@ -28,12 +29,16 @@ public class RolloutScheduler { private final SystemManagement systemManagement; private final RolloutHandler rolloutHandler; private final SystemSecurityContext systemSecurityContext; + private final ThreadPoolTaskExecutor rolloutTaskExecutor; public RolloutScheduler( - final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext) { + final RolloutHandler rolloutHandler, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext, + final int threadPoolSize) { this.systemManagement = systemManagement; this.rolloutHandler = rolloutHandler; this.systemSecurityContext = systemSecurityContext; + rolloutTaskExecutor = threadPoolTaskExecutor(threadPoolSize); + } /** @@ -43,14 +48,58 @@ public class RolloutScheduler { @Scheduled(initialDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER, fixedDelayString = PROP_SCHEDULER_DELAY_PLACEHOLDER) public void runningRolloutScheduler() { log.debug("rollout schedule checker has been triggered."); - - // run this code in system code privileged to have the necessary permission to query and create entities. + // run this code in system code privileged to have the necessary + // permission to query and create entities. systemSecurityContext.runAsSystem(() -> { - // workaround eclipselink that is currently not possible to execute a query without multi-tenancy if MultiTenant - // annotation is used. https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So, iterate through all tenants and execute the rollout - // check for each tenant separately. - systemManagement.forEachTenant(tenant -> rolloutHandler.handleAll()); + // workaround eclipselink that is currently not possible to + // execute a query without multi-tenancy if MultiTenant + // annotation is used. + // https://bugs.eclipse.org/bugs/show_bug.cgi?id=355458. So + // iterate through all tenants and execute the rollout check for + // each tenant seperately. + + systemManagement.forEachTenant(tenant -> { + if (rolloutTaskExecutor == null) { + handleAll(tenant); + } else { + handleAllAsync(tenant); + } + }); return null; }); } + + private void handleAll(final String tenant) { + log.trace("Handling rollout for tenant: {}", tenant); + try { + rolloutHandler.handleAll(); + } catch (Exception e) { + log.error("Error processing rollout for tenant {}", tenant, e); + } + } + + private void handleAllAsync(final String tenant) { + rolloutTaskExecutor.submit(() -> systemSecurityContext.runAsSystemAsTenant(() -> { + handleAll(tenant); + return null; + }, tenant)); + + } + + private ThreadPoolTaskExecutor threadPoolTaskExecutor (final int threadPoolSize) { + if (threadPoolSize <= 1) { + return null; + } + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(threadPoolSize); + executor.setMaxPoolSize(threadPoolSize); + executor.setQueueCapacity(0); // forces a Synchronous Queue + // This policy will block the submitter until a worker thread is free + executor.setRejectedExecutionHandler(new BlockWhenFullPolicy()); + executor.setThreadNamePrefix("rollout-exec-"); + executor.initialize(); + return executor; + } + } \ No newline at end of file