Merge pull request #232 from bsinno/feature_config_rabbit_factory

Made queue declaration retry count configurable.
This commit is contained in:
Kai Zimmermann
2016-06-29 20:26:57 +02:00
committed by GitHub
5 changed files with 106 additions and 40 deletions

View File

@@ -22,11 +22,11 @@ import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
@@ -268,15 +268,8 @@ public class AmqpConfiguration {
* AMQP messages
*/
@Bean(name = { "listenerContainerFactory" })
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount());
return containerFactory;
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> listenerContainerFactory() {
return new ConfigurableRabbitListenerContainerFactory(amqpProperties, rabbitConnectionFactory);
}
private static Map<String, Object> getTTLMaxArgsAuthenticationQueue() {

View File

@@ -138,9 +138,18 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.authenticationReceiverQueue}", containerFactory = "listenerContainerFactory")
public Message onAuthenticationRequest(final Message message,
@Header(MessageHeaderKey.TENANT) final String tenant) {
return onAuthenticationRequest(message);
public Message onAuthenticationRequest(final Message message) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
return handleAuthentifiactionMessage(message);
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
@@ -159,7 +168,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
final EventTopic eventTopic = EventTopic.valueOf(topicValue);
handleIncomingEvent(message, eventTopic);
break;
default:
logAndThrowMessageError(message, "No handle method was found for the given message type.");
}
@@ -173,20 +181,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
return null;
}
public Message onAuthenticationRequest(final Message message) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
return handleAuthentifiactionMessage(message);
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
private Message handleAuthentifiactionMessage(final Message message) {
final DownloadResponse authentificationResponse = new DownloadResponse();
final MessageProperties messageProperties = message.getMessageProperties();
@@ -414,7 +408,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
if (ArrayUtils.isNotEmpty(message.getMessageProperties().getCorrelationId())) {
actionStatus.addMessage(RepositoryConstants.SERVER_MESSAGE_PREFIX + "DMF message correlation-id "
+ message.getMessageProperties().getCorrelationId());
+ convertCorrelationId(message));
}
actionStatus.setAction(action);
@@ -422,6 +416,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
return actionStatus;
}
private static String convertCorrelationId(final Message message) {
return new String(message.getMessageProperties().getCorrelationId());
}
private Action getUpdateActionStatus(final ActionStatus actionStatus) {
if (actionStatus.getStatus().equals(Status.CANCELED)) {
return controllerManagement.addCancelActionStatus(actionStatus);
@@ -466,7 +464,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) {
return;
}
throw new IllegalArgumentException("Content-Type is not JSON compatible");
throw new AmqpRejectAndDontRequeueException("Content-Type is not JSON compatible");
}
void setControllerManagement(final ControllerManagement controllerManagement) {

View File

@@ -68,6 +68,29 @@ public class AmqpProperties {
*/
private int initialConcurrentConsumers = 3;
/**
* The number of retry attempts when passive queue declaration fails.
* Passive queue declaration occurs when the consumer starts or, when
* consuming from multiple queues, when not all queues were available during
* initialization.
*/
private int declarationRetries = 50;
/**
* @return the declarationRetries
*/
public int getDeclarationRetries() {
return declarationRetries;
}
/**
* @param declarationRetries
* the declarationRetries to set
*/
public void setDeclarationRetries(final int declarationRetries) {
this.declarationRetries = declarationRetries;
}
public String getAuthenticationReceiverQueue() {
return authenticationReceiverQueue;
}

View File

@@ -0,0 +1,52 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
/**
* {@link RabbitListenerContainerFactory} that can be configured through
* hawkBit's {@link AmqpProperties}.
*
*/
public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {
private final AmqpProperties amqpProperties;
/**
* Constructor.
*
* @param rabbitConnectionFactory
* for the container factory
* @param amqpProperties
* to configure the container factory
*/
public ConfigurableRabbitListenerContainerFactory(final AmqpProperties amqpProperties,
final ConnectionFactory rabbitConnectionFactory) {
this.amqpProperties = amqpProperties;
setDefaultRequeueRejected(true);
setConnectionFactory(rabbitConnectionFactory);
setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
setPrefetchCount(amqpProperties.getPrefetchCount());
}
@Override
// Exception squid:UnusedProtectedMethod - called by
// AbstractRabbitListenerContainerFactory
@SuppressWarnings("squid:UnusedProtectedMethod")
protected void initializeContainer(final SimpleMessageListenerContainer instance) {
super.initializeContainer(instance);
instance.setDeclarationRetries(amqpProperties.getDeclarationRetries());
}
}

View File

@@ -140,8 +140,8 @@ public class AmqpMessageHandlerServiceTest {
final Message message = new Message(new byte[0], messageProperties);
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to worng content type");
} catch (final IllegalArgumentException e) {
fail("AmqpRejectAndDontRequeueException was excepeted due to worng content type");
} catch (final AmqpRejectAndDontRequeueException e) {
}
}
@@ -175,7 +175,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no replyTo header was set");
fail("AmqpRejectAndDontRequeueException was excepeted since no replyTo header was set");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -189,7 +189,7 @@ public class AmqpMessageHandlerServiceTest {
final Message message = messageConverter.toMessage(new byte[0], messageProperties);
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no thingID was set");
fail("AmqpRejectAndDontRequeueException was excepeted since no thingID was set");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -205,7 +205,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
fail("AmqpRejectAndDontRequeueException was excepeted due to unknown message type");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -218,21 +218,21 @@ public class AmqpMessageHandlerServiceTest {
final Message message = new Message(new byte[0], messageProperties);
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
fail("AmqpRejectAndDontRequeueException was excepeted due to unknown message type");
} catch (final AmqpRejectAndDontRequeueException e) {
}
try {
messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic");
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown topic");
fail("AmqpRejectAndDontRequeueException was excepeted due to unknown topic");
} catch (final AmqpRejectAndDontRequeueException e) {
}
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name());
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted because there was no event topic");
fail("AmqpRejectAndDontRequeueException was excepeted because there was no event topic");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -251,7 +251,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
fail("AmqpRejectAndDontRequeueException was excepeted since no action id was set");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -268,7 +268,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
fail("AmqpRejectAndDontRequeueException was excepeted since no action id was set");
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}