Upgraded Spring AMQP. Added task scheduler. Improved executor shutdown.

Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-06-07 18:38:39 +02:00
parent 487cb73caf
commit d9046bb9cc
4 changed files with 89 additions and 24 deletions

View File

@@ -82,21 +82,4 @@ public class MessageService {
clazz.getTypeName());
return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
}
/**
* Method to verify if lwm2m header is set.
*
* @param message
* the message with the header
* @param header
* the header to verify
*/
public void checkIfLwm2mHeaderEmpty(final Message message, final String header) {
final Object headerObject = message.getMessageProperties().getHeaders().get(header);
if (null == headerObject) {
logAndThrowMessageError(message, "Header of " + header + "empty.");
}
}
}

View File

@@ -0,0 +1,59 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.autoconfigure.scheduling;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
/**
* Extension for {@link DelegatingSecurityContextExecutor} to allow proper
* shutdown at {@link Bean} destruction time.
*
*/
public class CloseableDelegatingSecurityContextExecutor extends DelegatingSecurityContextExecutor {
private final ThreadPoolExecutor executor;
/**
* Creates a new {@link CloseableDelegatingSecurityContextExecutor} that
* uses the current {@link SecurityContext} from the
* {@link SecurityContextHolder} at the time the task is submitted.
*
* @param delegate
* the {@link Executor} to delegate to. Cannot be null.
*/
public CloseableDelegatingSecurityContextExecutor(final ThreadPoolExecutor delegate) {
super(delegate);
executor = delegate;
}
/**
* Initiates an orderly shutdown in which previously submitted tasks are
* executed, but no new tasks will be accepted.
*/
public void shutdown() {
executor.shutdown();
}
/**
* Initiates an immediate shutdown.
*
* @return a list of the tasks that were awaiting execution
*/
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}
}

View File

@@ -21,7 +21,10 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -39,9 +42,10 @@ public class ExecutorAutoConfiguration {
private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties;
/**
* @return ExecutorService for general purpose multi threaded operations
* @return ExecutorService for general purpose multi threaded operations.
* Tries an orderly shutdown when destroyed.
*/
@Bean
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean
public Executor asyncExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(
@@ -53,20 +57,38 @@ public class ExecutorAutoConfiguration {
threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn(
"Reject runnable for centralExecutorService, reached limit of queue size {}",
executor.getQueue().size()));
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor);
}
/**
* @return the executor for UI background processes.
* @return the executor for UI background processes. Run immediate shutdown
* when destroyed.
*/
@Bean(name = "uiExecutor")
@Bean(name = "uiExecutor", destroyMethod = "shutdownNow")
@ConditionalOnMissingBean(name = "uiExecutor")
public Executor uiExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(20);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 20, 10000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("ui-executor-pool-%d").build());
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
return new CloseableDelegatingSecurityContextExecutor(threadPoolExecutor);
}
/**
* @return {@link TaskExecutor} for task execution
*/
@Bean
@ConditionalOnMissingBean
public TaskExecutor taskExecutor() {
return new ConcurrentTaskExecutor(asyncExecutor());
}
/**
* @return {@link TaskScheduler} for scheduled tasks
*/
@Bean
@ConditionalOnMissingBean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
}

View File

@@ -69,6 +69,7 @@
<jackson.version>2.5.5</jackson.version>
<hibernate-validator.version>5.2.4.Final</hibernate-validator.version>
<spring-cloud-connectors.version>1.2.0.RELEASE</spring-cloud-connectors.version>
<spring-amqp.version>1.6.0.RELEASE</spring-amqp.version>
<!-- Support for MongoDB 3 -->
<spring-data-releasetrain.version>Fowler-SR1</spring-data-releasetrain.version>
<mongodb.version>3.2.2</mongodb.version>