Configurable prefetch and consumer pool.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
kaizimmerm
2016-06-19 20:43:12 +02:00
parent 58b883d773
commit cc1dc09516
5 changed files with 65 additions and 27 deletions

View File

@@ -49,6 +49,11 @@
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-web</artifactId>
@@ -154,6 +159,11 @@
<artifactId>spring-context-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
</dependencies>
</project>

View File

@@ -32,7 +32,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;;
@@ -59,9 +58,6 @@ public class AmqpConfiguration {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Autowired
private TaskExecutor taskExecutor;
@Configuration
protected static class RabbitConnectionFactoryCreator {
@@ -201,8 +197,8 @@ public class AmqpConfiguration {
}
/**
* Create the Binding {@link AmqpConfiguration#receiverQueueFromSp()} to
* {@link AmqpConfiguration#senderConnectorToSpExchange()}.
* Create the Binding {@link AmqpConfiguration#receiverQueue()} to
* {@link AmqpConfiguration#senderExchange()}.
*
* @return the binding and create the queue and exchange
*/
@@ -244,9 +240,9 @@ public class AmqpConfiguration {
containerFactory.setDefaultRequeueRejected(true);
containerFactory.setConnectionFactory(rabbitConnectionFactory);
containerFactory.setMissingQueuesFatal(amqpProperties.isMissingQueuesFatal());
containerFactory.setTaskExecutor(taskExecutor);
containerFactory.setConcurrentConsumers(3);
containerFactory.setConcurrentConsumers(amqpProperties.getInitialConcurrentConsumers());
containerFactory.setMaxConcurrentConsumers(amqpProperties.getMaxConcurrentConsumers());
containerFactory.setPrefetchCount(amqpProperties.getPrefetchCount());
return containerFactory;
}

View File

@@ -37,6 +37,7 @@ import org.eclipse.hawkbit.repository.ControllerManagement;
import org.eclipse.hawkbit.repository.EntityFactory;
import org.eclipse.hawkbit.repository.eventbus.event.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Action.Status;
import org.eclipse.hawkbit.repository.model.ActionStatus;
@@ -47,6 +48,7 @@ import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -111,12 +113,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
super(defaultTemplate);
}
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
@Header(MessageHeaderKey.TENANT) final String tenant) {
return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
}
/**
* Method to handle all incoming amqp messages.
*
@@ -124,14 +120,17 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
* incoming message
* @param type
* the message type
* @param contentType
* the contentType of the message
* @param tenant
* the contentType of the message
* @param virtualHost
* the virtual host
*
* @return a message if <null> no message is send back to sender
*/
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
public Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
@Header(MessageHeaderKey.TENANT) final String tenant) {
return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
}
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
@@ -153,6 +152,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
default:
logAndThrowMessageError(message, "No handle method was found for the given message type.");
}
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
@@ -421,7 +424,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
}
private void checkContentTypeJson(final Message message) {
private static void checkContentTypeJson(final Message message) {
final MessageProperties messageProperties = message.getMessageProperties();
if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) {
return;

View File

@@ -50,6 +50,34 @@ public class AmqpProperties {
*/
private int maxConcurrentConsumers = 10;
/**
* Tells the broker how many messages to send to each consumer in a single
* request. Often this can be set quite high to improve throughput.
*/
private int prefetchCount = 10;
/**
* Initial number of consumers. Is scaled up if necessary up to
* {@link #maxConcurrentConsumers}.
*/
private int initialConcurrentConsumers = 3;
public int getPrefetchCount() {
return prefetchCount;
}
public void setPrefetchCount(final int prefetchCount) {
this.prefetchCount = prefetchCount;
}
public int getInitialConcurrentConsumers() {
return initialConcurrentConsumers;
}
public void setInitialConcurrentConsumers(final int initialConcurrentConsumers) {
this.initialConcurrentConsumers = initialConcurrentConsumers;
}
public int getMaxConcurrentConsumers() {
return maxConcurrentConsumers;
}

View File

@@ -59,6 +59,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -170,7 +171,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no replyTo header was set");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -184,7 +185,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no thingID was set");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -200,7 +201,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -213,21 +214,21 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown message type");
} catch (final IllegalArgumentException e) {
} catch (final AmqpRejectAndDontRequeueException e) {
}
try {
messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic");
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted due to unknown topic");
} catch (final IllegalArgumentException e) {
} catch (final AmqpRejectAndDontRequeueException e) {
}
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name());
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted because there was no event topic");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
@@ -246,7 +247,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}
}
@@ -263,7 +264,7 @@ public class AmqpMessageHandlerServiceTest {
try {
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
fail("IllegalArgumentException was excepeted since no action id was set");
} catch (final IllegalArgumentException exception) {
} catch (final AmqpRejectAndDontRequeueException exception) {
// test ok - exception was excepted
}