diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java index 70d868785..c610b83de 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfController.java @@ -14,11 +14,12 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest; +import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode; import org.eclipse.hawkbit.sdk.Controller; import org.eclipse.hawkbit.sdk.Tenant; import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSender; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; /** @@ -42,8 +43,9 @@ public class DmfController { private final boolean downloadAuthenticationEnabled; @Getter @Setter - private volatile Map attributes = Collections.emptyMap(); + private volatile Map attributes = new HashMap<>(); + @Setter private volatile Long currentActionId; private volatile Long lastActionId; @@ -83,10 +85,25 @@ public class DmfController { } public void processUpdate(final EventTopic actionType, final DmfDownloadAndUpdateRequest updateRequest) { - updateHandler.getUpdateProcessor(this, actionType, updateRequest); + log.info(LOG_PREFIX + "Processing update for action {} .", getTenantId(), controllerId, updateRequest.getActionId()); + updateHandler.getUpdateProcessor(this, actionType, updateRequest).run(); } public void sendFeedback(final UpdateStatus updateStatus) { + log.info(LOG_PREFIX + "Sending UPDATE_ACTION_STATUS for action : {}", getTenantId(), controllerId, currentActionId); dmfSender.sendFeedback(tenantId, currentActionId, updateStatus); } + + public void sendUpdateAttributes() { + log.info(LOG_PREFIX + "Sending UPDATE_ATTRIBUTES", getTenantId(), controllerId); + dmfSender.updateAttributes(tenantId, controllerId, DmfUpdateMode.MERGE, attributes); + } + + public void setAttribute(final String key, final String value) { + this.attributes.put(key, value); + } + + public void removeAttribute(final String key) { + this.attributes.remove(key); + } } \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java index c1044f0d0..3368ba788 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/DmfTenant.java @@ -35,11 +35,14 @@ public class DmfTenant { private final VHost vHost; public DmfTenant(final Tenant tenant, final Amqp amqp) { + this(tenant, amqp, true); + } + + public DmfTenant(final Tenant tenant, final Amqp amqp, final boolean initVHost) { this.tenant = tenant; this.amqp = amqp; - this.vHost = amqp.getVhost(tenant.getDmf()); - // register as tenant in order received events to access the tenant's devices - vHost.register(this); + this.vHost = amqp.getVhost(tenant.getDmf(), initVHost); + this.vHost.register(this); } public void destroy() { diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java index 82bc1e48f..60fd163e9 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/Amqp.java @@ -10,13 +10,14 @@ package org.eclipse.hawkbit.sdk.dmf.amqp; import lombok.extern.slf4j.Slf4j; -import org.eclipse.hawkbit.sdk.Tenant; import org.eclipse.hawkbit.sdk.Tenant.DMF; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.util.ObjectUtils; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.concurrent.ConcurrentHashMap; /** @@ -38,20 +39,31 @@ public class Amqp { vHosts.values().forEach(VHost::stop); } - public VHost getVhost(final DMF dmf) { + public VHost getVhost(final DMF dmf, final boolean initVHost) { final String vHost = dmf == null || ObjectUtils.isEmpty(dmf.getVirtualHost()) ? (rabbitProperties.getVirtualHost() == null ? "/" :rabbitProperties.getVirtualHost()) : dmf.getVirtualHost(); - return vHosts.computeIfAbsent(vHost, vh -> { - final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost()); - connectionFactory.setUsername( - dmf == null || ObjectUtils.isEmpty(dmf.getUsername()) ? - rabbitProperties.getUsername() : dmf.getUsername()); - connectionFactory.setPassword( - dmf == null || ObjectUtils.isEmpty(dmf.getPassword()) ? - rabbitProperties.getPassword() : dmf.getPassword()); - connectionFactory.setVirtualHost(vHost); - return new VHost(connectionFactory, amqpProperties); - }); + return vHosts.computeIfAbsent(vHost, vh -> new VHost(getConnectionFactory(dmf, vHost), amqpProperties, initVHost)); + } + + private ConnectionFactory getConnectionFactory(final DMF dmf, final String vHost) { + final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); + connectionFactory.setHost(rabbitProperties.getHost()); + connectionFactory.setPort(rabbitProperties.determinePort()); + if (rabbitProperties.getSsl().determineEnabled()) { + try { + connectionFactory.getRabbitConnectionFactory().useSslProtocol(); + } catch (NoSuchAlgorithmException | KeyManagementException e) { + throw new RuntimeException(e); + } + } + connectionFactory.setUsername( + dmf == null || ObjectUtils.isEmpty(dmf.getUsername()) ? + rabbitProperties.getUsername() : dmf.getUsername()); + connectionFactory.setPassword( + dmf == null || ObjectUtils.isEmpty(dmf.getPassword()) ? + rabbitProperties.getPassword() : dmf.getPassword()); + connectionFactory.setVirtualHost(vHost); + return connectionFactory; } } \ No newline at end of file diff --git a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java index 92c6d243c..8be269338 100644 --- a/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java +++ b/hawkbit-sdk/hawkbit-sdk-dmf/src/main/java/org/eclipse/hawkbit/sdk/dmf/amqp/VHost.java @@ -10,7 +10,6 @@ package org.eclipse.hawkbit.sdk.dmf.amqp; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.units.qual.C; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; import org.eclipse.hawkbit.dmf.amqp.api.MessageType; @@ -27,15 +26,12 @@ import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.lang.Nullable; -import org.springframework.messaging.handler.annotation.Header; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -57,7 +53,11 @@ public class VHost extends DmfSender implements MessageListener { private final Set openActions = Collections.synchronizedSet(new HashSet<>()); - VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties) { + public VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties) { + this(connectionFactory, amqpProperties, true); + } + + public VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties, final boolean initVHost) { super(new RabbitTemplate(connectionFactory), amqpProperties); // It is necessary to define rabbitTemplate as a Bean and set @@ -68,16 +68,18 @@ public class VHost extends DmfSender implements MessageListener { // SimpleMessageConverter is used instead per default. rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); - final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); - final Queue queue = QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() - .withArguments(Map.of( - "x-message-ttl", Duration.ofDays(1).toMillis(), - "x-max-length", 100_000)) - .build(); - rabbitAdmin.declareQueue(queue); - final FanoutExchange exchange = new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true); - rabbitAdmin.declareExchange(exchange); - rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange)); + if (initVHost) { + final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); + final Queue queue = QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() + .withArguments(Map.of( + "x-message-ttl", Duration.ofDays(1).toMillis(), + "x-max-length", 100_000)) + .build(); + rabbitAdmin.declareQueue(queue); + final FanoutExchange exchange = new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true); + rabbitAdmin.declareExchange(exchange); + rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange)); + } container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); @@ -101,6 +103,7 @@ public class VHost extends DmfSender implements MessageListener { final String controllerId = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID); final String type = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE); + log.info("Message received for target {}, value : {}", controllerId, message.toString()); switch (MessageType.valueOf(type)) { case EVENT: { checkContentTypeJson(message); @@ -141,8 +144,7 @@ public class VHost extends DmfSender implements MessageListener { case CONFIRM: handleConfirmation(message, thingId); break; - case DOWNLOAD_AND_INSTALL: - case DOWNLOAD: + case DOWNLOAD_AND_INSTALL, DOWNLOAD: handleUpdateProcess(message, thingId, eventTopic); break; case CANCEL_DOWNLOAD: @@ -205,7 +207,7 @@ public class VHost extends DmfSender implements MessageListener { } } - private void handleAttributeUpdateRequest(final Message message, final String controllerId) { + protected void handleAttributeUpdateRequest(final Message message, final String controllerId) { final String tenantId = getTenant(message); Optional.ofNullable(dmfTenants.get(tenantId)) .flatMap(dmfTenant -> dmfTenant.getController(controllerId)) @@ -216,10 +218,10 @@ public class VHost extends DmfSender implements MessageListener { private static String getTenant(final Message message) { final MessageProperties messageProperties = message.getMessageProperties(); final Map headers = messageProperties.getHeaders(); - return (String) headers.get(MessageHeaderKey.TENANT); + return ((String) headers.get(MessageHeaderKey.TENANT)).toLowerCase(); } - private void handleCancelDownloadAction(final Message message, final String thingId) { + protected void handleCancelDownloadAction(final Message message, final String thingId) { final String tenant = getTenant(message); final Long actionId = extractActionIdFrom(message); @@ -231,10 +233,11 @@ public class VHost extends DmfSender implements MessageListener { openActions.remove(actionId); } - private void handleUpdateProcess(final Message message, final String controllerId, final EventTopic actionType) { + protected void handleUpdateProcess(final Message message, final String controllerId, final EventTopic actionType) { final String tenant = getTenant(message); final DmfDownloadAndUpdateRequest downloadAndUpdateRequest = convertMessage(message, DmfDownloadAndUpdateRequest.class); + dmfTenants.get(tenant).getController(controllerId).get().setCurrentActionId(downloadAndUpdateRequest.getActionId()); processUpdate(tenant, controllerId, actionType, downloadAndUpdateRequest); }