Feature dmf target delete aware (#474)

* Implemented new function to create and send a thing created message if a
delete event is published

- added mock and integration tests 

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Switched target address form URI to String and fixed test

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Refactoring and removed TODOs

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Fixed javadoc and description

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Used Target from API instead from JpaTarget 

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Refactoring after review

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Small refactoring - fixed typos

Signed-off-by: Jonathan Philip Knoblauch <JonathanPhilip.Knoblauch@bosch-si.com>

* Resolve Merge conflicts

Signed-off-by: Melanie Retter <melanie.retter@bosch-si.com>
This commit is contained in:
Jonathan Knoblauch
2017-04-25 16:38:18 +02:00
committed by Michael Hirsch
parent a19364c635
commit 574fda1101
11 changed files with 222 additions and 51 deletions

View File

@@ -29,6 +29,7 @@ import org.eclipse.hawkbit.dmf.json.model.SoftwareModule;
import org.eclipse.hawkbit.repository.SystemManagement;
import org.eclipse.hawkbit.repository.TargetManagement;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.security.SystemSecurityContext;
@@ -102,7 +103,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
*/
@EventListener(classes = TargetAssignDistributionSetEvent.class)
public void targetAssignDistributionSet(final TargetAssignDistributionSetEvent assignedEvent) {
if (isFromSelf(assignedEvent)) {
if (isNotFromSelf(assignedEvent)) {
return;
}
@@ -135,7 +136,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
}
final Message message = getMessageConverter().toMessage(downloadAndUpdateRequest,
createConnectorMessageProperties(tenant, target.getControllerId(), EventTopic.DOWNLOAD_AND_INSTALL));
createConnectorMessagePropertiesEvent(tenant, target.getControllerId(),
EventTopic.DOWNLOAD_AND_INSTALL));
amqpSenderService.sendMessage(message, targetAdress);
}
@@ -148,7 +150,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
*/
@EventListener(classes = CancelTargetAssignmentEvent.class)
public void targetCancelAssignmentToDistributionSet(final CancelTargetAssignmentEvent cancelEvent) {
if (isFromSelf(cancelEvent)) {
if (isNotFromSelf(cancelEvent)) {
return;
}
@@ -156,7 +158,37 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
cancelEvent.getActionId(), cancelEvent.getEntity().getAddress());
}
private boolean isFromSelf(final RemoteApplicationEvent event) {
/**
* Method to send a message to a RabbitMQ Exchange after a Target was
* deleted.
*
* @param deleteEvent
* the TargetDeletedEvent which holds the necessary data for
* sending a target delete message.
*/
@EventListener(classes = TargetDeletedEvent.class)
public void targetDelete(final TargetDeletedEvent deleteEvent) {
if (isNotFromSelf(deleteEvent)) {
return;
}
sendDeleteMessage(deleteEvent.getTenant(), deleteEvent.getControllerId(), deleteEvent.getTargetAddress());
}
void sendDeleteMessage(final String tenant, final String controllerId, final String targetAddress) {
if (!hasValidAddress(targetAddress)) {
return;
}
final Message message = new Message(null, createConnectorMessagePropertiesDeleteThing(tenant, controllerId));
amqpSenderService.sendMessage(message, URI.create(targetAddress));
}
private boolean hasValidAddress(final String targetAddress) {
return targetAddress != null && IpUtil.isAmqpUri(URI.create(targetAddress));
}
private boolean isNotFromSelf(final RemoteApplicationEvent event) {
return serviceMatcher != null && !serviceMatcher.isFromSelf(event);
}
@@ -166,26 +198,33 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
return;
}
final Message message = getMessageConverter().toMessage(actionId,
createConnectorMessageProperties(tenant, controllerId, EventTopic.CANCEL_DOWNLOAD));
createConnectorMessagePropertiesEvent(tenant, controllerId, EventTopic.CANCEL_DOWNLOAD));
amqpSenderService.sendMessage(message, address);
}
private static MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId,
final EventTopic topic) {
final MessageProperties messageProperties = createMessageProperties();
private static MessageProperties createConnectorMessagePropertiesEvent(final String tenant,
final String controllerId, final EventTopic topic) {
final MessageProperties messageProperties = createConnectorMessageProperties(tenant, controllerId);
messageProperties.setHeader(MessageHeaderKey.TOPIC, topic);
messageProperties.setHeader(MessageHeaderKey.THING_ID, controllerId);
messageProperties.setHeader(MessageHeaderKey.TENANT, tenant);
messageProperties.setHeader(MessageHeaderKey.TYPE, MessageType.EVENT);
return messageProperties;
}
private static MessageProperties createMessageProperties() {
private static MessageProperties createConnectorMessagePropertiesDeleteThing(final String tenant,
final String controllerId) {
final MessageProperties messageProperties = createConnectorMessageProperties(tenant, controllerId);
messageProperties.setHeader(MessageHeaderKey.TYPE, MessageType.THING_DELETED);
return messageProperties;
}
private static MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId) {
final MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader(MessageHeaderKey.CONTENT_TYPE, MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader(MessageHeaderKey.THING_ID, controllerId);
messageProperties.setHeader(MessageHeaderKey.TENANT, tenant);
return messageProperties;
}

View File

@@ -32,6 +32,7 @@ import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.eclipse.hawkbit.dmf.json.model.DownloadAndUpdateRequest;
import org.eclipse.hawkbit.repository.SystemManagement;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration;
import org.eclipse.hawkbit.repository.model.Action;
@@ -110,7 +111,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
}
@Test
@Description("Verfies that download and install event with no software modul works")
@Description("Verifies that download and install event with no software modul works")
public void testSendDownloadRequesWithEmptySoftwareModules() {
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
@@ -136,7 +137,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
}
@Test
@Description("Verfies that download and install event with 3 software moduls and no artifacts works")
@Description("Verifies that download and install event with 3 software moduls and no artifacts works")
public void testSendDownloadRequesWithSoftwareModulesAndNoArtifacts() {
final DistributionSet createDistributionSet = testdataFactory
.createDistributionSet(UUID.randomUUID().toString());
@@ -169,7 +170,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
}
@Test
@Description("Verfies that download and install event with software moduls and artifacts works")
@Description("Verifies that download and install event with software moduls and artifacts works")
public void testSendDownloadRequest() {
DistributionSet dsA = testdataFactory.createDistributionSet(UUID.randomUUID().toString());
SoftwareModule module = dsA.getModules().iterator().next();
@@ -216,7 +217,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
}
@Test
@Description("Verfies that send cancel event works")
@Description("Verifies that send cancel event works")
public void testSendCancelRequest() {
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent(
testTarget, 1L, serviceMatcher.getServiceId());
@@ -228,13 +229,71 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
}
@Test
@Description("Verifies that sending a delete message when receiving a delete event works.")
public void sendDeleteRequest() {
// setup
final String amqpUri = "amqp://anyhost";
final TargetDeletedEvent targetDeletedEvent = new TargetDeletedEvent(TENANT, 1L, CONTROLLER_ID, amqpUri,
Target.class.getName(), serviceMatcher.getServiceId());
// test
amqpMessageDispatcherService.targetDelete(targetDeletedEvent);
// verify
final Message sendMessage = createArgumentCapture(URI.create(amqpUri));
assertDeleteMessage(sendMessage);
}
@Test
@Description("Verifies that a delete message is not send if the address is not an amqp address.")
public void sendDeleteRequestWithNoAmqpAdress() {
// setup
final String noAmqpUri = "http://anyhost";
final TargetDeletedEvent targetDeletedEvent = new TargetDeletedEvent(TENANT, 1L, CONTROLLER_ID, noAmqpUri,
Target.class.getName(), serviceMatcher.getServiceId());
// test
amqpMessageDispatcherService.targetDelete(targetDeletedEvent);
// verify
Mockito.verifyZeroInteractions(senderService);
}
@Test
@Description("Verfies that a delete message is not send if the address is null.")
public void sendDeleteRequestWithNullAdress() {
// setup
final String noAmqpUri = null;
final TargetDeletedEvent targetDeletedEvent = new TargetDeletedEvent(TENANT, 1L, CONTROLLER_ID, noAmqpUri,
Target.class.getName(), serviceMatcher.getServiceId());
// test
amqpMessageDispatcherService.targetDelete(targetDeletedEvent);
// verify
Mockito.verifyZeroInteractions(senderService);
}
private void assertCancelMessage(final Message sendMessage) {
assertEventMessage(sendMessage);
final Long actionId = convertMessage(sendMessage, Long.class);
assertEquals("Action ID should be 1", actionId, Long.valueOf(1));
assertEquals("The topc in the message should be a CANCEL_DOWNLOAD value", EventTopic.CANCEL_DOWNLOAD,
sendMessage.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC));
}
private void assertDeleteMessage(final Message sendMessage) {
assertNotNull(sendMessage);
assertThat(sendMessage.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID))
.isEqualTo(CONTROLLER_ID);
assertThat(sendMessage.getMessageProperties().getHeaders().get(MessageHeaderKey.TENANT)).isEqualTo(TENANT);
assertThat(sendMessage.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE))
.isEqualTo(MessageType.THING_DELETED);
}
private DownloadAndUpdateRequest assertDownloadAndInstallMessage(final Message sendMessage, final Long action) {

View File

@@ -13,6 +13,7 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetPollEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
@@ -91,7 +92,17 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AmqpServiceInte
createAndSendTarget(TENANT_EXIST);
waitUntilTargetStatusIsPending();
assertCancelActionMessage(actionId);
}
@Test
@Description("Verify that when a target is deleted a target delete message is send.")
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 1),
@Expect(type = TargetPollEvent.class, count = 1), @Expect(type = TargetDeletedEvent.class, count = 1) })
public void sendDeleteMessage() {
registerAndAssertTargetWithExistingTenant(REGISTER_TARGET, 1);
targetManagement.deleteTarget(REGISTER_TARGET);
assertDeleteMessage(REGISTER_TARGET);
}
private void waitUntilTargetStatusIsPending() {

View File

@@ -41,7 +41,7 @@ import org.springframework.amqp.rabbit.test.RabbitListenerTestHarness;
import org.springframework.beans.factory.annotation.Autowired;
/**
*
*
* Common class for {@link AmqpMessageHandlerServiceIntegrationTest} and
* {@link AmqpMessageDispatcherServiceIntegrationTest}.
*/
@@ -121,6 +121,17 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration
assertThat(actionUpdateStatus).isEqualTo(actionId);
}
protected void assertDeleteMessage(final String target) {
verifyReplyToListener();
final Message replyMessage = replyToListener.getDeleteMessages().get(target);
assertAllTargetsCount(0);
final Map<String, Object> headers = replyMessage.getMessageProperties().getHeaders();
assertThat(headers.get(MessageHeaderKey.THING_ID)).isEqualTo(target);
assertThat(headers.get(MessageHeaderKey.TENANT)).isEqualTo(TENANT_EXIST);
assertThat(headers.get(MessageHeaderKey.TYPE)).isEqualTo(MessageType.THING_DELETED.toString());
}
protected void assertDownloadAndInstallMessage(
final Set<org.eclipse.hawkbit.repository.model.SoftwareModule> dsModules) {
final Message replyMessage = assertReplyMessageHeader(EventTopic.DOWNLOAD_AND_INSTALL);
@@ -166,7 +177,7 @@ public abstract class AmqpServiceIntegrationTest extends AbstractAmqpIntegration
private Message assertReplyMessageHeader(final EventTopic eventTopic) {
verifyReplyToListener();
final Message replyMessage = replyToListener.getMessages().get(eventTopic);
final Message replyMessage = replyToListener.getEventTopicMessages().get(eventTopic);
assertAllTargetsCount(1);
final Map<String, Object> headers = replyMessage.getMessageProperties().getHeaders();
assertThat(headers.get(MessageHeaderKey.TOPIC)).isEqualTo(eventTopic.toString());

View File

@@ -8,12 +8,15 @@
*/
package org.eclipse.hawkbit.integration.listener;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.hawkbit.AmqpTestConfiguration;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -21,17 +24,42 @@ public class ReplyToListener implements TestRabbitListener {
public static final String LISTENER_ID = "replyto";
private final Map<EventTopic, Message> messages = new HashMap<>();
private final Map<EventTopic, Message> eventTopicMessages = new HashMap<>();
private final Map<String, Message> deleteMessages = new HashMap<>();
@Override
@RabbitListener(id = LISTENER_ID, queues = AmqpTestConfiguration.REPLY_TO_QUEUE)
public void handleMessage(Message message) {
final EventTopic eventTopic = EventTopic
.valueOf(message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC).toString());
messages.put(eventTopic, message);
public void handleMessage(final Message message) {
final MessageType messageType = MessageType
.valueOf(message.getMessageProperties().getHeaders().get(MessageHeaderKey.TYPE).toString());
if (messageType == MessageType.EVENT) {
final EventTopic eventTopic = EventTopic
.valueOf(message.getMessageProperties().getHeaders().get(MessageHeaderKey.TOPIC).toString());
eventTopicMessages.put(eventTopic, message);
return;
}
if (messageType == MessageType.THING_DELETED) {
final String targetName = message.getMessageProperties().getHeaders().get(MessageHeaderKey.THING_ID)
.toString();
deleteMessages.put(targetName, message);
return;
}
// if message type is not EVENT or THING_DELETED something unexpected
// happened
fail("Unexpected message type");
}
public Map<EventTopic, Message> getMessages() {
return messages;
public Map<EventTopic, Message> getEventTopicMessages() {
return eventTopicMessages;
}
public Map<String, Message> getDeleteMessages() {
return deleteMessages;
}
}

View File

@@ -10,8 +10,6 @@ package org.eclipse.hawkbit.dmf.amqp.api;
/**
* The amqp message types which can be handled.
*
*
*
*/
public enum MessageType {
@@ -22,8 +20,13 @@ public enum MessageType {
EVENT,
/**
* the thing created type.
* The thing created type.
*/
THING_CREATED,
/**
* The thing deleted type.
*/
THING_DELETED,
}

View File

@@ -16,7 +16,9 @@ import org.eclipse.hawkbit.repository.model.Target;
*/
public class TargetDeletedEvent extends RemoteIdEvent {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;
private String controllerId;
private String targetAddress;
/**
* Default constructor.
@@ -26,20 +28,33 @@ public class TargetDeletedEvent extends RemoteIdEvent {
}
/**
* Constructor for json serialization.
*
*
* @param tenant
* the tenant
* @param entityId
* the entity id
* @param controllerId
* the controllerId of the target
* @param targetAddress
* the target address
* @param entityClass
* the entity class
* @param applicationId
* the origin application id
*/
public TargetDeletedEvent(final String tenant, final Long entityId, final String entityClass,
final String applicationId) {
public TargetDeletedEvent(final String tenant, final Long entityId, final String controllerId,
final String targetAddress, final String entityClass, final String applicationId) {
super(entityId, tenant, entityClass, applicationId);
this.controllerId = controllerId;
this.targetAddress = targetAddress;
}
public String getControllerId() {
return controllerId;
}
public String getTargetAddress() {
return targetAddress;
}
}

View File

@@ -8,6 +8,7 @@
*/
package org.eclipse.hawkbit.repository.jpa;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -174,8 +175,10 @@ public class JpaTargetManagement implements TargetManagement {
targetRepository.deleteByIdIn(targetIDs);
targetIDs.forEach(targetId -> eventPublisher.publishEvent(new TargetDeletedEvent(tenantAware.getCurrentTenant(),
targetId, JpaTarget.class.getName(), applicationContext.getId())));
targets.forEach(target -> eventPublisher.publishEvent(
new TargetDeletedEvent(tenantAware.getCurrentTenant(), target.getId(), target.getControllerId(),
Optional.ofNullable(target.getAddress()).map(URI::toString).orElse(null),
JpaTarget.class.getName(), applicationContext.getId())));
}
@Override

View File

@@ -396,7 +396,8 @@ public class JpaTarget extends AbstractJpaNamedEntity implements Target, EventAw
@Override
public void fireDeleteEvent(final DescriptorEvent descriptorEvent) {
EventPublisherHolder.getInstance().getEventPublisher().publishEvent(new TargetDeletedEvent(getTenant(), getId(),
getClass().getName(), EventPublisherHolder.getInstance().getApplicationId()));
EventPublisherHolder.getInstance().getEventPublisher()
.publishEvent(new TargetDeletedEvent(getTenant(), getId(), getControllerId(), address,
getClass().getName(), EventPublisherHolder.getInstance().getApplicationId()));
}
}

View File

@@ -35,8 +35,12 @@ public class RemoteIdEventTest extends AbstractRemoteEventTest {
private static String NODE = "Node";
private static String CONTROLLER_ID = "controller911";
private static String ADDRESS = "amqp://anyhost";
@Test
@Description("Verifies that the is ds id correct reloaded")
@Description("Verifies that the ds id is correct reloaded")
public void testDistributionSetDeletedEvent() {
assertAndCreateRemoteEvent(DistributionSetDeletedEvent.class);
}
@@ -50,7 +54,9 @@ public class RemoteIdEventTest extends AbstractRemoteEventTest {
@Test
@Description("Verifies that the target id is correct reloaded")
public void testTargetDeletedEvent() {
assertAndCreateRemoteEvent(TargetDeletedEvent.class);
final TargetDeletedEvent deletedEvent = new TargetDeletedEvent(TENANT, ENTITY_ID, CONTROLLER_ID, ADDRESS,
ENTIY_CLASS, NODE);
assertEntity(deletedEvent);
}
@Test
@@ -85,23 +91,19 @@ public class RemoteIdEventTest extends AbstractRemoteEventTest {
}
}
protected RemoteIdEvent assertEntity(final RemoteIdEvent event) {
protected void assertEntity(final RemoteIdEvent event) {
assertThat(event.getEntityId()).isSameAs(ENTITY_ID);
RemoteIdEvent underTestCreatedEvent = (RemoteIdEvent) createProtoStuffEvent(event);
assertDeserializeEvent(underTestCreatedEvent);
final RemoteIdEvent protoStuffEvent = (RemoteIdEvent) createProtoStuffEvent(event);
assertDeserializeEvent(protoStuffEvent, event);
underTestCreatedEvent = (RemoteIdEvent) createJacksonEvent(event);
assertDeserializeEvent(underTestCreatedEvent);
return underTestCreatedEvent;
final RemoteIdEvent jacksonEvent = (RemoteIdEvent) createJacksonEvent(event);
assertDeserializeEvent(jacksonEvent, event);
}
private void assertDeserializeEvent(final RemoteIdEvent underTestCreatedEvent) {
assertThat(underTestCreatedEvent.getEntityId()).isEqualTo(ENTITY_ID);
assertThat(underTestCreatedEvent.getTenant()).isEqualTo(TENANT);
assertThat(underTestCreatedEvent.getEntityClass()).isEqualTo(ENTIY_CLASS);
assertThat(underTestCreatedEvent.getOriginService()).isEqualTo(NODE);
private void assertDeserializeEvent(final RemoteIdEvent underTestCreatedEvent, final RemoteIdEvent event) {
// gets added because events inherit from of java.util.EventObject
assertThat(underTestCreatedEvent).isEqualToIgnoringGivenFields(event, "source");
}
}

View File

@@ -1,4 +1,3 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*