Fix AMQP retries when attribute characters are invalid (#2327)
Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -137,8 +137,9 @@ class DdiConfigDataTest extends AbstractDDiApiIntegrationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("We verify that the config data (i.e. device attributes like serial number, hardware revision etc.) " +
|
||||
"upload quota is enforced to protect the server from malicious attempts.")
|
||||
@Description("""
|
||||
We verify that the config data (i.e. device attributes like serial number, hardware revision etc.)
|
||||
upload quota is enforced to protect the server from malicious attempts.""")
|
||||
void putTooMuchConfigData() throws Exception {
|
||||
testdataFactory.createTarget(TARGET1_ID);
|
||||
|
||||
@@ -148,15 +149,16 @@ class DdiConfigDataTest extends AbstractDDiApiIntegrationTest {
|
||||
attributes.put("dsafsdf" + i, "sdsds" + i);
|
||||
}
|
||||
mvc.perform(put(TARGET1_CONFIG_DATA_PATH, tenantAware.getCurrentTenant())
|
||||
.content(JsonBuilder.configData(attributes).toString()).contentType(MediaType.APPLICATION_JSON))
|
||||
.content(JsonBuilder.configData(attributes).toString())
|
||||
.contentType(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isOk());
|
||||
|
||||
mvc.perform(put(TARGET1_CONFIG_DATA_PATH, tenantAware.getCurrentTenant())
|
||||
.content(JsonBuilder.configData(Map.of("on too many", "sdsds")).toString()).contentType(MediaType.APPLICATION_JSON))
|
||||
.content(JsonBuilder.configData(Map.of("on too many", "sdsds")).toString())
|
||||
.contentType(MediaType.APPLICATION_JSON))
|
||||
.andExpect(status().isForbidden())
|
||||
.andExpect(jsonPath("$.exceptionClass", equalTo(AssignmentQuotaExceededException.class.getName())))
|
||||
.andExpect(jsonPath("$.errorCode", equalTo(SpServerError.SP_QUOTA_EXCEEDED.getKey())));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
|
||||
/**
|
||||
* An abstract error handler for errors resulting from AMQP.
|
||||
*/
|
||||
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler {
|
||||
|
||||
@Override
|
||||
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {
|
||||
// retrieving the cause of throwable as it contains the actual class of exception
|
||||
final Throwable cause = throwable.getCause();
|
||||
if (getExceptionClass().isAssignableFrom(cause.getClass())) {
|
||||
throw new AmqpRejectAndDontRequeueException(getErrorMessage(throwable));
|
||||
} else {
|
||||
chain.handle(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the class of the exception.
|
||||
*
|
||||
* @return the exception class
|
||||
*/
|
||||
public abstract Class<T> getExceptionClass();
|
||||
|
||||
/**
|
||||
* Returns the customized error message.
|
||||
*
|
||||
* @return the customized error message
|
||||
*/
|
||||
public String getErrorMessage(Throwable throwable) {
|
||||
return AmqpErrorMessageComposer.constructErrorMessage(throwable);
|
||||
}
|
||||
}
|
||||
@@ -9,11 +9,14 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.artifact.repository.urlhandler.ArtifactUrlHandler;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
|
||||
@@ -36,11 +39,14 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
|
||||
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
|
||||
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
@@ -65,7 +71,8 @@ public class AmqpConfiguration {
|
||||
private final ConnectionFactory rabbitConnectionFactory;
|
||||
private ServiceMatcher serviceMatcher;
|
||||
|
||||
public AmqpConfiguration(final AmqpProperties amqpProperties, final AmqpDeadletterProperties amqpDeadletterProperties, final ConnectionFactory rabbitConnectionFactory) {
|
||||
public AmqpConfiguration(final AmqpProperties amqpProperties, final AmqpDeadletterProperties amqpDeadletterProperties,
|
||||
final ConnectionFactory rabbitConnectionFactory) {
|
||||
this.amqpProperties = amqpProperties;
|
||||
this.amqpDeadletterProperties = amqpDeadletterProperties;
|
||||
this.rabbitConnectionFactory = rabbitConnectionFactory;
|
||||
@@ -76,48 +83,24 @@ public class AmqpConfiguration {
|
||||
this.serviceMatcher = serviceMatcher;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public FatalExceptionStrategy sqlFatalSQLExceptionStrategy(final AmqpProperties amqpProperties) {
|
||||
return new SqlFatalExceptionStrategy(amqpProperties.getFatalSqlExceptionPolicy());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a custom error handler bean.
|
||||
*
|
||||
* @param handlers list of {@link AmqpErrorHandler} handlers
|
||||
* @param fatalExceptionStrategies list of {@link FatalExceptionStrategy} handlers. isFatal will be called for causes,
|
||||
* up to the first fatal, so the implementation don't need to iterate over the causes.
|
||||
* @return the delegating error handler bean
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public ErrorHandler errorHandler(final List<AmqpErrorHandler> handlers) {
|
||||
return new DelegatingConditionalErrorHandler(
|
||||
handlers,
|
||||
new ConditionalRejectingErrorHandler(new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for all target attributes related fatal errors
|
||||
*
|
||||
* @return the invalid target attribute exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler invalidTargetAttributeConditionalExceptionHandler() {
|
||||
return new InvalidTargetAttributeExceptionHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for entity not found errors
|
||||
*
|
||||
* @return the entity not found exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler entityNotFoundExceptionHandler() {
|
||||
return new EntityNotFoundExceptionHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for amqp message conversion errors
|
||||
*
|
||||
* @return the amqp message conversion exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler messageConversionExceptionHandler() {
|
||||
return new MessageConversionExceptionHandler();
|
||||
public ErrorHandler errorHandler(
|
||||
final List<FatalExceptionStrategy> fatalExceptionStrategies,
|
||||
@Value("${hawkbit.dmf.rabbitmq.fatalExceptionTypes:}") final List<String> fatalExceptionTypes) {
|
||||
return new ConditionalRejectingErrorHandler(new RequeueExceptionStrategy(fatalExceptionStrategies, fatalExceptionTypes));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -303,4 +286,39 @@ public class AmqpConfiguration {
|
||||
args.put("x-max-length", 1_000);
|
||||
return args;
|
||||
}
|
||||
|
||||
@ToString
|
||||
private static class SqlFatalExceptionStrategy implements FatalExceptionStrategy {
|
||||
|
||||
private final boolean fatalByDefault;
|
||||
private final List<Integer> unlessErrorCodeIn;
|
||||
private final List<String> unlessSqlStateIn;
|
||||
private final List<Pattern> unlessMessageMatches;
|
||||
|
||||
public SqlFatalExceptionStrategy(final AmqpProperties.FatalSqlExceptionPolicy fatalSqlExceptions) {
|
||||
this.fatalByDefault = fatalSqlExceptions.isByDefault();
|
||||
this.unlessErrorCodeIn = fatalSqlExceptions.getUnlessErrorCodeIn();
|
||||
this.unlessSqlStateIn = fatalSqlExceptions.getUnlessSqlStateIn();
|
||||
this.unlessMessageMatches = fatalSqlExceptions.getUnlessMessageMatches();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFatal(final Throwable t) {
|
||||
if (t instanceof SQLException sqlException) {
|
||||
if (unlessErrorCodeIn.contains(sqlException.getErrorCode())) {
|
||||
return !fatalByDefault;
|
||||
} else if (unlessSqlStateIn.contains(sqlException.getSQLState())) {
|
||||
return !fatalByDefault;
|
||||
} else {
|
||||
for (final Pattern pattern : unlessMessageMatches) {
|
||||
if (pattern.matcher(sqlException.getMessage()).matches()) {
|
||||
return !fatalByDefault;
|
||||
}
|
||||
}
|
||||
return fatalByDefault;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,25 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
/**
|
||||
* Interface declaration of {@link AmqpErrorHandler} that handles errors based on the types of exception.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface AmqpErrorHandler {
|
||||
|
||||
/**
|
||||
* Handles the error based on the type of exception
|
||||
*
|
||||
* @param throwable the throwable
|
||||
* @param chain an {@link AmqpErrorHandlerChain}
|
||||
*/
|
||||
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
/**
|
||||
* An error handler chain that delegates the error to the matching error handler based on the type of exception
|
||||
*/
|
||||
public final class AmqpErrorHandlerChain {
|
||||
|
||||
private final Iterator<AmqpErrorHandler> iterator;
|
||||
private final ErrorHandler defaultHandler;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param iterator the {@link AmqpErrorHandler} iterator
|
||||
* @param defaultHandler the default handler
|
||||
*/
|
||||
private AmqpErrorHandlerChain(Iterator<AmqpErrorHandler> iterator, ErrorHandler defaultHandler) {
|
||||
this.iterator = iterator;
|
||||
this.defaultHandler = defaultHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@link AmqpErrorHandlerChain}
|
||||
*
|
||||
* @param errorHandlers {@link List} of error handlers
|
||||
* @param defaultHandler the default error handler
|
||||
* @return an {@link AmqpErrorHandlerChain}
|
||||
*/
|
||||
public static AmqpErrorHandlerChain getHandlerChain(final List<AmqpErrorHandler> errorHandlers, final ErrorHandler defaultHandler) {
|
||||
return new AmqpErrorHandlerChain(errorHandlers.iterator(), defaultHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the error based on the type of exception
|
||||
*
|
||||
* @param error the throwable containing the cause of exception
|
||||
*/
|
||||
public void handle(final Throwable error) {
|
||||
if (iterator.hasNext()) {
|
||||
final AmqpErrorHandler handler = iterator.next();
|
||||
handler.doHandle(error, this);
|
||||
} else {
|
||||
defaultHandler.handleError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,19 +9,21 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Bean which holds the necessary properties for configuring the AMQP
|
||||
* connection.
|
||||
* Bean which holds the necessary properties for configuring the AMQP connection.
|
||||
*/
|
||||
@Data
|
||||
@ConfigurationProperties("hawkbit.dmf.rabbitmq")
|
||||
public class AmqpProperties {
|
||||
|
||||
private static final int DEFAULT_QUEUE_DECLARATION_RETRIES = 50;
|
||||
private static final long DEFAULT_REQUEUE_DELAY = 0;
|
||||
|
||||
/**
|
||||
* Enable DMF API based on AMQP 0.9
|
||||
@@ -44,8 +46,7 @@ public class AmqpProperties {
|
||||
private String receiverQueue = "dmf_receiver";
|
||||
|
||||
/**
|
||||
* Authentication request called by 3rd party artifact storages for download
|
||||
* authorizations.
|
||||
* Authentication request called by 3rd party artifact storages for download authorizations.
|
||||
*/
|
||||
private String authenticationReceiverQueue = "authentication_receiver";
|
||||
|
||||
@@ -56,14 +57,37 @@ public class AmqpProperties {
|
||||
|
||||
/**
|
||||
* The number of retry attempts when passive queue declaration fails.
|
||||
* Passive queue declaration occurs when the consumer starts or, when
|
||||
* consuming from multiple queues, when not all queues were available during
|
||||
* initialization.
|
||||
* Passive queue declaration occurs when the consumer starts or, when consuming from multiple queues, when not all queues were
|
||||
* available during initialization.
|
||||
*/
|
||||
private int declarationRetries = DEFAULT_QUEUE_DECLARATION_RETRIES;
|
||||
|
||||
/**
|
||||
* Delay for messages that are requeued in milliseconds.
|
||||
* Represents which {@link }SQLExceptions} should be considered fatal. By default, (without any configuration) it's simply disabled.
|
||||
*/
|
||||
private long requeueDelay = DEFAULT_REQUEUE_DELAY;
|
||||
private final FatalSqlExceptionPolicy fatalSqlExceptionPolicy = new FatalSqlExceptionPolicy();
|
||||
|
||||
@Data
|
||||
public static class FatalSqlExceptionPolicy {
|
||||
|
||||
/**
|
||||
* The mode of the policy. If set to {@code true}, the every {@link java.sql.SQLException} would be assessed as fatal unless
|
||||
* matching the filters. Otherwise, every {@link java.sql.SQLException} will be assessed as non-fatal unless matching the filter.
|
||||
* The {@link java.sql.SQLException} that matches the filters are considered non-fatal if byDefault is {@code true} and fatal otherwise.
|
||||
*/
|
||||
private boolean byDefault = false;
|
||||
/**
|
||||
* Error codes of the {@link java.sql.SQLException} that will be excluded from the default fatal policy. DB depended.
|
||||
*/
|
||||
private final List<Integer> unlessErrorCodeIn = new ArrayList<>();
|
||||
/**
|
||||
* SQL states of the {@link java.sql.SQLException} that will be excluded from the default fatal policy. DB depended.
|
||||
*/
|
||||
private final List<String> unlessSqlStateIn = new ArrayList<>();
|
||||
/**
|
||||
* Java regex message matching patterns. The {@link java.sql.SQLException} with messages matching any of the patterns
|
||||
* will be excluded from the default fatal policy. DB depended.
|
||||
*/
|
||||
private final List<Pattern> unlessMessageMatches = new ArrayList<>();
|
||||
}
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.repository.exception.AssignmentQuotaExceededException;
|
||||
import org.eclipse.hawkbit.repository.exception.CancelActionNotAllowedException;
|
||||
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAddressException;
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;
|
||||
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
|
||||
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
|
||||
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import org.springframework.messaging.MessageHandlingException;
|
||||
|
||||
/**
|
||||
* Custom {@link FatalExceptionStrategy} that markes defined hawkBit internal
|
||||
* exceptions not to be requeued. In addition it throttles in case of a requeue
|
||||
* by means of blocking the processing thread for a certain amount of time. That
|
||||
* avoids a back and forth between broker and hawkBit at maximum speed.
|
||||
*/
|
||||
@Slf4j
|
||||
public class DelayedRequeueExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
|
||||
|
||||
private final long delay;
|
||||
|
||||
/**
|
||||
* @param delay in {@link TimeUnit#MILLISECONDS} before requeue.
|
||||
*/
|
||||
public DelayedRequeueExceptionStrategy(final long delay) {
|
||||
this.delay = delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isUserCauseFatal(final Throwable cause) {
|
||||
if (invalidMessage(cause)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
log.error("Found a message that has to be requeued. Processing with delay of {}ms: ", delay, cause);
|
||||
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(delay);
|
||||
} catch (final InterruptedException e) {
|
||||
log.error("Delay interrupted!", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean invalidMessage(final Throwable cause) {
|
||||
return doesNotExist(cause) || quotaHit(cause) || invalidContent(cause) || invalidState(cause);
|
||||
}
|
||||
|
||||
private static boolean invalidState(final Throwable cause) {
|
||||
return cause instanceof CancelActionNotAllowedException;
|
||||
}
|
||||
|
||||
private static boolean quotaHit(final Throwable cause) {
|
||||
return cause instanceof AssignmentQuotaExceededException;
|
||||
}
|
||||
|
||||
private static boolean doesNotExist(final Throwable cause) {
|
||||
return cause instanceof TenantNotExistException || cause instanceof EntityNotFoundException;
|
||||
}
|
||||
|
||||
private static boolean invalidContent(final Throwable cause) {
|
||||
return isRepositoryException(cause) || isMessageException(cause);
|
||||
}
|
||||
|
||||
private static boolean isRepositoryException(final Throwable cause) {
|
||||
return cause instanceof ConstraintViolationException || cause instanceof InvalidTargetAttributeException;
|
||||
}
|
||||
|
||||
private static boolean isMessageException(final Throwable cause) {
|
||||
return cause instanceof InvalidTargetAddressException ||
|
||||
cause instanceof MessageConversionException ||
|
||||
cause instanceof MessageHandlingException;
|
||||
}
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
/**
|
||||
* An error handler delegates error handling to the matching {@link AmqpErrorHandler} based on the type of exception
|
||||
*/
|
||||
@Slf4j
|
||||
public class DelegatingConditionalErrorHandler implements ErrorHandler {
|
||||
|
||||
private final List<AmqpErrorHandler> handlers;
|
||||
private final ErrorHandler defaultHandler;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param handlers {@link List} of error handlers
|
||||
* @param defaultHandler the default error handler
|
||||
*/
|
||||
public DelegatingConditionalErrorHandler(final List<AmqpErrorHandler> handlers, @NotNull final ErrorHandler defaultHandler) {
|
||||
this.handlers = handlers;
|
||||
this.defaultHandler = defaultHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleError(final Throwable t) {
|
||||
if (t.getCause() == null) {
|
||||
log.error("Cannot handle the error as the cause of the error is null!");
|
||||
return;
|
||||
}
|
||||
|
||||
if (includesAmqpRejectException(t.getCause())) {
|
||||
log.error("Received an AmqpRejectAndDontRequeueException due to {}", t.getCause().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
AmqpErrorHandlerChain.getHandlerChain(handlers, defaultHandler).handle(t);
|
||||
}
|
||||
|
||||
private boolean includesAmqpRejectException(final Throwable t) {
|
||||
if (t instanceof AmqpRejectAndDontRequeueException) {
|
||||
return true;
|
||||
}
|
||||
if (t.getCause() != null) {
|
||||
return includesAmqpRejectException(t.getCause());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
|
||||
|
||||
/**
|
||||
* An error handler for entity not found exception resulting from AMQP.
|
||||
*/
|
||||
public class EntityNotFoundExceptionHandler extends AbstractAmqpErrorHandler<EntityNotFoundException> {
|
||||
|
||||
@Override
|
||||
public Class<EntityNotFoundException> getExceptionClass() {
|
||||
return EntityNotFoundException.class;
|
||||
}
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;
|
||||
|
||||
/**
|
||||
* An error handler for all invalid target attributes resulting from AMQP.
|
||||
*/
|
||||
public class InvalidTargetAttributeExceptionHandler extends AbstractAmqpErrorHandler<InvalidTargetAttributeException> {
|
||||
|
||||
@Override
|
||||
public Class<InvalidTargetAttributeException> getExceptionClass() {
|
||||
return InvalidTargetAttributeException.class;
|
||||
}
|
||||
}
|
||||
@@ -1,41 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
|
||||
/**
|
||||
* An error handler for message conversion exception resulting from AMQP.
|
||||
*/
|
||||
public class MessageConversionExceptionHandler extends AbstractAmqpErrorHandler<MessageConversionException> {
|
||||
|
||||
@Override
|
||||
public Class<MessageConversionException> getExceptionClass() {
|
||||
return MessageConversionException.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getErrorMessage(Throwable throwable) {
|
||||
final String errorMessage = super.getErrorMessage(throwable);
|
||||
// since the detailed error message lies in the first parent of current throwable we retrieve it and append it to the errorMessage
|
||||
final Optional<String> detailedErrorMessage = getFirstAncestralErrorMessage(throwable.getCause());
|
||||
return detailedErrorMessage.isPresent() ? (detailedErrorMessage.get() + errorMessage) : errorMessage;
|
||||
}
|
||||
|
||||
private Optional<String> getFirstAncestralErrorMessage(final Throwable throwable) {
|
||||
if (throwable.getCause() instanceof InvalidFormatException) {
|
||||
return Optional.of(throwable.getCause().getMessage());
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import jakarta.validation.ConstraintViolationException;
|
||||
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.repository.exception.AssignmentQuotaExceededException;
|
||||
import org.eclipse.hawkbit.repository.exception.CancelActionNotAllowedException;
|
||||
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAddressException;
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;
|
||||
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
|
||||
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import org.springframework.messaging.MessageHandlingException;
|
||||
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Custom {@link FatalExceptionStrategy} that marks defined hawkBit internal exceptions not to be re-queued.
|
||||
*/
|
||||
@ToString
|
||||
@Slf4j
|
||||
class RequeueExceptionStrategy implements FatalExceptionStrategy {
|
||||
|
||||
private final List<FatalExceptionStrategy> fatalExceptionStrategies = new ArrayList<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
RequeueExceptionStrategy(final List<FatalExceptionStrategy> fatalExceptionStrategies, final List<String> fatalExceptionTypes) {
|
||||
this.fatalExceptionStrategies.add(new TypeBasedFatalExceptionStrategy(
|
||||
// default, see DefaultExceptionStrategy
|
||||
MessageConversionException.class,
|
||||
org.springframework.messaging.converter.MessageConversionException.class,
|
||||
MethodArgumentResolutionException.class, NoSuchMethodException.class, ClassCastException.class,
|
||||
// invalid state
|
||||
CancelActionNotAllowedException.class,
|
||||
// quota hit
|
||||
AssignmentQuotaExceededException.class,
|
||||
// does not exist
|
||||
TenantNotExistException.class, EntityNotFoundException.class,
|
||||
// is invalid content, repository exception
|
||||
ConstraintViolationException.class, InvalidTargetAttributeException.class,
|
||||
// is invalid content, message exception
|
||||
InvalidTargetAddressException.class, MessageHandlingException.class
|
||||
));
|
||||
if (!ObjectUtils.isEmpty(fatalExceptionTypes)) {
|
||||
// add explicitly configured fatal exception types
|
||||
fatalExceptionTypes.forEach(type -> {
|
||||
try {
|
||||
final Class<?> clazz = Class.forName(type);
|
||||
if (Throwable.class.isAssignableFrom(clazz)) {
|
||||
this.fatalExceptionStrategies.add(new TypeBasedFatalExceptionStrategy((Class<? extends Throwable>) clazz));
|
||||
} else {
|
||||
log.warn("Fatal exception type {} is not a Throwable", type);
|
||||
}
|
||||
} catch (final ClassNotFoundException e) {
|
||||
log.warn("Could not find class for fatal exception type {}", type);
|
||||
}
|
||||
});
|
||||
}
|
||||
this.fatalExceptionStrategies.addAll(fatalExceptionStrategies);
|
||||
log.info("RequeueExceptionStrategy created: {}", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFatal(final Throwable t) {
|
||||
for (Throwable cause = t; cause != null; cause = cause.getCause()) {
|
||||
// default exception from DefaultExceptionStrategy
|
||||
if (isCauseFatal(cause)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.warn("Found a message that has to be re-queued", t);
|
||||
} else {
|
||||
log.warn("Found a message that has to be re-queued: {}", t.getMessage());
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
protected boolean isCauseFatal(final Throwable cause) {
|
||||
for (final FatalExceptionStrategy handler : fatalExceptionStrategies) {
|
||||
if (handler.isFatal(cause)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ToString
|
||||
public static class TypeBasedFatalExceptionStrategy implements FatalExceptionStrategy {
|
||||
|
||||
private final Class<? extends Throwable>[] types;
|
||||
|
||||
@SafeVarargs
|
||||
public TypeBasedFatalExceptionStrategy(final Class<? extends Throwable>... types) {
|
||||
this.types = types;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFatal(final Throwable cause) {
|
||||
for (final Class<? extends Throwable> type : types) {
|
||||
if (type.isAssignableFrom(cause.getClass())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,83 +0,0 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import io.qameta.allure.Description;
|
||||
import io.qameta.allure.Feature;
|
||||
import io.qameta.allure.Story;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
@Feature("Unit Tests - Delegating Conditional Error Handler")
|
||||
@Story("Delegating Conditional Error Handler")
|
||||
class DelegatingAmqpErrorHandlerTest {
|
||||
|
||||
private final DelegatingConditionalErrorHandler delegatingConditionalErrorHandler =
|
||||
new DelegatingConditionalErrorHandler(
|
||||
List.of(new IllegalArgumentExceptionHandler(), new IndexOutOfBoundsExceptionHandler()),
|
||||
new DefaultErrorHandler());
|
||||
|
||||
@Test
|
||||
@Description("Verifies that with a list of conditional error handlers, the error is delegated to specific handler.")
|
||||
void verifyDelegationHandling() {
|
||||
final Throwable error = new Throwable(new IllegalArgumentException());
|
||||
assertThatExceptionOfType(IllegalArgumentException.class)
|
||||
.as("Expected handled exception to be of type IllegalArgumentException")
|
||||
.isThrownBy(() -> delegatingConditionalErrorHandler.handleError(error));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verifies that default handler is used if no handlers are defined for the specific exception.")
|
||||
void verifyDefaultDelegationHandling() {
|
||||
final Throwable error = new Throwable(new NullPointerException());
|
||||
assertThatExceptionOfType(RuntimeException.class)
|
||||
.as("Expected handled exception to be of type RuntimeException")
|
||||
.isThrownBy(() -> delegatingConditionalErrorHandler.handleError(error));
|
||||
}
|
||||
|
||||
// Test class
|
||||
static class IllegalArgumentExceptionHandler implements AmqpErrorHandler {
|
||||
|
||||
@Override
|
||||
public void doHandle(final Throwable t, final AmqpErrorHandlerChain chain) {
|
||||
if (t.getCause() instanceof IllegalArgumentException) {
|
||||
throw new IllegalArgumentException(t.getMessage());
|
||||
} else {
|
||||
chain.handle(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test class
|
||||
static class IndexOutOfBoundsExceptionHandler implements AmqpErrorHandler {
|
||||
|
||||
@Override
|
||||
public void doHandle(final Throwable t, final AmqpErrorHandlerChain chain) {
|
||||
if (t.getCause() instanceof IndexOutOfBoundsException) {
|
||||
throw new IndexOutOfBoundsException(t.getMessage());
|
||||
} else {
|
||||
chain.handle(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test class
|
||||
static class DefaultErrorHandler implements ErrorHandler {
|
||||
|
||||
@Override
|
||||
public void handleError(final Throwable t) {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import io.qameta.allure.Description;
|
||||
import io.qameta.allure.Feature;
|
||||
import io.qameta.allure.Story;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
|
||||
@Feature("Unit Tests - Requeue Exception Strategy")
|
||||
@Story("Requeue Exception Strategy")
|
||||
class RequestExceptionStrategyTest {
|
||||
|
||||
private final FatalExceptionStrategy requeueExceptionStrategy = new RequeueExceptionStrategy(
|
||||
List.of(new RequeueExceptionStrategy.TypeBasedFatalExceptionStrategy(
|
||||
IllegalArgumentException.class, IndexOutOfBoundsException.class)), null);
|
||||
|
||||
@Test
|
||||
@Description("Verifies that default handler is used if no handlers are defined for the specific exception.")
|
||||
void verifyDefaultFatal() {
|
||||
assertThat(requeueExceptionStrategy.isFatal(new MessageConversionException("t"))).as("Non Fatal error").isTrue();
|
||||
assertThat(requeueExceptionStrategy.isFatal(new Throwable(new MessageConversionException("t")))).as("Non Fatal error").isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verifies additional fatal exception types are fatal.")
|
||||
void verifyAdditionalFatal() {
|
||||
assertThat(requeueExceptionStrategy.isFatal(new IllegalArgumentException())).isTrue();
|
||||
assertThat(requeueExceptionStrategy.isFatal(new IndexOutOfBoundsException())).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verifies additional fatal exception types are fatal.")
|
||||
void verifyAdditionalWrappedFatal() {
|
||||
assertThat(requeueExceptionStrategy.isFatal(new Throwable(new IllegalArgumentException()))).isTrue();
|
||||
assertThat(requeueExceptionStrategy.isFatal(new Throwable(new IndexOutOfBoundsException()))).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verifies that default handler is used if no handlers are defined for the specific exception.")
|
||||
void verifyNonFatal() {
|
||||
assertThat(requeueExceptionStrategy.isFatal(new NullPointerException())).isFalse();
|
||||
assertThat(requeueExceptionStrategy.isFatal(new Throwable(new NullPointerException()))).isFalse();
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -126,6 +127,8 @@ import org.springframework.validation.annotation.Validated;
|
||||
@Validated
|
||||
public class JpaControllerManagement extends JpaActionManagement implements ControllerManagement {
|
||||
|
||||
private static final Pattern PATTERN = Pattern.compile("[a-zA-Z0-9_\\-!@#$%^&*()+=\\[\\]{}|;:'\",.<>/\\\\?\\s]*");
|
||||
|
||||
private final BlockingDeque<TargetPoll> queue;
|
||||
|
||||
// TODO - make it final
|
||||
@@ -197,7 +200,7 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
final long occurredAt = newActionStatus.getOccurredAt();
|
||||
switch (updatedActionStatus) {
|
||||
case ERROR: {
|
||||
final JpaTarget target = (JpaTarget) action.getTarget();
|
||||
final JpaTarget target = action.getTarget();
|
||||
target.setUpdateStatus(TargetUpdateStatus.ERROR);
|
||||
handleErrorOnAction(action, target);
|
||||
break;
|
||||
@@ -512,7 +515,7 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
|
||||
if (action.isActive()) {
|
||||
log.debug("action ({}) was still active. Change to {}.", action, Status.CANCELING);
|
||||
final JpaAction jpaAction = (JpaAction)action;
|
||||
final JpaAction jpaAction = (JpaAction) action;
|
||||
|
||||
jpaAction.setStatus(Status.CANCELING);
|
||||
// document that the status has been retrieved
|
||||
@@ -533,7 +536,7 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
targetRepository.getAccessController().ifPresent(
|
||||
accessController -> accessController.assertOperationAllowed(
|
||||
AccessController.Operation.UPDATE,
|
||||
(JpaTarget) actionRepository
|
||||
actionRepository
|
||||
.findById(actionId)
|
||||
.orElseThrow(() -> new EntityNotFoundException(Action.class, actionId))
|
||||
.getTarget()));
|
||||
@@ -615,11 +618,11 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
}
|
||||
|
||||
private static boolean isAttributeKeyValid(final String key) {
|
||||
return key != null && key.length() <= CONTROLLER_ATTRIBUTE_KEY_SIZE;
|
||||
return key != null && key.length() <= CONTROLLER_ATTRIBUTE_KEY_SIZE && PATTERN.matcher(key).matches();
|
||||
}
|
||||
|
||||
private static boolean isAttributeValueValid(final String value) {
|
||||
return value == null || value.length() <= CONTROLLER_ATTRIBUTE_VALUE_SIZE;
|
||||
return value == null || (value.length() <= CONTROLLER_ATTRIBUTE_VALUE_SIZE && PATTERN.matcher(value).matches());
|
||||
}
|
||||
|
||||
private static void copy(final Map<String, String> src, final Map<String, String> trg) {
|
||||
@@ -820,7 +823,7 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
final JpaTarget target = (JpaTarget) action.getTarget();
|
||||
final JpaTarget target = action.getTarget();
|
||||
action.setActive(false);
|
||||
action.setStatus(DOWNLOADED);
|
||||
target.setUpdateStatus(TargetUpdateStatus.IN_SYNC);
|
||||
@@ -854,11 +857,11 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
* @return a present controllerId in case the attributes needs to be requested.
|
||||
*/
|
||||
private JpaTarget handleFinishedAndStoreInTargetStatus(final long occurredAt, final JpaAction action) {
|
||||
final JpaTarget target = (JpaTarget) action.getTarget();
|
||||
final JpaTarget target = action.getTarget();
|
||||
action.setActive(false);
|
||||
action.setStatus(Status.FINISHED);
|
||||
if (target.getInstallationDate() == null || target.getInstallationDate() < occurredAt) {
|
||||
final JpaDistributionSet ds = (JpaDistributionSet) entityManager.merge(action.getDistributionSet());
|
||||
final JpaDistributionSet ds = entityManager.merge(action.getDistributionSet());
|
||||
|
||||
target.setInstalledDistributionSet(ds);
|
||||
target.setInstallationDate(occurredAt);
|
||||
@@ -866,7 +869,7 @@ public class JpaControllerManagement extends JpaActionManagement implements Cont
|
||||
// Target reported an installation of a DOWNLOAD_ONLY assignment, the assigned DS has to be adapted
|
||||
// because the currently assigned DS can be unequal to the currently installed DS (the downloadOnly DS)
|
||||
if (isDownloadOnly(action)) {
|
||||
target.setAssignedDistributionSet((JpaDistributionSet) action.getDistributionSet());
|
||||
target.setAssignedDistributionSet(action.getDistributionSet());
|
||||
}
|
||||
|
||||
// check if the assigned set is equal to the installed set (not
|
||||
|
||||
@@ -47,7 +47,6 @@ class RSQLTargetFieldTest extends AbstractJpaIntegrationTest {
|
||||
|
||||
@BeforeEach
|
||||
void setupBeforeTest() {
|
||||
|
||||
final DistributionSet ds = testdataFactory.createDistributionSet("AssignedDs");
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
|
||||
Reference in New Issue
Block a user