extends mgmt simulator. Extended scalabaility of dmf listener.

Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-06-15 10:45:34 +02:00
parent 239e9ff514
commit ecd5647438
15 changed files with 315 additions and 121 deletions

View File

@@ -12,6 +12,8 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
@@ -22,42 +24,53 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
/**
* The spring AMQP configuration to use a AMQP for communication with SP update
* server.
*
*
*
*/
@Configuration
@EnableConfigurationProperties(AmqpProperties.class)
public class AmqpConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class);
@Autowired
protected AmqpProperties amqpProperties;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* Create jackson message converter bean.
*
* @return the jackson message converter
* @return {@link RabbitTemplate} with automatic retry, published confirms
* and {@link Jackson2JsonMessageConverter}.
*/
@Bean
public MessageConverter jsonMessageConverter() {
final Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
return jackson2JsonMessageConverter;
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
final RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
rabbitTemplate.setRetryTemplate(retryTemplate);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.debug("Message with correlation ID {} confirmed by broker.", correlationData.getId());
} else {
LOGGER.error("Broker is unable to handle message with correlation ID {} : {}", correlationData.getId(),
cause);
}
});
return rabbitTemplate;
}
/**
@@ -138,8 +151,8 @@ public class AmqpConfiguration {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConcurrentConsumers(20);
containerFactory.setMaxConcurrentConsumers(20);
containerFactory.setConcurrentConsumers(3);
containerFactory.setMaxConcurrentConsumers(10);
containerFactory.setPrefetchCount(20);
return containerFactory;
}

View File

@@ -8,9 +8,14 @@
*/
package org.eclipse.hawkbit.simulator.amqp;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,6 +27,8 @@ import org.springframework.beans.factory.annotation.Autowired;
*/
public abstract class SenderService extends MessageService {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderService.class);
/**
* Constructor for sender service.
*
@@ -40,18 +47,25 @@ public abstract class SenderService extends MessageService {
/**
* Send a message if the message is not null.
*
* @param adress
* @param address
* the exchange name
* @param message
* the amqp message which will be send if its not null
*/
public void sendMessage(final String adress, final Message message) {
public void sendMessage(final String address, final Message message) {
if (message == null) {
return;
}
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
rabbitTemplate.setExchange(adress);
rabbitTemplate.send(message);
final String correlationId = UUID.randomUUID().toString();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, address, correlationId);
} else {
LOGGER.debug("Sending message to exchange {} with correlationId {}", address, correlationId);
}
rabbitTemplate.send(address, null, message, new CorrelationData(correlationId));
}
/**

View File

@@ -35,14 +35,21 @@ import com.google.common.collect.Lists;
public class SpReceiverService extends ReceiverService {
private static final Logger LOGGER = LoggerFactory.getLogger(ReceiverService.class);
public static final String SOFTWARE_MODULE_FIRMWARE = "firmware";
private final SpSenderService spSenderService;
private final DeviceSimulatorUpdater deviceUpdater;
/**
* Constructor.
*
* @param rabbitTemplate
* for sending messages
* @param amqpProperties
* for amqp configuration
* @param spSenderService
* to send messages
* @param deviceUpdater
* simulator service for updates
*/
@Autowired
public SpReceiverService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties,

View File

@@ -19,7 +19,9 @@
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN" />
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN" />
<Logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR" />
<Logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR" />
<Logger name="org.eclipse" level="DEBUG" />
<!-- Security Log with hints on potential attacks -->
<logger name="server-security" level="INFO" />