Merge pull request #203 from bsinno/feature_asynchronous_health

Upgraded Spring AMQP. Added task scheduler. Improved executor shutdown.
This commit is contained in:
Kai Zimmermann
2016-06-13 13:21:56 +02:00
committed by GitHub
13 changed files with 254 additions and 45 deletions

View File

@@ -8,6 +8,7 @@
*/
package org.eclipse.hawkbit.simulator.amqp;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -15,6 +16,7 @@ import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -65,9 +67,12 @@ public class AmqpConfiguration {
* @return the queue
*/
@Bean
public Queue receiverConnectorQueueFromSp() {
return new Queue(amqpProperties.getReceiverConnectorQueueFromSp(), true, false, false,
getDeadLetterExchangeArgs());
public Queue receiverConnectorQueueFromHawkBit() {
final Map<String, Object> arguments = getDeadLetterExchangeArgs();
arguments.putAll(getTTLMaxArgs());
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments)
.build();
}
/**
@@ -89,7 +94,7 @@ public class AmqpConfiguration {
*/
@Bean
public Binding bindReceiverQueueToSpExchange() {
return BindingBuilder.bind(receiverConnectorQueueFromSp()).to(exchangeQueueToConnector());
return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector());
}
/**
@@ -99,7 +104,7 @@ public class AmqpConfiguration {
*/
@Bean
public Queue deadLetterQueue() {
return new Queue(amqpProperties.getDeadLetterQueue(), true, false, true);
return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build();
}
/**
@@ -145,4 +150,11 @@ public class AmqpConfiguration {
return args;
}
private static Map<String, Object> getTTLMaxArgs() {
final Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Duration.ofDays(1).toMillis());
args.put("x-max-length", 100_000);
return args;
}
}

View File

@@ -82,21 +82,4 @@ public class MessageService {
clazz.getTypeName());
return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
}
/**
* Method to verify if lwm2m header is set.
*
* @param message
* the message with the header
* @param header
* the header to verify
*/
public void checkIfLwm2mHeaderEmpty(final Message message, final String header) {
final Object headerObject = message.getMessageProperties().getHeaders().get(header);
if (null == headerObject) {
logAndThrowMessageError(message, "Header of " + header + "empty.");
}
}
}

View File

@@ -19,7 +19,6 @@
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN" />
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN" />
<Logger name="org.apache.tomcat.jdbc.pool.ConnectionPool" level="DEBUG" />
<Logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR" />
<!-- Security Log with hints on potential attacks -->

View File

@@ -23,7 +23,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
*
*
*/
@Configuration
@EnableAsync
@ConditionalOnMissingBean(AsyncConfigurer.class)

View File

@@ -20,7 +20,7 @@ public class AsyncConfigurerThreadpoolProperties {
/**
* Max queue size for central event executor.
*/
private Integer queuesize = 250;
private Integer queuesize = 5_000;
/**
* Core processing threads for central event executor.
@@ -30,7 +30,7 @@ public class AsyncConfigurerThreadpoolProperties {
/**
* Maximum thread pool size for central event executor.
*/
private Integer maxthreads = 50;
private Integer maxthreads = 20;
/**
* When the number of threads is greater than the core, this is the maximum

View File

@@ -11,6 +11,7 @@ package org.eclipse.hawkbit.autoconfigure.scheduling;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -21,6 +22,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -39,11 +43,21 @@ public class ExecutorAutoConfiguration {
private AsyncConfigurerThreadpoolProperties asyncConfigurerProperties;
/**
* @return ExecutorService for general purpose multi threaded operations
* @return ExecutorService with security context availability in thread
* execution..
*/
@Bean
@ConditionalOnMissingBean
public Executor asyncExecutor() {
return new DelegatingSecurityContextExecutor(threadPoolExecutor());
}
/**
* @return central ThreadPoolExecutor for general purpose multi threaded
* operations. Tries an orderly shutdown when destroyed.
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor threadPoolExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(
asyncConfigurerProperties.getQueuesize());
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(asyncConfigurerProperties.getCorethreads(),
@@ -53,7 +67,8 @@ public class ExecutorAutoConfiguration {
threadPoolExecutor.setRejectedExecutionHandler((r, executor) -> LOGGER.warn(
"Reject runnable for centralExecutorService, reached limit of queue size {}",
executor.getQueue().size()));
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
return threadPoolExecutor;
}
/**
@@ -69,4 +84,32 @@ public class ExecutorAutoConfiguration {
return new DelegatingSecurityContextExecutor(threadPoolExecutor);
}
/**
* @return {@link TaskExecutor} for task execution
*/
@Bean
@ConditionalOnMissingBean
public TaskExecutor taskExecutor() {
return new ConcurrentTaskExecutor(asyncExecutor());
}
/**
* @return {@link ScheduledExecutorService} based on
* {@link #threadPoolTaskScheduler()}.
*/
@Bean
@ConditionalOnMissingBean
public ScheduledExecutorService scheduledExecutorService() {
return threadPoolTaskScheduler().getScheduledExecutor();
}
/**
* @return {@link ThreadPoolTaskScheduler} for scheduled operations.
*/
@Bean
@ConditionalOnMissingBean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
}

