Rabbit publisherConfirms

Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-06-12 08:09:28 +02:00
parent 64f3acfa2b
commit 12ce60f72a
5 changed files with 76 additions and 19 deletions

View File

@@ -8,6 +8,7 @@
*/
package org.eclipse.hawkbit.simulator.amqp;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@@ -15,6 +16,7 @@ import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -65,9 +67,12 @@ public class AmqpConfiguration {
* @return the queue
*/
@Bean
public Queue receiverConnectorQueueFromSp() {
return new Queue(amqpProperties.getReceiverConnectorQueueFromSp(), true, false, false,
getDeadLetterExchangeArgs());
public Queue receiverConnectorQueueFromHawkBit() {
final Map<String, Object> arguments = getDeadLetterExchangeArgs();
arguments.putAll(getTTLMaxArgs());
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments)
.build();
}
/**
@@ -89,7 +94,7 @@ public class AmqpConfiguration {
*/
@Bean
public Binding bindReceiverQueueToSpExchange() {
return BindingBuilder.bind(receiverConnectorQueueFromSp()).to(exchangeQueueToConnector());
return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector());
}
/**
@@ -99,7 +104,7 @@ public class AmqpConfiguration {
*/
@Bean
public Queue deadLetterQueue() {
return new Queue(amqpProperties.getDeadLetterQueue(), true, false, true);
return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build();
}
/**
@@ -145,4 +150,11 @@ public class AmqpConfiguration {
return args;
}
private static Map<String, Object> getTTLMaxArgs() {
final Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Duration.ofDays(1).toMillis());
args.put("x-max-length", 100_000);
return args;
}
}

View File

@@ -19,7 +19,6 @@
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN" />
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN" />
<Logger name="org.apache.tomcat.jdbc.pool.ConnectionPool" level="DEBUG" />
<Logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR" />
<!-- Security Log with hints on potential attacks -->

View File

@@ -23,7 +23,6 @@ import org.springframework.scheduling.annotation.EnableAsync;
*
*
*/
@Configuration
@EnableAsync
@ConditionalOnMissingBean(AsyncConfigurer.class)

View File

@@ -12,6 +12,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
@@ -30,6 +32,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;;
/**
* The spring AMQP configuration which is enabled by using the profile
@@ -39,17 +43,27 @@ import org.springframework.context.annotation.Configuration;
@EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class })
public class AmqpConfiguration {
@Autowired
protected AmqpProperties amqpProperties;
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class);
@Autowired
protected AmqpDeadletterProperties amqpDeadletterProperties;
private AmqpProperties amqpProperties;
@Autowired
private AmqpDeadletterProperties amqpDeadletterProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Configuration
protected static class HawkBitRabbitConnectionFactoryCreator {
protected static class RabbitConnectionFactoryCreator {
@Autowired
private AmqpProperties amqpProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@@ -57,15 +71,21 @@ public class AmqpConfiguration {
@Autowired
private ScheduledExecutorService scheduledExecutorService;
@Autowired
protected AmqpProperties amqpProperties;
/**
* {@link ConnectionFactory} with enabled publisher confirms and
* heartbeat.
*
* @param config
* with standard {@link RabbitProperties}
* @return {@link ConnectionFactory}
*/
@Bean
public ConnectionFactory rabbitConnectionFactory(final RabbitProperties config) {
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setRequestedHeartBeat(amqpProperties.getRequestedHeartBeat());
factory.setExecutor(threadPoolExecutor);
factory.getRabbitConnectionFactory().setHeartbeatExecutor(scheduledExecutorService);
factory.setPublisherConfirms(true);
final String addresses = config.getAddresses();
factory.setAddresses(addresses);
@@ -84,7 +104,6 @@ public class AmqpConfiguration {
}
return factory;
}
}
/**
@@ -101,14 +120,28 @@ public class AmqpConfiguration {
}
/**
* Method to set the Jackson2JsonMessageConverter.
*
* @return the Jackson2JsonMessageConverter
* @return {@link RabbitTemplate} with automatic retry, published confirms
* and {@link Jackson2JsonMessageConverter}.
*/
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
final RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
rabbitTemplate.setRetryTemplate(retryTemplate);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId());
} else {
LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(),
cause);
}
});
return rabbitTemplate;
}

View File

@@ -9,10 +9,14 @@
package org.eclipse.hawkbit.amqp;
import java.net.URI;
import java.util.UUID;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
/**
* A default implementation for the sender service. The service sends all amqp
@@ -20,6 +24,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
* extracted from the uri.
*/
public class DefaultAmqpSenderService implements AmqpSenderService {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class);
private final RabbitTemplate internalAmqpTemplate;
@@ -39,7 +44,16 @@ public class DefaultAmqpSenderService implements AmqpSenderService {
return;
}
internalAmqpTemplate.send(extractExchange(replyTo), null, message);
final String correlationId = UUID.randomUUID().toString();
final String exchange = extractExchange(replyTo);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId);
} else {
LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId);
}
internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId));
}
}