DMF API supports target attributes update (#402)

* DMF attributes

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Simluator sends attributes

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Added sonar exception

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Fix typos.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>

* Generics for captor.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2017-01-18 12:20:53 +01:00
committed by GitHub
parent 63ab80ab7b
commit 8288904b1e
11 changed files with 267 additions and 30 deletions

View File

@@ -57,6 +57,38 @@ All messages have to be sent to the exchange **dmf.exchange**.
| type=THING\_CREATED <br /> tenant=tenant123 <br /> thingId=abc <br /> sender=Lwm2m | content\_type=application/json <br /> reply_to (optional) =sp.connector.replyTo
### Message to update target attributes
| Message Header | Description | Type | Mandatory
|-----------------------------|----------------------------------|-------------------------------------|----------------
| type | Type of the message | Fixed string "EVENT" | true
| topic | Topic to handle events different | Fixed string "UPDATE_ATTRIBUTES" | true
| thingId | The ID of the registered thing | String | true
| tenant | The tenant this thing belongs to | String | false
| Message Properties | Description | Type | Mandatory
|-----------------------------|----------------------------------|-------------------------------------|----------------
| content_type | The content type of the payload | String | true
**Example Header and Payload**
| Headers | MessageProperties |
|---------------------------------------|---------------------------------|
| type=EVENT <br /> tenant=tenant123 <br /> thingId=abc <br /> topic=UPDATE\_ATTRIBUTES | content\_type=application/json <br />
Payload Template
```json
{
"attributes": {
"exampleKey1" : "exampleValue1",
"exampleKey2" : "exampleValue2"
}
}
```
### Message to send an action status event to _hawkBit_
The Java representation is ActionUpdateStatus:

View File

@@ -8,7 +8,10 @@
*/
package org.eclipse.hawkbit.simulator;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -38,6 +41,14 @@ public class DeviceSimulator {
return new AsyncEventBus(Executors.newFixedThreadPool(4));
}
/**
* @return central ScheduledExecutorService
*/
@Bean
public ScheduledExecutorService threadPool() {
return newScheduledThreadPool(8);
}
/**
* Start the Spring Boot Application.
*

View File

@@ -8,8 +8,6 @@
*/
package org.eclipse.hawkbit.simulator;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
@@ -60,7 +58,8 @@ public class DeviceSimulatorUpdater {
private static final Logger LOGGER = LoggerFactory.getLogger(DeviceSimulatorUpdater.class);
private static final ScheduledExecutorService threadPool = newScheduledThreadPool(8);
@Autowired
private ScheduledExecutorService threadPool;
@Autowired
private SpSenderService spSenderService;
@@ -109,14 +108,14 @@ public class DeviceSimulatorUpdater {
if (modules == null || modules.isEmpty()) {
device.setSwversion(swVersion);
} else {
device.setSwversion(modules.stream().map(sm -> sm.getModuleVersion()).collect(Collectors.joining(", ")));
device.setSwversion(
modules.stream().map(SoftwareModule::getModuleVersion).collect(Collectors.joining(", ")));
}
device.setTargetSecurityToken(targetSecurityToken);
eventbus.post(new InitUpdate(device));
threadPool.schedule(
new DeviceSimulatorUpdateThread(device, spSenderService, actionId, eventbus, callback, modules), 2_000,
TimeUnit.MILLISECONDS);
threadPool.schedule(new DeviceSimulatorUpdateThread(device, spSenderService, actionId, eventbus, threadPool,
callback, modules), 2_000, TimeUnit.MILLISECONDS);
}
private static final class DeviceSimulatorUpdateThread implements Runnable {
@@ -133,18 +132,20 @@ public class DeviceSimulatorUpdater {
private final SpSenderService spSenderService;
private final long actionId;
private final EventBus eventbus;
private final ScheduledExecutorService threadPool;
private final UpdaterCallback callback;
private final List<SoftwareModule> modules;
private DeviceSimulatorUpdateThread(final AbstractSimulatedDevice device, final SpSenderService spSenderService,
final long actionId, final EventBus eventbus, final UpdaterCallback callback,
final List<SoftwareModule> modules) {
final long actionId, final EventBus eventbus, final ScheduledExecutorService threadPool,
final UpdaterCallback callback, final List<SoftwareModule> modules) {
this.device = device;
this.spSenderService = spSenderService;
this.actionId = actionId;
this.eventbus = eventbus;
this.callback = callback;
this.modules = modules;
this.threadPool = threadPool;
}
@Override
@@ -163,9 +164,8 @@ public class DeviceSimulatorUpdater {
final double newProgress = device.getProgress() + 0.2;
device.setProgress(newProgress);
if (newProgress < 1.0) {
threadPool.schedule(
new DeviceSimulatorUpdateThread(device, spSenderService, actionId, eventbus, callback, modules),
rndSleep.nextInt(5_000), TimeUnit.MILLISECONDS);
threadPool.schedule(new DeviceSimulatorUpdateThread(device, spSenderService, actionId, eventbus,
threadPool, callback, modules), rndSleep.nextInt(5_000), TimeUnit.MILLISECONDS);
} else {
callback.updateFinished(device, actionId);
}

View File

@@ -9,6 +9,8 @@
package org.eclipse.hawkbit.simulator;
import java.net.URL;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol;
import org.eclipse.hawkbit.simulator.amqp.SpSenderService;
@@ -32,6 +34,9 @@ public class SimulatedDeviceFactory {
@Autowired
private SpSenderService spSenderService;
@Autowired
private ScheduledExecutorService threadPool;
/**
* Creating a simulated device.
*
@@ -84,6 +89,9 @@ public class SimulatedDeviceFactory {
if (pollImmediatly) {
spSenderService.createOrUpdateThing(tenant, id);
}
threadPool.schedule(() -> spSenderService.updateAttributesOfThing(tenant, id), 2_000, TimeUnit.MILLISECONDS);
return device;
}

View File

@@ -10,6 +10,8 @@ package org.eclipse.hawkbit.simulator;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol;
@@ -17,13 +19,19 @@ import org.hibernate.validator.constraints.NotEmpty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import com.google.common.base.Splitter;
/**
* General simulator service properties.
*
*/
@Component
@ConfigurationProperties("hawkbit.device.simulator")
// Exception for squid:S2245 : not security relevant random number generation
@SuppressWarnings("squid:S2245")
public class SimulationProperties {
private static final Splitter SPLITTER = Splitter.on(',').omitEmptyStrings().trimResults();
private static final Random RANDOM = new Random();
/**
* List of tenants where the simulator should auto start simulations after
@@ -31,10 +39,55 @@ public class SimulationProperties {
*/
private final List<Autostart> autostarts = new ArrayList<>();
private final List<Attribute> attributes = new ArrayList<>();
public List<Attribute> getAttributes() {
return attributes;
}
public List<Autostart> getAutostarts() {
return this.autostarts;
}
/**
* Properties for target attributes set as part of simulation.
*
*/
public static class Attribute {
private String key;
private String value;
private String random;
public String getKey() {
return key;
}
public String getValue() {
return Optional.ofNullable(value).orElseGet(this::getRandomElement);
}
public void setKey(final String key) {
this.key = key;
}
public void setValue(final String value) {
this.value = value;
}
public void setRandom(final String random) {
this.random = random;
}
public String getRandom() {
return random;
}
private String getRandomElement() {
final List<String> options = SPLITTER.splitToList(random);
return options.get(RANDOM.nextInt(options.size()));
}
}
/**
* Auto start configuration for simulation setups that the simulator begins
* after startup.

View File

@@ -10,6 +10,7 @@ package org.eclipse.hawkbit.simulator.amqp;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
@@ -17,6 +18,8 @@ import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.ActionStatus;
import org.eclipse.hawkbit.dmf.json.model.ActionUpdateStatus;
import org.eclipse.hawkbit.dmf.json.model.AttributeUpdate;
import org.eclipse.hawkbit.simulator.SimulationProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
@@ -35,17 +38,23 @@ public class SpSenderService extends SenderService {
private final String spExchange;
private final SimulationProperties simulationProperties;
/**
*
* @param rabbitTemplate
* the rabbit template
* @param amqpProperties
* the amqp properties
* @param simulationProperties
* for attributes update class
*/
@Autowired
public SpSenderService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) {
public SpSenderService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties,
final SimulationProperties simulationProperties) {
super(rabbitTemplate, amqpProperties);
this.spExchange = AmqpSettings.DMF_EXCHANGE;
this.simulationProperties = simulationProperties;
}
/**
@@ -53,7 +62,7 @@ public class SpSenderService extends SenderService {
*
* @param update
* the simulated update object
* @param description
* @param updateResultMessages
* a description according the update process
*/
public void finishUpdateProcess(final SimulatedUpdate update, final List<String> updateResultMessages) {
@@ -67,8 +76,8 @@ public class SpSenderService extends SenderService {
*
* @param update
* the simulated update object
* @param messageDescription
* a description according the update process
* @param updateResultMessages
* list of messages for error
*/
public void finishUpdateProcessWithError(final SimulatedUpdate update, final List<String> updateResultMessages) {
sendErrorgMessage(update, updateResultMessages);
@@ -81,7 +90,7 @@ public class SpSenderService extends SenderService {
*
* @param tenant
* the tenant
* @param messageDescription
* @param updateResultMessages
* the error message description to send
* @param actionId
* the ID of the action for the error message
@@ -96,7 +105,7 @@ public class SpSenderService extends SenderService {
*
* @param update
* the simulated update object
* @param warningMessage
* @param updateResultMessages
* a warning description
*/
public void sendWarningMessage(final SimulatedUpdate update, final List<String> updateResultMessages) {
@@ -111,7 +120,7 @@ public class SpSenderService extends SenderService {
* the tenant
* @param actionStatus
* the action status
* @param actionMessage
* @param updateResultMessages
* the message to get send
* @param actionId
* the cached value
@@ -124,7 +133,7 @@ public class SpSenderService extends SenderService {
}
/**
* Create new thing created message and send to SP.
* Create new thing created message and send to udpate server.
*
* @param tenant
* the tenant to create the target
@@ -132,7 +141,26 @@ public class SpSenderService extends SenderService {
* the ID of the target to create or update
*/
public void createOrUpdateThing(final String tenant, final String targetId) {
sendMessage(spExchange, thingCreatedMessage(tenant, targetId));
LOGGER.debug("Created thing created message and send to update server for Thing \"{}\"", targetId);
}
/**
* Create new attribute update message and send to update server.
*
* @param tenant
* the tenant to create the target
* @param targetId
* the ID of the target to create or update
*/
public void updateAttributesOfThing(final String tenant, final String targetId) {
sendMessage(spExchange, attributeUpdateMessage(tenant, targetId));
LOGGER.debug("Create update attributes message and send to update server for Thing \"{}\"", targetId);
}
private Message thingCreatedMessage(final String tenant, final String targetId) {
final MessageProperties messagePropertiesForSP = new MessageProperties();
messagePropertiesForSP.setHeader(MessageHeaderKey.TYPE, MessageType.THING_CREATED.name());
messagePropertiesForSP.setHeader(MessageHeaderKey.TENANT, tenant);
@@ -140,9 +168,23 @@ public class SpSenderService extends SenderService {
messagePropertiesForSP.setHeader(MessageHeaderKey.SENDER, "simulator");
messagePropertiesForSP.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messagePropertiesForSP.setReplyTo(amqpProperties.getSenderForSpExchange());
final Message convertedMessage = new Message(null, messagePropertiesForSP);
sendMessage(spExchange, convertedMessage);
LOGGER.debug("Created thing created message and send to SP for Thing \"{}\"", targetId);
return new Message(null, messagePropertiesForSP);
}
private Message attributeUpdateMessage(final String tenant, final String targetId) {
final MessageProperties messagePropertiesForSP = new MessageProperties();
messagePropertiesForSP.setHeader(MessageHeaderKey.TYPE, MessageType.EVENT.name());
messagePropertiesForSP.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ATTRIBUTES);
messagePropertiesForSP.setHeader(MessageHeaderKey.TENANT, tenant);
messagePropertiesForSP.setHeader(MessageHeaderKey.THING_ID, targetId);
messagePropertiesForSP.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messagePropertiesForSP.setReplyTo(amqpProperties.getSenderForSpExchange());
final AttributeUpdate attributeUpdate = new AttributeUpdate();
attributeUpdate.getAttributes().putAll(simulationProperties.getAttributes().stream().collect(
Collectors.toMap(SimulationProperties.Attribute::getKey, SimulationProperties.Attribute::getValue)));
return convertMessage(attributeUpdate, messagePropertiesForSP);
}
/**

View File

@@ -17,6 +17,12 @@ hawkbit.device.simulator.amqp.senderForSpExchange=simulator.replyTo
## Configuration for simulations
hawkbit.device.simulator.autostarts.[0].tenant=DEFAULT
hawkbit.device.simulator.attributes[0].key=isoCode
hawkbit.device.simulator.attributes[0].random=DE,US,AU,FR,DK,CA
hawkbit.device.simulator.attributes[1].key=hwRevision
hawkbit.device.simulator.attributes[1].value=1.1
hawkbit.device.simulator.attributes[2].key=serial
hawkbit.device.simulator.attributes[2].value=${random.value}
## Configuration for local RabbitMQ integration
spring.rabbitmq.username=guest

View File

@@ -21,6 +21,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.ActionUpdateStatus;
import org.eclipse.hawkbit.dmf.json.model.AttributeUpdate;
import org.eclipse.hawkbit.im.authentication.SpPermission.SpringEvalExpressions;
import org.eclipse.hawkbit.im.authentication.TenantAwareAuthenticationDetails;
import org.eclipse.hawkbit.repository.ControllerManagement;
@@ -206,11 +207,26 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
* the topic of the event.
*/
private void handleIncomingEvent(final Message message, final EventTopic topic) {
if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) {
switch (topic) {
case UPDATE_ACTION_STATUS:
updateActionStatus(message);
return;
break;
case UPDATE_ATTRIBUTES:
updateAttributes(message);
break;
default:
logAndThrowMessageError(message, "Got event without appropriate topic.");
break;
}
logAndThrowMessageError(message, "Got event without appropriate topic.");
}
private void updateAttributes(final Message message) {
final AttributeUpdate attributeUpdate = convertMessage(message, AttributeUpdate.class);
final String thingId = getStringHeaderKey(message, MessageHeaderKey.THING_ID, "ThingId is null");
controllerManagement.updateControllerAttributes(thingId, attributeUpdate.getAttributes());
}
/**

View File

@@ -23,6 +23,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import org.eclipse.hawkbit.api.HostnameResolver;
@@ -35,6 +36,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.ActionStatus;
import org.eclipse.hawkbit.dmf.json.model.ActionUpdateStatus;
import org.eclipse.hawkbit.dmf.json.model.AttributeUpdate;
import org.eclipse.hawkbit.dmf.json.model.DownloadResponse;
import org.eclipse.hawkbit.dmf.json.model.TenantSecurityToken;
import org.eclipse.hawkbit.dmf.json.model.TenantSecurityToken.FileResource;
@@ -56,6 +58,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
@@ -113,6 +116,12 @@ public class AmqpMessageHandlerServiceTest {
@Mock
private RabbitTemplate rabbitTemplate;
@Captor
private ArgumentCaptor<Map<String, String>> attributesCaptor;
@Captor
private ArgumentCaptor<String> targetIdCaptor;
@Before
public void before() throws Exception {
messageConverter = new Jackson2JsonMessageConverter();
@@ -164,6 +173,32 @@ public class AmqpMessageHandlerServiceTest {
}
@Test
@Description("Tests the target attribute update by calling the same method that incoming RabbitMQ messages would access.")
public void updateAttributes() {
final String knownThingId = "1";
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
messageProperties.setHeader(MessageHeaderKey.THING_ID, "1");
messageProperties.setHeader(MessageHeaderKey.TOPIC, "UPDATE_ATTRIBUTES");
final AttributeUpdate attributeUpdate = new AttributeUpdate();
attributeUpdate.getAttributes().put("testKey1", "testValue1");
attributeUpdate.getAttributes().put("testKey2", "testValue2");
final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(attributeUpdate,
messageProperties);
when(controllerManagementMock.updateControllerAttributes(targetIdCaptor.capture(), attributesCaptor.capture()))
.thenReturn(null);
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
// verify
assertThat(targetIdCaptor.getValue()).as("Thing id is wrong").isEqualTo(knownThingId);
assertThat(attributesCaptor.getValue()).as("Attributes is not right")
.isEqualTo(attributeUpdate.getAttributes());
}
@Test
@Description("Tests the creation of a thing without a 'reply to' header in message.")
public void createThingWitoutReplyTo() {

View File

@@ -10,9 +10,6 @@ package org.eclipse.hawkbit.dmf.amqp.api;
/**
* The event topics for the message headers.
*
*
*
*/
public enum EventTopic {
@@ -27,6 +24,10 @@ public enum EventTopic {
/**
* Topic when sending and receiving a cancel download task.
*/
CANCEL_DOWNLOAD;
CANCEL_DOWNLOAD,
/**
* Topic when updating device attributes.
*/
UPDATE_ATTRIBUTES;
}

View File

@@ -0,0 +1,33 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.dmf.json.model;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Map of {@link Target} attributes.
*
*/
@JsonInclude(Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
public class AttributeUpdate {
@JsonProperty
private final Map<String, String> attributes = new HashMap<>();
public Map<String, String> getAttributes() {
return attributes;
}
}