diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index c8617b4cd..9c36c5d7c 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -35,6 +35,7 @@ import org.eclipse.hawkbit.repository.RepositoryConstants; import org.eclipse.hawkbit.repository.TenantConfigurationManagement; import org.eclipse.hawkbit.repository.UpdateMode; import org.eclipse.hawkbit.repository.builder.ActionStatusCreate; +import org.eclipse.hawkbit.repository.exception.EntityAlreadyExistsException; import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.ActionProperties; @@ -73,7 +74,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { private final AmqpMessageDispatcherService amqpMessageDispatcherService; - private final ControllerManagement controllerManagement; + private ControllerManagement controllerManagement; private final EntityFactory entityFactory; @@ -92,6 +93,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * for target repo access * @param entityFactory * to create entities + * @param systemSecurityContext + * the system Security Context + * @param tenantConfigurationManagement + * the tenant configuration Management */ public AmqpMessageHandlerService(final RabbitTemplate rabbitTemplate, final AmqpMessageDispatcherService amqpMessageDispatcherService, @@ -202,11 +207,14 @@ public class AmqpMessageHandlerService extends BaseAmqpService { logAndThrowMessageError(message, "No ReplyTo was set for the createThing message."); } - final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo); - final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri); - LOG.debug("Target {} reported online state.", thingId); - - sendUpdateCommandToTarget(target); + try { + final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo); + final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri); + LOG.debug("Target {} reported online state.", thingId); + sendUpdateCommandToTarget(target); + } catch (EntityAlreadyExistsException e) { + throw new AmqpRejectAndDontRequeueException("Target already registered, message will be ignored!", e); + } } private void sendUpdateCommandToTarget(final Target target) { @@ -333,7 +341,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { // Exception squid:MethodCyclomaticComplexity - false positive, is a simple // mapping @SuppressWarnings("squid:MethodCyclomaticComplexity") - private Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus, + private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus, final Action action) { Status status = null; switch (actionUpdateStatus.getActionStatus()) { @@ -371,7 +379,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return status; } - private Status handleCancelRejectedState(final Message message, final Action action) { + private static Status handleCancelRejectedState(final Message message, final Action action) { if (action.isCancelingOrCanceled()) { return Status.CANCEL_REJECTED; } @@ -418,4 +426,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService { .runAsSystem(() -> tenantConfigurationManagement.getConfigurationValue(key, valueType).getValue()); } -} \ No newline at end of file + // for testing + public void setControllerManagement(final ControllerManagement controllerManagement) { + this.controllerManagement = controllerManagement; + } +} diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java index 44305a3ab..c4ab51da0 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/integration/AmqpMessageHandlerServiceIntegrationTest.java @@ -10,9 +10,13 @@ package org.eclipse.hawkbit.integration; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.hawkbit.repository.model.Action.ActionType.DOWNLOAD_ONLY; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -22,6 +26,7 @@ import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; +import org.eclipse.hawkbit.amqp.AmqpMessageHandlerService; import org.eclipse.hawkbit.amqp.AmqpProperties; import org.eclipse.hawkbit.dmf.amqp.api.EventTopic; import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey; @@ -30,6 +35,7 @@ import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus; import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus; import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate; import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode; +import org.eclipse.hawkbit.repository.ControllerManagement; import org.eclipse.hawkbit.repository.RepositoryConstants; import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent; import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent; @@ -42,6 +48,7 @@ import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleCreatedE import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleUpdatedEvent; import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent; import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent; +import org.eclipse.hawkbit.repository.exception.EntityAlreadyExistsException; import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet; import org.eclipse.hawkbit.repository.jpa.model.JpaTarget; import org.eclipse.hawkbit.repository.model.Action.Status; @@ -75,6 +82,9 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic @Autowired private AmqpProperties amqpProperties; + + @Autowired + private AmqpMessageHandlerService amqpMessageHandlerService; @Test @Description("Tests DMF PING request and expected reponse.") @@ -882,6 +892,31 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic verifyAssignedDsAndInstalledDs(controllerId, distributionSet.getId(), distributionSet.getId()); } + @Test + @Description("Messages that result into certain exceptions being raised should not be requeued. This message should forwarded to the deadletter queue") + @ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 0)}) + public void ignoredExceptionTypesShouldNotBeRequeued() { + final ControllerManagement mockedControllerManagement = Mockito.mock(ControllerManagement.class); + + final List> exceptionsThatShouldNotBeRequeued = Arrays + .asList(IllegalArgumentException.class, EntityAlreadyExistsException.class); + final String controllerId = "dummy_target"; + + try { + for (Class exceptionClass : exceptionsThatShouldNotBeRequeued) { + doThrow(exceptionClass).when(mockedControllerManagement) + .findOrRegisterTargetIfItDoesNotExist(eq(controllerId), any()); + + amqpMessageHandlerService.setControllerManagement(mockedControllerManagement); + createAndSendThingCreated(controllerId, TENANT_EXIST); + verifyOneDeadLetterMessage(); + assertThat(targetManagement.getByControllerID(controllerId)).isEmpty(); + } + } finally { + amqpMessageHandlerService.setControllerManagement(controllerManagement); + } + } + @Step private void verifyAssignedDsAndInstalledDs(final String controllerId, final Long assignedDsId, final Long installedDsId) { @@ -991,6 +1026,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic assertEmptyReceiverQueueCount(); createConditionFactory().untilAsserted(() -> Mockito .verify(getDeadletterListener(), Mockito.times(numberOfInvocations)).handleMessage(Mockito.any())); + Mockito.reset(getDeadletterListener()); } private static String getJsonFieldFromBody(final byte[] body, final String fieldName) throws IOException { diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java index 30cfc756b..83eea384f 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java @@ -110,7 +110,7 @@ import com.google.common.collect.Sets; @Transactional(readOnly = true) @Validated public class JpaControllerManagement implements ControllerManagement { - private static final Logger LOG = LoggerFactory.getLogger(ControllerManagement.class); + private static final Logger LOG = LoggerFactory.getLogger(JpaControllerManagement.class); private final BlockingDeque queue; @@ -387,23 +387,15 @@ public class JpaControllerManagement implements ControllerManagement { } private Target createTarget(final String controllerId, final URI address) { - try { - final Target result = targetRepository.save((JpaTarget) entityFactory.target().create() - .controllerId(controllerId).description("Plug and Play target: " + controllerId).name(controllerId) - .status(TargetUpdateStatus.REGISTERED).lastTargetQuery(System.currentTimeMillis()) - .address(Optional.ofNullable(address).map(URI::toString).orElse(null)).build()); + final Target result = targetRepository.save((JpaTarget) entityFactory.target().create() + .controllerId(controllerId).description("Plug and Play target: " + controllerId).name(controllerId) + .status(TargetUpdateStatus.REGISTERED).lastTargetQuery(System.currentTimeMillis()) + .address(Optional.ofNullable(address).map(URI::toString).orElse(null)).build()); - afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher() - .publishEvent(new TargetPollEvent(result, eventPublisherHolder.getApplicationId()))); + afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher() + .publishEvent(new TargetPollEvent(result, eventPublisherHolder.getApplicationId()))); - return result; - } catch (final EntityAlreadyExistsException e) { - LOG.warn( - "Caught an EntityAlreadyExistsException while creating non existing target " - + "[controllerId:{}, address:{}, tenant: {}]", - controllerId, address, tenantAware.getCurrentTenant()); - throw e; - } + return result; } /**