From 0beef57df77e37346bf7ad5a79aa30330e27f9b8 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Thu, 9 Jun 2016 11:57:44 +0200 Subject: [PATCH] Added rabbit connection heartbeat for DMF. Signed-off-by: Kai Zimmermann --- .../scheduling/ExecutorAutoConfiguration.java | 17 ++++-- .../hawkbit/amqp/AmqpConfiguration.java | 54 +++++++++++++++++-- .../eclipse/hawkbit/amqp/AmqpProperties.java | 16 ++++++ 3 files changed, 80 insertions(+), 7 deletions(-) diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 4fd55cbaa..4f606f1b1 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -11,6 +11,7 @@ package org.eclipse.hawkbit.autoconfigure.scheduling; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -22,7 +23,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; -import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; @@ -94,11 +94,22 @@ public class ExecutorAutoConfiguration { } /** - * @return {@link TaskScheduler} for scheduled tasks + * @return {@link ScheduledExecutorService} based on + * {@link #threadPoolTaskScheduler()}. */ @Bean @ConditionalOnMissingBean - public TaskScheduler taskScheduler() { + public ScheduledExecutorService scheduledExecutorService() { + return threadPoolTaskScheduler().getScheduledExecutor(); + } + + /** + * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + */ + @Bean + @ConditionalOnMissingBean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { return new ThreadPoolTaskScheduler(); } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 20a11713f..adf56e10f 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -8,21 +8,28 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; /** * The spring AMQP configuration which is enabled by using the profile @@ -39,7 +46,46 @@ public class AmqpConfiguration { protected AmqpDeadletterProperties amqpDeadletterProperties; @Autowired - private ConnectionFactory connectionFactory; + private ConnectionFactory rabbitConnectionFactory; + + @Configuration + protected static class HawkBitRabbitConnectionFactoryCreator { + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; + + @Autowired + private ScheduledExecutorService scheduledExecutorService; + + @Autowired + protected AmqpProperties amqpProperties; + + @Bean + public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) { + final CachingConnectionFactory factory = new CachingConnectionFactory(); + factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat()); + factory.setExecutor(threadPoolExecutor); + factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService); + + final String addresses = config.getAddresses(); + factory.setAddresses(addresses); + if (config.getHost() != null) { + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + } + if (config.getUsername() != null) { + factory.setUsername(config.getUsername()); + } + if (config.getPassword() != null) { + factory.setPassword(config.getPassword()); + } + if (config.getVirtualHost() != null) { + factory.setVirtualHost(config.getVirtualHost()); + } + return factory; + } + + } /** * Create a {@link RabbitAdmin} and ignore declaration exceptions. @@ -49,7 +95,7 @@ public class AmqpConfiguration { */ @Bean public RabbitAdmin rabbitAdmin() { - final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory); rabbitAdmin.setIgnoreDeclarationExceptions(true); return rabbitAdmin; } @@ -61,7 +107,7 @@ public class AmqpConfiguration { */ @Bean public RabbitTemplate rabbitTemplate() { - final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @@ -159,7 +205,7 @@ public class AmqpConfiguration { public SimpleRabbitListenerContainerFactory listenerContainerFactory() { final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); containerFactory.setDefaultRequeueRejected(false); - containerFactory.setConnectionFactory(connectionFactory); + containerFactory.setConnectionFactory(rabbitConnectionFactory); containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); return containerFactory; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index ce6068ce8..5b06c3318 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -8,6 +8,8 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.concurrent.TimeUnit; + import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -38,6 +40,11 @@ public class AmqpProperties { */ private boolean missingQueuesFatal = false; + /** + * Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}. + */ + private int requestedHeartBeat = 60; + /** * Is missingQueuesFatal enabled * @@ -102,4 +109,13 @@ public class AmqpProperties { public void setReceiverQueue(final String receiverQueue) { this.receiverQueue = receiverQueue; } + + public int getRequestedHeartBeat() { + return requestedHeartBeat; + } + + public void setRequestedHeartBeat(final int requestedHeartBeat) { + this.requestedHeartBeat = requestedHeartBeat; + } + }