DMF SDK fixes and improvements (#1730)

* Extend dmf sdk to support additional message handlers

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* make changes after review

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* refactoring and bugfixing of dmf sdk

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* make get connection factory private

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* changes after review

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* make handle cancel and update attributes protected

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

* rename isEnvLocal to initVHost

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>

---------

Signed-off-by: TRS1SF3 <Stanislav.Trailov@bosch.io>
This commit is contained in:
Stanislav Trailov
2024-05-13 10:55:18 +03:00
committed by GitHub
parent 9b5c4851c5
commit d8c92cb5e1
4 changed files with 76 additions and 41 deletions

View File

@@ -14,11 +14,12 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest;
import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode;
import org.eclipse.hawkbit.sdk.Controller;
import org.eclipse.hawkbit.sdk.Tenant;
import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSender;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
@@ -42,8 +43,9 @@ public class DmfController {
private final boolean downloadAuthenticationEnabled;
@Getter @Setter
private volatile Map<String, String> attributes = Collections.emptyMap();
private volatile Map<String, String> attributes = new HashMap<>();
@Setter
private volatile Long currentActionId;
private volatile Long lastActionId;
@@ -83,10 +85,25 @@ public class DmfController {
}
public void processUpdate(final EventTopic actionType, final DmfDownloadAndUpdateRequest updateRequest) {
updateHandler.getUpdateProcessor(this, actionType, updateRequest);
log.info(LOG_PREFIX + "Processing update for action {} .", getTenantId(), controllerId, updateRequest.getActionId());
updateHandler.getUpdateProcessor(this, actionType, updateRequest).run();
}
public void sendFeedback(final UpdateStatus updateStatus) {
log.info(LOG_PREFIX + "Sending UPDATE_ACTION_STATUS for action : {}", getTenantId(), controllerId, currentActionId);
dmfSender.sendFeedback(tenantId, currentActionId, updateStatus);
}
public void sendUpdateAttributes() {
log.info(LOG_PREFIX + "Sending UPDATE_ATTRIBUTES", getTenantId(), controllerId);
dmfSender.updateAttributes(tenantId, controllerId, DmfUpdateMode.MERGE, attributes);
}
public void setAttribute(final String key, final String value) {
this.attributes.put(key, value);
}
public void removeAttribute(final String key) {
this.attributes.remove(key);
}
}

View File

@@ -35,11 +35,14 @@ public class DmfTenant {
private final VHost vHost;
public DmfTenant(final Tenant tenant, final Amqp amqp) {
this(tenant, amqp, true);
}
public DmfTenant(final Tenant tenant, final Amqp amqp, final boolean initVHost) {
this.tenant = tenant;
this.amqp = amqp;
this.vHost = amqp.getVhost(tenant.getDmf());
// register as tenant in order received events to access the tenant's devices
vHost.register(this);
this.vHost = amqp.getVhost(tenant.getDmf(), initVHost);
this.vHost.register(this);
}
public void destroy() {

View File

@@ -10,13 +10,14 @@
package org.eclipse.hawkbit.sdk.dmf.amqp;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.sdk.Tenant;
import org.eclipse.hawkbit.sdk.Tenant.DMF;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.util.ObjectUtils;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -38,20 +39,31 @@ public class Amqp {
vHosts.values().forEach(VHost::stop);
}
public VHost getVhost(final DMF dmf) {
public VHost getVhost(final DMF dmf, final boolean initVHost) {
final String vHost = dmf == null || ObjectUtils.isEmpty(dmf.getVirtualHost()) ?
(rabbitProperties.getVirtualHost() == null ? "/" :rabbitProperties.getVirtualHost()) :
dmf.getVirtualHost();
return vHosts.computeIfAbsent(vHost, vh -> {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitProperties.getHost());
connectionFactory.setUsername(
dmf == null || ObjectUtils.isEmpty(dmf.getUsername()) ?
rabbitProperties.getUsername() : dmf.getUsername());
connectionFactory.setPassword(
dmf == null || ObjectUtils.isEmpty(dmf.getPassword()) ?
rabbitProperties.getPassword() : dmf.getPassword());
connectionFactory.setVirtualHost(vHost);
return new VHost(connectionFactory, amqpProperties);
});
return vHosts.computeIfAbsent(vHost, vh -> new VHost(getConnectionFactory(dmf, vHost), amqpProperties, initVHost));
}
private ConnectionFactory getConnectionFactory(final DMF dmf, final String vHost) {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.determinePort());
if (rabbitProperties.getSsl().determineEnabled()) {
try {
connectionFactory.getRabbitConnectionFactory().useSslProtocol();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}
connectionFactory.setUsername(
dmf == null || ObjectUtils.isEmpty(dmf.getUsername()) ?
rabbitProperties.getUsername() : dmf.getUsername());
connectionFactory.setPassword(
dmf == null || ObjectUtils.isEmpty(dmf.getPassword()) ?
rabbitProperties.getPassword() : dmf.getPassword());
connectionFactory.setVirtualHost(vHost);
return connectionFactory;
}
}

View File

@@ -10,7 +10,6 @@
package org.eclipse.hawkbit.sdk.dmf.amqp;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.C;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
@@ -27,15 +26,12 @@ import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.annotation.Header;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
@@ -57,7 +53,11 @@ public class VHost extends DmfSender implements MessageListener {
private final Set<Long> openActions = Collections.synchronizedSet(new HashSet<>());
VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties) {
public VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties) {
this(connectionFactory, amqpProperties, true);
}
public VHost(final ConnectionFactory connectionFactory, final AmqpProperties amqpProperties, final boolean initVHost) {
super(new RabbitTemplate(connectionFactory), amqpProperties);
// It is necessary to define rabbitTemplate as a Bean and set
@@ -68,16 +68,18 @@ public class VHost extends DmfSender implements MessageListener {
// SimpleMessageConverter is used instead per default.
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
final Queue queue = QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete()
.withArguments(Map.of(
"x-message-ttl", Duration.ofDays(1).toMillis(),
"x-max-length", 100_000))
.build();
rabbitAdmin.declareQueue(queue);
final FanoutExchange exchange = new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
if (initVHost) {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
final Queue queue = QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete()
.withArguments(Map.of(
"x-message-ttl", Duration.ofDays(1).toMillis(),
"x-max-length", 100_000))
.build();
rabbitAdmin.declareQueue(queue);
final FanoutExchange exchange = new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true);
rabbitAdmin.declareExchange(exchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange));
}
container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
@@ -101,6 +103,7 @@ public class VHost extends DmfSender implements MessageListener {
final String controllerId = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID);
final String type = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE);
log.info("Message received for target {}, value : {}", controllerId, message.toString());
switch (MessageType.valueOf(type)) {
case EVENT: {
checkContentTypeJson(message);
@@ -141,8 +144,7 @@ public class VHost extends DmfSender implements MessageListener {
case CONFIRM:
handleConfirmation(message, thingId);
break;
case DOWNLOAD_AND_INSTALL:
case DOWNLOAD:
case DOWNLOAD_AND_INSTALL, DOWNLOAD:
handleUpdateProcess(message, thingId, eventTopic);
break;
case CANCEL_DOWNLOAD:
@@ -205,7 +207,7 @@ public class VHost extends DmfSender implements MessageListener {
}
}
private void handleAttributeUpdateRequest(final Message message, final String controllerId) {
protected void handleAttributeUpdateRequest(final Message message, final String controllerId) {
final String tenantId = getTenant(message);
Optional.ofNullable(dmfTenants.get(tenantId))
.flatMap(dmfTenant -> dmfTenant.getController(controllerId))
@@ -216,10 +218,10 @@ public class VHost extends DmfSender implements MessageListener {
private static String getTenant(final Message message) {
final MessageProperties messageProperties = message.getMessageProperties();
final Map<String, Object> headers = messageProperties.getHeaders();
return (String) headers.get(MessageHeaderKey.TENANT);
return ((String) headers.get(MessageHeaderKey.TENANT)).toLowerCase();
}
private void handleCancelDownloadAction(final Message message, final String thingId) {
protected void handleCancelDownloadAction(final Message message, final String thingId) {
final String tenant = getTenant(message);
final Long actionId = extractActionIdFrom(message);
@@ -231,10 +233,11 @@ public class VHost extends DmfSender implements MessageListener {
openActions.remove(actionId);
}
private void handleUpdateProcess(final Message message, final String controllerId, final EventTopic actionType) {
protected void handleUpdateProcess(final Message message, final String controllerId, final EventTopic actionType) {
final String tenant = getTenant(message);
final DmfDownloadAndUpdateRequest downloadAndUpdateRequest = convertMessage(message,
DmfDownloadAndUpdateRequest.class);
dmfTenants.get(tenant).getController(controllerId).get().setCurrentActionId(downloadAndUpdateRequest.getActionId());
processUpdate(tenant, controllerId, actionType, downloadAndUpdateRequest);
}