Extend and improve DMF SDK (#2096)

Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
Avgustin Marinov
2024-11-22 11:40:10 +02:00
committed by GitHub
parent 40c76e49de
commit 49d27b1cd1
5 changed files with 21 additions and 16 deletions

View File

@@ -116,4 +116,9 @@ public class DmfController {
public void removeAttribute(final String key) { public void removeAttribute(final String key) {
this.attributes.remove(key); this.attributes.remove(key);
} }
public void thingCreated() {
log.debug(LOG_PREFIX + "Thing created.", getTenantId(), getControllerId());
dmfSender.createOrUpdateThing(getTenantId(), getControllerId());
}
} }

View File

@@ -9,6 +9,7 @@
*/ */
package org.eclipse.hawkbit.sdk.dmf; package org.eclipse.hawkbit.sdk.dmf;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -22,8 +23,7 @@ import org.eclipse.hawkbit.sdk.dmf.amqp.VHost;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
/** /**
* An in-memory simulated DMF Tenant to hold the controller twins in * An in-memory simulated DMF Tenant to hold the controller twins in memory and be able to retrieve them again.
* memory and be able to retrieve them again.
*/ */
public class DmfTenant { public class DmfTenant {
@@ -31,7 +31,6 @@ public class DmfTenant {
private final Tenant tenant; private final Tenant tenant;
private final Map<String, DmfController> controllers = new ConcurrentHashMap<>(); private final Map<String, DmfController> controllers = new ConcurrentHashMap<>();
private final Amqp amqp;
private final VHost vHost; private final VHost vHost;
public DmfTenant(final Tenant tenant, final Amqp amqp) { public DmfTenant(final Tenant tenant, final Amqp amqp) {
@@ -40,7 +39,6 @@ public class DmfTenant {
public DmfTenant(final Tenant tenant, final Amqp amqp, final boolean initVHost) { public DmfTenant(final Tenant tenant, final Amqp amqp, final boolean initVHost) {
this.tenant = tenant; this.tenant = tenant;
this.amqp = amqp;
this.vHost = amqp.getVhost(tenant.getDmf(), initVHost); this.vHost = amqp.getVhost(tenant.getDmf(), initVHost);
this.vHost.register(this); this.vHost.register(this);
} }
@@ -68,6 +66,10 @@ public class DmfTenant {
return Optional.ofNullable(controllers.get(controllerId)); return Optional.ofNullable(controllers.get(controllerId));
} }
public Map<String, DmfController> controllers() {
return Collections.unmodifiableMap(controllers);
}
public void ping(final String correlationId, final BiConsumer<String, Message> listener) { public void ping(final String correlationId, final BiConsumer<String, Message> listener) {
vHost.ping(tenant.getTenantId(), correlationId, listener); vHost.ping(tenant.getTenantId(), correlationId, listener);
} }

View File

@@ -40,6 +40,7 @@ import org.springframework.util.ObjectUtils;
public class DmfSender { public class DmfSender {
protected final RabbitTemplate rabbitTemplate; protected final RabbitTemplate rabbitTemplate;
private static final byte[] EMPTY_BODY = new byte[0]; private static final byte[] EMPTY_BODY = new byte[0];
private final AmqpProperties amqpProperties; private final AmqpProperties amqpProperties;
private final ConcurrentHashMap<String, BiConsumer<String, Message>> pingListeners = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, BiConsumer<String, Message>> pingListeners = new ConcurrentHashMap<>();

View File

@@ -49,6 +49,7 @@ import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
public class VHost extends DmfSender implements MessageListener { public class VHost extends DmfSender implements MessageListener {
private static final String REGEX_EXTRACT_ACTION_ID = "[^0-9]"; private static final String REGEX_EXTRACT_ACTION_ID = "[^0-9]";
private final SimpleMessageListenerContainer container; private final SimpleMessageListenerContainer container;
private final ConcurrentHashMap<String, DmfTenant> dmfTenants = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, DmfTenant> dmfTenants = new ConcurrentHashMap<>();
private final Set<Long> openActions = Collections.synchronizedSet(new HashSet<>()); private final Set<Long> openActions = Collections.synchronizedSet(new HashSet<>());
@@ -269,13 +270,11 @@ public class VHost extends DmfSender implements MessageListener {
} }
/** /**
* Convert a message body to a given class and set the message header * Convert a message body to a given class and set the message header AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME for Jackson converter.
* AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME for Jackson converter.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <T> T convertMessage(final Message message, final Class<T> clazz) { private <T> T convertMessage(final Message message, final Class<T> clazz) {
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName());
clazz.getTypeName());
return (T) rabbitTemplate.getMessageConverter().fromMessage(message); return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
} }
} }

View File

@@ -15,19 +15,17 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.sdk.dmf.DmfTenant; import org.eclipse.hawkbit.sdk.dmf.DmfTenant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Message;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
/** /**
* Handle all incoming Messages from hawkBit update server. * Handle all incoming Messages from hawkBit update server.
*/ */
@Slf4j
public class HealthService { public class HealthService {
private static final Logger LOGGER = LoggerFactory.getLogger(HealthService.class);
private final Collection<DmfTenant> dmfTenants; private final Collection<DmfTenant> dmfTenants;
private final Set<String> openPings = Collections.synchronizedSet(new HashSet<>()); private final Set<String> openPings = Collections.synchronizedSet(new HashSet<>());
@@ -39,22 +37,22 @@ public class HealthService {
@Scheduled(fixedDelay = 5_000, initialDelay = 5_000) @Scheduled(fixedDelay = 5_000, initialDelay = 5_000)
void checkDmfHealth() { void checkDmfHealth() {
if (openPings.size() > 5) { if (openPings.size() > 5) {
LOGGER.error("Currently {} open pings! DMF does not seem to be reachable.", openPings.size()); log.error("Currently {} open pings! DMF does not seem to be reachable.", openPings.size());
} else { } else {
LOGGER.debug("Currently {} open pings", openPings.size()); log.debug("Currently {} open pings", openPings.size());
} }
dmfTenants.forEach(tenant -> { dmfTenants.forEach(tenant -> {
final String correlationId = UUID.randomUUID().toString(); final String correlationId = UUID.randomUUID().toString();
openPings.add(correlationId); openPings.add(correlationId);
LOGGER.debug("Ping tenant {} with correlationId {}", tenant, correlationId); log.debug("Ping tenant {} with correlationId {}", tenant, correlationId);
tenant.ping(correlationId, this::pingReceived); tenant.ping(correlationId, this::pingReceived);
}); });
} }
void pingReceived(final String correlationId, final Message message) { void pingReceived(final String correlationId, final Message message) {
if (!openPings.remove(correlationId)) { if (!openPings.remove(correlationId)) {
LOGGER.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId); log.error("Unknown PING_RESPONSE received for correlationId: {}.", correlationId);
} }
} }
} }