View File

@@ -27,12 +27,6 @@ vaadin.servlet.urlMapping=/UI/*
vaadin.servlet.heartbeatInterval=60
vaadin.servlet.closeIdleSessions=false
# Defines the thread pool executor
hawkbit.threadpool.corethreads=5
hawkbit.threadpool.maxthreads=20
hawkbit.threadpool.idletimeout=10000
hawkbit.threadpool.queuesize=20000
# Defines the polling time for the controllers in HH:MM:SS notation
hawkbit.controller.pollingTime=00:05:00
hawkbit.controller.pollingOverdueTime=00:05:00

View File

@@ -8,21 +8,32 @@
*/
package org.eclipse.hawkbit.amqp;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;;
/**
* The spring AMQP configuration which is enabled by using the profile
@@ -32,14 +43,68 @@ import org.springframework.context.annotation.Bean;
@EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class })
public class AmqpConfiguration {
@Autowired
protected AmqpProperties amqpProperties;
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class);
@Autowired
protected AmqpDeadletterProperties amqpDeadletterProperties;
private AmqpProperties amqpProperties;
@Autowired
private ConnectionFactory connectionFactory;
private AmqpDeadletterProperties amqpDeadletterProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Configuration
protected static class RabbitConnectionFactoryCreator {
@Autowired
private AmqpProperties amqpProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
/**
* {@link ConnectionFactory} with enabled publisher confirms and
* heartbeat.
*
* @param config
* with standard {@link RabbitProperties}
* @return {@link ConnectionFactory}
*/
@Bean
public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) {
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat());
factory.setExecutor(threadPoolExecutor);
factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService);
factory.setPublisherConfirms(true);
final String addresses = config.getAddresses();
factory.setAddresses(addresses);
if (config.getHost() != null) {
factory.setHost(config.getHost());
factory.setPort(config.getPort());
}
if (config.getUsername() != null) {
factory.setUsername(config.getUsername());
}
if (config.getPassword() != null) {
factory.setPassword(config.getPassword());
}
if (config.getVirtualHost() != null) {
factory.setVirtualHost(config.getVirtualHost());
}
return factory;
}
}
/**
* Create a {@link RabbitAdmin} and ignore declaration exceptions.
@@ -49,20 +114,34 @@ public class AmqpConfiguration {
*/
@Bean
public RabbitAdmin rabbitAdmin() {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
/**
* Method to set the Jackson2JsonMessageConverter.
*
* @return the Jackson2JsonMessageConverter
* @return {@link RabbitTemplate} with automatic retry, published confirms
* and {@link Jackson2JsonMessageConverter}.
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
final RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
rabbitTemplate.setRetryTemplate(retryTemplate);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId());
} else {
LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(),
cause);
}
});
return rabbitTemplate;
}
@@ -159,7 +238,7 @@ public class AmqpConfiguration {
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
return containerFactory;
}

View File

@@ -8,6 +8,8 @@
*/
package org.eclipse.hawkbit.amqp;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -38,6 +40,11 @@ public class AmqpProperties {
*/
private boolean missingQueuesFatal = false;
/**
* Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}.
*/
private int requestedHeartBeat = (int) TimeUnit.SECONDS.toSeconds(60);
/**
* Is missingQueuesFatal enabled
*
@@ -102,4 +109,13 @@ public class AmqpProperties {
public void setReceiverQueue(final String receiverQueue) {
this.receiverQueue = receiverQueue;
}
public int getRequestedHeartBeat() {
return requestedHeartBeat;
}
public void setRequestedHeartBeat(final int requestedHeartBeat) {
this.requestedHeartBeat = requestedHeartBeat;
}
}

View File

@@ -9,10 +9,14 @@
package org.eclipse.hawkbit.amqp;
import java.net.URI;
import java.util.UUID;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* A default implementation for the sender service. The service sends all amqp
@@ -20,6 +24,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
* extracted from the uri.
*/
public class DefaultAmqpSenderService implements AmqpSenderService {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class);
private final RabbitTemplate internalAmqpTemplate;
@@ -39,7 +44,16 @@ public class DefaultAmqpSenderService implements AmqpSenderService {
return;
}
internalAmqpTemplate.send(extractExchange(replyTo), null, message);
final String correlationId = UUID.randomUUID().toString();
final String exchange = extractExchange(replyTo);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId);
} else {
LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId);
}
internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId));
}
}

