diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index c8e74339a..933b67012 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -24,7 +24,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import com.google.common.collect.Iterables; import org.eclipse.hawkbit.api.ApiType; import org.eclipse.hawkbit.api.ArtifactUrl; import org.eclipse.hawkbit.api.ArtifactUrlHandler; @@ -49,11 +48,11 @@ import org.eclipse.hawkbit.repository.SoftwareModuleManagement; import org.eclipse.hawkbit.repository.SystemManagement; import org.eclipse.hawkbit.repository.TargetManagement; import org.eclipse.hawkbit.repository.TenantConfigurationManagement; +import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent; import org.eclipse.hawkbit.repository.event.remote.MultiActionEvent; import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent; import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent; import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent; -import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent; import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.ActionProperties; import org.eclipse.hawkbit.repository.model.Artifact; @@ -77,6 +76,8 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.util.CollectionUtils; +import com.google.common.collect.Iterables; + /** * {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and * delegate the messages to a {@link AmqpMessageSenderService}. @@ -90,7 +91,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class); private static final int MAX_PROCESSING_SIZE = 1000; - + private final ArtifactUrlHandler artifactUrlHandler; private final AmqpMessageSenderService amqpSenderService; private final SystemSecurityContext systemSecurityContext; @@ -127,12 +128,12 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { * */ protected AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate, - final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, - final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement, - final TargetManagement targetManagement, final ServiceMatcher serviceMatcher, - final DistributionSetManagement distributionSetManagement, - final SoftwareModuleManagement softwareModuleManagement, final DeploymentManagement deploymentManagement, - final TenantConfigurationManagement tenantConfigurationManagement) { + final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler, + final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement, + final TargetManagement targetManagement, final ServiceMatcher serviceMatcher, + final DistributionSetManagement distributionSetManagement, + final SoftwareModuleManagement softwareModuleManagement, final DeploymentManagement deploymentManagement, + final TenantConfigurationManagement tenantConfigurationManagement) { super(rabbitTemplate); this.artifactUrlHandler = artifactUrlHandler; this.amqpSenderService = amqpSenderService; @@ -164,7 +165,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { if (!filteredTargetList.isEmpty()) { LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker."); - sendUpdateMessageToTarget(assignedEvent, filteredTargetList); + sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(), + filteredTargetList); } } @@ -196,21 +198,35 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { }); } - private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent, + private void sendUpdateMessageToTargets(final Long dsId, final Map actionsPropsByTargetId, final List targets) { - distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> { + distributionSetManagement.get(dsId).ifPresent(ds -> { final Map> softwareModules = getSoftwareModulesWithMetadata( ds); - - if (!targets.isEmpty() && isBatchAssignmentsEnabled()) { - sendUpdateMessageToTargets(assignedEvent.getActions(), targets, softwareModules); - } else { - targets.forEach(target -> sendUpdateMessageToTarget( - assignedEvent.getActions().get(target.getControllerId()), target, softwareModules)); - } + sendUpdateMessageToTargets(actionsPropsByTargetId, targets, softwareModules); }); } + protected void sendUpdateMessageToTarget(final ActionProperties actionsProps, final Target target, + final Map> softwareModules) { + final Map actionProp = new HashMap<>(); + actionProp.put(target.getControllerId(), actionsProps); + sendUpdateMessageToTargets(actionProp, Collections.singletonList(target), softwareModules); + } + + private void sendUpdateMessageToTargets(final Map actionsPropsByTargetId, + final List targets, final Map> softwareModules) { + + if (!targets.isEmpty() && isBatchAssignmentsEnabled()) { + sendBatchUpdateMessage(actionsPropsByTargetId, targets, softwareModules); + } else { + targets.forEach(target -> { + final ActionProperties actionProp = actionsPropsByTargetId.get(target.getControllerId()); + sendSingleUpdateMessage(actionProp, target, softwareModules); + }); + } + } + private void sendMultiActionRequestMessages(final String tenant, final List controllerIds) { final Map> softwareModuleMetadata = new HashMap<>(); @@ -329,7 +345,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { } final List eventTargets = partitionedParallelExecution(cancelEvent.getActions().keySet(), - targetManagement::getByControllerID); + targetManagement::getByControllerID); eventTargets.forEach(target -> { cancelEvent.getActionPropertiesForController(target.getControllerId()).map(ActionProperties::getId) @@ -366,7 +382,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { SecurityContextHolder.setContext(oldContext); } } - + /** * Method to send a message to a RabbitMQ Exchange after a Target was * deleted. @@ -389,7 +405,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { updateAttributesEvent.getTargetAddress()); } - protected void sendUpdateMessageToTarget(final ActionProperties action, final Target target, + private void sendSingleUpdateMessage(final ActionProperties action, final Target target, final Map> modules) { final String tenant = action.getTenant(); @@ -551,34 +567,36 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { PageRequest.of(0, RepositoryConstants.MAX_META_DATA_COUNT), module.getId()).getContent(); } - private void sendUpdateMessageToTargets(final Map actions, final List targets, - final Map> modules) { + private void sendBatchUpdateMessage(final Map actions, final List targets, + final Map> modules) { - List dmfTargets = targets.stream().filter(target -> IpUtil.isAmqpUri(target.getAddress())) + final List dmfTargets = targets.stream().filter(target -> IpUtil.isAmqpUri(target.getAddress())) .map(t -> convertToDmfTarget(t, actions.get(t.getControllerId()).getId())).collect(Collectors.toList()); final DmfBatchDownloadAndUpdateRequest batchRequest = new DmfBatchDownloadAndUpdateRequest(); batchRequest.setTimestamp(System.currentTimeMillis()); batchRequest.addTargets(dmfTargets); - //due to the fact that all targets in a batch use the same set of software modules we don't generate + // due to the fact that all targets in a batch use the same set of + // software modules we don't generate // target-specific urls - Target firstTarget = targets.get(0); + final Target firstTarget = targets.get(0); if (modules != null) { - modules.entrySet().forEach(entry -> - batchRequest.addSoftwareModule(convertToAmqpSoftwareModule(firstTarget, entry))); + modules.entrySet() + .forEach(entry -> batchRequest.addSoftwareModule(convertToAmqpSoftwareModule(firstTarget, entry))); } - // we use only the first action when constructing message as Tenant and action type are the same + // we use only the first action when constructing message as Tenant and + // action type are the same // since all actions have the same trigger final ActionProperties firstAction = actions.values().iterator().next(); final Message message = getMessageConverter().toMessage(batchRequest, - createMessagePropertiesBatch(firstAction.getTenant(), getBatchEventTopicForAction(firstAction))); - amqpSenderService.sendMessage(message, firstTarget.getAddress()); + createMessagePropertiesBatch(firstAction.getTenant(), getBatchEventTopicForAction(firstAction))); + amqpSenderService.sendMessage(message, firstTarget.getAddress()); } protected DmfTarget convertToDmfTarget(final Target target, final Long actionId) { - DmfTarget dmfTarget = new DmfTarget(); + final DmfTarget dmfTarget = new DmfTarget(); dmfTarget.setActionId(actionId); dmfTarget.setControllerId(target.getControllerId()); dmfTarget.setTargetSecurityToken(systemSecurityContext.runAsSystem(target::getSecurityToken));