Introduce optional throttled DMF requeuing (#373)

* Inroduce throttled requeuing
Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
Kai Zimmermann
2016-12-09 14:42:33 +01:00
committed by GitHub
parent f0fc8f0606
commit 0ddcedad74
6 changed files with 88 additions and 9 deletions

View File

@@ -37,6 +37,7 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@@ -78,6 +79,18 @@ public class AmqpConfiguration {
@Autowired(required = false)
private ServiceMatcher serviceMatcher;
/**
* Register the bean for the custom error handler.
*
* @return custom error handler
*/
@Bean
@ConditionalOnMissingBean(ErrorHandler.class)
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay()));
}
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "hawkbit.dmf.rabbitmq", name = "enabled", matchIfMissing = true)

View File

@@ -28,8 +28,6 @@ import org.eclipse.hawkbit.repository.EntityFactory;
import org.eclipse.hawkbit.repository.RepositoryConstants;
import org.eclipse.hawkbit.repository.builder.ActionStatusCreate;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
import org.eclipse.hawkbit.repository.exception.TooManyStatusEntriesException;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Action.Status;
import org.eclipse.hawkbit.repository.model.Target;
@@ -138,8 +136,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException | TooManyStatusEntriesException e) {
throw new AmqpRejectAndDontRequeueException(e);
} finally {
SecurityContextHolder.setContext(oldContext);
}

View File

@@ -30,6 +30,8 @@ public class AmqpProperties {
private static final int DEFAULT_MAX_CONSUMERS = 10;
private static final long DEFAULT_REQUEUE_DELAY = 0;
/**
* Enable DMF API based on AMQP 0.9
*/
@@ -91,6 +93,19 @@ public class AmqpProperties {
*/
private int declarationRetries = DEFAULT_QUEUE_DECLARATION_RETRIES;
/**
* Delay for messages that are requeued in milliseconds.
*/
private long requeueDelay = DEFAULT_REQUEUE_DELAY;
public long getRequeueDelay() {
return requeueDelay;
}
public void setRequeueDelay(final long requeueDelay) {
this.requeueDelay = requeueDelay;
}
public int getDeclarationRetries() {
return declarationRetries;
}

View File

@@ -0,0 +1,59 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.amqp;
import java.util.concurrent.TimeUnit;
import org.eclipse.hawkbit.repository.exception.InvalidTargetAddressException;
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
import org.eclipse.hawkbit.repository.exception.TooManyStatusEntriesException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
/**
* Custom {@link FatalExceptionStrategy} that markes defined hawkBit internal
* exceptions not to be requeued. In addition it throttles in case of a requeue
* by means of blocking the processing thread for a certain amount of time. That
* avoids a back and forth between broker and hawkBit at maximum speed.
*
*/
public class DelayedRequeueExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DelayedRequeueExceptionStrategy.class);
private final long delay;
/**
* @param delay
* in {@link TimeUnit#MILLISECONDS} before requeue.
*/
public DelayedRequeueExceptionStrategy(final long delay) {
this.delay = delay;
}
@Override
protected boolean isUserCauseFatal(final Throwable cause) {
if (cause instanceof TenantNotExistException || cause instanceof TooManyStatusEntriesException
|| cause instanceof InvalidTargetAddressException) {
return true;
}
LOG.error("Found a message that has to be requeued. Processing with delay of {}ms: ", delay, cause);
try {
TimeUnit.MILLISECONDS.sleep(delay);
} catch (final InterruptedException e) {
LOG.error("Delay interrupted!", e);
Thread.currentThread().interrupt();
}
return false;
}
}

View File

@@ -15,10 +15,6 @@ import org.eclipse.hawkbit.exception.SpServerError;
* Exception which is thrown when trying to set an invalid target address.
*/
public class InvalidTargetAddressException extends AbstractServerRtException {
/**
*
*/
private static final long serialVersionUID = 1L;
/**

View File

@@ -105,7 +105,7 @@
<!-- Spring boot version overrides (should be reviewed with every boot upgrade) - START -->
<!-- Newer versions needed than defined in Boot -->
<spring-amqp.version>1.6.3.RELEASE</spring-amqp.version>
<spring-amqp.version>1.6.5.RELEASE</spring-amqp.version>
<spring-security.version>4.1.2.RELEASE</spring-security.version>
<spring.version>4.3.3.RELEASE</spring.version>
<spring-integration.version>4.3.2.RELEASE</spring-integration.version>