diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 85cb693b1..4339d3926 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -139,14 +140,14 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { if (!shouldBeProcessed(assignedEvent)) { return; } - LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker."); - distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> { - final Map> softwareModules = getSoftwareModulesWithMetadata( - ds); - targetManagement.getByControllerID(assignedEvent.getActions().keySet()).forEach( - target -> sendUpdateMessageToTarget(assignedEvent.getActions().get(target.getControllerId()), - target, softwareModules)); - }); + + final List filteredTargetList = getTargetsWithoutPendingCancellations( + assignedEvent.getActions().keySet()); + + if (!filteredTargetList.isEmpty()) { + LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker."); + sendUpdateMessageToTarget(assignedEvent, filteredTargetList); + } } /** @@ -164,6 +165,27 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds()); } + private List getTargetsWithoutPendingCancellations(final Set controllerIds) { + return targetManagement.getByControllerID(controllerIds).stream().filter(target -> { + if (hasPendingCancellations(target.getControllerId())) { + LOG.debug("Target {} has pending cancellations. Will not send update message to it.", + target.getControllerId()); + return false; + } + return true; + }).collect(Collectors.toList()); + } + + private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent, + final List targets) { + distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> { + final Map> softwareModules = getSoftwareModulesWithMetadata( + ds); + targets.forEach(target -> sendUpdateMessageToTarget( + assignedEvent.getActions().get(target.getControllerId()), target, softwareModules)); + }); + } + private void sendMultiActionRequestMessages(final String tenant, final List controllerIds) { final Map> softwareModuleMetadata = new HashMap<>(); @@ -361,6 +383,10 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { return serviceMatcher == null || serviceMatcher.isFromSelf(event); } + private boolean hasPendingCancellations(final String controllerId) { + return deploymentManagement.hasPendingCancellations(controllerId); + } + protected void sendCancelMessageToTarget(final String tenant, final String controllerId, final Long actionId, final URI address) { if (!IpUtil.isAmqpUri(address)) { diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AbstractAmqpServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AbstractAmqpServiceIntegrationTest.java index 4aebee725..bb3dc748f 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AbstractAmqpServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AbstractAmqpServiceIntegrationTest.java @@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach; import org.mockito.Mockito; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; import org.springframework.amqp.rabbit.test.RabbitListenerTestHarness; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration; @@ -70,6 +69,7 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt protected static final String TENANT_EXIST = "DEFAULT"; protected static final String CREATED_BY = "CONTROLLER_PLUG_AND_PLAY"; + protected static final String CORRELATION_ID = UUID.randomUUID().toString(); protected ReplyToListener replyToListener; private DeadletterListener deadletterListener; @@ -199,7 +199,6 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt protected void assertDownloadAndInstallMessage(final Set softwareModules, final String controllerId) { assertAssignmentMessage(softwareModules, controllerId, EventTopic.DOWNLOAD_AND_INSTALL); - } protected void assertDownloadMessage(final Set dsModules, final String controllerId) { @@ -319,15 +318,23 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt return createMessage(null, messageProperties); } - protected void createAndSendActionStatusUpdateMessage(final String target, final String tenant, final long actionId, + protected void createAndSendActionStatusUpdateMessage(final String target, final long actionId, final DmfActionStatus status) { + final DmfActionUpdateStatus dmfActionUpdateStatus = new DmfActionUpdateStatus(actionId, status); + + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, dmfActionUpdateStatus); + eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.THING_ID, target); + + getDmfClient().send(eventMessage); + } + + protected Message createUpdateActionEventMessage(final String tenant, final Object payload) { final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant); - messageProperties.getHeaders().put(MessageHeaderKey.THING_ID, target); messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.EVENT.toString()); messageProperties.getHeaders().put(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.toString()); + messageProperties.setCorrelationId(CORRELATION_ID); - final DmfActionUpdateStatus dmfActionUpdateStatus = new DmfActionUpdateStatus(actionId, status); - getDmfClient().send(createMessage(dmfActionUpdateStatus, messageProperties)); + return createMessage(payload, messageProperties); } protected MessageProperties createMessagePropertiesWithTenant(final String tenant) { diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageDispatcherServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageDispatcherServiceIntegrationTest.java index 8f3fa2421..568548c4c 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageDispatcherServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageDispatcherServiceIntegrationTest.java @@ -143,7 +143,7 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1), @Expect(type = TargetAssignDistributionSetEvent.class, count = 2), @Expect(type = CancelTargetAssignmentEvent.class, count = 1), - @Expect(type = ActionCreatedEvent.class, count = 2), @Expect(type = ActionUpdatedEvent.class, count = 1), + @Expect(type = ActionCreatedEvent.class, count = 2), @Expect(type = ActionUpdatedEvent.class, count = 2), @Expect(type = SoftwareModuleCreatedEvent.class, count = 6), @Expect(type = SoftwareModuleUpdatedEvent.class, count = 12), @Expect(type = DistributionSetCreatedEvent.class, count = 2), @@ -153,14 +153,27 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer final DistributionSetAssignmentResult assignmentResult = registerTargetAndAssignDistributionSet(controllerId); final DistributionSet distributionSet2 = testdataFactory.createDistributionSet(); + testdataFactory.addSoftwareModuleMetadata(distributionSet2); + + // first assignment will be canceled -> Open cancellations -> No message through the DMF assignDistributionSet(distributionSet2.getId(), controllerId); - assertDownloadAndInstallMessage(distributionSet2.getModules(), controllerId); + + // should not get the message of the second assignment + assertDownloadAndInstallMessage(assignmentResult.getDistributionSet().getModules(), controllerId); + assertCancelActionMessage(getFirstAssignedActionId(assignmentResult), controllerId); createAndSendThingCreated(controllerId, TENANT_EXIST); waitUntilTargetHasStatus(controllerId, TargetUpdateStatus.PENDING); assertCancelActionMessage(getFirstAssignedActionId(assignmentResult), controllerId); + + // confirm the cancel of the first action should lead to expose the latest action + createAndSendActionStatusUpdateMessage(controllerId, getFirstAssignedActionId(assignmentResult), + DmfActionStatus.CANCELED); + + // verify latest action is exposed + assertDownloadAndInstallMessage(distributionSet2.getModules(), controllerId); } @Test @@ -337,7 +350,7 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer private void updateActionViaDmfClient(final String controllerId, final long actionId, final DmfActionStatus status) { - createAndSendActionStatusUpdateMessage(controllerId, TENANT_EXIST, actionId, status); + createAndSendActionStatusUpdateMessage(controllerId, actionId, status); } private Long assignNewDsToTarget(final String controllerId) { diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java index 709284dc8..bfdb18f8f 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java @@ -30,7 +30,6 @@ import org.eclipse.hawkbit.amqp.AmqpMessageHandlerService; import org.eclipse.hawkbit.amqp.AmqpProperties; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; -import org.eclipse.hawkbit.dmf.amqp.api.MessageType; import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus; import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus; import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate; @@ -63,7 +62,6 @@ import org.eclipse.hawkbit.repository.test.util.WithSpringAuthorityRule; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.amqp.core.Message; -import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Autowired; @@ -78,7 +76,6 @@ import io.qameta.allure.Story; @Feature("Component Tests - Device Management Federation API") @Story("Amqp Message Handler Service") public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServiceIntegrationTest { - private static final String CORRELATION_ID = UUID.randomUUID().toString(); private static final String TARGET_PREFIX = "Dmf_hand_"; @Autowired @@ -299,7 +296,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests null topic message header. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void nullTopicHeader() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, ""); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, ""); eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, null); getDmfClient().send(eventMessage); @@ -310,7 +307,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests null topic message header. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void emptyTopicHeader() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, ""); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, ""); eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, ""); getDmfClient().send(eventMessage); @@ -321,7 +318,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests null topic message header. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void invalidTopicHeader() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, ""); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, ""); eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, "NotExist"); getDmfClient().send(eventMessage); @@ -332,7 +329,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests missing topic message header. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void missingTopicHeader() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, ""); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, ""); eventMessage.getMessageProperties().getHeaders().remove(MessageHeaderKey.TOPIC); getDmfClient().send(eventMessage); @@ -343,7 +340,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests invalid null message content. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void updateActionStatusWithNullContent() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, null); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, null); getDmfClient().send(eventMessage); verifyOneDeadLetterMessage(); } @@ -352,7 +349,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests invalid empty message content. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void updateActionStatusWithEmptyContent() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, ""); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, ""); getDmfClient().send(eventMessage); verifyOneDeadLetterMessage(); } @@ -361,8 +358,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Description("Tests invalid json message content. This message should forwarded to the deadletter queue") @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void updateActionStatusWithInvalidJsonContent() { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, - "Invalid Content"); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "Invalid Content"); getDmfClient().send(eventMessage); verifyOneDeadLetterMessage(); } @@ -372,8 +368,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) }) public void updateActionStatusWithInvalidActionId() { final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(1L, DmfActionStatus.RUNNING); - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, - actionUpdateStatus); + final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, actionUpdateStatus); getDmfClient().send(eventMessage); verifyOneDeadLetterMessage(); } @@ -423,7 +418,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Test @Description("Register a target and send a update action status (download). Verify if the updated action status is correct.") - @ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1), + @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1), @Expect(type = TargetAssignDistributionSetEvent.class, count = 1), @Expect(type = ActionCreatedEvent.class, count = 1), @Expect(type = SoftwareModuleCreatedEvent.class, count = 3), @@ -437,7 +432,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Test @Description("Register a target and send a update action status (error). Verify if the updated action status is correct.") - @ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1), + @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1), @Expect(type = TargetAssignDistributionSetEvent.class, count = 1), @Expect(type = ActionUpdatedEvent.class, count = 1), @Expect(type = ActionCreatedEvent.class, count = 1), @Expect(type = SoftwareModuleCreatedEvent.class, count = 3), @@ -520,7 +515,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Test @Description("Verify receiving a download message if a deployment is done with window configured but before maintenance window start time.") - @ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1), + @ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1), @Expect(type = TargetAssignDistributionSetEvent.class, count = 1), @Expect(type = ActionCreatedEvent.class, count = 1), @Expect(type = SoftwareModuleCreatedEvent.class, count = 3), @@ -616,7 +611,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic final Long actionId = registerTargetAndCancelActionId(controllerId); final Long actionNotExist = actionId + 1; - sendActionUpdateStatus(new DmfActionUpdateStatus(actionNotExist, DmfActionStatus.CANCELED)); + createAndSendActionStatusUpdateMessage(controllerId, actionNotExist, DmfActionStatus.CANCELED); verifyOneDeadLetterMessage(); } @@ -650,7 +645,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic final Long actionId = registerTargetAndCancelActionId(controllerId); - sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.CANCEL_REJECTED)); + createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.CANCEL_REJECTED); assertAction(actionId, 1, Status.RUNNING, Status.CANCELING, Status.CANCEL_REJECTED); } @@ -847,7 +842,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic Long actionId = Long.parseLong(getJsonFieldFromBody(message.getBody(), "actionId")); // Send DOWNLOADED message - sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.DOWNLOADED)); + createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.DOWNLOADED); assertAction(actionId, 1, Status.RUNNING, Status.DOWNLOADED); Mockito.verifyZeroInteractions(getDeadletterListener()); @@ -879,14 +874,14 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic Long actionId = Long.parseLong(getJsonFieldFromBody(message.getBody(), "actionId")); // Send DOWNLOADED message, should result in the action being closed - sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.DOWNLOADED)); + createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.DOWNLOADED); assertAction(actionId, 1, Status.RUNNING, Status.DOWNLOADED); Mockito.verifyZeroInteractions(getDeadletterListener()); verifyAssignedDsAndInstalledDs(controllerId, distributionSet.getId(), null); // Send FINISHED message - sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.FINISHED)); + createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.FINISHED); assertAction(actionId, 2, Status.RUNNING, Status.DOWNLOADED, Status.FINISHED); Mockito.verifyZeroInteractions(getDeadletterListener()); @@ -949,15 +944,10 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic private Long registerTargetAndSendActionStatus(final DmfActionStatus sendActionStatus, final String controllerId) { final DistributionSetAssignmentResult assignmentResult = registerTargetAndAssignDistributionSet(controllerId); final Long actionId = getFirstAssignedActionId(assignmentResult); - sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, sendActionStatus)); + createAndSendActionStatusUpdateMessage(controllerId, actionId, sendActionStatus); return actionId; } - private void sendActionUpdateStatus(final DmfActionUpdateStatus actionStatus) { - final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, actionStatus); - getDmfClient().send(eventMessage); - } - private void registerTargetAndSendAndAssertUpdateActionStatus(final DmfActionStatus sendActionStatus, final Status expectedActionStatus, final String controllerId) { final Long actionId = registerTargetAndSendActionStatus(sendActionStatus, controllerId); @@ -995,15 +985,6 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic }); } - private Message createEventMessage(final String tenant, final EventTopic eventTopic, final Object payload) { - final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant); - messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.EVENT.toString()); - messageProperties.getHeaders().put(MessageHeaderKey.TOPIC, eventTopic.toString()); - messageProperties.setCorrelationId(CORRELATION_ID); - - return createMessage(payload, messageProperties); - } - private void sendUpdateAttributeMessage(final String target, final String tenant, final DmfAttributeUpdate attributeUpdate) { final Message updateMessage = createUpdateAttributesMessage(target, tenant, attributeUpdate); diff --git a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/DeploymentManagement.java b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/DeploymentManagement.java index 88b6a127b..f863a613c 100644 --- a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/DeploymentManagement.java +++ b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/DeploymentManagement.java @@ -505,4 +505,15 @@ public interface DeploymentManagement { @PreAuthorize(SpringEvalExpressions.IS_SYSTEM_CODE) int deleteActionsByStatusAndLastModifiedBefore(@NotNull Set status, long lastModified); + /** + * Checks if there is an action for the device with the given controller ID that + * is in the {@link Action.Status#CANCELING} state. + * + * @param controllerId + * of target + * @return if actions in CANCELING state are present + */ + @PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET) + boolean hasPendingCancellations(@NotEmpty String controllerId); + } diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDeploymentManagement.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDeploymentManagement.java index 6fc3b8bef..82d5b4fc1 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDeploymentManagement.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaDeploymentManagement.java @@ -830,6 +830,12 @@ public class JpaDeploymentManagement extends JpaActionManagement implements Depl return deleteQuery.executeUpdate(); } + @Override + public boolean hasPendingCancellations(String controllerId) { + return actionRepository.existsByTargetControllerIdAndStatusAndActiveIsTrue(controllerId, + Action.Status.CANCELING); + } + private static String getQueryForDeleteActionsByStatusAndLastModifiedBeforeString(final Database database) { return QUERY_DELETE_ACTIONS_BY_STATE_AND_LAST_MODIFIED.getOrDefault(database, QUERY_DELETE_ACTIONS_BY_STATE_AND_LAST_MODIFIED_DEFAULT); diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/OnlineDsAssignmentStrategy.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/OnlineDsAssignmentStrategy.java index 8a3ab91e3..7c67ab117 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/OnlineDsAssignmentStrategy.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/OnlineDsAssignmentStrategy.java @@ -167,7 +167,7 @@ public class OnlineDsAssignmentStrategy extends AbstractDsAssignmentStrategy { private DistributionSetAssignmentResult sendDistributionSetAssignedEvent( final DistributionSetAssignmentResult assignmentResult) { final List filteredActions = filterCancellations(assignmentResult.getAssignedEntity()) - .filter(action -> !hasPendingCancellations(action.getTarget())).collect(Collectors.toList()); + .collect(Collectors.toList()); final DistributionSet set = assignmentResult.getDistributionSet(); sendTargetAssignDistributionSetEvent(set.getTenant(), set.getId(), filteredActions); return assignmentResult; @@ -184,11 +184,6 @@ public class OnlineDsAssignmentStrategy extends AbstractDsAssignmentStrategy { eventPublisherHolder.getApplicationId(), actions.get(0).isMaintenanceWindowAvailable()))); } - private boolean hasPendingCancellations(final Target target) { - return actionRepository.existsByTargetControllerIdAndStatusAndActiveIsTrue(target.getControllerId(), - Status.CANCELING); - } - /** * Helper to fire a {@link MultiActionCancelEvent}. This method may only be * called if the Multi-Assignments feature is enabled. diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DeploymentManagementTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DeploymentManagementTest.java index 78001338b..23f8c2971 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DeploymentManagementTest.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/jpa/DeploymentManagementTest.java @@ -945,8 +945,8 @@ public class DeploymentManagementTest extends AbstractJpaIntegrationTest { @Expect(type = CancelTargetAssignmentEvent.class, count = 4 * 2), @Expect(type = DistributionSetCreatedEvent.class, count = 3), @Expect(type = SoftwareModuleCreatedEvent.class, count = 9), - @Expect(type = TargetAssignDistributionSetEvent.class, count = 2) }) - public void mutipleDeployments() throws InterruptedException { + @Expect(type = TargetAssignDistributionSetEvent.class, count = 3) }) + public void multipleDeployments() throws InterruptedException { final String undeployedTargetPrefix = "undep-T"; final int noOfUndeployedTargets = 5;