Adapt cancel flow (#1274)

* Adapt assignment events to communicate mass cancel operations within one event.

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Fix edge cases identified by test failures. Adapt tests and reduce amount of published cancel events.

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Fix license header

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Refactor visibility of methods in assignment strategy classes. Avoid having empty action status messages.

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Fix api docs

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>
Co-authored-by: Bogdan Bondar <Bogdan.Bondar@bosch.io>
This commit is contained in:
Michael Herdt
2022-09-21 17:04:08 +02:00
committed by GitHub
parent 5e963f8308
commit ea5a3b3d30
20 changed files with 329 additions and 180 deletions

View File

@@ -12,16 +12,19 @@ import static org.eclipse.hawkbit.repository.RepositoryConstants.MAX_ACTION_COUN
import static org.eclipse.hawkbit.tenancy.configuration.TenantConfigurationProperties.TenantConfigurationKey.BATCH_ASSIGNMENTS_ENABLED;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
import org.eclipse.hawkbit.api.ApiType;
import org.eclipse.hawkbit.api.ArtifactUrl;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
@@ -50,7 +53,7 @@ import org.eclipse.hawkbit.repository.event.remote.MultiActionEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.ActionProperties;
import org.eclipse.hawkbit.repository.model.Artifact;
@@ -70,6 +73,8 @@ import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.data.domain.PageRequest;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.util.CollectionUtils;
/**
@@ -84,6 +89,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);
private static final int MAX_PROCESSING_SIZE = 1000;
private final ArtifactUrlHandler artifactUrlHandler;
private final AmqpMessageSenderService amqpSenderService;
private final SystemSecurityContext systemSecurityContext;
@@ -177,14 +184,16 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
}
private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
return targetManagement.getByControllerID(controllerIds).stream().filter(target -> {
if (hasPendingCancellations(target.getControllerId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
return true;
}).collect(Collectors.toList());
return partitionedParallelExecution(controllerIds, partition -> {
return targetManagement.getByControllerID(partition).stream().filter(target -> {
if (hasPendingCancellations(target.getControllerId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
return true;
}).collect(Collectors.toList());
});
}
private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent,
@@ -319,18 +328,45 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
return;
}
final Optional<Target> eventEntity = cancelEvent.getEntity();
if (eventEntity.isPresent()) {
final Target target = eventEntity.get();
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), cancelEvent.getActionId(),
target.getAddress());
} else {
LOG.warn(
"Cannot process the received CancelTargetAssignmentEvent with action ID {} because the referenced target with ID {} does no longer exist.",
cancelEvent.getActionId(), cancelEvent.getEntityId());
}
final List<Target> eventTargets = partitionedParallelExecution(cancelEvent.getActions().keySet(),
targetManagement::getByControllerID);
eventTargets.forEach(target -> {
cancelEvent.getActionPropertiesForController(target.getControllerId()).map(ActionProperties::getId)
.ifPresent(actionId -> {
sendCancelMessageToTarget(cancelEvent.getTenant(), target.getControllerId(), actionId,
target.getAddress());
});
});
}
private static <T, R> List<R> partitionedParallelExecution(final Collection<T> controllerIds,
final Function<Collection<T>, List<R>> loadingFunction) {
// Ensure not exceeding the max value of MAX_PROCESSING_SIZE
if (controllerIds.size() > MAX_PROCESSING_SIZE) {
// Split the provided collection
final Iterable<List<T>> partitions = Iterables.partition(controllerIds, MAX_PROCESSING_SIZE);
// Preserve the security context because it gets lost when executing
// loading calls in new threads
final SecurityContext context = SecurityContextHolder.getContext();
// Handling remote request in parallel streams
return StreamSupport.stream(partitions.spliterator(), true) //
.flatMap(partition -> withSecurityContext(() -> loadingFunction.apply(partition), context).stream())
.collect(Collectors.toList());
}
return loadingFunction.apply(controllerIds);
}
private static <T> T withSecurityContext(final Supplier<T> callable, final SecurityContext securityContext) {
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
SecurityContextHolder.setContext(securityContext);
return callable.get();
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
/**
* Method to send a message to a RabbitMQ Exchange after a Target was
* deleted.