Added rabbit connection heartbeat for DMF.

Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-06-09 11:57:44 +02:00
parent 4f23fb1377
commit 0beef57df7
3 changed files with 80 additions and 7 deletions

View File

@@ -11,6 +11,7 @@ package org.eclipse.hawkbit.autoconfigure.scheduling;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -22,7 +23,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
@@ -94,11 +94,22 @@ public class ExecutorAutoConfiguration {
}
/**
* @return {@link TaskScheduler} for scheduled tasks
* @return {@link ScheduledExecutorService} based on
* {@link #threadPoolTaskScheduler()}.
*/
@Bean
@ConditionalOnMissingBean
public TaskScheduler taskScheduler() {
public ScheduledExecutorService scheduledExecutorService() {
return threadPoolTaskScheduler().getScheduledExecutor();
}
/**
* @return {@link ThreadPoolTaskScheduler} for scheduled operations.
*/
@Bean
@ConditionalOnMissingBean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
}

View File

@@ -8,21 +8,28 @@
*/
package org.eclipse.hawkbit.amqp;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* The spring AMQP configuration which is enabled by using the profile
@@ -39,7 +46,46 @@ public class AmqpConfiguration {
protected AmqpDeadletterProperties amqpDeadletterProperties;
@Autowired
private ConnectionFactory connectionFactory;
private ConnectionFactory rabbitConnectionFactory;
@Configuration
protected static class HawkBitRabbitConnectionFactoryCreator {
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
@Autowired
protected AmqpProperties amqpProperties;
@Bean
public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) {
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat());
factory.setExecutor(threadPoolExecutor);
factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService);
final String addresses = config.getAddresses();
factory.setAddresses(addresses);
if (config.getHost() != null) {
factory.setHost(config.getHost());
factory.setPort(config.getPort());
}
if (config.getUsername() != null) {
factory.setUsername(config.getUsername());
}
if (config.getPassword() != null) {
factory.setPassword(config.getPassword());
}
if (config.getVirtualHost() != null) {
factory.setVirtualHost(config.getVirtualHost());
}
return factory;
}
}
/**
* Create a {@link RabbitAdmin} and ignore declaration exceptions.
@@ -49,7 +95,7 @@ public class AmqpConfiguration {
*/
@Bean
public RabbitAdmin rabbitAdmin() {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
@@ -61,7 +107,7 @@ public class AmqpConfiguration {
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@@ -159,7 +205,7 @@ public class AmqpConfiguration {
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
return containerFactory;
}

View File

@@ -8,6 +8,8 @@
*/
package org.eclipse.hawkbit.amqp;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -38,6 +40,11 @@ public class AmqpProperties {
*/
private boolean missingQueuesFatal = false;
/**
* Requested heartbeat interval from broker in {@link TimeUnit#SECONDS}.
*/
private int requestedHeartBeat = 60;
/**
* Is missingQueuesFatal enabled
*
@@ -102,4 +109,13 @@ public class AmqpProperties {
public void setReceiverQueue(final String receiverQueue) {
this.receiverQueue = receiverQueue;
}
public int getRequestedHeartBeat() {
return requestedHeartBeat;
}
public void setRequestedHeartBeat(final int requestedHeartBeat) {
this.requestedHeartBeat = requestedHeartBeat;
}
}