diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulator.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulator.java index 2ed3980d8..6cfb19bc2 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulator.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulator.java @@ -14,6 +14,9 @@ import java.util.concurrent.ScheduledExecutorService; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; @@ -25,6 +28,7 @@ import com.vaadin.spring.annotation.EnableVaadin; */ @SpringBootApplication @EnableVaadin +@EnableScheduling public class DeviceSimulator { public DeviceSimulator() { @@ -35,7 +39,7 @@ public class DeviceSimulator { * @return an asynchronous event bus to publish and retrieve events. */ @Bean - public EventBus eventBus() { + EventBus eventBus() { return new AsyncEventBus(Executors.newFixedThreadPool(4)); } @@ -43,10 +47,15 @@ public class DeviceSimulator { * @return central ScheduledExecutorService */ @Bean - public ScheduledExecutorService threadPool() { + ScheduledExecutorService threadPool() { return Executors.newScheduledThreadPool(8); } + @Bean + TaskScheduler taskScheduler() { + return new ConcurrentTaskScheduler(threadPool()); + } + /** * Start the Spring Boot Application. * diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorRepository.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorRepository.java index 05e650323..8b70c7469 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorRepository.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorRepository.java @@ -9,10 +9,11 @@ package org.eclipse.hawkbit.simulator; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** @@ -27,8 +28,7 @@ public class DeviceSimulatorRepository { private final Map devices = new ConcurrentHashMap<>(); - @Autowired - private SimulatedDeviceFactory deviceFactory; + private final Set tenants = new HashSet<>(); /** * Adds a simulated device to the repository. @@ -39,6 +39,7 @@ public class DeviceSimulatorRepository { */ public AbstractSimulatedDevice add(final AbstractSimulatedDevice simulatedDevice) { devices.put(new DeviceKey(simulatedDevice.getTenant().toLowerCase(), simulatedDevice.getId()), simulatedDevice); + tenants.add(simulatedDevice.getTenant().toLowerCase()); return simulatedDevice; } @@ -78,12 +79,17 @@ public class DeviceSimulatorRepository { return devices.remove(new DeviceKey(tenant.toLowerCase(), id)); } + public Set getTenants() { + return tenants; + } + /** * Clears all stored devices. */ public void clear() { - devices.values().forEach(device -> device.clean()); + devices.values().forEach(AbstractSimulatedDevice::clean); devices.clear(); + tenants.clear(); } private static final class DeviceKey { diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java index ad827bc05..6dc47059c 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpConfiguration.java @@ -122,8 +122,7 @@ public class AmqpConfiguration { */ @Bean public Queue receiverConnectorQueueFromHawkBit() { - final Map arguments = getDeadLetterExchangeArgs(); - arguments.putAll(getTTLMaxArgs()); + final Map arguments = getTTLMaxArgs(); return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() .withArguments(arguments).build(); @@ -151,36 +150,6 @@ public class AmqpConfiguration { return BindingBuilder.bind(receiverConnectorQueueFromHawkBit()).to(exchangeQueueToConnector()); } - /** - * Create dead letter queue. - * - * @return the queue - */ - @Bean - public Queue deadLetterQueue() { - return QueueBuilder.nonDurable(amqpProperties.getDeadLetterQueue()).withArguments(getTTLMaxArgs()).build(); - } - - /** - * Create the dead letter fanout exchange. - * - * @return the fanout exchange - */ - @Bean - public FanoutExchange exchangeDeadLetter() { - return new FanoutExchange(amqpProperties.getDeadLetterExchange(), false, true); - } - - /** - * Create the Binding deadLetterQueue to exchangeDeadLetter. - * - * @return the binding - */ - @Bean - public Binding bindDeadLetterQueueToLwm2mExchange() { - return BindingBuilder.bind(deadLetterQueue()).to(exchangeDeadLetter()); - } - /** * Returns the Listener factory. * @@ -198,12 +167,6 @@ public class AmqpConfiguration { return containerFactory; } - private Map getDeadLetterExchangeArgs() { - final Map args = Maps.newHashMapWithExpectedSize(1); - args.put("x-dead-letter-exchange", amqpProperties.getDeadLetterExchange()); - return args; - } - private static Map getTTLMaxArgs() { final Map args = Maps.newHashMapWithExpectedSize(2); args.put("x-message-ttl", Duration.ofDays(1).toMillis()); diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpProperties.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpProperties.java index 3730a3350..9bafd4ff0 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpProperties.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/AmqpProperties.java @@ -30,6 +30,11 @@ public class AmqpProperties { */ private boolean enabled; + /** + * Set to true for the simulator run DMF health check. + */ + private boolean checkDmfHealth = false; + /** * Queue for receiving DMF messages from update server. */ @@ -40,22 +45,20 @@ public class AmqpProperties { */ private String senderForSpExchange = "simulator.replyTo"; - /** - * Simulator dead letter queue. - */ - private String deadLetterQueue = "simulator_deadletter"; - - /** - * Simulator dead letter exchange. - */ - private String deadLetterExchange = "simulator.deadletter"; - /** * Message time to live (ttl) for the deadletter queue. Default ttl is 1 * hour. */ private int deadLetterTtl = 60_000; + public boolean isCheckDmfHealth() { + return checkDmfHealth; + } + + public void setCheckDmfHealth(final boolean checkDmfHealth) { + this.checkDmfHealth = checkDmfHealth; + } + public String getReceiverConnectorQueueFromSp() { return receiverConnectorQueueFromSp; } @@ -64,22 +67,6 @@ public class AmqpProperties { this.receiverConnectorQueueFromSp = receiverConnectorQueueFromSp; } - public String getDeadLetterExchange() { - return deadLetterExchange; - } - - public void setDeadLetterExchange(final String deadLetterExchange) { - this.deadLetterExchange = deadLetterExchange; - } - - public String getDeadLetterQueue() { - return deadLetterQueue; - } - - public void setDeadLetterQueue(final String deadLetterQueue) { - this.deadLetterQueue = deadLetterQueue; - } - public String getSenderForSpExchange() { return senderForSpExchange; } diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SenderService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SenderService.java index d7f81be4e..3fd86c4a0 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SenderService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SenderService.java @@ -58,8 +58,12 @@ public abstract class SenderService extends MessageService { return; } message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); + final String correlationId = UUID.randomUUID().toString(); - message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8)); + + if (isCorrelationIdEmpty(message)) { + message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8)); + } if (LOGGER.isTraceEnabled()) { LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, address, correlationId); @@ -70,6 +74,11 @@ public abstract class SenderService extends MessageService { rabbitTemplate.send(address, null, message, new CorrelationData(correlationId)); } + private static boolean isCorrelationIdEmpty(final Message message) { + return message.getMessageProperties().getCorrelationId() == null + || message.getMessageProperties().getCorrelationId().length <= 0; + } + /** * Convert object and message properties to message. * diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpReceiverService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpReceiverService.java index b17a3a888..ad432d023 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpReceiverService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpReceiverService.java @@ -8,8 +8,11 @@ */ package org.eclipse.hawkbit.simulator.amqp; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; @@ -17,6 +20,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.MessageType; import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest; import org.eclipse.hawkbit.simulator.DeviceSimulatorRepository; import org.eclipse.hawkbit.simulator.DeviceSimulatorUpdater; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; @@ -26,6 +30,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.messaging.handler.annotation.Header; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** @@ -43,6 +48,8 @@ public class SpReceiverService extends ReceiverService { private final DeviceSimulatorRepository repository; + private final Set openPings = new ConcurrentHashSet<>(); + /** * Constructor. * @@ -82,28 +89,60 @@ public class SpReceiverService extends ReceiverService { */ @RabbitListener(queues = "${hawkbit.device.simulator.amqp.receiverConnectorQueueFromSp}", containerFactory = "listenerContainerFactory") public void recieveMessageSp(final Message message, @Header(MessageHeaderKey.TYPE) final String type, - @Header(MessageHeaderKey.THING_ID) final String thingId, + @Header(name = MessageHeaderKey.THING_ID, required = false) final String thingId, @Header(MessageHeaderKey.TENANT) final String tenant) { - checkContentTypeJson(message); - delegateMessage(message, type, thingId, tenant); - } - - private void delegateMessage(final Message message, final String type, final String thingId, final String tenant) { final MessageType messageType = MessageType.valueOf(type); if (MessageType.EVENT.equals(messageType)) { + checkContentTypeJson(message); handleEventMessage(message, thingId); return; } if (MessageType.THING_DELETED.equals(messageType)) { + checkContentTypeJson(message); repository.remove(tenant, thingId); return; } + if (MessageType.PING_RESPONSE.equals(messageType)) { + final String correlationId = new String(message.getMessageProperties().getCorrelationId(), + StandardCharsets.UTF_8); + if (!openPings.remove(correlationId)) { + LOGGER.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Got ping response from tenant {} with correlationId {} with timestamp {}", tenant, + correlationId, new String(message.getBody(), StandardCharsets.UTF_8)); + } + + return; + } + LOGGER.info("No valid message type property."); } + @Scheduled(fixedDelay = 5_000, initialDelay = 5_000) + void checkDmfHealth() { + if (!amqpProperties.isCheckDmfHealth()) { + return; + } + + if (openPings.size() > 5) { + LOGGER.error("Currently {} open pings! DMF does not seem to be reachable.", openPings.size()); + } else { + LOGGER.debug("Currently {} open pings", openPings.size()); + } + + repository.getTenants().forEach(tenant -> { + final String correlationId = UUID.randomUUID().toString(); + spSenderService.ping(tenant, correlationId); + openPings.add(correlationId); + LOGGER.debug("Ping tenant {} with correlationId {}", tenant, correlationId); + }); + } + private void handleEventMessage(final Message message, final String thingId) { final Object eventHeader = message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC); if (eventHeader == null) { diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpSenderService.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpSenderService.java index 94806cf44..41b2fcca1 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpSenderService.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/amqp/SpSenderService.java @@ -57,6 +57,17 @@ public class SpSenderService extends SenderService { this.simulationProperties = simulationProperties; } + public void ping(final String tenant, final String correlationId) { + final MessageProperties messageProperties = new MessageProperties(); + messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant); + messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.PING.toString()); + messageProperties.setCorrelationId(correlationId.getBytes()); + messageProperties.setReplyTo(amqpProperties.getSenderForSpExchange()); + messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); + + sendMessage(spExchange, new Message(null, messageProperties)); + } + /** * Finish the update process. This will send a action status to SP. * @@ -96,7 +107,8 @@ public class SpSenderService extends SenderService { * the ID of the action for the error message */ public void sendErrorMessage(final String tenant, final List updateResultMessages, final Long actionId) { - final Message message = createActionStatusMessage(tenant, DmfActionStatus.ERROR, updateResultMessages, actionId); + final Message message = createActionStatusMessage(tenant, DmfActionStatus.ERROR, updateResultMessages, + actionId); sendMessage(spExchange, message); } @@ -240,7 +252,8 @@ public class SpSenderService extends SenderService { final List updateResultMessages) { final MessageProperties messageProperties = new MessageProperties(); final Map headers = messageProperties.getHeaders(); - final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(cacheValue.getActionId(), actionStatus); + final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(cacheValue.getActionId(), + actionStatus); headers.put(MessageHeaderKey.TYPE, MessageType.EVENT.name()); headers.put(MessageHeaderKey.TENANT, cacheValue.getTenant()); headers.put(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name()); diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 4eb95a26f..66b12f9cb 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -264,8 +264,8 @@ public class AmqpConfiguration { */ @Bean @ConditionalOnMissingBean - public AmqpSenderService amqpSenderServiceBean() { - return new DefaultAmqpSenderService(rabbitTemplate()); + public AmqpMessageSenderService amqpSenderServiceBean() { + return new DefaultAmqpMessageSenderService(rabbitTemplate()); } @Bean @@ -325,7 +325,7 @@ public class AmqpConfiguration { @Bean @ConditionalOnMissingBean(AmqpMessageDispatcherService.class) public AmqpMessageDispatcherService amqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, - final AmqpSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, + final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement, final TargetManagement targetManagement) { return new AmqpMessageDispatcherService(rabbitTemplate, amqpSenderService, artifactUrlHandler, diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 71dcfcd2c..609fa9698 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -39,6 +39,7 @@ import org.eclipse.hawkbit.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.cloud.bus.ServiceMatcher; @@ -47,7 +48,7 @@ import org.springframework.context.event.EventListener; /** * {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and - * delegate the messages to a {@link AmqpSenderService}. + * delegate the messages to a {@link AmqpMessageSenderService}. * * Additionally the dispatcher listener/subscribe for some target events e.g. * assignment. @@ -58,7 +59,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class); private final ArtifactUrlHandler artifactUrlHandler; - private final AmqpSenderService amqpSenderService; + private final AmqpMessageSenderService amqpSenderService; private final SystemSecurityContext systemSecurityContext; private final SystemManagement systemManagement; private final TargetManagement targetManagement; @@ -83,10 +84,10 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { * to check in cluster case if the message is from the same * cluster node */ - public AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, final AmqpSenderService amqpSenderService, - final ArtifactUrlHandler artifactUrlHandler, final SystemSecurityContext systemSecurityContext, - final SystemManagement systemManagement, final TargetManagement targetManagement, - final ServiceMatcher serviceMatcher) { + public AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, + final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, + final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement, + final TargetManagement targetManagement, final ServiceMatcher serviceMatcher) { super(rabbitTemplate); this.artifactUrlHandler = artifactUrlHandler; this.amqpSenderService = amqpSenderService; @@ -143,6 +144,16 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { amqpSenderService.sendMessage(message, targetAdress); } + void sendPingReponseToDmfReceiver(final Message ping, final String tenant) { + final Message message = MessageBuilder.withBody(String.valueOf(System.currentTimeMillis()).getBytes()) + .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) + .setCorrelationId(ping.getMessageProperties().getCorrelationId()) + .setHeader(MessageHeaderKey.TYPE, MessageType.PING_RESPONSE).setHeader(MessageHeaderKey.TENANT, tenant) + .build(); + + amqpSenderService.sendMessage(message, ping.getMessageProperties().getReplyTo()); + } + /** * Method to send a message to a RabbitMQ Exchange after the assignment of * the Distribution set to a Target has been canceled. diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 6a5e45d16..84f76214f 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -115,20 +115,25 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * @return the rpc message back to supplier. */ public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { - checkContentTypeJson(message); + final SecurityContext oldContext = SecurityContextHolder.getContext(); try { final MessageType messageType = MessageType.valueOf(type); switch (messageType) { case THING_CREATED: + checkContentTypeJson(message); setTenantSecurityContext(tenant); registerTarget(message, virtualHost); break; case EVENT: + checkContentTypeJson(message); setTenantSecurityContext(tenant); - final String topicValue = getStringHeaderKey(message, MessageHeaderKey.TOPIC, "EventTopic is null"); - final EventTopic eventTopic = EventTopic.valueOf(topicValue); - handleIncomingEvent(message, eventTopic); + handleIncomingEvent(message); + break; + case PING: + if (isCorrelationIdNotEmpty(message)) { + amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant); + } break; default: logAndThrowMessageError(message, "No handle method was found for the given message type."); @@ -206,8 +211,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * @param topic * the topic of the event. */ - private void handleIncomingEvent(final Message message, final EventTopic topic) { - switch (topic) { + private void handleIncomingEvent(final Message message) { + switch (EventTopic.valueOf(getStringHeaderKey(message, MessageHeaderKey.TOPIC, "EventTopic is null"))) { case UPDATE_ACTION_STATUS: updateActionStatus(message); break; diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java similarity index 57% rename from hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java rename to hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java index cb0b024b6..2a1d01794 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java @@ -12,13 +12,14 @@ import java.net.URI; import javax.validation.constraints.NotNull; +import org.eclipse.hawkbit.util.IpUtil; import org.springframework.amqp.core.Message; /** * Interface to send a amqp message. */ @FunctionalInterface -public interface AmqpSenderService { +public interface AmqpMessageSenderService { /** * Send the given message to the given uri. The uri contains the (virtual) @@ -29,18 +30,24 @@ public interface AmqpSenderService { * @param replyTo * the reply to uri */ - void sendMessage(@NotNull Message message, @NotNull URI replyTo); + default void sendMessage(@NotNull final Message message, @NotNull final URI replyTo) { + if (!IpUtil.isAmqpUri(replyTo)) { + return; + } - /** - * Extract the exchange from the uri. Default implementation removes the - * first /. - * - * @param amqpUri - * the amqp uri - * @return the exchange. - */ - default String extractExchange(final URI amqpUri) { - return amqpUri.getPath().substring(1); + sendMessage(message, replyTo.getPath().substring(1)); } + /** + * Send the given message to the given host and exchange. + * + * @param message + * the amqp message + * @param exchange + * to send to + * @param virtualHost + * to send to + */ + void sendMessage(@NotNull final Message message, @NotNull final String exchange); + } diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java similarity index 60% rename from hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java rename to hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java index 1eb6d31f2..7824b6706 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java @@ -8,11 +8,9 @@ */ package org.eclipse.hawkbit.amqp; -import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.UUID; -import org.eclipse.hawkbit.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; @@ -24,30 +22,27 @@ import org.springframework.amqp.rabbit.support.CorrelationData; * message to the configured spring rabbitmq connections. The exchange is * extracted from the uri. */ -public class DefaultAmqpSenderService implements AmqpSenderService { - private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpSenderService.class); - - private final RabbitTemplate internalAmqpTemplate; +public class DefaultAmqpMessageSenderService extends BaseAmqpService implements AmqpMessageSenderService { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpMessageSenderService.class); /** * Constructor. * - * @param internalAmqpTemplate - * the amqp template + * @param rabbitTemplate + * the AMQP template */ - public DefaultAmqpSenderService(final RabbitTemplate internalAmqpTemplate) { - this.internalAmqpTemplate = internalAmqpTemplate; + public DefaultAmqpMessageSenderService(final RabbitTemplate rabbitTemplate) { + super(rabbitTemplate); } @Override - public void sendMessage(final Message message, final URI replyTo) { - if (!IpUtil.isAmqpUri(replyTo)) { - return; - } + public void sendMessage(final Message message, final String exchange) { final String correlationId = UUID.randomUUID().toString(); - final String exchange = extractExchange(replyTo); - message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8)); + + if (isCorrelationIdEmpty(message)) { + message.getMessageProperties().setCorrelationId(correlationId.getBytes(StandardCharsets.UTF_8)); + } if (LOGGER.isTraceEnabled()) { LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId); @@ -55,7 +50,12 @@ public class DefaultAmqpSenderService implements AmqpSenderService { LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId); } - internalAmqpTemplate.send(exchange, null, message, new CorrelationData(correlationId)); + getRabbitTemplate().send(exchange, null, message, new CorrelationData(correlationId)); + } + + protected static boolean isCorrelationIdEmpty(final Message message) { + return message.getMessageProperties().getCorrelationId() == null + || message.getMessageProperties().getCorrelationId().length <= 0; } } diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java index 82cf3eadc..dcc110dcb 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java @@ -81,7 +81,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest { private RabbitTemplate rabbitTemplate; - private DefaultAmqpSenderService senderService; + private DefaultAmqpMessageSenderService senderService; private Target testTarget; @@ -94,7 +94,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest { this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter()); - senderService = Mockito.mock(DefaultAmqpSenderService.class); + senderService = Mockito.mock(DefaultAmqpMessageSenderService.class); final ArtifactUrlHandler artifactUrlHandlerMock = Mockito.mock(ArtifactUrlHandler.class); when(artifactUrlHandlerMock.getUrls(anyObject(), anyObject())) diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java index 9a655ff69..7bb7e4515 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java @@ -59,6 +59,17 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra @Autowired private AmqpProperties amqpProperties; + @Test + @Description("Tests DMF PING request and expected reponse.") + public void pingDmfInterface() { + final Message pingMessage = createPingMessage(CORRELATION_ID, TENANT_EXIST); + getDmfClient().send(pingMessage); + + assertPingReplyMessage(CORRELATION_ID); + + Mockito.verifyZeroInteractions(getDeadletterListener()); + } + @Test @Description("Tests register target") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 2), @@ -450,7 +461,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra final String controllerId = TARGET_PREFIX + "receiveDownLoadAndInstallMessageAfterAssignment"; // setup - controllerManagement.findOrRegisterTargetIfItDoesNotexist(controllerId, TEST_URI); + createAndSendTarget(controllerId, TENANT_EXIST); final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString()); assignDistributionSet(distributionSet.getId(), controllerId); @@ -476,7 +487,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AmqpServiceIntegra final String controllerId = TARGET_PREFIX + "receiveCancelUpdateMessageAfterAssignmentWasCanceled"; // Setup - controllerManagement.findOrRegisterTargetIfItDoesNotexist(controllerId, TEST_URI); + createAndSendTarget(controllerId, TENANT_EXIST); final DistributionSet distributionSet = testdataFactory.createDistributionSet(UUID.randomUUID().toString()); final DistributionSetAssignmentResult distributionSetAssignmentResult = assignDistributionSet( distributionSet.getId(), controllerId); diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpServiceIntegrationTest.java index 96c6207d2..39dac2dab 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpServiceIntegrationTest.java @@ -10,6 +10,7 @@ package org.eclipse.hawkbit.integration; import static org.assertj.core.api.Assertions.assertThat; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -138,6 +139,22 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.THING_DELETED.toString()); } + protected void assertPingReplyMessage(final String correlationId) { + + verifyReplyToListener(); + final Message replyMessage = replyToListener.getPingResponseMessages().get(correlationId); + + final Map headers = replyMessage.getMessageProperties().getHeaders(); + + assertThat(headers.get(MessageHeaderKey.TENANT)).isEqualTo(TENANT_EXIST); + assertThat(correlationId) + .isEqualTo(new String(replyMessage.getMessageProperties().getCorrelationId(), StandardCharsets.UTF_8)); + assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.PING_RESPONSE.toString()); + assertThat(Long.valueOf(new String(replyMessage.getBody(), StandardCharsets.UTF_8))) + .isLessThanOrEqualTo(System.currentTimeMillis()); + + } + protected void assertDownloadAndInstallMessage(final Set dsModules, final String controllerId) { final Message replyMessage = assertReplyMessageHeader(EventTopic.DOWNLOAD_AND_INSTALL, controllerId); assertAllTargetsCount(1); @@ -158,6 +175,12 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration getDmfClient().send(message); } + protected Message createAndSendPingMessage(final String correlationId, final String tenant) { + final Message message = createPingMessage(correlationId, tenant); + getDmfClient().send(message); + return message; + } + protected void verifyReplyToListener() { createConditionFactory().until(() -> { Mockito.verify(replyToListener, Mockito.atLeast(1)).handleMessage(Mockito.any()); @@ -244,6 +267,15 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration return createMessage(null, messageProperties); } + protected Message createPingMessage(final String correlationId, final String tenant) { + final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant); + messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.PING.toString()); + messageProperties.setCorrelationId(correlationId.getBytes()); + messageProperties.setReplyTo(DmfTestConfiguration.REPLY_TO_EXCHANGE); + + return createMessage(null, messageProperties); + } + protected MessageProperties createMessagePropertiesWithTenant(final String tenant) { final MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant); diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/listener/ReplyToListener.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/listener/ReplyToListener.java index 1034520d0..3367912c7 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/listener/ReplyToListener.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/listener/ReplyToListener.java @@ -10,6 +10,8 @@ package org.eclipse.hawkbit.integration.listener; import static org.junit.Assert.fail; +import java.nio.charset.StandardCharsets; +import java.util.EnumMap; import java.util.HashMap; import java.util.Map; @@ -25,8 +27,9 @@ public class ReplyToListener implements TestRabbitListener { public static final String LISTENER_ID = "replyto"; public static final String REPLY_TO_QUEUE = "reply_queue"; - private final Map eventTopicMessages = new HashMap<>(); + private final Map eventTopicMessages = new EnumMap<>(EventTopic.class); private final Map deleteMessages = new HashMap<>(); + private final Map pingResponseMessages = new HashMap<>(); @Override @RabbitListener(id = LISTENER_ID, queues = REPLY_TO_QUEUE) @@ -49,6 +52,13 @@ public class ReplyToListener implements TestRabbitListener { return; } + if (messageType == MessageType.PING_RESPONSE) { + final String correlationId = new String(message.getMessageProperties().getCorrelationId(), + StandardCharsets.UTF_8); + pingResponseMessages.put(correlationId, message); + return; + } + // if message type is not EVENT or THING_DELETED something unexpected // happened fail("Unexpected message type"); @@ -63,4 +73,8 @@ public class ReplyToListener implements TestRabbitListener { return deleteMessages; } + public Map getPingResponseMessages() { + return pingResponseMessages; + } + } diff --git a/hawkbit-dmf/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java b/hawkbit-dmf/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java index 982acf787..6c60f8c1b 100644 --- a/hawkbit-dmf/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java +++ b/hawkbit-dmf/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java @@ -15,7 +15,7 @@ package org.eclipse.hawkbit.dmf.amqp.api; public enum MessageType { /** - * The event type. + * The event type related to interaction with a thing. */ EVENT, @@ -29,4 +29,14 @@ public enum MessageType { */ THING_DELETED, + /** + * DMF receiver health check type. + */ + PING, + + /** + * DMF receiver health check reponse type. + */ + PING_RESPONSE; + } diff --git a/hawkbit-dmf/hawkbit-dmf-rabbitmq-test/src/main/java/org/eclipse/hawkbit/rabbitmq/test/AbstractAmqpIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-rabbitmq-test/src/main/java/org/eclipse/hawkbit/rabbitmq/test/AbstractAmqpIntegrationTest.java index 1f0a8e767..8f8dd5a5a 100644 --- a/hawkbit-dmf/hawkbit-dmf-rabbitmq-test/src/main/java/org/eclipse/hawkbit/rabbitmq/test/AbstractAmqpIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-rabbitmq-test/src/main/java/org/eclipse/hawkbit/rabbitmq/test/AbstractAmqpIntegrationTest.java @@ -8,12 +8,10 @@ */ package org.eclipse.hawkbit.rabbitmq.test; -import java.net.URI; import java.util.concurrent.TimeUnit; import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration; import org.eclipse.hawkbit.repository.test.util.AbstractIntegrationTest; -import org.eclipse.hawkbit.util.IpUtil; import org.junit.Before; import org.junit.Rule; import org.springframework.amqp.core.Message; @@ -37,8 +35,6 @@ import com.jayway.awaitility.core.ConditionFactory; @DirtiesContext(classMode = ClassMode.AFTER_CLASS) public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTest { - protected static final URI TEST_URI = IpUtil.createAmqpUri("testHost", "testExcange"); - @Rule @Autowired public BrokerRunning brokerRunning;