From cc1dc0951602b02a8f7f5d25bff13ecedd62c576 Mon Sep 17 00:00:00 2001 From: kaizimmerm Date: Sun, 19 Jun 2016 20:43:12 +0200 Subject: [PATCH] Configurable prefetch and consumer pool. Signed-off-by: kaizimmerm --- hawkbit-dmf-amqp/pom.xml | 10 +++++++ .../hawkbit/amqp/AmqpConfiguration.java | 12 +++----- .../amqp/AmqpMessageHandlerService.java | 25 +++++++++-------- .../eclipse/hawkbit/amqp/AmqpProperties.java | 28 +++++++++++++++++++ .../amqp/AmqpMessageHandlerServiceTest.java | 17 +++++------ 5 files changed, 65 insertions(+), 27 deletions(-) diff --git a/hawkbit-dmf-amqp/pom.xml b/hawkbit-dmf-amqp/pom.xml index c2ed7c213..322dd5047 100644 --- a/hawkbit-dmf-amqp/pom.xml +++ b/hawkbit-dmf-amqp/pom.xml @@ -49,6 +49,11 @@ org.springframework.amqp spring-rabbit + + com.rabbitmq + amqp-client + 3.6.2 + org.springframework.security spring-security-web @@ -154,6 +159,11 @@ spring-context-support test + + org.scala-lang + scala-library + 2.10.4 + \ No newline at end of file diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 9435e8860..0e7e766e8 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -32,7 +32,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.task.TaskExecutor; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.support.RetryTemplate;; @@ -59,9 +58,6 @@ public class AmqpConfiguration { @Autowired private ConnectionFactory rabbitConnectionFactory; - @Autowired - private TaskExecutor taskExecutor; - @Configuration protected static class RabbitConnectionFactoryCreator { @@ -201,8 +197,8 @@ public class AmqpConfiguration { } /** - * Create the Binding {@link AmqpConfiguration#receiverQueueFromSp()} to - * {@link AmqpConfiguration#senderConnectorToSpExchange()}. + * Create the Binding {@link AmqpConfiguration#receiverQueue()} to + * {@link AmqpConfiguration#senderExchange()}. * * @return the binding and create the queue and exchange */ @@ -244,9 +240,9 @@ public class AmqpConfiguration { containerFactory.setDefaultRequeueRejected(true); containerFactory.setConnectionFactory(rabbitConnectionFactory); containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal()); - containerFactory.setTaskExecutor(taskExecutor); - containerFactory.setConcurrentConsumers(3); + containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers()); containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers()); + containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount()); return containerFactory; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 107850fc1..caa11ab3d 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -37,6 +37,7 @@ import org.eclipse.hawkbit.repository.ControllerManagement; import org.eclipse.hawkbit.repository.EntityFactory; import org.eclipse.hawkbit.repository.eventbus.event.TargetAssignDistributionSetEvent; import org.eclipse.hawkbit.repository.exception.EntityNotFoundException; +import org.eclipse.hawkbit.repository.exception.TenantNotExistException; import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.ActionStatus; @@ -47,6 +48,7 @@ import org.eclipse.hawkbit.repository.model.Target; import org.eclipse.hawkbit.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -111,12 +113,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { super(defaultTemplate); } - @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory") - private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type, - @Header(MessageHeaderKey.TENANT) final String tenant) { - return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost()); - } - /** * Method to handle all incoming amqp messages. * @@ -124,14 +120,17 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * incoming message * @param type * the message type - * @param contentType - * the contentType of the message * @param tenant * the contentType of the message - * @param virtualHost - * the virtual host + * * @return a message if no message is send back to sender */ + @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory") + public Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type, + @Header(MessageHeaderKey.TENANT) final String tenant) { + return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost()); + } + public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { checkContentTypeJson(message); final SecurityContext oldContext = SecurityContextHolder.getContext(); @@ -153,6 +152,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService { default: logAndThrowMessageError(message, "No handle method was found for the given message type."); } + } catch (final IllegalArgumentException ex) { + throw new AmqpRejectAndDontRequeueException("Invalid message!", ex); + } catch (final TenantNotExistException teex) { + throw new AmqpRejectAndDontRequeueException(teex); } finally { SecurityContextHolder.setContext(oldContext); } @@ -421,7 +424,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } } - private void checkContentTypeJson(final Message message) { + private static void checkContentTypeJson(final Message message) { final MessageProperties messageProperties = message.getMessageProperties(); if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) { return; diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index a20c59792..ace1fefa2 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -50,6 +50,34 @@ public class AmqpProperties { */ private int maxConcurrentConsumers = 10; + /** + * Tells the broker how many messages to send to each consumer in a single + * request. Often this can be set quite high to improve throughput. + */ + private int prefetchCount = 10; + + /** + * Initial number of consumers. Is scaled up if necessary up to + * {@link #maxConcurrentConsumers}. + */ + private int initialConcurrentConsumers = 3; + + public int getPrefetchCount() { + return prefetchCount; + } + + public void setPrefetchCount(final int prefetchCount) { + this.prefetchCount = prefetchCount; + } + + public int getInitialConcurrentConsumers() { + return initialConcurrentConsumers; + } + + public void setInitialConcurrentConsumers(final int initialConcurrentConsumers) { + this.initialConcurrentConsumers = initialConcurrentConsumers; + } + public int getMaxConcurrentConsumers() { return maxConcurrentConsumers; } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index a0515a871..338feea62 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -59,6 +59,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import org.springframework.amqp.AmqpRejectAndDontRequeueException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -170,7 +171,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no replyTo header was set"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -184,7 +185,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no thingID was set"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } } @@ -200,7 +201,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost"); fail("IllegalArgumentException was excepeted due to unknown message type"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } } @@ -213,21 +214,21 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted due to unknown message type"); - } catch (final IllegalArgumentException e) { + } catch (final AmqpRejectAndDontRequeueException e) { } try { messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic"); amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted due to unknown topic"); - } catch (final IllegalArgumentException e) { + } catch (final AmqpRejectAndDontRequeueException e) { } messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name()); try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted because there was no event topic"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } @@ -246,7 +247,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no action id was set"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted } } @@ -263,7 +264,7 @@ public class AmqpMessageHandlerServiceTest { try { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no action id was set"); - } catch (final IllegalArgumentException exception) { + } catch (final AmqpRejectAndDontRequeueException exception) { // test ok - exception was excepted }