Made queue declaration retry count configurable.

Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-06-29 14:20:40 +02:00
parent 5fb86bbb2f
commit 8cd6e5d614
4 changed files with 89 additions and 30 deletions

View File

@@ -22,11 +22,11 @@ import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
@@ -268,15 +268,8 @@ public class AmqpConfiguration {
* AMQP messages
*/
@Bean(name = { "listenerContainerFactory" })
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount());
return containerFactory;
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> listenerContainerFactory() {
return new ConfigurableRabbitListenerContainerFactory(amqpProperties, rabbitConnectionFactory);
}
private static Map<String, Object> getTTLMaxArgsAuthenticationQueue() {

View File

@@ -138,9 +138,18 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.authenticationReceiverQueue}", containerFactory = "listenerContainerFactory")
public Message onAuthenticationRequest(final Message message,
@Header(MessageHeaderKey.TENANT) final String tenant) {
return onAuthenticationRequest(message);
public Message onAuthenticationRequest(final Message message) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
return handleAuthentifiactionMessage(message);
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
@@ -159,7 +168,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
final EventTopic eventTopic = EventTopic.valueOf(topicValue);
handleIncomingEvent(message, eventTopic);
break;
default:
logAndThrowMessageError(message, "No handle method was found for the given message type.");
}
@@ -173,20 +181,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
return null;
}
public Message onAuthenticationRequest(final Message message) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
return handleAuthentifiactionMessage(message);
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
private Message handleAuthentifiactionMessage(final Message message) {
final DownloadResponse authentificationResponse = new DownloadResponse();
final MessageProperties messageProperties = message.getMessageProperties();
@@ -414,7 +408,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
if (ArrayUtils.isNotEmpty(message.getMessageProperties().getCorrelationId())) {
actionStatus.addMessage(RepositoryConstants.SERVER_MESSAGE_PREFIX + "DMF message correlation-id "
+ message.getMessageProperties().getCorrelationId());
+ new String(message.getMessageProperties().getCorrelationId()));
}
actionStatus.setAction(action);
@@ -466,7 +460,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) {
return;
}
throw new IllegalArgumentException("Content-Type is not JSON compatible");
throw new AmqpRejectAndDontRequeueException("Content-Type is not JSON compatible");
}
void setControllerManagement(final ControllerManagement controllerManagement) {

View File

@@ -68,6 +68,29 @@ public class AmqpProperties {
*/
private int initialConcurrentConsumers = 3;
/**
* The number of retry attempts when passive queue declaration fails.
* Passive queue declaration occurs when the consumer starts or, when
* consuming from multiple queues, when not all queues were available during
* initialization.
*/
private int declarationRetries = 50;
/**
* @return the declarationRetries
*/
public int getDeclarationRetries() {
return declarationRetries;
}
/**
* @param declarationRetries
* the declarationRetries to set
*/
public void setDeclarationRetries(final int declarationRetries) {
this.declarationRetries = declarationRetries;
}
public String getAuthenticationReceiverQueue() {
return authenticationReceiverQueue;
}

View File

@@ -0,0 +1,49 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
/**
* {@link RabbitListenerContainerFactory} that can be configured through
* hawkBit's {@link AmqpProperties}.
*
*/
public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {
private final AmqpProperties amqpProperties;
/**
* Constructor.
*
* @param rabbitConnectionFactory
* for the container factory
* @param amqpProperties
* to configure the container factory
*/
public ConfigurableRabbitListenerContainerFactory(final AmqpProperties amqpProperties,
final ConnectionFactory rabbitConnectionFactory) {
this.amqpProperties = amqpProperties;
setDefaultRequeueRejected(true);
setConnectionFactory(rabbitConnectionFactory);
setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
setPrefetchCount(amqpProperties.getPrefetchCount());
}
@Override
protected void initializeContainer(final SimpleMessageListenerContainer instance) {
super.initializeContainer(instance);
instance.setDeclarationRetries(amqpProperties.getDeclarationRetries());
}
}