DMF health check (#577)

* DMF support PING message for health checks.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Device simulator supports PING.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Code cleanup.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Revert accidental checkin.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Fix tests.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Simplify API.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Remove simulator dead letter.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Remove dead code. 

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Reduce code.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Add message for one more error case.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2017-09-15 12:50:27 +02:00
committed by GitHub
parent 2818c8505a
commit da13bd79d9
18 changed files with 245 additions and 133 deletions

View File

@@ -14,6 +14,9 @@ import java.util.concurrent.ScheduledExecutorService;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
@@ -25,6 +28,7 @@ import com.vaadin.spring.annotation.EnableVaadin;
*/ */
@SpringBootApplication @SpringBootApplication
@EnableVaadin @EnableVaadin
@EnableScheduling
public class DeviceSimulator { public class DeviceSimulator {
public DeviceSimulator() { public DeviceSimulator() {
@@ -35,7 +39,7 @@ public class DeviceSimulator {
* @return an asynchronous event bus to publish and retrieve events. * @return an asynchronous event bus to publish and retrieve events.
*/ */
@Bean @Bean
public EventBus eventBus() { EventBus eventBus() {
return new AsyncEventBus(Executors.newFixedThreadPool(4)); return new AsyncEventBus(Executors.newFixedThreadPool(4));
} }
@@ -43,10 +47,15 @@ public class DeviceSimulator {
* @return central ScheduledExecutorService * @return central ScheduledExecutorService
*/ */
@Bean @Bean
public ScheduledExecutorService threadPool() { ScheduledExecutorService threadPool() {
return Executors.newScheduledThreadPool(8); return Executors.newScheduledThreadPool(8);
} }
@Bean
TaskScheduler taskScheduler() {
return new ConcurrentTaskScheduler(threadPool());
}
/** /**
* Start the Spring Boot Application. * Start the Spring Boot Application.
* *

View File

@@ -9,10 +9,11 @@
package org.eclipse.hawkbit.simulator; package org.eclipse.hawkbit.simulator;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
@@ -27,8 +28,7 @@ public class DeviceSimulatorRepository {
private final Map<DeviceKey, AbstractSimulatedDevice> devices = new ConcurrentHashMap<>(); private final Map<DeviceKey, AbstractSimulatedDevice> devices = new ConcurrentHashMap<>();
@Autowired private final Set<String> tenants = new HashSet<>();
private SimulatedDeviceFactory deviceFactory;
/** /**
* Adds a simulated device to the repository. * Adds a simulated device to the repository.
@@ -39,6 +39,7 @@ public class DeviceSimulatorRepository {
*/ */
public AbstractSimulatedDevice add(final AbstractSimulatedDevice simulatedDevice) { public AbstractSimulatedDevice add(final AbstractSimulatedDevice simulatedDevice) {
devices.put(new DeviceKey(simulatedDevice.getTenant().toLowerCase(), simulatedDevice.getId()), simulatedDevice); devices.put(new DeviceKey(simulatedDevice.getTenant().toLowerCase(), simulatedDevice.getId()), simulatedDevice);
tenants.add(simulatedDevice.getTenant().toLowerCase());
return simulatedDevice; return simulatedDevice;
} }
@@ -78,12 +79,17 @@ public class DeviceSimulatorRepository {
return devices.remove(new DeviceKey(tenant.toLowerCase(), id)); return devices.remove(new DeviceKey(tenant.toLowerCase(), id));
} }
public Set<String> getTenants() {
return tenants;
}
/** /**
* Clears all stored devices. * Clears all stored devices.
*/ */
public void clear() { public void clear() {
devices.values().forEach(device -> device.clean()); devices.values().forEach(AbstractSimulatedDevice::clean);
devices.clear(); devices.clear();
tenants.clear();
} }
private static final class DeviceKey { private static final class DeviceKey {

View File

@@ -122,8 +122,7 @@ public class AmqpConfiguration {
*/ */
@Bean @Bean
public Queue receiverConnectorQueueFromHawkBit() { public Queue receiverConnectorQueueFromHawkBit() {
final Map<String, Object> arguments = getDeadLetterExchangeArgs(); final Map<String, Object> arguments = getTTLMaxArgs();
arguments.putAll(getTTLMaxArgs());
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete()
.withArguments(arguments).build(); .withArguments(arguments).build();
@@ -151,36 +150,6 @@ public class AmqpConfiguration {
return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector()); return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector());
} }
/**
* Create dead letter queue.
*
* @return the queue
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build();
}
/**
* Create the dead letter fanout exchange.
*
* @return the fanout exchange
*/
@Bean
public FanoutExchange exchangeDeadLetter() {
return new FanoutExchange(amqpProperties.getDeadLetterExchange(), false, true);
}
/**
* Create the Binding deadLetterQueue to exchangeDeadLetter.
*
* @return the binding
*/
@Bean
public Binding bindDeadLetterQueueToLwm2mExchange() {
return BindingBuilder.bind(deadLetterQueue()).to(exchangeDeadLetter());
}
/** /**
* Returns the Listener factory. * Returns the Listener factory.
* *
@@ -198,12 +167,6 @@ public class AmqpConfiguration {
return containerFactory; return containerFactory;
} }
private Map<String, Object> getDeadLetterExchangeArgs() {
final Map<String, Object> args = Maps.newHashMapWithExpectedSize(1);
args.put("x-dead-letter-exchange", amqpProperties.getDeadLetterExchange());
return args;
}
private static Map<String, Object> getTTLMaxArgs() { private static Map<String, Object> getTTLMaxArgs() {
final Map<String, Object> args = Maps.newHashMapWithExpectedSize(2); final Map<String, Object> args = Maps.newHashMapWithExpectedSize(2);
args.put("x-message-ttl", Duration.ofDays(1).toMillis()); args.put("x-message-ttl", Duration.ofDays(1).toMillis());

View File

@@ -30,6 +30,11 @@ public class AmqpProperties {
*/ */
private boolean enabled; private boolean enabled;
/**
* Set to true for the simulator run DMF health check.
*/
private boolean checkDmfHealth = false;
/** /**
* Queue for receiving DMF messages from update server. * Queue for receiving DMF messages from update server.
*/ */
@@ -40,22 +45,20 @@ public class AmqpProperties {
*/ */
private String senderForSpExchange = "simulator.replyTo"; private String senderForSpExchange = "simulator.replyTo";
/**
* Simulator dead letter queue.
*/
private String deadLetterQueue = "simulator_deadletter";
/**
* Simulator dead letter exchange.
*/
private String deadLetterExchange = "simulator.deadletter";
/** /**
* Message time to live (ttl) for the deadletter queue. Default ttl is 1 * Message time to live (ttl) for the deadletter queue. Default ttl is 1
* hour. * hour.
*/ */
private int deadLetterTtl = 60_000; private int deadLetterTtl = 60_000;
public boolean isCheckDmfHealth() {
return checkDmfHealth;
}
public void setCheckDmfHealth(final boolean checkDmfHealth) {
this.checkDmfHealth = checkDmfHealth;
}
public String getReceiverConnectorQueueFromSp() { public String getReceiverConnectorQueueFromSp() {
return receiverConnectorQueueFromSp; return receiverConnectorQueueFromSp;
} }
@@ -64,22 +67,6 @@ public class AmqpProperties {
this.receiverConnectorQueueFromSp = receiverConnectorQueueFromSp; this.receiverConnectorQueueFromSp = receiverConnectorQueueFromSp;
} }
public String getDeadLetterExchange() {
return deadLetterExchange;
}
public void setDeadLetterExchange(final String deadLetterExchange) {
this.deadLetterExchange = deadLetterExchange;
}
public String getDeadLetterQueue() {
return deadLetterQueue;
}
public void setDeadLetterQueue(final String deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
}
public String getSenderForSpExchange() { public String getSenderForSpExchange() {
return senderForSpExchange; return senderForSpExchange;
} }

View File

@@ -58,8 +58,12 @@ public abstract class SenderService extends MessageService {
return; return;
} }
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
final String correlationId = UUID.randomUUID().toString(); final String correlationId = UUID.randomUUID().toString();
message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8));
if (isCorrelationIdEmpty(message)) {
message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8));
}
if (LOGGER.isTraceEnabled()) { if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, address, correlationId); LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, address, correlationId);
@@ -70,6 +74,11 @@ public abstract class SenderService extends MessageService {
rabbitTemplate.send(address, null, message, new CorrelationData(correlationId)); rabbitTemplate.send(address, null, message, new CorrelationData(correlationId));
} }
private static boolean isCorrelationIdEmpty(final Message message) {
return message.getMessageProperties().getCorrelationId() == null
|| message.getMessageProperties().getCorrelationId().length <= 0;
}
/** /**
* Convert object and message properties to message. * Convert object and message properties to message.
* *

View File

@@ -8,8 +8,11 @@
*/ */
package org.eclipse.hawkbit.simulator.amqp; package org.eclipse.hawkbit.simulator.amqp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
@@ -17,6 +20,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest; import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest;
import org.eclipse.hawkbit.simulator.DeviceSimulatorRepository; import org.eclipse.hawkbit.simulator.DeviceSimulatorRepository;
import org.eclipse.hawkbit.simulator.DeviceSimulatorUpdater; import org.eclipse.hawkbit.simulator.DeviceSimulatorUpdater;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
@@ -26,6 +30,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
@@ -43,6 +48,8 @@ public class SpReceiverService extends ReceiverService {
private final DeviceSimulatorRepository repository; private final DeviceSimulatorRepository repository;
private final Set<String> openPings = new ConcurrentHashSet<>();
/** /**
* Constructor. * Constructor.
* *
@@ -82,28 +89,60 @@ public class SpReceiverService extends ReceiverService {
*/ */
@RabbitListener(queues = "${hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp}", containerFactory = "listenerContainerFactory") @RabbitListener(queues = "${hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp}", containerFactory = "listenerContainerFactory")
public void recieveMessageSp(final Message message, @Header(MessageHeaderKey.TYPE) final String type, public void recieveMessageSp(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
@Header(MessageHeaderKey.THING_ID) final String thingId, @Header(name = MessageHeaderKey.THING_ID, required = false) final String thingId,
@Header(MessageHeaderKey.TENANT) final String tenant) { @Header(MessageHeaderKey.TENANT) final String tenant) {
checkContentTypeJson(message);
delegateMessage(message, type, thingId, tenant);
}
private void delegateMessage(final Message message, final String type, final String thingId, final String tenant) {
final MessageType messageType = MessageType.valueOf(type); final MessageType messageType = MessageType.valueOf(type);
if (MessageType.EVENT.equals(messageType)) { if (MessageType.EVENT.equals(messageType)) {
checkContentTypeJson(message);
handleEventMessage(message, thingId); handleEventMessage(message, thingId);
return; return;
} }
if (MessageType.THING_DELETED.equals(messageType)) { if (MessageType.THING_DELETED.equals(messageType)) {
checkContentTypeJson(message);
repository.remove(tenant, thingId); repository.remove(tenant, thingId);
return; return;
} }
if (MessageType.PING_RESPONSE.equals(messageType)) {
final String correlationId = new String(message.getMessageProperties().getCorrelationId(),
StandardCharsets.UTF_8);
if (!openPings.remove(correlationId)) {
LOGGER.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Got ping response from tenant {} with correlationId {} with timestamp {}", tenant,
correlationId, new String(message.getBody(), StandardCharsets.UTF_8));
}
return;
}
LOGGER.info("No valid message type property."); LOGGER.info("No valid message type property.");
} }
@Scheduled(fixedDelay = 5_000, initialDelay = 5_000)
void checkDmfHealth() {
if (!amqpProperties.isCheckDmfHealth()) {
return;
}
if (openPings.size() > 5) {
LOGGER.error("Currently {} open pings! DMF does not seem to be reachable.", openPings.size());
} else {
LOGGER.debug("Currently {} open pings", openPings.size());
}
repository.getTenants().forEach(tenant -> {
final String correlationId = UUID.randomUUID().toString();
spSenderService.ping(tenant, correlationId);
openPings.add(correlationId);
LOGGER.debug("Ping tenant {} with correlationId {}", tenant, correlationId);
});
}
private void handleEventMessage(final Message message, final String thingId) { private void handleEventMessage(final Message message, final String thingId) {
final Object eventHeader = message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC); final Object eventHeader = message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC);
if (eventHeader == null) { if (eventHeader == null) {

View File

@@ -57,6 +57,17 @@ public class SpSenderService extends SenderService {
this.simulationProperties = simulationProperties; this.simulationProperties = simulationProperties;
} }
public void ping(final String tenant, final String correlationId) {
final MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant);
messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.PING.toString());
messageProperties.setCorrelationId(correlationId.getBytes());
messageProperties.setReplyTo(amqpProperties.getSenderForSpExchange());
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
sendMessage(spExchange, new Message(null, messageProperties));
}
/** /**
* Finish the update process. This will send a action status to SP. * Finish the update process. This will send a action status to SP.
* *
@@ -96,7 +107,8 @@ public class SpSenderService extends SenderService {
* the ID of the action for the error message * the ID of the action for the error message
*/ */
public void sendErrorMessage(final String tenant, final List<String> updateResultMessages, final Long actionId) { public void sendErrorMessage(final String tenant, final List<String> updateResultMessages, final Long actionId) {
final Message message = createActionStatusMessage(tenant, DmfActionStatus.ERROR, updateResultMessages, actionId); final Message message = createActionStatusMessage(tenant, DmfActionStatus.ERROR, updateResultMessages,
actionId);
sendMessage(spExchange, message); sendMessage(spExchange, message);
} }
@@ -240,7 +252,8 @@ public class SpSenderService extends SenderService {
final List<String> updateResultMessages) { final List<String> updateResultMessages) {
final MessageProperties messageProperties = new MessageProperties(); final MessageProperties messageProperties = new MessageProperties();
final Map<String, Object> headers = messageProperties.getHeaders(); final Map<String, Object> headers = messageProperties.getHeaders();
final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(cacheValue.getActionId(), actionStatus); final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(cacheValue.getActionId(),
actionStatus);
headers.put(MessageHeaderKey.TYPE, MessageType.EVENT.name()); headers.put(MessageHeaderKey.TYPE, MessageType.EVENT.name());
headers.put(MessageHeaderKey.TENANT, cacheValue.getTenant()); headers.put(MessageHeaderKey.TENANT, cacheValue.getTenant());
headers.put(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name()); headers.put(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name());

View File

@@ -264,8 +264,8 @@ public class AmqpConfiguration {
*/ */
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public AmqpSenderService amqpSenderServiceBean() { public AmqpMessageSenderService amqpSenderServiceBean() {
return new DefaultAmqpSenderService(rabbitTemplate()); return new DefaultAmqpMessageSenderService(rabbitTemplate());
} }
@Bean @Bean
@@ -325,7 +325,7 @@ public class AmqpConfiguration {
@Bean @Bean
@ConditionalOnMissingBean(AmqpMessageDispatcherService.class) @ConditionalOnMissingBean(AmqpMessageDispatcherService.class)
public AmqpMessageDispatcherService amqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, public AmqpMessageDispatcherService amqpMessageDispatcherService(final RabbitTemplate rabbitTemplate,
final AmqpSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler,
final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement, final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement,
final TargetManagement targetManagement) { final TargetManagement targetManagement) {
return new AmqpMessageDispatcherService(rabbitTemplate, amqpSenderService, artifactUrlHandler, return new AmqpMessageDispatcherService(rabbitTemplate, amqpSenderService, artifactUrlHandler,

View File

@@ -39,6 +39,7 @@ import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.cloud.bus.ServiceMatcher; import org.springframework.cloud.bus.ServiceMatcher;
@@ -47,7 +48,7 @@ import org.springframework.context.event.EventListener;
/** /**
* {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and * {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and
* delegate the messages to a {@link AmqpSenderService}. * delegate the messages to a {@link AmqpMessageSenderService}.
* *
* Additionally the dispatcher listener/subscribe for some target events e.g. * Additionally the dispatcher listener/subscribe for some target events e.g.
* assignment. * assignment.
@@ -58,7 +59,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class); private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);
private final ArtifactUrlHandler artifactUrlHandler; private final ArtifactUrlHandler artifactUrlHandler;
private final AmqpSenderService amqpSenderService; private final AmqpMessageSenderService amqpSenderService;
private final SystemSecurityContext systemSecurityContext; private final SystemSecurityContext systemSecurityContext;
private final SystemManagement systemManagement; private final SystemManagement systemManagement;
private final TargetManagement targetManagement; private final TargetManagement targetManagement;
@@ -83,10 +84,10 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
* to check in cluster case if the message is from the same * to check in cluster case if the message is from the same
* cluster node * cluster node
*/ */
public AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, final AmqpSenderService amqpSenderService, public AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate,
final ArtifactUrlHandler artifactUrlHandler, final SystemSecurityContext systemSecurityContext, final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler,
final SystemManagement systemManagement, final TargetManagement targetManagement, final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement,
final ServiceMatcher serviceMatcher) { final TargetManagement targetManagement, final ServiceMatcher serviceMatcher) {
super(rabbitTemplate); super(rabbitTemplate);
this.artifactUrlHandler = artifactUrlHandler; this.artifactUrlHandler = artifactUrlHandler;
this.amqpSenderService = amqpSenderService; this.amqpSenderService = amqpSenderService;
@@ -143,6 +144,16 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
amqpSenderService.sendMessage(message, targetAdress); amqpSenderService.sendMessage(message, targetAdress);
} }
void sendPingReponseToDmfReceiver(final Message ping, final String tenant) {
final Message message = MessageBuilder.withBody(String.valueOf(System.currentTimeMillis()).getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setCorrelationId(ping.getMessageProperties().getCorrelationId())
.setHeader(MessageHeaderKey.TYPE, MessageType.PING_RESPONSE).setHeader(MessageHeaderKey.TENANT, tenant)
.build();
amqpSenderService.sendMessage(message, ping.getMessageProperties().getReplyTo());
}
/** /**
* Method to send a message to a RabbitMQ Exchange after the assignment of * Method to send a message to a RabbitMQ Exchange after the assignment of
* the Distribution set to a Target has been canceled. * the Distribution set to a Target has been canceled.

View File

@@ -115,20 +115,25 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
* @return the rpc message back to supplier. * @return the rpc message back to supplier.
*/ */
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext(); final SecurityContext oldContext = SecurityContextHolder.getContext();
try { try {
final MessageType messageType = MessageType.valueOf(type); final MessageType messageType = MessageType.valueOf(type);
switch (messageType) { switch (messageType) {
case THING_CREATED: case THING_CREATED:
checkContentTypeJson(message);
setTenantSecurityContext(tenant); setTenantSecurityContext(tenant);
registerTarget(message, virtualHost); registerTarget(message, virtualHost);
break; break;
case EVENT: case EVENT:
checkContentTypeJson(message);
setTenantSecurityContext(tenant); setTenantSecurityContext(tenant);
final String topicValue = getStringHeaderKey(message, MessageHeaderKey.TOPIC, "EventTopic is null"); handleIncomingEvent(message);
final EventTopic eventTopic = EventTopic.valueOf(topicValue); break;
handleIncomingEvent(message, eventTopic); case PING:
if (isCorrelationIdNotEmpty(message)) {
amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant);
}
break; break;
default: default:
logAndThrowMessageError(message, "No handle method was found for the given message type."); logAndThrowMessageError(message, "No handle method was found for the given message type.");
@@ -206,8 +211,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
* @param topic * @param topic
* the topic of the event. * the topic of the event.
*/ */
private void handleIncomingEvent(final Message message, final EventTopic topic) { private void handleIncomingEvent(final Message message) {
switch (topic) { switch (EventTopic.valueOf(getStringHeaderKey(message, MessageHeaderKey.TOPIC, "EventTopic is null"))) {
case UPDATE_ACTION_STATUS: case UPDATE_ACTION_STATUS:
updateActionStatus(message); updateActionStatus(message);
break; break;

View File

@@ -12,13 +12,14 @@ import java.net.URI;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import org.eclipse.hawkbit.util.IpUtil;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
/** /**
* Interface to send a amqp message. * Interface to send a amqp message.
*/ */
@FunctionalInterface @FunctionalInterface
public interface AmqpSenderService { public interface AmqpMessageSenderService {
/** /**
* Send the given message to the given uri. The uri contains the (virtual) * Send the given message to the given uri. The uri contains the (virtual)
@@ -29,18 +30,24 @@ public interface AmqpSenderService {
* @param replyTo * @param replyTo
* the reply to uri * the reply to uri
*/ */
void sendMessage(@NotNull Message message, @NotNull URI replyTo); default void sendMessage(@NotNull final Message message, @NotNull final URI replyTo) {
if (!IpUtil.isAmqpUri(replyTo)) {
return;
}
/** sendMessage(message, replyTo.getPath().substring(1));
* Extract the exchange from the uri. Default implementation removes the
* first /.
*
* @param amqpUri
* the amqp uri
* @return the exchange.
*/
default String extractExchange(final URI amqpUri) {
return amqpUri.getPath().substring(1);
} }
/**
* Send the given message to the given host and exchange.
*
* @param message
* the amqp message
* @param exchange
* to send to
* @param virtualHost
* to send to
*/
void sendMessage(@NotNull final Message message, @NotNull final String exchange);
} }

View File

@@ -8,11 +8,9 @@
*/ */
package org.eclipse.hawkbit.amqp; package org.eclipse.hawkbit.amqp;
import java.net.URI;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.UUID; import java.util.UUID;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
@@ -24,30 +22,27 @@ import org.springframework.amqp.rabbit.support.CorrelationData;
* message to the configured spring rabbitmq connections. The exchange is * message to the configured spring rabbitmq connections. The exchange is
* extracted from the uri. * extracted from the uri.
*/ */
public class DefaultAmqpSenderService implements AmqpSenderService { public class DefaultAmqpMessageSenderService extends BaseAmqpService implements AmqpMessageSenderService {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class); private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpMessageSenderService.class);
private final RabbitTemplate internalAmqpTemplate;
/** /**
* Constructor. * Constructor.
* *
* @param internalAmqpTemplate * @param rabbitTemplate
* the amqp template * the AMQP template
*/ */
public DefaultAmqpSenderService(final RabbitTemplate internalAmqpTemplate) { public DefaultAmqpMessageSenderService(final RabbitTemplate rabbitTemplate) {
this.internalAmqpTemplate = internalAmqpTemplate; super(rabbitTemplate);
} }
@Override @Override
public void sendMessage(final Message message, final URI replyTo) { public void sendMessage(final Message message, final String exchange) {
if (!IpUtil.isAmqpUri(replyTo)) {
return;
}
final String correlationId = UUID.randomUUID().toString(); final String correlationId = UUID.randomUUID().toString();
final String exchange = extractExchange(replyTo);
message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8)); if (isCorrelationIdEmpty(message)) {
message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8));
}
if (LOGGER.isTraceEnabled()) { if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId); LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId);
@@ -55,7 +50,12 @@ public class DefaultAmqpSenderService implements AmqpSenderService {
LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId); LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId);
} }
internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId)); getRabbitTemplate().send(exchange, null, message, new CorrelationData(correlationId));
}
protected static boolean isCorrelationIdEmpty(final Message message) {
return message.getMessageProperties().getCorrelationId() == null
|| message.getMessageProperties().getCorrelationId().length <= 0;
} }
} }

