Publish target assign event (#1136)

* Throw the TargetAssignDistributionSetEvent even if there are actions in CANCELING state present. Filter the actions on the receiver side. In this case at the DMF.

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Fix deploymentManagementTest

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Add debug logs and fix tests to verify correct DMF message behaviour

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Extend test case for the DMF in case the cancel is confirmed by the device

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Remove unsued import

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Fix review findings by filtering the list of targets first before querying the database for distribution set and software module.

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* flip list verification logic

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>

* Refactor amqp tests

Signed-off-by: Michael Herdt <Michael.Herdt@bosch.io>
This commit is contained in:
Michael Herdt
2021-07-02 19:50:04 +02:00
committed by GitHub
parent a71ab68330
commit 2bdab157cf
8 changed files with 100 additions and 61 deletions

View File

@@ -61,6 +61,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -139,14 +140,14 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
if (!shouldBeProcessed(assignedEvent)) {
return;
}
LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> {
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules = getSoftwareModulesWithMetadata(
ds);
targetManagement.getByControllerID(assignedEvent.getActions().keySet()).forEach(
target -> sendUpdateMessageToTarget(assignedEvent.getActions().get(target.getControllerId()),
target, softwareModules));
});
final List<Target> filteredTargetList = getTargetsWithoutPendingCancellations(
assignedEvent.getActions().keySet());
if (!filteredTargetList.isEmpty()) {
LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
sendUpdateMessageToTarget(assignedEvent, filteredTargetList);
}
}
/**
@@ -164,6 +165,27 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
}
private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
return targetManagement.getByControllerID(controllerIds).stream().filter(target -> {
if (hasPendingCancellations(target.getControllerId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
return true;
}).collect(Collectors.toList());
}
private void sendUpdateMessageToTarget(final TargetAssignDistributionSetEvent assignedEvent,
final List<Target> targets) {
distributionSetManagement.get(assignedEvent.getDistributionSetId()).ifPresent(ds -> {
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules = getSoftwareModulesWithMetadata(
ds);
targets.forEach(target -> sendUpdateMessageToTarget(
assignedEvent.getActions().get(target.getControllerId()), target, softwareModules));
});
}
private void sendMultiActionRequestMessages(final String tenant, final List<String> controllerIds) {
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModuleMetadata = new HashMap<>();
@@ -361,6 +383,10 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
return serviceMatcher == null || serviceMatcher.isFromSelf(event);
}
private boolean hasPendingCancellations(final String controllerId) {
return deploymentManagement.hasPendingCancellations(controllerId);
}
protected void sendCancelMessageToTarget(final String tenant, final String controllerId, final Long actionId,
final URI address) {
if (!IpUtil.isAmqpUri(address)) {

View File

@@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.junit.BrokerRunningSupport;
import org.springframework.amqp.rabbit.test.RabbitListenerTestHarness;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.test.binder.TestSupportBinderAutoConfiguration;
@@ -70,6 +69,7 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt
protected static final String TENANT_EXIST = "DEFAULT";
protected static final String CREATED_BY = "CONTROLLER_PLUG_AND_PLAY";
protected static final String CORRELATION_ID = UUID.randomUUID().toString();
protected ReplyToListener replyToListener;
private DeadletterListener deadletterListener;
@@ -199,7 +199,6 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt
protected void assertDownloadAndInstallMessage(final Set<SoftwareModule> softwareModules,
final String controllerId) {
assertAssignmentMessage(softwareModules, controllerId, EventTopic.DOWNLOAD_AND_INSTALL);
}
protected void assertDownloadMessage(final Set<SoftwareModule> dsModules, final String controllerId) {
@@ -319,15 +318,23 @@ public abstract class AbstractAmqpServiceIntegrationTest extends AbstractAmqpInt
return createMessage(null, messageProperties);
}
protected void createAndSendActionStatusUpdateMessage(final String target, final String tenant, final long actionId,
protected void createAndSendActionStatusUpdateMessage(final String target, final long actionId,
final DmfActionStatus status) {
final DmfActionUpdateStatus dmfActionUpdateStatus = new DmfActionUpdateStatus(actionId, status);
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, dmfActionUpdateStatus);
eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.THING_ID, target);
getDmfClient().send(eventMessage);
}
protected Message createUpdateActionEventMessage(final String tenant, final Object payload) {
final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant);
messageProperties.getHeaders().put(MessageHeaderKey.THING_ID, target);
messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.EVENT.toString());
messageProperties.getHeaders().put(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.toString());
messageProperties.setCorrelationId(CORRELATION_ID);
final DmfActionUpdateStatus dmfActionUpdateStatus = new DmfActionUpdateStatus(actionId, status);
getDmfClient().send(createMessage(dmfActionUpdateStatus, messageProperties));
return createMessage(payload, messageProperties);
}
protected MessageProperties createMessagePropertiesWithTenant(final String tenant) {

View File

@@ -143,7 +143,7 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1),
@Expect(type = TargetAssignDistributionSetEvent.class, count = 2),
@Expect(type = CancelTargetAssignmentEvent.class, count = 1),
@Expect(type = ActionCreatedEvent.class, count = 2), @Expect(type = ActionUpdatedEvent.class, count = 1),
@Expect(type = ActionCreatedEvent.class, count = 2), @Expect(type = ActionUpdatedEvent.class, count = 2),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 6),
@Expect(type = SoftwareModuleUpdatedEvent.class, count = 12),
@Expect(type = DistributionSetCreatedEvent.class, count = 2),
@@ -153,14 +153,27 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer
final DistributionSetAssignmentResult assignmentResult = registerTargetAndAssignDistributionSet(controllerId);
final DistributionSet distributionSet2 = testdataFactory.createDistributionSet();
testdataFactory.addSoftwareModuleMetadata(distributionSet2);
// first assignment will be canceled -> Open cancellations -> No message through the DMF
assignDistributionSet(distributionSet2.getId(), controllerId);
assertDownloadAndInstallMessage(distributionSet2.getModules(), controllerId);
// should not get the message of the second assignment
assertDownloadAndInstallMessage(assignmentResult.getDistributionSet().getModules(), controllerId);
assertCancelActionMessage(getFirstAssignedActionId(assignmentResult), controllerId);
createAndSendThingCreated(controllerId, TENANT_EXIST);
waitUntilTargetHasStatus(controllerId, TargetUpdateStatus.PENDING);
assertCancelActionMessage(getFirstAssignedActionId(assignmentResult), controllerId);
// confirm the cancel of the first action should lead to expose the latest action
createAndSendActionStatusUpdateMessage(controllerId, getFirstAssignedActionId(assignmentResult),
DmfActionStatus.CANCELED);
// verify latest action is exposed
assertDownloadAndInstallMessage(distributionSet2.getModules(), controllerId);
}
@Test
@@ -337,7 +350,7 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer
private void updateActionViaDmfClient(final String controllerId, final long actionId,
final DmfActionStatus status) {
createAndSendActionStatusUpdateMessage(controllerId, TENANT_EXIST, actionId, status);
createAndSendActionStatusUpdateMessage(controllerId, actionId, status);
}
private Long assignNewDsToTarget(final String controllerId) {

View File

@@ -30,7 +30,6 @@ import org.eclipse.hawkbit.amqp.AmqpMessageHandlerService;
import org.eclipse.hawkbit.amqp.AmqpProperties;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus;
import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus;
import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate;
@@ -63,7 +62,6 @@ import org.eclipse.hawkbit.repository.test.util.WithSpringAuthorityRule;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
@@ -78,7 +76,6 @@ import io.qameta.allure.Story;
@Feature("Component Tests - Device Management Federation API")
@Story("Amqp Message Handler Service")
public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServiceIntegrationTest {
private static final String CORRELATION_ID = UUID.randomUUID().toString();
private static final String TARGET_PREFIX = "Dmf_hand_";
@Autowired
@@ -299,7 +296,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests null topic message header. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void nullTopicHeader() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, "");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "");
eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, null);
getDmfClient().send(eventMessage);
@@ -310,7 +307,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests null topic message header. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void emptyTopicHeader() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, "");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "");
eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, "");
getDmfClient().send(eventMessage);
@@ -321,7 +318,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests null topic message header. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void invalidTopicHeader() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, "");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "");
eventMessage.getMessageProperties().getHeaders().put(MessageHeaderKey.TOPIC, "NotExist");
getDmfClient().send(eventMessage);
@@ -332,7 +329,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests missing topic message header. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void missingTopicHeader() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, "");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "");
eventMessage.getMessageProperties().getHeaders().remove(MessageHeaderKey.TOPIC);
getDmfClient().send(eventMessage);
@@ -343,7 +340,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests invalid null message content. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void updateActionStatusWithNullContent() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, null);
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, null);
getDmfClient().send(eventMessage);
verifyOneDeadLetterMessage();
}
@@ -352,7 +349,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests invalid empty message content. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void updateActionStatusWithEmptyContent() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, "");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "");
getDmfClient().send(eventMessage);
verifyOneDeadLetterMessage();
}
@@ -361,8 +358,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Description("Tests invalid json message content. This message should forwarded to the deadletter queue")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void updateActionStatusWithInvalidJsonContent() {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS,
"Invalid Content");
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, "Invalid Content");
getDmfClient().send(eventMessage);
verifyOneDeadLetterMessage();
}
@@ -372,8 +368,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
public void updateActionStatusWithInvalidActionId() {
final DmfActionUpdateStatus actionUpdateStatus = new DmfActionUpdateStatus(1L, DmfActionStatus.RUNNING);
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS,
actionUpdateStatus);
final Message eventMessage = createUpdateActionEventMessage(TENANT_EXIST, actionUpdateStatus);
getDmfClient().send(eventMessage);
verifyOneDeadLetterMessage();
}
@@ -423,7 +418,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Test
@Description("Register a target and send a update action status (download). Verify if the updated action status is correct.")
@ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1),
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1),
@Expect(type = TargetAssignDistributionSetEvent.class, count = 1),
@Expect(type = ActionCreatedEvent.class, count = 1),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 3),
@@ -437,7 +432,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Test
@Description("Register a target and send a update action status (error). Verify if the updated action status is correct.")
@ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1),
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1),
@Expect(type = TargetAssignDistributionSetEvent.class, count = 1),
@Expect(type = ActionUpdatedEvent.class, count = 1), @Expect(type = ActionCreatedEvent.class, count = 1),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 3),
@@ -520,7 +515,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
@Test
@Description("Verify receiving a download message if a deployment is done with window configured but before maintenance window start time.")
@ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 1),
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1),
@Expect(type = TargetAssignDistributionSetEvent.class, count = 1),
@Expect(type = ActionCreatedEvent.class, count = 1),
@Expect(type = SoftwareModuleCreatedEvent.class, count = 3),
@@ -616,7 +611,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
final Long actionId = registerTargetAndCancelActionId(controllerId);
final Long actionNotExist = actionId + 1;
sendActionUpdateStatus(new DmfActionUpdateStatus(actionNotExist, DmfActionStatus.CANCELED));
createAndSendActionStatusUpdateMessage(controllerId, actionNotExist, DmfActionStatus.CANCELED);
verifyOneDeadLetterMessage();
}
@@ -650,7 +645,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
final Long actionId = registerTargetAndCancelActionId(controllerId);
sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.CANCEL_REJECTED));
createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.CANCEL_REJECTED);
assertAction(actionId, 1, Status.RUNNING, Status.CANCELING, Status.CANCEL_REJECTED);
}
@@ -847,7 +842,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
Long actionId = Long.parseLong(getJsonFieldFromBody(message.getBody(), "actionId"));
// Send DOWNLOADED message
sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.DOWNLOADED));
createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.DOWNLOADED);
assertAction(actionId, 1, Status.RUNNING, Status.DOWNLOADED);
Mockito.verifyZeroInteractions(getDeadletterListener());
@@ -879,14 +874,14 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
Long actionId = Long.parseLong(getJsonFieldFromBody(message.getBody(), "actionId"));
// Send DOWNLOADED message, should result in the action being closed
sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.DOWNLOADED));
createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.DOWNLOADED);
assertAction(actionId, 1, Status.RUNNING, Status.DOWNLOADED);
Mockito.verifyZeroInteractions(getDeadletterListener());
verifyAssignedDsAndInstalledDs(controllerId, distributionSet.getId(), null);
// Send FINISHED message
sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, DmfActionStatus.FINISHED));
createAndSendActionStatusUpdateMessage(controllerId, actionId, DmfActionStatus.FINISHED);
assertAction(actionId, 2, Status.RUNNING, Status.DOWNLOADED, Status.FINISHED);
Mockito.verifyZeroInteractions(getDeadletterListener());
@@ -949,15 +944,10 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
private Long registerTargetAndSendActionStatus(final DmfActionStatus sendActionStatus, final String controllerId) {
final DistributionSetAssignmentResult assignmentResult = registerTargetAndAssignDistributionSet(controllerId);
final Long actionId = getFirstAssignedActionId(assignmentResult);
sendActionUpdateStatus(new DmfActionUpdateStatus(actionId, sendActionStatus));
createAndSendActionStatusUpdateMessage(controllerId, actionId, sendActionStatus);
return actionId;
}
private void sendActionUpdateStatus(final DmfActionUpdateStatus actionStatus) {
final Message eventMessage = createEventMessage(TENANT_EXIST, EventTopic.UPDATE_ACTION_STATUS, actionStatus);
getDmfClient().send(eventMessage);
}
private void registerTargetAndSendAndAssertUpdateActionStatus(final DmfActionStatus sendActionStatus,
final Status expectedActionStatus, final String controllerId) {
final Long actionId = registerTargetAndSendActionStatus(sendActionStatus, controllerId);
@@ -995,15 +985,6 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
});
}
private Message createEventMessage(final String tenant, final EventTopic eventTopic, final Object payload) {
final MessageProperties messageProperties = createMessagePropertiesWithTenant(tenant);
messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.EVENT.toString());
messageProperties.getHeaders().put(MessageHeaderKey.TOPIC, eventTopic.toString());
messageProperties.setCorrelationId(CORRELATION_ID);
return createMessage(payload, messageProperties);
}
private void sendUpdateAttributeMessage(final String target, final String tenant,
final DmfAttributeUpdate attributeUpdate) {
final Message updateMessage = createUpdateAttributesMessage(target, tenant, attributeUpdate);