Fix DMF context (as system) (#2097)
Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -153,12 +153,17 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
return;
|
||||
}
|
||||
|
||||
final List<Target> filteredTargetList = getTargetsWithoutPendingCancellations(assignedEvent.getActions().keySet());
|
||||
systemSecurityContext.runAsSystemAsTenant(() -> {
|
||||
final List<Target> filteredTargetList = getTargetsWithoutPendingCancellations(
|
||||
assignedEvent.getTenant(), assignedEvent.getActions().keySet());
|
||||
|
||||
if (!filteredTargetList.isEmpty()) {
|
||||
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
|
||||
sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(), filteredTargetList);
|
||||
}
|
||||
if (!filteredTargetList.isEmpty()) {
|
||||
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
|
||||
sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(), filteredTargetList);
|
||||
}
|
||||
|
||||
return null;
|
||||
}, assignedEvent.getTenant());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -171,8 +176,12 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
if (!shouldBeProcessed(multiActionEvent)) {
|
||||
return;
|
||||
}
|
||||
log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
|
||||
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
|
||||
|
||||
systemSecurityContext.runAsSystemAsTenant(() -> {
|
||||
log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
|
||||
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
|
||||
return null;
|
||||
}, multiActionEvent.getTenant());
|
||||
}
|
||||
|
||||
protected void sendUpdateMessageToTarget(
|
||||
@@ -229,16 +238,20 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
return;
|
||||
}
|
||||
|
||||
final List<Target> eventTargets = partitionedParallelExecution(
|
||||
cancelEvent.getActions().keySet(), targetManagement::getByControllerID);
|
||||
systemSecurityContext.runAsSystemAsTenant(() -> {
|
||||
final List<Target> eventTargets = partitionedParallelExecution(
|
||||
cancelEvent.getActions().keySet(), targetManagement::getByControllerID);
|
||||
|
||||
eventTargets.forEach(target ->
|
||||
cancelEvent.getActionPropertiesForController(target.getControllerId())
|
||||
.map(ActionProperties::getId)
|
||||
.ifPresent(actionId ->
|
||||
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId, target.getAddress())
|
||||
)
|
||||
);
|
||||
eventTargets.forEach(target ->
|
||||
cancelEvent.getActionPropertiesForController(target.getControllerId())
|
||||
.map(ActionProperties::getId)
|
||||
.ifPresent(actionId ->
|
||||
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId, target.getAddress())
|
||||
)
|
||||
);
|
||||
|
||||
return null;
|
||||
}, cancelEvent.getTenant());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -251,6 +264,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
if (!shouldBeProcessed(deleteEvent)) {
|
||||
return;
|
||||
}
|
||||
|
||||
sendDeleteMessage(deleteEvent.getTenant(), deleteEvent.getControllerId(), deleteEvent.getTargetAddress());
|
||||
}
|
||||
|
||||
@@ -427,12 +441,13 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
: EventTopic.BATCH_DOWNLOAD_AND_INSTALL;
|
||||
}
|
||||
|
||||
private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
|
||||
private List<Target> getTargetsWithoutPendingCancellations(final String tenant, final Set<String> controllerIds) {
|
||||
return partitionedParallelExecution(controllerIds, partition ->
|
||||
targetManagement.getByControllerID(partition).stream()
|
||||
.filter(target -> {
|
||||
if (hasPendingCancellations(target.getId())) {
|
||||
log.debug("Target {} has pending cancellations. Will not send update message to it.", target.getControllerId());
|
||||
log.debug("Target {} has pending cancellations. Will not send update message to it.",
|
||||
target.getControllerId());
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
||||
@@ -213,11 +213,9 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
return StringUtils.hasLength(message.getMessageProperties().getCorrelationId());
|
||||
}
|
||||
|
||||
// Exception squid:MethodCyclomaticComplexity - false positive, is a simple
|
||||
// mapping
|
||||
// Exception squid:MethodCyclomaticComplexity - false positive, is a simple mapping
|
||||
@SuppressWarnings("squid:MethodCyclomaticComplexity")
|
||||
private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus,
|
||||
final Action action) {
|
||||
private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus, final Action action) {
|
||||
Status status = null;
|
||||
switch (actionUpdateStatus.getActionStatus()) {
|
||||
case DOWNLOAD: {
|
||||
|
||||
Reference in New Issue
Block a user