View File

@@ -81,7 +81,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
private RabbitTemplate rabbitTemplate; private RabbitTemplate rabbitTemplate;
private DefaultAmqpSenderService senderService; private DefaultAmqpMessageSenderService senderService;
private Target testTarget; private Target testTarget;
@@ -94,7 +94,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter()); when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter());
senderService = Mockito.mock(DefaultAmqpSenderService.class); senderService = Mockito.mock(DefaultAmqpMessageSenderService.class);
final ArtifactUrlHandler artifactUrlHandlerMock = Mockito.mock(ArtifactUrlHandler.class); final ArtifactUrlHandler artifactUrlHandlerMock = Mockito.mock(ArtifactUrlHandler.class);
when(artifactUrlHandlerMock.getUrls(anyObject(), anyObject())) when(artifactUrlHandlerMock.getUrls(anyObject(), anyObject()))

View File

@@ -59,6 +59,17 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra
@Autowired @Autowired
private AmqpProperties amqpProperties; private AmqpProperties amqpProperties;
@Test
@Description("Tests DMF PING request and expected reponse.")
public void pingDmfInterface() {
final Message pingMessage = createPingMessage(CORRELATION_ID, TENANT_EXIST);
getDmfClient().send(pingMessage);
assertPingReplyMessage(CORRELATION_ID);
Mockito.verifyZeroInteractions(getDeadletterListener());
}
@Test @Test
@Description("Tests register target") @Description("Tests register target")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 2), @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 2),
@@ -450,7 +461,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra
final String controllerId = TARGET_PREFIX + "receiveDownLoadAndInstallMessageAfterAssignment"; final String controllerId = TARGET_PREFIX + "receiveDownLoadAndInstallMessageAfterAssignment";
// setup // setup
controllerManagement.findOrRegisterTargetIfItDoesNotexist(controllerId, TEST_URI); createAndSendTarget(controllerId, TENANT_EXIST);
final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString()); final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString());
assignDistributionSet(distributionSet.getId(), controllerId); assignDistributionSet(distributionSet.getId(), controllerId);
@@ -476,7 +487,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra
final String controllerId = TARGET_PREFIX + "receiveCancelUpdateMessageAfterAssignmentWasCanceled"; final String controllerId = TARGET_PREFIX + "receiveCancelUpdateMessageAfterAssignmentWasCanceled";
// Setup // Setup
controllerManagement.findOrRegisterTargetIfItDoesNotexist(controllerId, TEST_URI); createAndSendTarget(controllerId, TENANT_EXIST);
final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString()); final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString());
final DistributionSetAssignmentResult distributionSetAssignmentResult = assignDistributionSet( final DistributionSetAssignmentResult distributionSetAssignmentResult = assignDistributionSet(
distributionSet.getId(), controllerId); distributionSet.getId(), controllerId);