View File

@@ -8,6 +8,14 @@
*/
package org.eclipse.hawkbit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.amqp.AmqpProperties;
import org.eclipse.hawkbit.amqp.AmqpSenderService;
import org.eclipse.hawkbit.amqp.DefaultAmqpSenderService;
import org.eclipse.hawkbit.repository.jpa.model.helper.SystemSecurityContextHolder;
@@ -16,13 +24,22 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
*/
@Configuration
@EnableConfigurationProperties({ AmqpProperties.class })
public class AmqpTestConfiguration {
/**
* @return the {@link SystemSecurityContext} singleton bean which make it
@@ -56,4 +73,55 @@ public class AmqpTestConfiguration {
public AmqpSenderService amqpSenderServiceBean(final RabbitTemplate rabbitTemplate) {
return new DefaultAmqpSenderService(rabbitTemplate);
}
/**
* @return ExecutorService with security context availability in thread
* execution..
*/
@Bean
@ConditionalOnMissingBean
public Executor asyncExecutor() {
return new DelegatingSecurityContextExecutor(threadPoolExecutor());
}
/**
* @return central ThreadPoolExecutor for general purpose multi threaded
* operations. Tries an orderly shutdown when destroyed.
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor threadPoolExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("central-executor-pool-%d").build());
return threadPoolExecutor;
}
/**
* @return {@link TaskExecutor} for task execution
*/
@Bean
@ConditionalOnMissingBean
public TaskExecutor taskExecutor() {
return new ConcurrentTaskExecutor(asyncExecutor());
}
/**
* @return {@link ScheduledExecutorService} based on
* {@link #threadPoolTaskScheduler()}.
*/
@Bean
@ConditionalOnMissingBean
public ScheduledExecutorService scheduledExecutorService() {
return threadPoolTaskScheduler().getScheduledExecutor();
}
/**
* @return {@link ThreadPoolTaskScheduler} for scheduled operations.
*/
@Bean
@ConditionalOnMissingBean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
}

View File

@@ -33,6 +33,7 @@ import ru.yandex.qatools.allure.annotations.Stories;
@Stories("Test to generate the artifact download URL")
@SpringApplicationConfiguration(classes = { AmqpTestConfiguration.class,
org.eclipse.hawkbit.RepositoryApplicationConfiguration.class })
public class PropertyBasedArtifactUrlHandlerTest extends AbstractIntegrationTestWithMongoDB {
private static final String HTTPS_LOCALHOST = "https://localhost:8080/";

View File

@@ -69,6 +69,7 @@
<jackson.version>2.5.5</jackson.version>
<hibernate-validator.version>5.2.4.Final</hibernate-validator.version>
<spring-cloud-connectors.version>1.2.0.RELEASE</spring-cloud-connectors.version>
<spring-amqp.version>1.6.0.RELEASE</spring-amqp.version>
<spring-hateoas.version>0.18.0.RELEASE</spring-hateoas.version>
<!-- Support for MongoDB 3 -->
<spring-data-releasetrain.version>Fowler-SR1</spring-data-releasetrain.version>