Send DMF batch message on thing_created event (#1284)

* send dmf batch message on thing created event
* fix tests

Signed-off-by: Stefan Klotz <stefan.klotz@bosch.io>
This commit is contained in:
Stefan Klotz
2022-10-05 18:39:32 +02:00
committed by GitHub
parent c5afc6b17e
commit 06e8ef4c15

View File

@@ -24,7 +24,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import org.eclipse.hawkbit.api.ApiType;
import org.eclipse.hawkbit.api.ArtifactUrl;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
@@ -49,11 +48,11 @@ import org.eclipse.hawkbit.repository.SoftwareModuleManagement;
import org.eclipse.hawkbit.repository.SystemManagement;
import org.eclipse.hawkbit.repository.TargetManagement;
import org.eclipse.hawkbit.repository.TenantConfigurationManagement;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.MultiActionEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.ActionProperties;
import org.eclipse.hawkbit.repository.model.Artifact;
@@ -77,6 +76,8 @@ import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.util.CollectionUtils;
import com.google.common.collect.Iterables;
/**
* {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and
* delegate the messages to a {@link AmqpMessageSenderService}.
@@ -90,7 +91,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);
private static final int MAX_PROCESSING_SIZE = 1000;
private final ArtifactUrlHandler artifactUrlHandler;
private final AmqpMessageSenderService amqpSenderService;
private final SystemSecurityContext systemSecurityContext;
@@ -127,12 +128,12 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
*
*/
protected AmqpMessageDispatcherService(final RabbitTemplate rabbitTemplate,
final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler,
final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement,
final TargetManagement targetManagement, final ServiceMatcher serviceMatcher,
final DistributionSetManagement distributionSetManagement,
final SoftwareModuleManagement softwareModuleManagement, final DeploymentManagement deploymentManagement,
final TenantConfigurationManagement tenantConfigurationManagement) {
final AmqpMessageSenderService amqpSenderService, final ArtifactUrlHandler artifactUrlHandler,
final SystemSecurityContext systemSecurityContext, final SystemManagement systemManagement,
final TargetManagement targetManagement, final ServiceMatcher serviceMatcher,
final DistributionSetManagement distributionSetManagement,
final SoftwareModuleManagement softwareModuleManagement, final DeploymentManagement deploymentManagement,
final TenantConfigurationManagement tenantConfigurationManagement) {
super(rabbitTemplate);
this.artifactUrlHandler = artifactUrlHandler;
this.amqpSenderService = amqpSenderService;
@@ -164,7 +165,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
if (!filteredTargetList.isEmpty()) {
LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
sendUpdateMessageToTarget(assignedEvent, filteredTargetList);
sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(),
filteredTargetList);
}
}
@@ -196,21 +198,35 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
});
}
private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent,
private void sendUpdateMessageToTargets(final Long dsId, final Map<String, ActionProperties> actionsPropsByTargetId,
final List<Target> targets) {
distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> {
distributionSetManagement.get(dsId).ifPresent(ds -> {
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules = getSoftwareModulesWithMetadata(
ds);
if (!targets.isEmpty() && isBatchAssignmentsEnabled()) {
sendUpdateMessageToTargets(assignedEvent.getActions(), targets, softwareModules);
} else {
targets.forEach(target -> sendUpdateMessageToTarget(
assignedEvent.getActions().get(target.getControllerId()), target, softwareModules));
}
sendUpdateMessageToTargets(actionsPropsByTargetId, targets, softwareModules);
});
}
protected void sendUpdateMessageToTarget(final ActionProperties actionsProps, final Target target,
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules) {
final Map<String, ActionProperties> actionProp = new HashMap<>();
actionProp.put(target.getControllerId(), actionsProps);
sendUpdateMessageToTargets(actionProp, Collections.singletonList(target), softwareModules);
}
private void sendUpdateMessageToTargets(final Map<String, ActionProperties> actionsPropsByTargetId,
final List<Target> targets, final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules) {
if (!targets.isEmpty() && isBatchAssignmentsEnabled()) {
sendBatchUpdateMessage(actionsPropsByTargetId, targets, softwareModules);
} else {
targets.forEach(target -> {
final ActionProperties actionProp = actionsPropsByTargetId.get(target.getControllerId());
sendSingleUpdateMessage(actionProp, target, softwareModules);
});
}
}
private void sendMultiActionRequestMessages(final String tenant, final List<String> controllerIds) {
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModuleMetadata = new HashMap<>();
@@ -329,7 +345,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
}
final List<Target> eventTargets = partitionedParallelExecution(cancelEvent.getActions().keySet(),
targetManagement::getByControllerID);
targetManagement::getByControllerID);
eventTargets.forEach(target -> {
cancelEvent.getActionPropertiesForController(target.getControllerId()).map(ActionProperties::getId)
@@ -366,7 +382,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
SecurityContextHolder.setContext(oldContext);
}
}
/**
* Method to send a message to a RabbitMQ Exchange after a Target was
* deleted.
@@ -389,7 +405,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
updateAttributesEvent.getTargetAddress());
}
protected void sendUpdateMessageToTarget(final ActionProperties action, final Target target,
private void sendSingleUpdateMessage(final ActionProperties action, final Target target,
final Map<SoftwareModule, List<SoftwareModuleMetadata>> modules) {
final String tenant = action.getTenant();
@@ -551,34 +567,36 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
PageRequest.of(0, RepositoryConstants.MAX_META_DATA_COUNT), module.getId()).getContent();
}
private void sendUpdateMessageToTargets(final Map<String, ActionProperties> actions, final List<Target> targets,
final Map<SoftwareModule, List<SoftwareModuleMetadata>> modules) {
private void sendBatchUpdateMessage(final Map<String, ActionProperties> actions, final List<Target> targets,
final Map<SoftwareModule, List<SoftwareModuleMetadata>> modules) {
List<DmfTarget> dmfTargets = targets.stream().filter(target -> IpUtil.isAmqpUri(target.getAddress()))
final List<DmfTarget> dmfTargets = targets.stream().filter(target -> IpUtil.isAmqpUri(target.getAddress()))
.map(t -> convertToDmfTarget(t, actions.get(t.getControllerId()).getId())).collect(Collectors.toList());
final DmfBatchDownloadAndUpdateRequest batchRequest = new DmfBatchDownloadAndUpdateRequest();
batchRequest.setTimestamp(System.currentTimeMillis());
batchRequest.addTargets(dmfTargets);
//due to the fact that all targets in a batch use the same set of software modules we don't generate
// due to the fact that all targets in a batch use the same set of
// software modules we don't generate
// target-specific urls
Target firstTarget = targets.get(0);
final Target firstTarget = targets.get(0);
if (modules != null) {
modules.entrySet().forEach(entry ->
batchRequest.addSoftwareModule(convertToAmqpSoftwareModule(firstTarget, entry)));
modules.entrySet()
.forEach(entry -> batchRequest.addSoftwareModule(convertToAmqpSoftwareModule(firstTarget, entry)));
}
// we use only the first action when constructing message as Tenant and action type are the same
// we use only the first action when constructing message as Tenant and
// action type are the same
// since all actions have the same trigger
final ActionProperties firstAction = actions.values().iterator().next();
final Message message = getMessageConverter().toMessage(batchRequest,
createMessagePropertiesBatch(firstAction.getTenant(), getBatchEventTopicForAction(firstAction)));
amqpSenderService.sendMessage(message, firstTarget.getAddress());
createMessagePropertiesBatch(firstAction.getTenant(), getBatchEventTopicForAction(firstAction)));
amqpSenderService.sendMessage(message, firstTarget.getAddress());
}
protected DmfTarget convertToDmfTarget(final Target target, final Long actionId) {
DmfTarget dmfTarget = new DmfTarget();
final DmfTarget dmfTarget = new DmfTarget();
dmfTarget.setActionId(actionId);
dmfTarget.setControllerId(target.getControllerId());
dmfTarget.setTargetSecurityToken(systemSecurityContext.runAsSystem(target::getSecurityToken));