View File

@@ -10,6 +10,7 @@ package org.eclipse.hawkbit.integration;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import java.nio.charset.StandardCharsets;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
@@ -138,6 +139,22 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration
assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.THING_DELETED.toString()); assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.THING_DELETED.toString());
} }
protected void assertPingReplyMessage(final String correlationId) {
verifyReplyToListener();
final Message replyMessage = replyToListener.getPingResponseMessages().get(correlationId);
final Map<String, Object> headers = replyMessage.getMessageProperties().getHeaders();
assertThat(headers.get(MessageHeaderKey.TENANT)).isEqualTo(TENANT_EXIST);
assertThat(correlationId)
.isEqualTo(new String(replyMessage.getMessageProperties().getCorrelationId(), StandardCharsets.UTF_8));
assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.PING_RESPONSE.toString());
assertThat(Long.valueOf(new String(replyMessage.getBody(), StandardCharsets.UTF_8)))
.isLessThanOrEqualTo(System.currentTimeMillis());
}
protected void assertDownloadAndInstallMessage(final Set<SoftwareModule> dsModules, final String controllerId) { protected void assertDownloadAndInstallMessage(final Set<SoftwareModule> dsModules, final String controllerId) {
final Message replyMessage = assertReplyMessageHeader(EventTopic.DOWNLOAD_AND_INSTALL, controllerId); final Message replyMessage = assertReplyMessageHeader(EventTopic.DOWNLOAD_AND_INSTALL, controllerId);
assertAllTargetsCount(1); assertAllTargetsCount(1);
@@ -158,6 +175,12 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration
getDmfClient().send(message); getDmfClient().send(message);
} }
protected Message createAndSendPingMessage(final String correlationId, final String tenant) {
final Message message = createPingMessage(correlationId, tenant);
getDmfClient().send(message);
return message;
}
protected void verifyReplyToListener() { protected void verifyReplyToListener() {
createConditionFactory().until(() -> { createConditionFactory().until(() -> {
Mockito.verify(replyToListener, Mockito.atLeast(1)).handleMessage(Mockito.any()); Mockito.verify(replyToListener, Mockito.atLeast(1)).handleMessage(Mockito.any());
@@ -244,6 +267,15 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration
return createMessage(null, messageProperties); return createMessage(null, messageProperties);
} }
protected Message createPingMessage(final String correlationId, final String tenant) {
final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant);
messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.PING.toString());
messageProperties.setCorrelationId(correlationId.getBytes());
messageProperties.setReplyTo(DmfTestConfiguration.REPLY_TO_EXCHANGE);
return createMessage(null, messageProperties);
}
protected MessageProperties createMessagePropertiesWithTenant(final String tenant) { protected MessageProperties createMessagePropertiesWithTenant(final String tenant) {
final MessageProperties messageProperties = new MessageProperties(); final MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant); messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant);

View File

@@ -10,6 +10,8 @@ package org.eclipse.hawkbit.integration.listener;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.EnumMap;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@@ -25,8 +27,9 @@ public class ReplyToListener implements TestRabbitListener {
public static final String LISTENER_ID = "replyto"; public static final String LISTENER_ID = "replyto";
public static final String REPLY_TO_QUEUE = "reply_queue"; public static final String REPLY_TO_QUEUE = "reply_queue";
private final Map<EventTopic, Message> eventTopicMessages = new HashMap<>(); private final Map<EventTopic, Message> eventTopicMessages = new EnumMap<>(EventTopic.class);
private final Map<String, Message> deleteMessages = new HashMap<>(); private final Map<String, Message> deleteMessages = new HashMap<>();
private final Map<String, Message> pingResponseMessages = new HashMap<>();
@Override @Override
@RabbitListener(id = LISTENER_ID, queues = REPLY_TO_QUEUE) @RabbitListener(id = LISTENER_ID, queues = REPLY_TO_QUEUE)
@@ -49,6 +52,13 @@ public class ReplyToListener implements TestRabbitListener {
return; return;
} }
if (messageType == MessageType.PING_RESPONSE) {
final String correlationId = new String(message.getMessageProperties().getCorrelationId(),
StandardCharsets.UTF_8);
pingResponseMessages.put(correlationId, message);
return;
}
// if message type is not EVENT or THING_DELETED something unexpected // if message type is not EVENT or THING_DELETED something unexpected
// happened // happened
fail("Unexpected message type"); fail("Unexpected message type");
@@ -63,4 +73,8 @@ public class ReplyToListener implements TestRabbitListener {
return deleteMessages; return deleteMessages;
} }
public Map<String, Message> getPingResponseMessages() {
return pingResponseMessages;
}
} }

