Excluded EntityAlreadyExists exceptions from requeuing (#883)
* excluded EntityAlreadyExists exceptions from requeuing Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch-si.com> * moved the catch clause closer to the thrown exception Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch-si.com> * reverted AmqpMessageHandlerServiceIntegrationTest package change Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch-si.com> * reduced scope of method variable Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch-si.com> * removed instantiation of exceptions Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch-si.com>
This commit is contained in:
committed by
Dominic Schabel
parent
c68c5a6f5b
commit
973f1952c7
@@ -35,6 +35,7 @@ import org.eclipse.hawkbit.repository.RepositoryConstants;
|
||||
import org.eclipse.hawkbit.repository.TenantConfigurationManagement;
|
||||
import org.eclipse.hawkbit.repository.UpdateMode;
|
||||
import org.eclipse.hawkbit.repository.builder.ActionStatusCreate;
|
||||
import org.eclipse.hawkbit.repository.exception.EntityAlreadyExistsException;
|
||||
import org.eclipse.hawkbit.repository.model.Action;
|
||||
import org.eclipse.hawkbit.repository.model.Action.Status;
|
||||
import org.eclipse.hawkbit.repository.model.ActionProperties;
|
||||
@@ -73,7 +74,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
|
||||
private final AmqpMessageDispatcherService amqpMessageDispatcherService;
|
||||
|
||||
private final ControllerManagement controllerManagement;
|
||||
private ControllerManagement controllerManagement;
|
||||
|
||||
private final EntityFactory entityFactory;
|
||||
|
||||
@@ -92,6 +93,10 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
* for target repo access
|
||||
* @param entityFactory
|
||||
* to create entities
|
||||
* @param systemSecurityContext
|
||||
* the system Security Context
|
||||
* @param tenantConfigurationManagement
|
||||
* the tenant configuration Management
|
||||
*/
|
||||
public AmqpMessageHandlerService(final RabbitTemplate rabbitTemplate,
|
||||
final AmqpMessageDispatcherService amqpMessageDispatcherService,
|
||||
@@ -202,11 +207,14 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
logAndThrowMessageError(message, "No ReplyTo was set for the createThing message.");
|
||||
}
|
||||
|
||||
final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo);
|
||||
final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri);
|
||||
LOG.debug("Target {} reported online state.", thingId);
|
||||
|
||||
sendUpdateCommandToTarget(target);
|
||||
try {
|
||||
final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo);
|
||||
final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri);
|
||||
LOG.debug("Target {} reported online state.", thingId);
|
||||
sendUpdateCommandToTarget(target);
|
||||
} catch (EntityAlreadyExistsException e) {
|
||||
throw new AmqpRejectAndDontRequeueException("Target already registered, message will be ignored!", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void sendUpdateCommandToTarget(final Target target) {
|
||||
@@ -333,7 +341,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
// Exception squid:MethodCyclomaticComplexity - false positive, is a simple
|
||||
// mapping
|
||||
@SuppressWarnings("squid:MethodCyclomaticComplexity")
|
||||
private Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus,
|
||||
private static Status mapStatus(final Message message, final DmfActionUpdateStatus actionUpdateStatus,
|
||||
final Action action) {
|
||||
Status status = null;
|
||||
switch (actionUpdateStatus.getActionStatus()) {
|
||||
@@ -371,7 +379,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
return status;
|
||||
}
|
||||
|
||||
private Status handleCancelRejectedState(final Message message, final Action action) {
|
||||
private static Status handleCancelRejectedState(final Message message, final Action action) {
|
||||
if (action.isCancelingOrCanceled()) {
|
||||
return Status.CANCEL_REJECTED;
|
||||
}
|
||||
@@ -418,4 +426,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
.runAsSystem(() -> tenantConfigurationManagement.getConfigurationValue(key, valueType).getValue());
|
||||
}
|
||||
|
||||
}
|
||||
// for testing
|
||||
public void setControllerManagement(final ControllerManagement controllerManagement) {
|
||||
this.controllerManagement = controllerManagement;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,9 +10,13 @@ package org.eclipse.hawkbit.integration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.eclipse.hawkbit.repository.model.Action.ActionType.DOWNLOAD_ONLY;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -22,6 +26,7 @@ import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.eclipse.hawkbit.amqp.AmqpMessageHandlerService;
|
||||
import org.eclipse.hawkbit.amqp.AmqpProperties;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
|
||||
@@ -30,6 +35,7 @@ import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode;
|
||||
import org.eclipse.hawkbit.repository.ControllerManagement;
|
||||
import org.eclipse.hawkbit.repository.RepositoryConstants;
|
||||
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.TargetAttributesRequestedEvent;
|
||||
@@ -42,6 +48,7 @@ import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleCreatedE
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.SoftwareModuleUpdatedEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent;
|
||||
import org.eclipse.hawkbit.repository.exception.EntityAlreadyExistsException;
|
||||
import org.eclipse.hawkbit.repository.jpa.model.JpaDistributionSet;
|
||||
import org.eclipse.hawkbit.repository.jpa.model.JpaTarget;
|
||||
import org.eclipse.hawkbit.repository.model.Action.Status;
|
||||
@@ -75,6 +82,9 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
|
||||
|
||||
@Autowired
|
||||
private AmqpProperties amqpProperties;
|
||||
|
||||
@Autowired
|
||||
private AmqpMessageHandlerService amqpMessageHandlerService;
|
||||
|
||||
@Test
|
||||
@Description("Tests DMF PING request and expected reponse.")
|
||||
@@ -882,6 +892,31 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
|
||||
verifyAssignedDsAndInstalledDs(controllerId, distributionSet.getId(), distributionSet.getId());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Messages that result into certain exceptions being raised should not be requeued. This message should forwarded to the deadletter queue")
|
||||
@ExpectEvents({@Expect(type = TargetCreatedEvent.class, count = 0)})
|
||||
public void ignoredExceptionTypesShouldNotBeRequeued() {
|
||||
final ControllerManagement mockedControllerManagement = Mockito.mock(ControllerManagement.class);
|
||||
|
||||
final List<Class<? extends RuntimeException>> exceptionsThatShouldNotBeRequeued = Arrays
|
||||
.asList(IllegalArgumentException.class, EntityAlreadyExistsException.class);
|
||||
final String controllerId = "dummy_target";
|
||||
|
||||
try {
|
||||
for (Class<? extends RuntimeException> exceptionClass : exceptionsThatShouldNotBeRequeued) {
|
||||
doThrow(exceptionClass).when(mockedControllerManagement)
|
||||
.findOrRegisterTargetIfItDoesNotExist(eq(controllerId), any());
|
||||
|
||||
amqpMessageHandlerService.setControllerManagement(mockedControllerManagement);
|
||||
createAndSendThingCreated(controllerId, TENANT_EXIST);
|
||||
verifyOneDeadLetterMessage();
|
||||
assertThat(targetManagement.getByControllerID(controllerId)).isEmpty();
|
||||
}
|
||||
} finally {
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagement);
|
||||
}
|
||||
}
|
||||
|
||||
@Step
|
||||
private void verifyAssignedDsAndInstalledDs(final String controllerId, final Long assignedDsId,
|
||||
final Long installedDsId) {
|
||||
@@ -991,6 +1026,7 @@ public class AmqpMessageHandlerServiceIntegrationTest extends AbstractAmqpServic
|
||||
assertEmptyReceiverQueueCount();
|
||||
createConditionFactory().untilAsserted(() -> Mockito
|
||||
.verify(getDeadletterListener(), Mockito.times(numberOfInvocations)).handleMessage(Mockito.any()));
|
||||
Mockito.reset(getDeadletterListener());
|
||||
}
|
||||
|
||||
private static String getJsonFieldFromBody(final byte[] body, final String fieldName) throws IOException {
|
||||
|
||||
@@ -110,7 +110,7 @@ import com.google.common.collect.Sets;
|
||||
@Transactional(readOnly = true)
|
||||
@Validated
|
||||
public class JpaControllerManagement implements ControllerManagement {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ControllerManagement.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JpaControllerManagement.class);
|
||||
|
||||
private final BlockingDeque<TargetPoll> queue;
|
||||
|
||||
@@ -387,23 +387,15 @@ public class JpaControllerManagement implements ControllerManagement {
|
||||
}
|
||||
|
||||
private Target createTarget(final String controllerId, final URI address) {
|
||||
try {
|
||||
final Target result = targetRepository.save((JpaTarget) entityFactory.target().create()
|
||||
.controllerId(controllerId).description("Plug and Play target: " + controllerId).name(controllerId)
|
||||
.status(TargetUpdateStatus.REGISTERED).lastTargetQuery(System.currentTimeMillis())
|
||||
.address(Optional.ofNullable(address).map(URI::toString).orElse(null)).build());
|
||||
final Target result = targetRepository.save((JpaTarget) entityFactory.target().create()
|
||||
.controllerId(controllerId).description("Plug and Play target: " + controllerId).name(controllerId)
|
||||
.status(TargetUpdateStatus.REGISTERED).lastTargetQuery(System.currentTimeMillis())
|
||||
.address(Optional.ofNullable(address).map(URI::toString).orElse(null)).build());
|
||||
|
||||
afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher()
|
||||
.publishEvent(new TargetPollEvent(result, eventPublisherHolder.getApplicationId())));
|
||||
afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher()
|
||||
.publishEvent(new TargetPollEvent(result, eventPublisherHolder.getApplicationId())));
|
||||
|
||||
return result;
|
||||
} catch (final EntityAlreadyExistsException e) {
|
||||
LOG.warn(
|
||||
"Caught an EntityAlreadyExistsException while creating non existing target "
|
||||
+ "[controllerId:{}, address:{}, tenant: {}]",
|
||||
controllerId, address, tenantAware.getCurrentTenant());
|
||||
throw e;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user