diff --git a/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/Tenant.java b/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/Tenant.java index 07672915d..6fd3542d1 100644 --- a/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/Tenant.java +++ b/hawkbit-sdk/hawkbit-sdk-commons/src/main/java/org/eclipse/hawkbit/sdk/Tenant.java @@ -34,5 +34,22 @@ public class Tenant { @Nullable private String gatewayToken; + // amqp settings (if DMF is used) + @Nullable + private DMF dmf; + private boolean downloadAuthenticationEnabled = true; + + @Data + @ToString + public static class DMF { + + @Nullable + private String virtualHost; + @Nullable + private String username; + @Nullable + @ToString.Exclude + private String password; + } } diff --git a/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/DdiTenant.java b/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/DdiTenant.java new file mode 100644 index 000000000..4df82b989 --- /dev/null +++ b/hawkbit-sdk/hawkbit-sdk-device/src/main/java/org/eclipse/hawkbit/sdk/device/DdiTenant.java @@ -0,0 +1,56 @@ +/** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.sdk.device; + +import lombok.Getter; +import org.eclipse.hawkbit.sdk.Controller; +import org.eclipse.hawkbit.sdk.HawkbitClient; +import org.eclipse.hawkbit.sdk.Tenant; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * An in-memory simulated DMF Tenant to hold the controller twins in + * memory and be able to retrieve them again. + */ +public class DdiTenant { + + @Getter + private final Tenant tenant; + + private final Map controllers = new ConcurrentHashMap<>(); + private final HawkbitClient hawkbitClient; + + public DdiTenant(final Tenant tenant, final HawkbitClient hawkbitClient) { + this.tenant = tenant; + this.hawkbitClient = hawkbitClient; + } + + public void destroy() { + controllers.values().forEach(DdiController::stop); + controllers.clear(); + } + + public DdiController create(final Controller controller, final UpdateHandler updateHandler) { + final DdiController ddiController = new DdiController(tenant, controller, updateHandler, hawkbitClient); + controllers.put(controller.getControllerId(), ddiController); + return ddiController; + } + + public void remove(final String controllerId) { + Optional.ofNullable(controllers.remove(controllerId)).ifPresent(DdiController::stop); + } + + public Optional getController(final String controllerId) { + return Optional.ofNullable(controllers.get(controllerId)); + } +} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DeviceManagement.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DeviceManagement.java deleted file mode 100644 index 3adf778a1..000000000 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DeviceManagement.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.hawkbit.sdk.dmf; - -import org.springframework.stereotype.Service; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * An in-memory simulated device management to hold the multi-tenants and controller twins in - * memory and be able to retrieve them again. - */ -@Service -public class DeviceManagement { - - private final Set tenants = new HashSet<>(); - - private final Map controllers = new ConcurrentHashMap<>(); - - public DmfController add(final DmfController controller) { - controllers.put(new DeviceKey(controller.getTenantId().toLowerCase(), controller.getControllerId()), controller); - tenants.add(controller.getTenantId().toLowerCase()); - return controller; - } - - public Set getTenants() { - return tenants; - } - - public Collection getControllers() { - return controllers.values(); - } - - public Optional getController(final String tenantId, final String controllerId) { - return Optional.ofNullable(controllers.get(new DeviceKey(tenantId.toLowerCase(), controllerId))); - } - - public void remove(final String tenant, final String id) { - final DmfController controller = controllers.remove(new DeviceKey(tenant.toLowerCase(), id)); - if (controller != null) { - controller.stop(); - } - } - - /** - * Clears all stored devices. - */ - public void destroy() { - controllers.values().forEach(DmfController::stop); - controllers.clear(); - tenants.clear(); - } - - private record DeviceKey(String tenant, String id) {} -} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java index f6aadc663..70d868785 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java @@ -16,7 +16,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest; import org.eclipse.hawkbit.sdk.Controller; import org.eclipse.hawkbit.sdk.Tenant; -import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSenderService; +import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSender; import java.util.Collections; import java.util.Map; @@ -36,7 +36,7 @@ public class DmfController { private final String tenantId; private final String controllerId; private final UpdateHandler updateHandler; - private final DmfSenderService dmfSenderService; + private final DmfSender dmfSender; // configuration private final boolean downloadAuthenticationEnabled; @@ -54,26 +54,26 @@ public class DmfController { * @param tenant the tenant of the device belongs to * @param controller the controller */ - public DmfController( + DmfController( final Tenant tenant, final Controller controller, final UpdateHandler updateHandler, - final DmfSenderService dmfSenderService) { + final DmfSender dmfSender) { this.tenantId = tenant.getTenantId(); downloadAuthenticationEnabled = tenant.isDownloadAuthenticationEnabled(); this.controllerId = controller.getControllerId(); this.updateHandler = updateHandler == null ? UpdateHandler.SKIP : updateHandler; - this.dmfSenderService = dmfSenderService; + this.dmfSender = dmfSender; } public void connect() { log.debug(LOG_PREFIX + "Connecting/Polling ...", getTenantId(), getControllerId()); - dmfSenderService.createOrUpdateThing(getTenantId(), getControllerId()); + dmfSender.createOrUpdateThing(getTenantId(), getControllerId()); log.debug(LOG_PREFIX + "Done. Create thing sent.", getTenantId(), getControllerId()); } public void poll() { log.debug(LOG_PREFIX + "Polling ..", getTenantId(), getControllerId()); - dmfSenderService.createOrUpdateThing(getTenantId(), getControllerId()); + dmfSender.createOrUpdateThing(getTenantId(), getControllerId()); log.debug(LOG_PREFIX + "Done. Create thing sent.", getTenantId(), getControllerId()); } @@ -87,6 +87,6 @@ public class DmfController { } public void sendFeedback(final UpdateStatus updateStatus) { - dmfSenderService.sendFeedback(tenantId, currentActionId, updateStatus); + dmfSender.sendFeedback(tenantId, currentActionId, updateStatus); } } \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfProperties.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfProperties.java deleted file mode 100644 index 29a584ba1..000000000 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfProperties.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.hawkbit.sdk.dmf; - -import lombok.Data; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -/** - * Bean which holds the necessary properties for configuring the AMQP connection. - */ -@Data -@ToString -@Component -@ConfigurationProperties(DmfProperties.CONFIGURATION_PREFIX) -public class DmfProperties { - - /** - * The prefix for this configuration. - */ - public static final String CONFIGURATION_PREFIX = "hawkbit.sdk.dmf"; - - /** - * The property string of ~.amqp.enabled - */ - public static final String CONFIGURATION_ENABLED_PROPERTY = CONFIGURATION_PREFIX + ".enabled"; - - /** - * Indicates if the AMQP interface is enabled for the device simulator. - */ - private boolean enabled; - - /** - * Set to true for the simulator run DMF health check. - */ - private boolean healthCheckEnabled; -} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfSDKConfiguration.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfSDKConfiguration.java deleted file mode 100644 index 7af9cc3b5..000000000 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfSDKConfiguration.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.hawkbit.sdk.dmf; - -import java.time.Duration; -import java.util.Map; - -import org.eclipse.hawkbit.sdk.dmf.amqp.AmqpProperties; -import org.eclipse.hawkbit.sdk.dmf.amqp.DmfReceiverService; -import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSenderService; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.FanoutExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * The spring AMQP configuration to use a AMQP for communication with SP update server. - */ -@Configuration -@EnableConfigurationProperties({DmfProperties.class, AmqpProperties.class}) -@ConditionalOnProperty(prefix = DmfProperties.CONFIGURATION_PREFIX, name = "enabled", matchIfMissing = true) -public class DmfSDKConfiguration { - - @Bean - DeviceManagement deviceManagement() { - return new DeviceManagement(); - } - - @Bean - DmfSenderService dmfSenderService( - final RabbitTemplate rabbitTemplate, - final AmqpProperties amqpProperties) { - return new DmfSenderService(rabbitTemplate, amqpProperties); - } - - @Bean - DmfReceiverService dmfReceiverService( - final RabbitTemplate rabbitTemplate, - final DmfSenderService dmfSenderService, - final DeviceManagement deviceManagement, - final AmqpProperties amqpProperties) { - return new DmfReceiverService(rabbitTemplate, dmfSenderService, deviceManagement, amqpProperties); - } - - @Bean - public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { - final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); - - // It is necessary to define rabbitTemplate as a Bean and set - // Jackson2JsonMessageConverter explicitly here in order to convert only - // OUTCOMING messages to json. In case of INCOMING messages, - // Jackson2JsonMessageConverter can not handle messages with NULL - // payload (e.g. REQUEST_ATTRIBUTES_UPDATE), so the - // SimpleMessageConverter is used instead per default. - rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); - - return rabbitTemplate; - } - - /** - * Creates the receiver queue from update server for receiving message from update server. - */ - @Bean - Queue receiverConnectorQueueFromHawkBit(final AmqpProperties amqpProperties) { - return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() - .withArguments(Map.of( - "x-message-ttl", Duration.ofDays(1).toMillis(), - "x-max-length", 100_000)) - .build(); - } - - /** - * Creates the receiver exchange for sending messages to update server. - */ - @Bean - FanoutExchange exchangeQueueToConnector(final AmqpProperties amqpProperties) { - return new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true); - } - - /** - * Create the Binding - * - * @return the binding and create the queue and exchange - */ - @Bean - Binding bindReceiverQueueToSpExchange(final AmqpProperties amqpProperties) { - return BindingBuilder.bind(receiverConnectorQueueFromHawkBit(amqpProperties)) - .to(exchangeQueueToConnector(amqpProperties)); - } - - @Configuration - @ConditionalOnProperty(prefix = DmfProperties.CONFIGURATION_PREFIX, name = "customVhost") - protected static class CachingConnectionFactoryInitializer { - - CachingConnectionFactoryInitializer( - final CachingConnectionFactory connectionFactory, final AmqpProperties amqpProperties) { - connectionFactory.setVirtualHost(amqpProperties.getCustomVhost()); - } - } - - @ConditionalOnProperty(prefix = DmfProperties.CONFIGURATION_PREFIX, name = "healthCheckEnabled") - HealthService healthService(final DeviceManagement deviceManagement, final DmfSenderService dmfSenderService) { - return new HealthService(deviceManagement, dmfSenderService); - } -} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java new file mode 100644 index 000000000..c1044f0d0 --- /dev/null +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.sdk.dmf; + +import lombok.Getter; +import org.eclipse.hawkbit.sdk.Controller; +import org.eclipse.hawkbit.sdk.Tenant; +import org.eclipse.hawkbit.sdk.dmf.amqp.Amqp; +import org.eclipse.hawkbit.sdk.dmf.amqp.VHost; +import org.springframework.amqp.core.Message; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; + +/** + * An in-memory simulated DMF Tenant to hold the controller twins in + * memory and be able to retrieve them again. + */ +public class DmfTenant { + + @Getter + private final Tenant tenant; + + private final Map controllers = new ConcurrentHashMap<>(); + private final Amqp amqp; + private final VHost vHost; + + public DmfTenant(final Tenant tenant, final Amqp amqp) { + this.tenant = tenant; + this.amqp = amqp; + this.vHost = amqp.getVhost(tenant.getDmf()); + // register as tenant in order received events to access the tenant's devices + vHost.register(this); + } + + public void destroy() { + controllers.values().forEach(DmfController::stop); + controllers.clear(); + } + + public DmfController create(final Controller controller, final UpdateHandler updateHandler) { + final DmfController dmfController = new DmfController(tenant, controller, updateHandler, vHost); + controllers.put(controller.getControllerId(), dmfController); + return dmfController; + } + + public void remove(final String controllerId) { + Optional.ofNullable(controllers.remove(controllerId)).ifPresent(DmfController::stop); + } + + public Optional getController(final String controllerId) { + return Optional.ofNullable(controllers.get(controllerId)); + } + + public void ping(final String correlationId, final BiConsumer listener) { + vHost.ping(tenant.getTenantId(), correlationId, listener); + } +} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateInfo.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateInfo.java deleted file mode 100644 index bf81d43a1..000000000 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/UpdateInfo.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.hawkbit.sdk.dmf; - -import lombok.Data; -import lombok.ToString; - -import java.io.Serial; -import java.io.Serializable; -import java.time.LocalDateTime; - -/** - * Object for holding attributes for a simulated update for the device simulator. - */ -@Data -@ToString -public class UpdateInfo implements Serializable { - - @Serial - private static final long serialVersionUID = 1L; - - private final String tenant; - private final String thingId; - private final Long actionId; - - private final transient LocalDateTime startCacheTime; - - /** - * @param tenant the tenant for this thing and for this simulated update - * @param thingId the thing id that this simulated update correlates to - * @param actionId the id of the action related to this simulated update - */ - public UpdateInfo(final String tenant, final String thingId, final Long actionId) { - this.tenant = tenant; - this.thingId = thingId; - this.actionId = actionId; - this.startCacheTime = LocalDateTime.now(); - } -} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java new file mode 100644 index 000000000..300264965 --- /dev/null +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java @@ -0,0 +1,55 @@ +/** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made + * available under the terms of the Eclipse Public License 2.0 + * which is available at https://www.eclipse.org/legal/epl-2.0/ + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.hawkbit.sdk.dmf.amqp; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.hawkbit.sdk.Tenant; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.boot.autoconfigure.amqp.RabbitProperties; +import org.springframework.util.ObjectUtils; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Abstract class for connecting to AMQP host. + */ +@Slf4j +public class Amqp { + + private final RabbitProperties rabbitProperties; + private final AmqpProperties amqpProperties; + private final ConcurrentHashMap vHosts = new ConcurrentHashMap<>(); + + public Amqp(final RabbitProperties rabbitProperties, final AmqpProperties amqpProperties) { + this.rabbitProperties = rabbitProperties; + this.amqpProperties = amqpProperties; + } + + public void stop() { + vHosts.values().forEach(VHost::stop); + } + + public VHost getVhost(final Tenant.DMF dmf) { + final String vHost = ObjectUtils.isEmpty(dmf.getVirtualHost()) ? + rabbitProperties.getVirtualHost() : dmf.getVirtualHost(); + return vHosts.computeIfAbsent(vHost, vh -> { + final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost()); + connectionFactory.setUsername( + ObjectUtils.isEmpty(dmf.getUsername()) ? + rabbitProperties.getUsername() : dmf.getUsername()); + connectionFactory.setPassword( + ObjectUtils.isEmpty(dmf.getPassword()) ? + rabbitProperties.getPassword() : dmf.getPassword()); + connectionFactory.setVirtualHost(vHost); + return new VHost(connectionFactory, amqpProperties); + }); + } +} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/AmqpProperties.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/AmqpProperties.java index 73f01c777..902550092 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/AmqpProperties.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/AmqpProperties.java @@ -19,7 +19,6 @@ import org.springframework.stereotype.Component; */ @Data @ToString -@Component @ConfigurationProperties(AmqpProperties.CONFIGURATION_PREFIX) public class AmqpProperties { @@ -42,6 +41,4 @@ public class AmqpProperties { * Message time to live (ttl) for the deadletter queue. Default ttl is 1 hour. */ private int deadLetterTtl = 60_000; - - private String customVhost; } \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSenderService.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSender.java similarity index 76% rename from hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSenderService.java rename to hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSender.java index 6aa7296a2..4e4ff3bf0 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSenderService.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfSender.java @@ -9,7 +9,6 @@ */ package org.eclipse.hawkbit.sdk.dmf.amqp; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -17,7 +16,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import lombok.extern.slf4j.Slf4j; -import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; import org.eclipse.hawkbit.dmf.amqp.api.MessageType; @@ -25,7 +23,6 @@ import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus; import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus; import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate; import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode; -import org.eclipse.hawkbit.sdk.dmf.UpdateInfo; import org.eclipse.hawkbit.sdk.dmf.UpdateStatus; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; @@ -34,20 +31,23 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.util.ObjectUtils; +import static org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings.DMF_EXCHANGE; + /** * Sender service to send messages to update server. */ @Slf4j -public class DmfSenderService extends MessageService { +public class DmfSender { private static final byte[] EMPTY_BODY = new byte[0]; - private final String spExchange; + protected final RabbitTemplate rabbitTemplate; + private final AmqpProperties amqpProperties; private final ConcurrentHashMap> pingListeners = new ConcurrentHashMap<>(); - public DmfSenderService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) { - super(rabbitTemplate, amqpProperties); - spExchange = AmqpSettings.DMF_EXCHANGE; + public DmfSender(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) { + this.rabbitTemplate = rabbitTemplate; + this.amqpProperties = amqpProperties; } public void createOrUpdateThing(final String tenant, final String controllerId) { @@ -59,17 +59,13 @@ public class DmfSenderService extends MessageService { messagePropertiesForSP.setContentType(MessageProperties.CONTENT_TYPE_JSON); messagePropertiesForSP.setReplyTo(amqpProperties.getSenderForSpExchange()); - sendMessage(spExchange, new Message(EMPTY_BODY, messagePropertiesForSP)); + sendMessage(DMF_EXCHANGE, new Message(EMPTY_BODY, messagePropertiesForSP)); } - /** - * Finish the update process. This will send a action status to SP. - * - * @param update the simulated update object - * @param updateResultMessages a description according the update process - */ - public void finishUpdateProcess(final UpdateInfo update, final List updateResultMessages) { - sendMessage(spExchange, createActionStatusMessage(update, updateResultMessages, DmfActionStatus.FINISHED)); + public void finishUpdateProcess(final String tenantId, final long actionId, final List updateResultMessages) { + sendMessage( + DMF_EXCHANGE, + createActionStatusMessage(tenantId, actionId, DmfActionStatus.FINISHED, updateResultMessages)); } /** @@ -86,9 +82,9 @@ public class DmfSenderService extends MessageService { } message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); - final String correlationId = UUID.randomUUID().toString(); - - if (ObjectUtils.isEmpty(message.getMessageProperties().getCorrelationId())) { + String correlationId = message.getMessageProperties().getCorrelationId(); + if (ObjectUtils.isEmpty(correlationId)) { + correlationId = UUID.randomUUID().toString(); message.getMessageProperties().setCorrelationId(correlationId); } @@ -101,23 +97,13 @@ public class DmfSenderService extends MessageService { rabbitTemplate.send(address, null, message, new CorrelationData(correlationId)); } - public Message convertMessage(final Object object, final MessageProperties messageProperties) { - return rabbitTemplate.getMessageConverter().toMessage(object, messageProperties); - } - public void sendFeedback( final String tenant, final Long actionId, final UpdateStatus updateStatus) { final Message message = createActionStatusMessage(tenant, actionId, updateStatus.status(), updateStatus.messages()); - sendMessage(spExchange, message); + sendMessage(DMF_EXCHANGE, message); } - public void updateAttributes( - final String tenant, final String controllerId, - final DmfUpdateMode mode, - final String key, final String value) { - updateAttributes(tenant, controllerId, mode, Collections.singletonMap(key, value)); - } public void updateAttributes( final String tenant, final String controllerId, final DmfUpdateMode mode, @@ -134,7 +120,7 @@ public class DmfSenderService extends MessageService { attributeUpdate.setMode(mode); attributeUpdate.getAttributes().putAll(attributes); - sendMessage(spExchange, convertMessage(attributeUpdate, messagePropertiesForSP)); + sendMessage(DMF_EXCHANGE, convertMessage(attributeUpdate, messagePropertiesForSP)); } public void ping(final String tenant, final String correlationId, final BiConsumer listener) { @@ -149,7 +135,7 @@ public class DmfSenderService extends MessageService { pingListeners.put(correlationId, listener); } - sendMessage(spExchange, new Message(EMPTY_BODY, messageProperties)); + sendMessage(DMF_EXCHANGE, new Message(EMPTY_BODY, messageProperties)); } void pingResponse(final String controllerId, final Message message) { @@ -159,9 +145,8 @@ public class DmfSenderService extends MessageService { } } - private Message createActionStatusMessage(final UpdateInfo update, - final List updateResultMessages, final DmfActionStatus status) { - return createActionStatusMessage(update.getTenant(), update.getActionId(), status, updateResultMessages); + private Message convertMessage(final Object object, final MessageProperties messageProperties) { + return rabbitTemplate.getMessageConverter().toMessage(object, messageProperties); } private Message createActionStatusMessage(final String tenant, final Long actionId, diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/MessageService.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/MessageService.java deleted file mode 100644 index 2d20462ac..000000000 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/MessageService.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (c) 2024 Contributors to the Eclipse Foundation - * - * This program and the accompanying materials are made - * available under the terms of the Eclipse Public License 2.0 - * which is available at https://www.eclipse.org/legal/epl-2.0/ - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.hawkbit.sdk.dmf.amqp; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; - -/** - * Abstract class for sender and receiver service. - */ -@Slf4j -class MessageService { - - protected final RabbitTemplate rabbitTemplate; - protected final AmqpProperties amqpProperties; - - MessageService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) { - this.rabbitTemplate = rabbitTemplate; - this.amqpProperties = amqpProperties; - } - - /** - * Convert a message body to a given class and set the message header - * AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME for Jackson converter. - */ - @SuppressWarnings("unchecked") - T convertMessage(final Message message, final Class clazz) { - message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, - clazz.getTypeName()); - return (T) rabbitTemplate.getMessageConverter().fromMessage(message); - } -} \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfReceiverService.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java similarity index 51% rename from hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfReceiverService.java rename to hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java index b6a216c61..92c6d243c 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/DmfReceiverService.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java @@ -9,12 +9,8 @@ */ package org.eclipse.hawkbit.sdk.dmf.amqp; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - +import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.units.qual.C; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; import org.eclipse.hawkbit.dmf.amqp.api.MessageType; @@ -22,46 +18,89 @@ import org.eclipse.hawkbit.dmf.json.model.DmfActionRequest; import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest; import org.eclipse.hawkbit.dmf.json.model.DmfMultiActionRequest; import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode; -import org.eclipse.hawkbit.sdk.dmf.DeviceManagement; -import org.eclipse.hawkbit.sdk.dmf.DmfProperties; -import org.eclipse.hawkbit.sdk.dmf.UpdateInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.eclipse.hawkbit.sdk.dmf.DmfTenant; import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.lang.Nullable; import org.springframework.messaging.handler.annotation.Header; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + /** - * Handle all incoming Messages from hawkBit update server. - * + * Abstract class for sender and receiver service. */ -public class DmfReceiverService extends MessageService { +@Slf4j +public class VHost extends DmfSender implements MessageListener { - private static final Logger LOGGER = LoggerFactory.getLogger(DmfReceiverService.class); - - private static final String REGEX_EXTRACT_ACTION_ID = "[^0-9]"; - - private final DmfSenderService dmfSenderService; - private final DeviceManagement deviceManagement; + private final SimpleMessageListenerContainer container; + private final ConcurrentHashMap dmfTenants = new ConcurrentHashMap<>(); private final Set openActions = Collections.synchronizedSet(new HashSet<>()); - public DmfReceiverService(final RabbitTemplate rabbitTemplate, final DmfSenderService dmfSenderService, - final DeviceManagement deviceManagement, final AmqpProperties amqpProperties) { - super(rabbitTemplate, amqpProperties); - this.dmfSenderService = dmfSenderService; - this.deviceManagement = deviceManagement; + VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties) { + super(new RabbitTemplate(connectionFactory), amqpProperties); + + // It is necessary to define rabbitTemplate as a Bean and set + // Jackson2JsonMessageConverter explicitly here in order to convert only + // OUTCOMING messages to json. In case of INCOMING messages, + // Jackson2JsonMessageConverter can not handle messages with NULL + // payload (e.g. REQUEST_ATTRIBUTES_UPDATE), so the + // SimpleMessageConverter is used instead per default. + rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + + final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final Queue queue = QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() + .withArguments(Map.of( + "x-message-ttl", Duration.ofDays(1).toMillis(), + "x-max-length", 100_000)) + .build(); + rabbitAdmin.declareQueue(queue); + final FanoutExchange exchange = new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true); + rabbitAdmin.declareExchange(exchange); + rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange)); + + container = new SimpleMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setQueueNames(amqpProperties.getReceiverConnectorQueueFromSp()); + container.setMessageListener(this); + container.start(); } - @RabbitListener(queues = "${" + DmfProperties.CONFIGURATION_PREFIX + ".receiverConnectorQueueFromSp}") - public void receiveMessageSp( - @Header(MessageHeaderKey.TENANT) final String tenant, - @Header(name = MessageHeaderKey.THING_ID, required = false) final String controllerId, - @Header(MessageHeaderKey.TYPE) final String type, - final Message message) { + void stop() { + container.stop(); + rabbitTemplate.destroy(); + } + + public void register(final DmfTenant dmfTenant) { + dmfTenants.put(dmfTenant.getTenant().getTenantId(), dmfTenant); + } + + @Override + public void onMessage(final Message message) { + final String tenantId = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.TENANT); + final String controllerId = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID); + final String type = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE); + switch (MessageType.valueOf(type)) { case EVENT: { checkContentTypeJson(message); @@ -70,20 +109,20 @@ public class DmfReceiverService extends MessageService { } case THING_DELETED: { checkContentTypeJson(message); - deviceManagement.remove(tenant, controllerId); + Optional.ofNullable(dmfTenants.get(tenantId)).ifPresent(dmfTenant -> dmfTenant.remove(controllerId)); break; } case PING_RESPONSE: { final String correlationId = message.getMessageProperties().getCorrelationId(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Got ping response from tenant {} with correlationId {} with timestamp {}", tenant, + if (log.isDebugEnabled()) { + log.debug("Got ping response from tenantId {} with correlationId {} with timestamp {}", tenantId, correlationId, new String(message.getBody(), StandardCharsets.UTF_8)); } - dmfSenderService.pingResponse(controllerId, message); + pingResponse(controllerId, message); break; } default: { - LOGGER.info("No valid message type property."); + log.info("No valid message type property."); } } } @@ -91,7 +130,7 @@ public class DmfReceiverService extends MessageService { private void handleEventMessage(final Message message, final String thingId) { final Object eventHeader = message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC); if (eventHeader == null) { - LOGGER.error("Error \"Event Topic is not set\" reported by message {}", message.getMessageProperties().getMessageId()); + log.error("Error \"Event Topic is not set\" reported by message {}", message.getMessageProperties().getMessageId()); throw new IllegalArgumentException("Event Topic is not set"); } @@ -99,32 +138,33 @@ public class DmfReceiverService extends MessageService { @SuppressWarnings({ "squid:S2259" }) final EventTopic eventTopic = EventTopic.valueOf(eventHeader.toString()); switch (eventTopic) { - case CONFIRM: - handleConfirmation(message, thingId); - break; - case DOWNLOAD_AND_INSTALL: - case DOWNLOAD: - handleUpdateProcess(message, thingId, eventTopic); - break; - case CANCEL_DOWNLOAD: - handleCancelDownloadAction(message, thingId); - break; - case REQUEST_ATTRIBUTES_UPDATE: - handleAttributeUpdateRequest(message, thingId); - break; - case MULTI_ACTION: - handleMultiActionRequest(message, thingId); - break; - default: - LOGGER.info("No valid event property: {}", eventTopic); - break; + case CONFIRM: + handleConfirmation(message, thingId); + break; + case DOWNLOAD_AND_INSTALL: + case DOWNLOAD: + handleUpdateProcess(message, thingId, eventTopic); + break; + case CANCEL_DOWNLOAD: + handleCancelDownloadAction(message, thingId); + break; + case REQUEST_ATTRIBUTES_UPDATE: + handleAttributeUpdateRequest(message, thingId); + break; + case MULTI_ACTION: + handleMultiActionRequest(message, thingId); + break; + default: + log.info("No valid event property: {}", eventTopic); + break; } } private void handleConfirmation(final Message message, final String controllerId) { - LOGGER.warn("Handle confirmed received for {}! Skip it!", controllerId); + log.warn("Handle confirmed received for {}! Skip it!", controllerId); } + private static final String REGEX_EXTRACT_ACTION_ID = "[^0-9]"; private long extractActionIdFrom(final Message message) { final String messageAsString = message.toString(); final String requiredMessageContent = messageAsString @@ -160,15 +200,17 @@ public class DmfReceiverService extends MessageService { break; default: openActions.remove(actionId); - LOGGER.info("No valid event property in MULTI_ACTION."); + log.info("No valid event property in MULTI_ACTION."); break; } } private void handleAttributeUpdateRequest(final Message message, final String controllerId) { final String tenantId = getTenant(message); - deviceManagement.getController(tenantId, controllerId).ifPresent(controller -> - dmfSenderService.updateAttributes(tenantId, controllerId, DmfUpdateMode.MERGE, controller.getAttributes())); + Optional.ofNullable(dmfTenants.get(tenantId)) + .flatMap(dmfTenant -> dmfTenant.getController(controllerId)) + .ifPresent(controller -> + updateAttributes(tenantId, controllerId, DmfUpdateMode.MERGE, controller.getAttributes())); } private static String getTenant(final Message message) { @@ -185,8 +227,7 @@ public class DmfReceiverService extends MessageService { } private void processCancelDownloadAction(final String thingId, final String tenant, final Long actionId) { - final UpdateInfo update = new UpdateInfo(tenant, thingId, actionId); - dmfSenderService.finishUpdateProcess(update, Collections.singletonList("Simulation canceled")); + finishUpdateProcess(thingId, actionId, Collections.singletonList("Simulation canceled")); openActions.remove(actionId); } @@ -197,9 +238,21 @@ public class DmfReceiverService extends MessageService { processUpdate(tenant, controllerId, actionType, downloadAndUpdateRequest); } - private void processUpdate(final String tenant, final String thingId, final EventTopic actionType, final DmfDownloadAndUpdateRequest updateRequest) { - deviceManagement.getController(tenant, thingId).ifPresent(controller -> - controller.processUpdate(actionType, updateRequest)); + private void processUpdate(final String tenantId, final String controllerId, final EventTopic actionType, final DmfDownloadAndUpdateRequest updateRequest) { + Optional.ofNullable(dmfTenants.get(tenantId)) + .flatMap(dmfTenant -> dmfTenant.getController(controllerId)) + .ifPresent(controller -> controller.processUpdate(actionType, updateRequest)); + } + + /** + * Convert a message body to a given class and set the message header + * AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME for Jackson converter. + */ + @SuppressWarnings("unchecked") + private T convertMessage(final Message message, final Class clazz) { + message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, + clazz.getTypeName()); + return (T) rabbitTemplate.getMessageConverter().fromMessage(message); } /** diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/HealthService.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/health/HealthService.java similarity index 74% rename from hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/HealthService.java rename to hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/health/HealthService.java index 9fca84b0a..110451976 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/HealthService.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/health/HealthService.java @@ -7,14 +7,15 @@ * * SPDX-License-Identifier: EPL-2.0 */ -package org.eclipse.hawkbit.sdk.dmf; +package org.eclipse.hawkbit.sdk.dmf.health; -import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSenderService; +import org.eclipse.hawkbit.sdk.dmf.DmfTenant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.scheduling.annotation.Scheduled; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -23,18 +24,16 @@ import java.util.UUID; /** * Handle all incoming Messages from hawkBit update server. */ -class HealthService { +public class HealthService { private static final Logger LOGGER = LoggerFactory.getLogger(HealthService.class); - private final DeviceManagement deviceManagement; - private final DmfSenderService dmfSenderService; + private final Collection dmfTenants; private final Set openPings = Collections.synchronizedSet(new HashSet<>()); - HealthService(final DeviceManagement deviceManagement, final DmfSenderService dmfSenderService) { - this.deviceManagement = deviceManagement; - this.dmfSenderService = dmfSenderService; + HealthService(final Collection dmfTenants) { + this.dmfTenants = dmfTenants; } @Scheduled(fixedDelay = 5_000, initialDelay = 5_000) @@ -45,11 +44,11 @@ class HealthService { LOGGER.debug("Currently {} open pings", openPings.size()); } - deviceManagement.getTenants().forEach(tenant -> { + dmfTenants.forEach(tenant -> { final String correlationId = UUID.randomUUID().toString(); openPings.add(correlationId); LOGGER.debug("Ping tenant {} with correlationId {}", tenant, correlationId); - dmfSenderService.ping(tenant, correlationId, this::pingReceived); + tenant.ping(correlationId, this::pingReceived); }); }