View File

@@ -15,7 +15,7 @@ package org.eclipse.hawkbit.dmf.amqp.api;
public enum MessageType { public enum MessageType {
/** /**
* The event type. * The event type related to interaction with a thing.
*/ */
EVENT, EVENT,
@@ -29,4 +29,14 @@ public enum MessageType {
*/ */
THING_DELETED, THING_DELETED,
/**
* DMF receiver health check type.
*/
PING,
/**
* DMF receiver health check reponse type.
*/
PING_RESPONSE;
} }

View File

@@ -8,12 +8,10 @@
*/ */
package org.eclipse.hawkbit.rabbitmq.test; package org.eclipse.hawkbit.rabbitmq.test;
import java.net.URI;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration; import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration;
import org.eclipse.hawkbit.repository.test.util.AbstractIntegrationTest; import org.eclipse.hawkbit.repository.test.util.AbstractIntegrationTest;
import org.eclipse.hawkbit.util.IpUtil;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
@@ -37,8 +35,6 @@ import com.jayway.awaitility.core.ConditionFactory;
@DirtiesContext(classMode = ClassMode.AFTER_CLASS) @DirtiesContext(classMode = ClassMode.AFTER_CLASS)
public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTest { public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTest {
protected static final URI TEST_URI = IpUtil.createAmqpUri("testHost", "testExcange");
@Rule @Rule
@Autowired @Autowired
public BrokerRunning brokerRunning; public BrokerRunning brokerRunning;