From 12ce60f72a2835da02c3306d536c64f0533a6347 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Sun, 12 Jun 2016 08:09:28 +0200 Subject: [PATCH] Rabbit publisherConfirms Signed-off-by: Kai Zimmermann --- .../simulator/amqp/AmqpConfiguration.java | 22 ++++++-- .../src/main/resources/logback.xml | 1 - .../AsyncConfigurerAutoConfiguration.java | 1 - .../hawkbit/amqp/AmqpConfiguration.java | 55 +++++++++++++++---- .../amqp/DefaultAmqpSenderService.java | 16 +++++- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java index bf5d723a8..e954cec96 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java @@ -8,6 +8,7 @@ */ package org.eclipse.hawkbit.simulator.amqp; +import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -15,6 +16,7 @@ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -65,9 +67,12 @@ public class AmqpConfiguration { * @return the queue */ @Bean - public Queue receiverConnectorQueueFromSp() { - return new Queue(amqpProperties.getReceiverConnectorQueueFromSp(), true, false, false, - getDeadLetterExchangeArgs()); + public Queue receiverConnectorQueueFromHawkBit() { + final Map arguments = getDeadLetterExchangeArgs(); + arguments.putAll(getTTLMaxArgs()); + + return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments) + .build(); } /** @@ -89,7 +94,7 @@ public class AmqpConfiguration { */ @Bean public Binding bindReceiverQueueToSpExchange() { - return BindingBuilder.bind(receiverConnectorQueueFromSp()).to(exchangeQueueToConnector()); + return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector()); } /** @@ -99,7 +104,7 @@ public class AmqpConfiguration { */ @Bean public Queue deadLetterQueue() { - return new Queue(amqpProperties.getDeadLetterQueue(), true, false, true); + return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build(); } /** @@ -145,4 +150,11 @@ public class AmqpConfiguration { return args; } + private static Map getTTLMaxArgs() { + final Map args = new HashMap<>(); + args.put("x-message-ttl", Duration.ofDays(1).toMillis()); + args.put("x-max-length", 100_000); + return args; + } + } diff --git a/examples/hawkbit-device-simulator/src/main/resources/logback.xml b/examples/hawkbit-device-simulator/src/main/resources/logback.xml index da64e62d1..469c7bde3 100644 --- a/examples/hawkbit-device-simulator/src/main/resources/logback.xml +++ b/examples/hawkbit-device-simulator/src/main/resources/logback.xml @@ -19,7 +19,6 @@ - diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java index cdbbcba5b..7e2b780b4 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerAutoConfiguration.java @@ -23,7 +23,6 @@ import org.springframework.scheduling.annotation.EnableAsync; * * */ - @Configuration @EnableAsync @ConditionalOnMissingBean(AsyncConfigurer.class) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index adf56e10f..541ae414c 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -12,6 +12,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; @@ -30,6 +32,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.support.RetryTemplate;; /** * The spring AMQP configuration which is enabled by using the profile @@ -39,17 +43,27 @@ import org.springframework.context.annotation.Configuration; @EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class }) public class AmqpConfiguration { - @Autowired - protected AmqpProperties amqpProperties; + private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class); @Autowired - protected AmqpDeadletterProperties amqpDeadletterProperties; + private AmqpProperties amqpProperties; + + @Autowired + private AmqpDeadletterProperties amqpDeadletterProperties; + + @Autowired + @Qualifier("threadPoolExecutor") + private ThreadPoolExecutor threadPoolExecutor; @Autowired private ConnectionFactory rabbitConnectionFactory; @Configuration - protected static class HawkBitRabbitConnectionFactoryCreator { + protected static class RabbitConnectionFactoryCreator { + + @Autowired + private AmqpProperties amqpProperties; + @Autowired @Qualifier("threadPoolExecutor") private ThreadPoolExecutor threadPoolExecutor; @@ -57,15 +71,21 @@ public class AmqpConfiguration { @Autowired private ScheduledExecutorService scheduledExecutorService; - @Autowired - protected AmqpProperties amqpProperties; - + /** + * {@link ConnectionFactory} with enabled publisher confirms and + * heartbeat. + * + * @param config + * with standard {@link RabbitProperties} + * @return {@link ConnectionFactory} + */ @Bean public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) { final CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat()); factory.setExecutor(threadPoolExecutor); factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService); + factory.setPublisherConfirms(true); final String addresses = config.getAddresses(); factory.setAddresses(addresses); @@ -84,7 +104,6 @@ public class AmqpConfiguration { } return factory; } - } /** @@ -101,14 +120,28 @@ public class AmqpConfiguration { } /** - * Method to set the Jackson2JsonMessageConverter. - * - * @return the Jackson2JsonMessageConverter + * @return {@link RabbitTemplate} with automatic retry, published confirms + * and {@link Jackson2JsonMessageConverter}. */ @Bean public RabbitTemplate rabbitTemplate() { final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + + final RetryTemplate retryTemplate = new RetryTemplate(); + retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy()); + rabbitTemplate.setRetryTemplate(retryTemplate); + + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId()); + } else { + LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(), + cause); + } + + }); + return rabbitTemplate; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java index 244544b64..afb3e7ff2 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -9,10 +9,14 @@ package org.eclipse.hawkbit.amqp; import java.net.URI; +import java.util.UUID; import org.eclipse.hawkbit.util.IpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.CorrelationData; /** * A default implementation for the sender service. The service sends all amqp @@ -20,6 +24,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; * extracted from the uri. */ public class DefaultAmqpSenderService implements AmqpSenderService { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class); private final RabbitTemplate internalAmqpTemplate; @@ -39,7 +44,16 @@ public class DefaultAmqpSenderService implements AmqpSenderService { return; } - internalAmqpTemplate.send(extractExchange(replyTo), null, message); + final String correlationId = UUID.randomUUID().toString(); + final String exchange = extractExchange(replyTo); + + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId); + } else { + LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId); + } + + internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId)); } }