From d9046bb9cc24e3caa5a3fdb37d48f8ae8dc9707f Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Tue, 7 Jun 2016 18:38:39 +0200 Subject: [PATCH] Upgraded Spring AMQP. Added task scheduler. Improved executor shutdown. Signed-off-by: Kai Zimmermann --- .../simulator/amqp/MessageService.java | 17 ------ ...ableDelegatingSecurityContextExecutor.java | 59 +++++++++++++++++++ .../scheduling/ExecutorAutoConfiguration.java | 36 ++++++++--- pom.xml | 1 + 4 files changed, 89 insertions(+), 24 deletions(-) create mode 100644 hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java index bd48bbe12..8bb1e1292 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/MessageService.java @@ -82,21 +82,4 @@ public class MessageService { clazz.getTypeName()); return (T) rabbitTemplate.getMessageConverter().fromMessage(message); } - - /** - * Method to verify if lwm2m header is set. - * - * @param message - * the message with the header - * @param header - * the header to verify - */ - public void checkIfLwm2mHeaderEmpty(final Message message, final String header) { - final Object headerObject = message.getMessageProperties().getHeaders().get(header); - if (null == headerObject) { - logAndThrowMessageError(message, "Header of " + header + "empty."); - } - - } - } diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java new file mode 100644 index 000000000..1160e12e0 --- /dev/null +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/CloseableDelegatingSecurityContextExecutor.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.hawkbit.autoconfigure.scheduling; + +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +import org.springframework.context.annotation.Bean; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; + +/** + * Extension for {@link DelegatingSecurityContextExecutor} to allow proper + * shutdown at {@link Bean} destruction time. + * + */ +public class CloseableDelegatingSecurityContextExecutor extends DelegatingSecurityContextExecutor { + + private final ThreadPoolExecutor executor; + + /** + * Creates a new {@link CloseableDelegatingSecurityContextExecutor} that + * uses the current {@link SecurityContext} from the + * {@link SecurityContextHolder} at the time the task is submitted. + * + * @param delegate + * the {@link Executor} to delegate to. Cannot be null. + */ + public CloseableDelegatingSecurityContextExecutor(final ThreadPoolExecutor delegate) { + super(delegate); + executor = delegate; + } + + /** + * Initiates an orderly shutdown in which previously submitted tasks are + * executed, but no new tasks will be accepted. + */ + public void shutdown() { + executor.shutdown(); + } + + /** + * Initiates an immediate shutdown. + * + * @return a list of the tasks that were awaiting execution + */ + public List shutdownNow() { + return executor.shutdownNow(); + } + +} diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 43a096e7d..d5bcdd0f1 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -21,7 +21,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -39,9 +42,10 @@ public class ExecutorAutoConfiguration { private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties; /** - * @return ExecutorService for general purpose multi threaded operations + * @return ExecutorService for general purpose multi threaded operations. + * Tries an orderly shutdown when destroyed. */ - @Bean + @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean public Executor asyncExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( @@ -53,20 +57,38 @@ public class ExecutorAutoConfiguration { threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn( "Reject runnable for centralExecutorService, reached limit of queue size {}", executor.getQueue().size())); - return new DelegatingSecurityContextExecutor(threadPoolExecutor); + return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); } /** - * @return the executor for UI background processes. + * @return the executor for UI background processes. Run immediate shutdown + * when destroyed. */ - @Bean(name = "uiExecutor") + @Bean(name = "uiExecutor", destroyMethod = "shutdownNow") @ConditionalOnMissingBean(name = "uiExecutor") public Executor uiExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(20); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build()); threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - return new DelegatingSecurityContextExecutor(threadPoolExecutor); + return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor); } + /** + * @return {@link TaskExecutor} for task execution + */ + @Bean + @ConditionalOnMissingBean + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); + } + + /** + * @return {@link TaskScheduler} for scheduled tasks + */ + @Bean + @ConditionalOnMissingBean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } } diff --git a/pom.xml b/pom.xml index 33ad1f81e..c535ba2d8 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ 2.5.5 5.2.4.Final 1.2.0.RELEASE + 1.6.0.RELEASE Fowler-SR1 3.2.2