diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 0630375c9..6674a3cbc 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -37,6 +37,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; @@ -78,6 +79,18 @@ public class AmqpConfiguration { @Autowired(required = false) private ServiceMatcher serviceMatcher; + /** + * Register the bean for the custom error handler. + * + * @return custom error handler + */ + @Bean + @ConditionalOnMissingBean(ErrorHandler.class) + public ErrorHandler errorHandler() { + return new ConditionalRejectingErrorHandler( + new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay())); + } + @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) @ConditionalOnProperty(prefix = "hawkbit.dmf.rabbitmq", name = "enabled", matchIfMissing = true) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index afe86fb5b..35551d49f 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -28,8 +28,6 @@ import org.eclipse.hawkbit.repository.EntityFactory; import org.eclipse.hawkbit.repository.RepositoryConstants; import org.eclipse.hawkbit.repository.builder.ActionStatusCreate; import org.eclipse.hawkbit.repository.exception.EntityNotFoundException; -import org.eclipse.hawkbit.repository.exception.TenantNotExistException; -import org.eclipse.hawkbit.repository.exception.TooManyStatusEntriesException; import org.eclipse.hawkbit.repository.model.Action; import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.Target; @@ -138,8 +136,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } } catch (final IllegalArgumentException ex) { throw new AmqpRejectAndDontRequeueException("Invalid message!", ex); - } catch (final TenantNotExistException | TooManyStatusEntriesException e) { - throw new AmqpRejectAndDontRequeueException(e); } finally { SecurityContextHolder.setContext(oldContext); } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index dd80262a5..1e33b8106 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -30,6 +30,8 @@ public class AmqpProperties { private static final int DEFAULT_MAX_CONSUMERS = 10; + private static final long DEFAULT_REQUEUE_DELAY = 0; + /** * Enable DMF API based on AMQP 0.9 */ @@ -91,6 +93,19 @@ public class AmqpProperties { */ private int declarationRetries = DEFAULT_QUEUE_DECLARATION_RETRIES; + /** + * Delay for messages that are requeued in milliseconds. + */ + private long requeueDelay = DEFAULT_REQUEUE_DELAY; + + public long getRequeueDelay() { + return requeueDelay; + } + + public void setRequeueDelay(final long requeueDelay) { + this.requeueDelay = requeueDelay; + } + public int getDeclarationRetries() { return declarationRetries; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DelayedRequeueExceptionStrategy.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DelayedRequeueExceptionStrategy.java new file mode 100644 index 000000000..70ebdcf98 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DelayedRequeueExceptionStrategy.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.hawkbit.amqp; + +import java.util.concurrent.TimeUnit; + +import org.eclipse.hawkbit.repository.exception.InvalidTargetAddressException; +import org.eclipse.hawkbit.repository.exception.TenantNotExistException; +import org.eclipse.hawkbit.repository.exception.TooManyStatusEntriesException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; +import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy; + +/** + * Custom {@link FatalExceptionStrategy} that markes defined hawkBit internal + * exceptions not to be requeued. In addition it throttles in case of a requeue + * by means of blocking the processing thread for a certain amount of time. That + * avoids a back and forth between broker and hawkBit at maximum speed. + * + */ +public class DelayedRequeueExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { + private static final Logger LOG = LoggerFactory.getLogger(DelayedRequeueExceptionStrategy.class); + + private final long delay; + + /** + * @param delay + * in {@link TimeUnit#MILLISECONDS} before requeue. + */ + public DelayedRequeueExceptionStrategy(final long delay) { + this.delay = delay; + } + + @Override + protected boolean isUserCauseFatal(final Throwable cause) { + if (cause instanceof TenantNotExistException || cause instanceof TooManyStatusEntriesException + || cause instanceof InvalidTargetAddressException) { + return true; + } + + LOG.error("Found a message that has to be requeued. Processing with delay of {}ms: ", delay, cause); + + try { + TimeUnit.MILLISECONDS.sleep(delay); + } catch (final InterruptedException e) { + LOG.error("Delay interrupted!", e); + Thread.currentThread().interrupt(); + } + + return false; + } +} diff --git a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/exception/InvalidTargetAddressException.java b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/exception/InvalidTargetAddressException.java index ada5cb9b6..6b5a5333d 100644 --- a/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/exception/InvalidTargetAddressException.java +++ b/hawkbit-repository/hawkbit-repository-api/src/main/java/org/eclipse/hawkbit/repository/exception/InvalidTargetAddressException.java @@ -15,10 +15,6 @@ import org.eclipse.hawkbit.exception.SpServerError; * Exception which is thrown when trying to set an invalid target address. */ public class InvalidTargetAddressException extends AbstractServerRtException { - - /** - * - */ private static final long serialVersionUID = 1L; /** diff --git a/pom.xml b/pom.xml index b48eb334f..aed04b4b8 100644 --- a/pom.xml +++ b/pom.xml @@ -105,7 +105,7 @@ - 1.6.3.RELEASE + 1.6.5.RELEASE 4.1.2.RELEASE 4.3.3.RELEASE 4.3.2.RELEASE