From 8cd6e5d614b3f5c711d65f4fc82881e4c8391d1d Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 29 Jun 2016 14:20:40 +0200 Subject: [PATCH 1/5] Made queue declaration retry count configurable. Signed-off-by: Kai Zimmermann --- .../hawkbit/amqp/AmqpConfiguration.java | 13 ++--- .../amqp/AmqpMessageHandlerService.java | 34 ++++++------- .../eclipse/hawkbit/amqp/AmqpProperties.java | 23 +++++++++ ...gurableRabbitListenerContainerFactory.java | 49 +++++++++++++++++++ 4 files changed, 89 insertions(+), 30 deletions(-) create mode 100644 hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 538f82213..11f892654 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -22,11 +22,11 @@ import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; @@ -268,15 +268,8 @@ public class AmqpConfiguration { * AMQP messages */ @Bean(name = { "listenerContainerFactory" }) - public SimpleRabbitListenerContainerFactory listenerContainerFactory() { - final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory(); - containerFactory.setDefaultRequeueRejected(true); - containerFactory.setConnectionFactory(rabbitConnectionFactory); - containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); - containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers()); - containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers()); - containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount()); - return containerFactory; + public RabbitListenerContainerFactory listenerContainerFactory() { + return new ConfigurableRabbitListenerContainerFactory(amqpProperties, rabbitConnectionFactory); } private static Map getTTLMaxArgsAuthenticationQueue() { diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index b003d316a..9636de5c7 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -138,9 +138,18 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.authenticationReceiverQueue}", containerFactory = "listenerContainerFactory") - public Message onAuthenticationRequest(final Message message, - @Header(MessageHeaderKey.TENANT) final String tenant) { - return onAuthenticationRequest(message); + public Message onAuthenticationRequest(final Message message) { + checkContentTypeJson(message); + final SecurityContext oldContext = SecurityContextHolder.getContext(); + try { + return handleAuthentifiactionMessage(message); + } catch (final IllegalArgumentException ex) { + throw new AmqpRejectAndDontRequeueException("Invalid message!", ex); + } catch (final TenantNotExistException teex) { + throw new AmqpRejectAndDontRequeueException(teex); + } finally { + SecurityContextHolder.setContext(oldContext); + } } public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { @@ -159,7 +168,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { final EventTopic eventTopic = EventTopic.valueOf(topicValue); handleIncomingEvent(message, eventTopic); break; - default: logAndThrowMessageError(message, "No handle method was found for the given message type."); } @@ -173,20 +181,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return null; } - public Message onAuthenticationRequest(final Message message) { - checkContentTypeJson(message); - final SecurityContext oldContext = SecurityContextHolder.getContext(); - try { - return handleAuthentifiactionMessage(message); - } catch (final IllegalArgumentException ex) { - throw new AmqpRejectAndDontRequeueException("Invalid message!", ex); - } catch (final TenantNotExistException teex) { - throw new AmqpRejectAndDontRequeueException(teex); - } finally { - SecurityContextHolder.setContext(oldContext); - } - } - private Message handleAuthentifiactionMessage(final Message message) { final DownloadResponse authentificationResponse = new DownloadResponse(); final MessageProperties messageProperties = message.getMessageProperties(); @@ -414,7 +408,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { if (ArrayUtils.isNotEmpty(message.getMessageProperties().getCorrelationId())) { actionStatus.addMessage(RepositoryConstants.SERVER_MESSAGE_PREFIX + "DMF message correlation-id " - + message.getMessageProperties().getCorrelationId()); + + new String(message.getMessageProperties().getCorrelationId())); } actionStatus.setAction(action); @@ -466,7 +460,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) { return; } - throw new IllegalArgumentException("Content-Type is not JSON compatible"); + throw new AmqpRejectAndDontRequeueException("Content-Type is not JSON compatible"); } void setControllerManagement(final ControllerManagement controllerManagement) { diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index 888b204e7..56aa37772 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -68,6 +68,29 @@ public class AmqpProperties { */ private int initialConcurrentConsumers = 3; + /** + * The number of retry attempts when passive queue declaration fails. + * Passive queue declaration occurs when the consumer starts or, when + * consuming from multiple queues, when not all queues were available during + * initialization. + */ + private int declarationRetries = 50; + + /** + * @return the declarationRetries + */ + public int getDeclarationRetries() { + return declarationRetries; + } + + /** + * @param declarationRetries + * the declarationRetries to set + */ + public void setDeclarationRetries(final int declarationRetries) { + this.declarationRetries = declarationRetries; + } + public String getAuthenticationReceiverQueue() { return authenticationReceiverQueue; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java new file mode 100644 index 000000000..fd0a1d2a1 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.hawkbit.amqp; + +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; + +/** + * {@link RabbitListenerContainerFactory} that can be configured through + * hawkBit's {@link AmqpProperties}. + * + */ +public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory { + private final AmqpProperties amqpProperties; + + /** + * Constructor. + * + * @param rabbitConnectionFactory + * for the container factory + * @param amqpProperties + * to configure the container factory + */ + public ConfigurableRabbitListenerContainerFactory(final AmqpProperties amqpProperties, + final ConnectionFactory rabbitConnectionFactory) { + this.amqpProperties = amqpProperties; + setDefaultRequeueRejected(true); + setConnectionFactory(rabbitConnectionFactory); + setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); + setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers()); + setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers()); + setPrefetchCount(amqpProperties.getPrefetchCount()); + + } + + @Override + protected void initializeContainer(final SimpleMessageListenerContainer instance) { + super.initializeContainer(instance); + instance.setDeclarationRetries(amqpProperties.getDeclarationRetries()); + } +} From 0c4d88a8f4714f7087bc10702eec6e7b2a600bd6 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 29 Jun 2016 15:15:06 +0200 Subject: [PATCH 2/5] Fixed test. Signed-off-by: Kai Zimmermann --- .../org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 6e61d1642..0dcf6cd85 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -141,7 +141,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted due to worng content type"); - } catch (final IllegalArgumentException e) { + } catch (final AmqpRejectAndDontRequeueException e) { } } From 202054e129631d820440d0d1b1a8ee59565ad739 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 29 Jun 2016 15:20:23 +0200 Subject: [PATCH 3/5] Fixed sonar issue. Signed-off-by: Kai Zimmermann --- .../amqp/ConfigurableRabbitListenerContainerFactory.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java index fd0a1d2a1..14e6f8fcb 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/ConfigurableRabbitListenerContainerFactory.java @@ -42,6 +42,9 @@ public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitList } @Override + // Exception squid:UnusedProtectedMethod - called by + // AbstractRabbitListenerContainerFactory + @SuppressWarnings("squid:UnusedProtectedMethod") protected void initializeContainer(final SimpleMessageListenerContainer instance) { super.initializeContainer(instance); instance.setDeclarationRetries(amqpProperties.getDeclarationRetries()); From c3b999792b5190f943171c45e04aec2618d302a7 Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 29 Jun 2016 16:18:35 +0200 Subject: [PATCH 4/5] Clarified string conversion. Signed-off-by: Kai Zimmermann --- .../org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 9636de5c7..d1bf85103 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -408,7 +408,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { if (ArrayUtils.isNotEmpty(message.getMessageProperties().getCorrelationId())) { actionStatus.addMessage(RepositoryConstants.SERVER_MESSAGE_PREFIX + "DMF message correlation-id " - + new String(message.getMessageProperties().getCorrelationId())); + + convertCorrelationId(message)); } actionStatus.setAction(action); @@ -416,6 +416,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return actionStatus; } + private static String convertCorrelationId(final Message message) { + return new String(message.getMessageProperties().getCorrelationId()); + } + private Action getUpdateActionStatus(final ActionStatus actionStatus) { if (actionStatus.getStatus().equals(Status.CANCELED)) { return controllerManagement.addCancelActionStatus(actionStatus); From 8d6526ae8fe25c164d46597b3966c74e0731571c Mon Sep 17 00:00:00 2001 From: Kai Zimmermann Date: Wed, 29 Jun 2016 17:01:24 +0200 Subject: [PATCH 5/5] Fixed fail messages. Signed-off-by: Kai Zimmermann --- .../amqp/AmqpMessageHandlerServiceTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 0dcf6cd85..2627731c6 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -140,7 +140,7 @@ public class AmqpMessageHandlerServiceTest { final Message message = new Message(new byte[0], messageProperties); try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted due to worng content type"); + fail("AmqpRejectAndDontRequeueException was excepeted due to worng content type"); } catch (final AmqpRejectAndDontRequeueException e) { } } @@ -175,7 +175,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted since no replyTo header was set"); + fail("AmqpRejectAndDontRequeueException was excepeted since no replyTo header was set"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -189,7 +189,7 @@ public class AmqpMessageHandlerServiceTest { final Message message = messageConverter.toMessage(new byte[0], messageProperties); try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted since no thingID was set"); + fail("AmqpRejectAndDontRequeueException was excepeted since no thingID was set"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -205,7 +205,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost"); - fail("IllegalArgumentException was excepeted due to unknown message type"); + fail("AmqpRejectAndDontRequeueException was excepeted due to unknown message type"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -218,21 +218,21 @@ public class AmqpMessageHandlerServiceTest { final Message message = new Message(new byte[0], messageProperties); try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted due to unknown message type"); + fail("AmqpRejectAndDontRequeueException was excepeted due to unknown message type"); } catch (final AmqpRejectAndDontRequeueException e) { } try { messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic"); amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted due to unknown topic"); + fail("AmqpRejectAndDontRequeueException was excepeted due to unknown topic"); } catch (final AmqpRejectAndDontRequeueException e) { } messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name()); try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted because there was no event topic"); + fail("AmqpRejectAndDontRequeueException was excepeted because there was no event topic"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -251,7 +251,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted since no action id was set"); + fail("AmqpRejectAndDontRequeueException was excepeted since no action id was set"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -268,7 +268,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); - fail("IllegalArgumentException was excepeted since no action id was set"); + fail("AmqpRejectAndDontRequeueException was excepeted since no action id was set"); } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted }