diff --git a/hawkbit-dmf-amqp/pom.xml b/hawkbit-dmf-amqp/pom.xml
index c2ed7c213..322dd5047 100644
--- a/hawkbit-dmf-amqp/pom.xml
+++ b/hawkbit-dmf-amqp/pom.xml
@@ -49,6 +49,11 @@
org.springframework.amqp
spring-rabbit
+
+ com.rabbitmq
+ amqp-client
+ 3.6.2
+
org.springframework.security
spring-security-web
@@ -154,6 +159,11 @@
spring-context-support
test
+
+ org.scala-lang
+ scala-library
+ 2.10.4
+
\ No newline at end of file
diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java
index 9435e8860..0e7e766e8 100644
--- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java
+++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java
@@ -32,7 +32,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;;
@@ -59,9 +58,6 @@ public class AmqpConfiguration {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
- @Autowired
- private TaskExecutor taskExecutor;
-
@Configuration
protected static class RabbitConnectionFactoryCreator {
@@ -201,8 +197,8 @@ public class AmqpConfiguration {
}
/**
- * Create the Binding {@link AmqpConfiguration#receiverQueueFromSp()} to
- * {@link AmqpConfiguration#senderConnectorToSpExchange()}.
+ * Create the Binding {@link AmqpConfiguration#receiverQueue()} to
+ * {@link AmqpConfiguration#senderExchange()}.
*
* @return the binding and create the queue and exchange
*/
@@ -244,9 +240,9 @@ public class AmqpConfiguration {
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
- containerFactory.setTaskExecutor(taskExecutor);
- containerFactory.setConcurrentConsumers(3);
+ containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
+ containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount());
return containerFactory;
}
diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java
index 107850fc1..caa11ab3d 100644
--- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java
+++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java
@@ -37,6 +37,7 @@ import org.eclipse.hawkbit.repository.ControllerManagement;
import org.eclipse.hawkbit.repository.EntityFactory;
import org.eclipse.hawkbit.repository.eventbus.event.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
+import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Action.Status;
import org.eclipse.hawkbit.repository.model.ActionStatus;
@@ -47,6 +48,7 @@ import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -111,12 +113,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
super(defaultTemplate);
}
- @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
- private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
- @Header(MessageHeaderKey.TENANT) final String tenant) {
- return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
- }
-
/**
* Method to handle all incoming amqp messages.
*
@@ -124,14 +120,17 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
* incoming message
* @param type
* the message type
- * @param contentType
- * the contentType of the message
* @param tenant
* the contentType of the message
- * @param virtualHost
- * the virtual host
+ *
* @return a message if no message is send back to sender
*/
+ @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
+ public Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
+ @Header(MessageHeaderKey.TENANT) final String tenant) {
+ return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
+ }
+
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
@@ -153,6 +152,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
default:
logAndThrowMessageError(message, "No handle method was found for the given message type.");
}
+ } catch (final IllegalArgumentException ex) {
+ throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
+ } catch (final TenantNotExistException teex) {
+ throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
@@ -421,7 +424,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
}
- private void checkContentTypeJson(final Message message) {
+ private static void checkContentTypeJson(final Message message) {
final MessageProperties messageProperties = message.getMessageProperties();
if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) {
return;
diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java
index a20c59792..ace1fefa2 100644
--- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java
+++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java
@@ -50,6 +50,34 @@ public class AmqpProperties {
*/
private int maxConcurrentConsumers = 10;
+ /**
+ * Tells the broker how many messages to send to each consumer in a single
+ * request. Often this can be set quite high to improve throughput.
+ */
+ private int prefetchCount = 10;
+
+ /**
+ * Initial number of consumers. Is scaled up if necessary up to
+ * {@link #maxConcurrentConsumers}.
+ */
+ private int initialConcurrentConsumers = 3;
+
+ public int getPrefetchCount() {
+ return prefetchCount;
+ }
+
+ public void setPrefetchCount(final int prefetchCount) {
+ this.prefetchCount = prefetchCount;
+ }
+
+ public int getInitialConcurrentConsumers() {
+ return initialConcurrentConsumers;
+ }
+
+ public void setInitialConcurrentConsumers(final int initialConcurrentConsumers) {
+ this.initialConcurrentConsumers = initialConcurrentConsumers;
+ }
+
public int getMaxConcurrentConsumers() {
return maxConcurrentConsumers;
}
diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java
index a0515a871..338feea62 100644
--- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java
+++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java
@@ -59,6 +59,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -170,7 +171,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no replyTo header was set");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -184,7 +185,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no thingID was set");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -200,7 +201,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -213,21 +214,21 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
- } catch (final IllegalArgumentException e) {
+ } catch (final AmqpRejectAndDontRequeueException e) {
}
try {
messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic");
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown topic");
- } catch (final IllegalArgumentException e) {
+ } catch (final AmqpRejectAndDontRequeueException e) {
}
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name());
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted because there was no event topic");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -246,7 +247,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -263,7 +264,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
- } catch (final IllegalArgumentException exception) {
+ } catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}