Propose SDK Refactor (#1821)
* Propose SDK Refactor * Added ExecutorService for DMF Devices * After review, Created MgmtApi inside sdk-mgmt * Removed direct dependency to halkbit-mgmt-api all mgmt related calls now goes through hawkbit-sdk-mgmt * Added copyright header * Removed redundant paramters for deleteController * Fixed File Copyright Headers
This commit is contained in:
@@ -21,6 +21,7 @@ import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSender;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
/**
|
||||
* Class representing DMF device twin connecting to hawkBit via DMF.
|
||||
@@ -50,6 +51,9 @@ public class DmfController {
|
||||
|
||||
private volatile Long lastActionId;
|
||||
|
||||
@SuppressWarnings("java:S3077") // volatile used only for the reference as expected
|
||||
private volatile ScheduledExecutorService executorService;
|
||||
|
||||
/**
|
||||
* Creates a new device instance.
|
||||
*
|
||||
@@ -67,26 +71,32 @@ public class DmfController {
|
||||
this.dmfSender = dmfSender;
|
||||
}
|
||||
|
||||
public void connect() {
|
||||
public void start(ScheduledExecutorService executorService) {
|
||||
stop();
|
||||
this.executorService = executorService;
|
||||
log.debug(LOG_PREFIX + "Connecting/Polling ...", getTenantId(), getControllerId());
|
||||
dmfSender.createOrUpdateThing(getTenantId(), getControllerId());
|
||||
log.debug(LOG_PREFIX + "Done. Create thing sent.", getTenantId(), getControllerId());
|
||||
}
|
||||
|
||||
public void poll() {
|
||||
log.debug(LOG_PREFIX + "Polling ..", getTenantId(), getControllerId());
|
||||
dmfSender.createOrUpdateThing(getTenantId(), getControllerId());
|
||||
public void unregisterThing() {
|
||||
log.debug(LOG_PREFIX + "Removing Controller...", getTenantId(), getControllerId());
|
||||
dmfSender.removeThing(getTenantId(), getControllerId());
|
||||
log.debug(LOG_PREFIX + "Done. Create thing sent.", getTenantId(), getControllerId());
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
if (executorService != null) {
|
||||
executorService.shutdown();
|
||||
}
|
||||
executorService = null;
|
||||
lastActionId = null;
|
||||
currentActionId = null;
|
||||
}
|
||||
|
||||
public void processUpdate(final EventTopic actionType, final DmfDownloadAndUpdateRequest updateRequest) {
|
||||
log.info(LOG_PREFIX + "Processing update for action {} .", getTenantId(), controllerId, updateRequest.getActionId());
|
||||
updateHandler.getUpdateProcessor(this, actionType, updateRequest).run();
|
||||
executorService.submit(updateHandler.getUpdateProcessor(this, actionType, updateRequest));
|
||||
}
|
||||
|
||||
public void sendFeedback(final UpdateStatus updateStatus) {
|
||||
|
||||
@@ -46,17 +46,21 @@ public class DmfTenant {
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
controllers.values().forEach(DmfController::stop);
|
||||
controllers.values().forEach(DmfController::unregisterThing);
|
||||
controllers.clear();
|
||||
}
|
||||
|
||||
public DmfController create(final Controller controller, final UpdateHandler updateHandler) {
|
||||
public DmfController createController(final Controller controller, final UpdateHandler updateHandler) {
|
||||
final DmfController dmfController = new DmfController(tenant, controller, updateHandler, vHost);
|
||||
controllers.put(controller.getControllerId(), dmfController);
|
||||
return dmfController;
|
||||
}
|
||||
|
||||
public void remove(final String controllerId) {
|
||||
public void removeController(final String controllerId) {
|
||||
Optional.ofNullable(controllers.remove(controllerId)).ifPresent(DmfController::unregisterThing);
|
||||
}
|
||||
|
||||
public void handleThingDeleted(final String controllerId) {
|
||||
Optional.ofNullable(controllers.remove(controllerId)).ifPresent(DmfController::stop);
|
||||
}
|
||||
|
||||
|
||||
@@ -51,8 +51,16 @@ public class DmfSender {
|
||||
}
|
||||
|
||||
public void createOrUpdateThing(final String tenant, final String controllerId) {
|
||||
sendThingMessage(tenant, controllerId, MessageType.THING_CREATED.name());
|
||||
}
|
||||
|
||||
public void removeThing(final String tenant, final String controllerId) {
|
||||
sendThingMessage(tenant, controllerId, MessageType.THING_REMOVED.name());
|
||||
}
|
||||
|
||||
public void sendThingMessage(final String tenant, final String controllerId, String thingStatusChange) {
|
||||
final MessageProperties messagePropertiesForSP = new MessageProperties();
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.TYPE, MessageType.THING_CREATED.name());
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.TYPE, thingStatusChange);
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.TENANT, tenant);
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.THING_ID, controllerId);
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.SENDER, "hawkBit-sdk");
|
||||
|
||||
@@ -103,7 +103,7 @@ public class VHost extends DmfSender implements MessageListener {
|
||||
final String controllerId = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID);
|
||||
final String type = (String)message.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE);
|
||||
|
||||
log.info("Message received for target {}, value : {}", controllerId, message.toString());
|
||||
log.info("Message received for target {}, value : {}", controllerId, message);
|
||||
switch (MessageType.valueOf(type)) {
|
||||
case EVENT: {
|
||||
checkContentTypeJson(message);
|
||||
@@ -112,7 +112,7 @@ public class VHost extends DmfSender implements MessageListener {
|
||||
}
|
||||
case THING_DELETED: {
|
||||
checkContentTypeJson(message);
|
||||
Optional.ofNullable(dmfTenants.get(tenantId)).ifPresent(dmfTenant -> dmfTenant.remove(controllerId));
|
||||
Optional.ofNullable(dmfTenants.get(tenantId)).ifPresent(dmfTenant -> dmfTenant.handleThingDeleted(controllerId));
|
||||
break;
|
||||
}
|
||||
case PING_RESPONSE: {
|
||||
@@ -237,7 +237,7 @@ public class VHost extends DmfSender implements MessageListener {
|
||||
final String tenant = getTenant(message);
|
||||
final DmfDownloadAndUpdateRequest downloadAndUpdateRequest = convertMessage(message,
|
||||
DmfDownloadAndUpdateRequest.class);
|
||||
dmfTenants.get(tenant).getController(controllerId).get().setCurrentActionId(downloadAndUpdateRequest.getActionId());
|
||||
dmfTenants.get(tenant).getController(controllerId).ifPresent(controller -> controller.setCurrentActionId(downloadAndUpdateRequest.getActionId()));
|
||||
processUpdate(tenant, controllerId, actionType, downloadAndUpdateRequest);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user