diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index fc8e259ea..89caf018c 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -153,12 +153,17 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { return; } - final List filteredTargetList = getTargetsWithoutPendingCancellations(assignedEvent.getActions().keySet()); + systemSecurityContext.runAsSystemAsTenant(() -> { + final List filteredTargetList = getTargetsWithoutPendingCancellations( + assignedEvent.getTenant(), assignedEvent.getActions().keySet()); - if (!filteredTargetList.isEmpty()) { - log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker."); - sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(), filteredTargetList); - } + if (!filteredTargetList.isEmpty()) { + log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker."); + sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(), filteredTargetList); + } + + return null; + }, assignedEvent.getTenant()); } /** @@ -171,8 +176,12 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { if (!shouldBeProcessed(multiActionEvent)) { return; } - log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds()); - sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds()); + + systemSecurityContext.runAsSystemAsTenant(() -> { + log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds()); + sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds()); + return null; + }, multiActionEvent.getTenant()); } protected void sendUpdateMessageToTarget( @@ -229,16 +238,20 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { return; } - final List eventTargets = partitionedParallelExecution( - cancelEvent.getActions().keySet(), targetManagement::getByControllerID); + systemSecurityContext.runAsSystemAsTenant(() -> { + final List eventTargets = partitionedParallelExecution( + cancelEvent.getActions().keySet(), targetManagement::getByControllerID); - eventTargets.forEach(target -> - cancelEvent.getActionPropertiesForController(target.getControllerId()) - .map(ActionProperties::getId) - .ifPresent(actionId -> - sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId, target.getAddress()) - ) - ); + eventTargets.forEach(target -> + cancelEvent.getActionPropertiesForController(target.getControllerId()) + .map(ActionProperties::getId) + .ifPresent(actionId -> + sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId, target.getAddress()) + ) + ); + + return null; + }, cancelEvent.getTenant()); } /** @@ -251,6 +264,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { if (!shouldBeProcessed(deleteEvent)) { return; } + sendDeleteMessage(deleteEvent.getTenant(), deleteEvent.getControllerId(), deleteEvent.getTargetAddress()); } @@ -427,12 +441,13 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { : EventTopic.BATCH_DOWNLOAD_AND_INSTALL; } - private List getTargetsWithoutPendingCancellations(final Set controllerIds) { + private List getTargetsWithoutPendingCancellations(final String tenant, final Set controllerIds) { return partitionedParallelExecution(controllerIds, partition -> targetManagement.getByControllerID(partition).stream() .filter(target -> { if (hasPendingCancellations(target.getId())) { - log.debug("Target {} has pending cancellations. Will not send update message to it.", target.getControllerId()); + log.debug("Target {} has pending cancellations. Will not send update message to it.", + target.getControllerId()); return false; } return true; diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 973e43800..df02c8ede 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -213,11 +213,9 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return StringUtils.hasLength(message.getMessageProperties().getCorrelationId()); } - // Exception squid:MethodCyclomaticComplexity - false positive, is a simple - // mapping + // Exception squid:MethodCyclomaticComplexity - false positive, is a simple mapping @SuppressWarnings("squid:MethodCyclomaticComplexity") - private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus, - final Action action) { + private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus, final Action action) { Status status = null; switch (actionUpdateStatus.getActionStatus()) { case DOWNLOAD: {