Do not retry AMQP messages which violates a quota (#1392)
This commit is contained in:
@@ -39,6 +39,7 @@ import org.eclipse.hawkbit.repository.RepositoryConstants;
|
||||
import org.eclipse.hawkbit.repository.TenantConfigurationManagement;
|
||||
import org.eclipse.hawkbit.repository.UpdateMode;
|
||||
import org.eclipse.hawkbit.repository.builder.ActionStatusCreate;
|
||||
import org.eclipse.hawkbit.repository.exception.AssignmentQuotaExceededException;
|
||||
import org.eclipse.hawkbit.repository.exception.EntityAlreadyExistsException;
|
||||
import org.eclipse.hawkbit.repository.model.Action;
|
||||
import org.eclipse.hawkbit.repository.model.Action.Status;
|
||||
@@ -183,6 +184,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
default:
|
||||
logAndThrowMessageError(message, "No handle method was found for the given message type.");
|
||||
}
|
||||
} catch(AssignmentQuotaExceededException ex) {
|
||||
throw new AmqpRejectAndDontRequeueException("Could not handle message due to quota violation!", ex);
|
||||
} catch (final IllegalArgumentException ex) {
|
||||
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
|
||||
} finally {
|
||||
|
||||
@@ -48,6 +48,7 @@ import org.eclipse.hawkbit.repository.TenantConfigurationManagement;
|
||||
import org.eclipse.hawkbit.repository.UpdateMode;
|
||||
import org.eclipse.hawkbit.repository.builder.ActionStatusBuilder;
|
||||
import org.eclipse.hawkbit.repository.builder.ActionStatusCreate;
|
||||
import org.eclipse.hawkbit.repository.exception.AssignmentQuotaExceededException;
|
||||
import org.eclipse.hawkbit.repository.jpa.model.JpaAction;
|
||||
import org.eclipse.hawkbit.repository.jpa.builder.JpaActionStatusBuilder;
|
||||
import org.eclipse.hawkbit.repository.jpa.model.JpaActionStatus;
|
||||
@@ -494,6 +495,31 @@ public class AmqpMessageHandlerServiceTest {
|
||||
VIRTUAL_HOST));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Tests that messages which cause quota violations are not re-added to message queue so they would block other communication.")
|
||||
public void quotaExceeded() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
|
||||
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name());
|
||||
|
||||
final DmfActionUpdateStatus actionUpdateStatus = createActionUpdateStatus(DmfActionStatus.WARNING);
|
||||
final Message message = createMessage(actionUpdateStatus, messageProperties);
|
||||
final Action action = mock(Action.class);
|
||||
when(action.getId()).thenReturn(2L);
|
||||
final ActionStatusBuilder builder = mock(ActionStatusBuilder.class);
|
||||
final ActionStatusCreate create = mock(ActionStatusCreate.class);
|
||||
when(builder.create(2L)).thenReturn(create);
|
||||
when(create.status(any())).thenReturn(create);
|
||||
when(create.messages(any())).thenReturn(create);
|
||||
when(entityFactoryMock.actionStatus()).thenReturn(builder);
|
||||
|
||||
when(controllerManagementMock.findActionWithDetails(anyLong())).thenReturn(Optional.of(action));
|
||||
when(controllerManagementMock.addUpdateActionStatus(any())).thenThrow(new AssignmentQuotaExceededException());
|
||||
|
||||
assertThatExceptionOfType(AmqpRejectAndDontRequeueException.class)
|
||||
.isThrownBy(() -> amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT,
|
||||
VIRTUAL_HOST));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Tests that an download request is denied for an artifact which does not exists")
|
||||
public void authenticationRequestDeniedForArtifactWhichDoesNotExists() {
|
||||
|
||||
Reference in New Issue
Block a user