Feature/handle amqp fatal errors (#1111)
* Adding support to handle lengthy error msgs more precisely Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Added check at conditionalHandler level and changes assertions in test class Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Fixed sonar lint issues Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Reverted the change on making class final Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * To trigger the circle-ci build and check Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Addressed last set of PR comments Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Fixe sonar issue for nullpointer dereference Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com> * Handling null case explicitly Signed-off-by: Shruthi Manavalli Ramanna <shruthimanavalli.ramanna@bosch-si.com>
This commit is contained in:
committed by
GitHub
parent
a8f7f50cf9
commit
c37c615ea6
@@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
|
||||
/**
|
||||
* An abstract error handler for errors resulting from AMQP.
|
||||
*/
|
||||
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler{
|
||||
|
||||
@Override
|
||||
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {
|
||||
// retrieving the cause of throwable as it contains the actual class of
|
||||
// exception
|
||||
final Throwable cause = throwable.getCause();
|
||||
if (getExceptionClass().isAssignableFrom(cause.getClass())) {
|
||||
throw new AmqpRejectAndDontRequeueException(getErrorMessage(throwable));
|
||||
} else {
|
||||
chain.handle(throwable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the class of the exception.
|
||||
*
|
||||
* @return
|
||||
* the exception class
|
||||
*/
|
||||
public abstract Class<T> getExceptionClass();
|
||||
|
||||
/**
|
||||
* Returns the customized error message.
|
||||
*
|
||||
* @return
|
||||
* the customized error message
|
||||
*/
|
||||
public String getErrorMessage(Throwable throwable){
|
||||
return AmqpErrorMessageComposer.constructErrorMessage(throwable);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -8,9 +8,7 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
|
||||
import org.eclipse.hawkbit.api.HostnameResolver;
|
||||
import org.eclipse.hawkbit.cache.DownloadIdCache;
|
||||
@@ -53,7 +51,9 @@ import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Spring configuration for AMQP based DMF communication for indirect device
|
||||
@@ -80,15 +80,48 @@ public class AmqpConfiguration {
|
||||
private ServiceMatcher serviceMatcher;
|
||||
|
||||
/**
|
||||
* Register the bean for the custom error handler.
|
||||
* Creates a custom error handler bean.
|
||||
*
|
||||
* @return custom error handler
|
||||
* @param handlers
|
||||
* list of {@link AmqpErrorHandler} handlers
|
||||
|
||||
* @return the delegating error handler bean
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(ErrorHandler.class)
|
||||
public ErrorHandler errorHandler() {
|
||||
return new ConditionalRejectingErrorHandler(
|
||||
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay()));
|
||||
@ConditionalOnMissingBean
|
||||
public ErrorHandler errorHandler(final List<AmqpErrorHandler> handlers) {
|
||||
return new DelegatingConditionalErrorHandler(handlers, new ConditionalRejectingErrorHandler(
|
||||
new DelayedRequeueExceptionStrategy(amqpProperties.getRequeueDelay())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for all target attributes related fatal errors
|
||||
*
|
||||
* @return the invalid target attribute exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler invalidTargetAttributeConditionalExceptionHandler() {
|
||||
return new InvalidTargetAttributeExceptionHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for entity not found errors
|
||||
*
|
||||
* @return the entity not found exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler entityNotFoundExceptionHandler() {
|
||||
return new EntityNotFoundExceptionHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Error handler bean for amqp message conversion errors
|
||||
*
|
||||
* @return the amqp message conversion exception handler bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpErrorHandler messageConversionExceptionHandler() {
|
||||
return new MessageConversionExceptionHandler();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
/**
|
||||
* Interface declaration of {@link AmqpErrorHandler} that handles errors based on the
|
||||
* types of exception.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface AmqpErrorHandler {
|
||||
|
||||
/**
|
||||
* Handles the error based on the type of exception
|
||||
*
|
||||
* @param throwable
|
||||
* the throwable
|
||||
* @param chain
|
||||
* an {@link AmqpErrorHandlerChain}
|
||||
*/
|
||||
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An error handler chain that delegates the error to the matching error handler based on the type of exception
|
||||
*/
|
||||
public final class AmqpErrorHandlerChain {
|
||||
private final Iterator<AmqpErrorHandler> iterator;
|
||||
private final ErrorHandler defaultHandler;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param iterator
|
||||
* the {@link AmqpErrorHandler} iterator
|
||||
* @param defaultHandler
|
||||
* the default handler
|
||||
*/
|
||||
private AmqpErrorHandlerChain(Iterator<AmqpErrorHandler> iterator, ErrorHandler defaultHandler) {
|
||||
this.iterator = iterator;
|
||||
this.defaultHandler = defaultHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@link AmqpErrorHandlerChain}
|
||||
*
|
||||
* @param errorHandlers
|
||||
* {@link List} of error handlers
|
||||
* @param defaultHandler
|
||||
* the default error handler
|
||||
* @return an {@link AmqpErrorHandlerChain}
|
||||
*/
|
||||
public static AmqpErrorHandlerChain getHandlerChain(final List<AmqpErrorHandler> errorHandlers, final ErrorHandler defaultHandler) {
|
||||
return new AmqpErrorHandlerChain(errorHandlers.iterator(), defaultHandler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the error based on the type of exception
|
||||
*
|
||||
* @param error
|
||||
* the throwable containing the cause of exception
|
||||
*/
|
||||
public void handle(final Throwable error) {
|
||||
if (iterator.hasNext()) {
|
||||
final AmqpErrorHandler handler = iterator.next();
|
||||
handler.doHandle(error, this);
|
||||
} else {
|
||||
defaultHandler.handleError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
|
||||
|
||||
/**
|
||||
* Class that composes a meaningful error message and enhances it with properties from failed message
|
||||
*/
|
||||
public final class AmqpErrorMessageComposer {
|
||||
|
||||
private AmqpErrorMessageComposer() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs an error message based on failed message content
|
||||
*
|
||||
* @param throwable
|
||||
* the throwable containing failed message content
|
||||
* @return
|
||||
* meaningful error message
|
||||
*/
|
||||
public static String constructErrorMessage(final Throwable throwable) {
|
||||
StringBuilder completeErrorMessage = new StringBuilder();
|
||||
final String mainErrorMsg = throwable.getCause().getMessage();
|
||||
|
||||
if (throwable instanceof ListenerExecutionFailedException) {
|
||||
Collection<Message> failedMessages = ((ListenerExecutionFailedException) throwable).getFailedMessages();
|
||||
// since the intended message content is always on top of the collection, we only extract the first one
|
||||
final Message failedMessage = failedMessages.iterator().next();
|
||||
final byte[] amqpFailedMsgBody = failedMessage.getBody();
|
||||
final Map<String, Object> amqpFailedMsgHeaders = failedMessage.getMessageProperties().getHeaders();
|
||||
|
||||
String amqpFailedMsgConcatenatedHeaders = amqpFailedMsgHeaders.keySet().stream()
|
||||
.map(key -> key + "=" + amqpFailedMsgHeaders.get(key)).collect(Collectors.joining(", ", "{", "}"));
|
||||
completeErrorMessage.append(mainErrorMsg).append(new String(amqpFailedMsgBody))
|
||||
.append(amqpFailedMsgConcatenatedHeaders);
|
||||
return completeErrorMessage.toString();
|
||||
}
|
||||
return mainErrorMsg;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.List;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
/**
|
||||
* An error handler delegates error handling to the matching {@link AmqpErrorHandler} based on the type of exception
|
||||
*/
|
||||
public class DelegatingConditionalErrorHandler implements ErrorHandler {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DelegatingConditionalErrorHandler.class);
|
||||
private final List<AmqpErrorHandler> handlers;
|
||||
private final ErrorHandler defaultHandler;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param handlers
|
||||
* {@link List} of error handlers
|
||||
* @param defaultHandler
|
||||
* the default error handler
|
||||
*/
|
||||
public DelegatingConditionalErrorHandler(final List<AmqpErrorHandler> handlers, @NotNull final ErrorHandler defaultHandler) {
|
||||
this.handlers = handlers;
|
||||
this.defaultHandler = defaultHandler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleError(final Throwable t) {
|
||||
if (t.getCause() == null) {
|
||||
LOG.error("Cannot handle the error as the cause of the error is null!");
|
||||
return;
|
||||
}
|
||||
|
||||
if (includesAmqpRejectException(t.getCause())) {
|
||||
LOG.error("Received an AmqpRejectAndDontRequeueException due to {}", t.getCause().getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
AmqpErrorHandlerChain.getHandlerChain(handlers, defaultHandler).handle(t);
|
||||
}
|
||||
|
||||
private boolean includesAmqpRejectException(final Throwable t) {
|
||||
if (t instanceof AmqpRejectAndDontRequeueException){
|
||||
return true;
|
||||
}
|
||||
if (t.getCause() != null) {
|
||||
return includesAmqpRejectException(t.getCause());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
|
||||
|
||||
/**
|
||||
* An error handler for entity not found exception resulting from AMQP.
|
||||
*/
|
||||
public class EntityNotFoundExceptionHandler extends AbstractAmqpErrorHandler<EntityNotFoundException> {
|
||||
|
||||
@Override
|
||||
public Class<EntityNotFoundException> getExceptionClass() {
|
||||
return EntityNotFoundException.class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;
|
||||
|
||||
/**
|
||||
* An error handler for all invalid target attributes resulting from AMQP.
|
||||
*/
|
||||
public class InvalidTargetAttributeExceptionHandler extends AbstractAmqpErrorHandler<InvalidTargetAttributeException> {
|
||||
|
||||
@Override
|
||||
public Class<InvalidTargetAttributeException> getExceptionClass() {
|
||||
return InvalidTargetAttributeException.class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.fasterxml.jackson.databind.exc.InvalidFormatException;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
|
||||
/**
|
||||
* An error handler for message conversion exception resulting from AMQP.
|
||||
*/
|
||||
public class MessageConversionExceptionHandler extends AbstractAmqpErrorHandler<MessageConversionException> {
|
||||
|
||||
@Override
|
||||
public Class<MessageConversionException> getExceptionClass() {
|
||||
return MessageConversionException.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getErrorMessage(Throwable throwable) {
|
||||
final String errorMessage = super.getErrorMessage(throwable);
|
||||
//since the detailed error message lies in the first parent of current throwable we retrieve it
|
||||
// and append it to the errorMessage
|
||||
final Optional<String> detailedErrorMessage = getFirstAncestralErrorMessage(throwable.getCause());
|
||||
return detailedErrorMessage.isPresent()? (detailedErrorMessage.get() + errorMessage) : errorMessage;
|
||||
}
|
||||
|
||||
private Optional<String> getFirstAncestralErrorMessage(final Throwable throwable) {
|
||||
if(throwable.getCause() instanceof InvalidFormatException) {
|
||||
return Optional.of(throwable.getCause().getMessage());
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,86 @@
|
||||
/**
|
||||
* Copyright (c) 2021 Bosch.IO GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||
|
||||
import io.qameta.allure.Description;
|
||||
import io.qameta.allure.Feature;
|
||||
import io.qameta.allure.Story;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.util.ErrorHandler;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Feature("Unit Tests - Delegating Conditional Error Handler")
|
||||
@Story("Delegating Conditional Error Handler")
|
||||
public class DelegatingAmqpErrorHandlerTest {
|
||||
|
||||
@Test
|
||||
@Description("Verifies that with a list of conditional error handlers, the error is delegated to specific handler.")
|
||||
public void verifyDelegationHandling(){
|
||||
List<AmqpErrorHandler> handlers = new ArrayList<>();
|
||||
handlers.add(new IllegalArgumentExceptionHandler());
|
||||
handlers.add(new IndexOutOfBoundsExceptionHandler());
|
||||
assertThatExceptionOfType(IllegalArgumentException.class)
|
||||
.as("Expected handled exception to be of type IllegalArgumentException")
|
||||
.isThrownBy(() -> new DelegatingConditionalErrorHandler(handlers, new DefaultErrorHandler())
|
||||
.handleError(new Throwable(new IllegalArgumentException())));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verifies that default handler is used if no handlers are defined for the specific exception.")
|
||||
public void verifyDefaultDelegationHandling(){
|
||||
List<AmqpErrorHandler> handlers = new ArrayList<>();
|
||||
handlers.add(new IllegalArgumentExceptionHandler());
|
||||
handlers.add(new IndexOutOfBoundsExceptionHandler());
|
||||
assertThatExceptionOfType(RuntimeException.class)
|
||||
.as("Expected handled exception to be of type RuntimeException")
|
||||
.isThrownBy(() -> new DelegatingConditionalErrorHandler(handlers, new DefaultErrorHandler())
|
||||
.handleError(new Throwable(new NullPointerException())));
|
||||
}
|
||||
|
||||
// Test class
|
||||
public class IllegalArgumentExceptionHandler implements AmqpErrorHandler {
|
||||
|
||||
@Override
|
||||
public void doHandle(final Throwable t, final AmqpErrorHandlerChain chain) {
|
||||
if (t.getCause() instanceof IllegalArgumentException) {
|
||||
throw new IllegalArgumentException(t.getMessage());
|
||||
} else {
|
||||
chain.handle(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test class
|
||||
public class IndexOutOfBoundsExceptionHandler implements AmqpErrorHandler {
|
||||
|
||||
@Override
|
||||
public void doHandle(final Throwable t, final AmqpErrorHandlerChain chain) {
|
||||
if (t.getCause() instanceof IndexOutOfBoundsException) {
|
||||
throw new IndexOutOfBoundsException(t.getMessage());
|
||||
} else {
|
||||
chain.handle(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Test class
|
||||
public class DefaultErrorHandler implements ErrorHandler {
|
||||
|
||||
@Override
|
||||
public void
|
||||
handleError(Throwable t) {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user