Refactoring/Improving source: dmf (#1611)

Signed-off-by: Marinov Avgustin <Avgustin.Marinov@bosch.com>
This commit is contained in:
Avgustin Marinov
2024-02-04 11:05:38 +02:00
committed by GitHub
parent e218d2c64d
commit da3a6470ec
20 changed files with 73 additions and 106 deletions

View File

@@ -14,7 +14,7 @@ import org.springframework.amqp.AmqpRejectAndDontRequeueException;
/**
* An abstract error handler for errors resulting from AMQP.
*/
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler{
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler {
@Override
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {

View File

@@ -13,6 +13,7 @@ import java.net.URISyntaxException;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadArtifactCache;
import org.eclipse.hawkbit.cache.DownloadIdCache;
@@ -27,8 +28,6 @@ import org.eclipse.hawkbit.repository.model.Artifact;
import org.eclipse.hawkbit.security.DmfTenantSecurityToken;
import org.eclipse.hawkbit.security.DmfTenantSecurityToken.FileResource;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -48,10 +47,9 @@ import org.springframework.web.util.UriComponentsBuilder;
* is permitted to download certain artifact. This is handled by the queue that
* is configured for the property
* hawkbit.dmf.rabbitmq.authenticationReceiverQueue.
*
*/
@Slf4j
public class AmqpAuthenticationMessageHandler extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpAuthenticationMessageHandler.class);
private final AmqpControllerAuthentication authenticationManager;
@@ -133,30 +131,30 @@ public class AmqpAuthenticationMessageHandler extends BaseAmqpService {
} else if (securityToken.getTargetId() != null) {
checkByTargetId(sha1Hash, securityToken.getTargetId());
} else {
LOG.info("anonymous download no authentication check for artifact {}", sha1Hash);
log.info("anonymous download no authentication check for artifact {}", sha1Hash);
}
}
private void checkByTargetId(final String sha1Hash, final Long targetId) {
LOG.debug("no anonymous download request, doing authentication check for target {} and artifact {}", targetId,
log.debug("no anonymous download request, doing authentication check for target {} and artifact {}", targetId,
sha1Hash);
if (!controllerManagement.hasTargetArtifactAssigned(targetId, sha1Hash)) {
LOG.info("target {} tried to download artifact {} which is not assigned to the target", targetId, sha1Hash);
log.info("target {} tried to download artifact {} which is not assigned to the target", targetId, sha1Hash);
throw new EntityNotFoundException();
}
LOG.info("download security check for target {} and artifact {} granted", targetId, sha1Hash);
log.info("download security check for target {} and artifact {} granted", targetId, sha1Hash);
}
private void checkByControllerId(final String sha1Hash, final String controllerId) {
LOG.debug("no anonymous download request, doing authentication check for target {} and artifact {}",
log.debug("no anonymous download request, doing authentication check for target {} and artifact {}",
controllerId, sha1Hash);
if (!controllerManagement.hasTargetArtifactAssigned(controllerId, sha1Hash)) {
LOG.info("target {} tried to download artifact {} which is not assigned to the target", controllerId,
log.info("target {} tried to download artifact {} which is not assigned to the target", controllerId,
sha1Hash);
throw new EntityNotFoundException();
}
LOG.info("download security check for target {} and artifact {} granted", controllerId, sha1Hash);
log.info("download security check for target {} and artifact {} granted", controllerId, sha1Hash);
}
private Optional<Artifact> findArtifactByFileResource(final FileResource fileResource) {
@@ -222,16 +220,16 @@ public class AmqpAuthenticationMessageHandler extends BaseAmqpService {
.path(tenantAware.getCurrentTenant()).path("/").path(downloadId).build().toUriString());
authenticationResponse.setResponseCode(HttpStatus.OK.value());
} catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) {
LOG.error("Login failed", e);
log.error("Login failed", e);
authenticationResponse.setResponseCode(HttpStatus.FORBIDDEN.value());
authenticationResponse.setMessage("Login failed");
} catch (final URISyntaxException e) {
LOG.error("URI build exception", e);
log.error("URI build exception", e);
authenticationResponse.setResponseCode(HttpStatus.INTERNAL_SERVER_ERROR.value());
authenticationResponse.setMessage("Building download URI failed");
} catch (final EntityNotFoundException e) {
final String errorMessage = "Artifact for resource " + fileResource + " not found ";
LOG.info(errorMessage);
log.info(errorMessage);
authenticationResponse.setResponseCode(HttpStatus.NOT_FOUND.value());
authenticationResponse.setMessage(errorMessage);
}

View File

@@ -9,6 +9,7 @@
*/
package org.eclipse.hawkbit.amqp;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadIdCache;
@@ -26,8 +27,6 @@ import org.eclipse.hawkbit.repository.TenantConfigurationManagement;
import org.eclipse.hawkbit.security.DdiSecurityProperties;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
@@ -60,15 +59,13 @@ import java.util.Map;
/**
* Spring configuration for AMQP based DMF communication for indirect device
* integration.
*
*/
@Slf4j
@EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class })
@ConditionalOnProperty(prefix = "hawkbit.dmf.rabbitmq", name = "enabled", matchIfMissing = true)
@PropertySource("classpath:/hawkbit-dmf-defaults.properties")
public class AmqpConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class);
@Autowired
private AmqpProperties amqpProperties;
@@ -154,9 +151,9 @@ public class AmqpConfiguration {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.debug("Message with {} confirmed by broker.", correlationData);
log.debug("Message with {} confirmed by broker.", correlationData);
} else {
LOGGER.error("Broker is unable to handle message with {} : {}", correlationData, cause);
log.error("Broker is unable to handle message with {} : {}", correlationData, cause);
}
});

View File

@@ -14,6 +14,7 @@ import java.util.List;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.im.authentication.TenantAwareAuthenticationDetails;
import org.eclipse.hawkbit.repository.ControllerManagement;
import org.eclipse.hawkbit.repository.SystemManagement;
@@ -29,19 +30,15 @@ import org.eclipse.hawkbit.security.PreAuthTokenSourceTrustAuthenticationProvide
import org.eclipse.hawkbit.security.PreAuthenticationFilter;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken;
/**
*
* A controller which handles the DMF AMQP authentication.
*/
@Slf4j
public class AmqpControllerAuthentication {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpControllerAuthentication.class);
private final PreAuthTokenSourceTrustAuthenticationProvider preAuthenticatedAuthenticationProvider = new PreAuthTokenSourceTrustAuthenticationProvider();
private List<PreAuthenticationFilter> filterChain;
@@ -155,11 +152,11 @@ public class AmqpControllerAuthentication {
final Object credentials = filter.getPreAuthenticatedCredentials(securityToken);
if (principal == null) {
LOGGER.debug("No pre-authenticated principal found in message");
log.debug("No pre-authenticated principal found in message");
return null;
}
LOGGER.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal);
log.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal);
return new PreAuthenticatedAuthenticationToken(principal, credentials,
filter.getSuccessfulAuthenticationAuthorities());

View File

@@ -13,6 +13,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -20,8 +21,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
* Bean which holds the necessary properties for configuring the AMQP deadletter
* queue.
*/
@Data
@ConfigurationProperties("hawkbit.dmf.rabbitmq.dead-letter")
public class AmqpDeadletterProperties {
private static final int THREE_WEEKS = 21;
/**
@@ -46,8 +49,7 @@ public class AmqpDeadletterProperties {
/**
* Create a deadletter queue with ttl for messages
*
* @param queueName
* the deadlette queue name
* @param queueName the deadletter queue name
* @return the deadletter queue
*/
public Queue createDeadletterQueue(final String queueName) {
@@ -59,13 +61,4 @@ public class AmqpDeadletterProperties {
args.put("x-message-ttl", getTtl());
return args;
}
public long getTtl() {
return ttl;
}
public void setTtl(final long ttl) {
this.ttl = ttl;
}
}

View File

@@ -24,6 +24,5 @@ public interface AmqpErrorHandler {
* @param chain
* an {@link AmqpErrorHandlerChain}
*/
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);
}
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);
}

View File

@@ -61,5 +61,4 @@ public final class AmqpErrorHandlerChain {
defaultHandler.handleError(error);
}
}
}
}

View File

@@ -13,17 +13,17 @@ import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
/**
* Class that composes a meaningful error message and enhances it with properties from failed message
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AmqpErrorMessageComposer {
private AmqpErrorMessageComposer() {
}
/**
* Constructs an error message based on failed message content
*

View File

@@ -25,6 +25,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.IterableUtils;
import org.apache.commons.collections4.ListUtils;
import org.eclipse.hawkbit.api.ApiType;
@@ -67,8 +68,6 @@ import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.repository.model.TenantMetaData;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
@@ -87,12 +86,10 @@ import org.springframework.util.CollectionUtils;
*
* Additionally the dispatcher listener/subscribe for some target events e.g.
* assignment.
*
*/
@Slf4j
public class AmqpMessageDispatcherService extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);
private static final int MAX_PROCESSING_SIZE = 1000;
private final ArtifactUrlHandler artifactUrlHandler;
@@ -167,7 +164,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
assignedEvent.getActions().keySet());
if (!filteredTargetList.isEmpty()) {
LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(),
filteredTargetList);
}
@@ -184,7 +181,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
if (!shouldBeProcessed(multiActionEvent)) {
return;
}
LOG.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
}
@@ -192,7 +189,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
return partitionedParallelExecution(controllerIds, partition -> {
return targetManagement.getByControllerID(partition).stream().filter(target -> {
if (hasPendingCancellations(target.getId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
log.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}

View File

@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
@@ -51,8 +52,6 @@ import org.eclipse.hawkbit.repository.model.SoftwareModuleMetadata;
import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -67,20 +66,17 @@ import org.springframework.security.core.context.SecurityContextImpl;
import org.springframework.util.StringUtils;
/**
*
* {@link AmqpMessageHandlerService} handles all incoming target interaction
* AMQP messages (e.g. create target, check for updates etc.) for the queue
* which is configured for the property hawkbit.dmf.rabbitmq.receiverQueue.
*
*/
@Slf4j
public class AmqpMessageHandlerService extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class);
private final AmqpMessageDispatcherService amqpMessageDispatcherService;
private ControllerManagement controllerManagement;
private ConfirmationManagement confirmationManagement;
private final ConfirmationManagement confirmationManagement;
private final EntityFactory entityFactory;
@@ -231,14 +227,14 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo);
final Target target;
if (isOptionalMessageBodyEmpty(message)) {
LOG.debug("Received \"THING_CREATED\" AMQP message for thing \"{}\" without body.", thingId);
log.debug("Received \"THING_CREATED\" AMQP message for thing \"{}\" without body.", thingId);
target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri);
} else {
checkContentTypeJson(message);
final DmfCreateThing thingCreateBody = convertMessage(message, DmfCreateThing.class);
final DmfAttributeUpdate thingAttributeUpdateBody = thingCreateBody.getAttributeUpdate();
LOG.debug("Received \"THING_CREATED\" AMQP message for thing \"{}\" with target name \"{}\" and type " +
log.debug("Received \"THING_CREATED\" AMQP message for thing \"{}\" with target name \"{}\" and type " +
"\"{}\".", thingId, thingCreateBody.getName(), thingCreateBody.getType());
target = controllerManagement.findOrRegisterTargetIfItDoesNotExist(thingId, amqpUri,
@@ -249,7 +245,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
getUpdateMode(thingAttributeUpdateBody));
}
}
LOG.debug("Target {} reported online state.", thingId);
log.debug("Target {} reported online state.", thingId);
sendUpdateCommandToTarget(target);
} catch (final EntityAlreadyExistsException e) {
throw new AmqpRejectAndDontRequeueException(
@@ -356,14 +352,14 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
final DmfAutoConfirmation autoConfirmation = convertMessage(message, DmfAutoConfirmation.class);
final String thingId = getStringHeaderKey(message, MessageHeaderKey.THING_ID, THING_ID_NULL);
if (autoConfirmation.isEnabled()) {
LOG.debug("Activate auto-confirmation for device {} using DMF. Initiator: {}. Remark: {}", thingId,
log.debug("Activate auto-confirmation for device {} using DMF. Initiator: {}. Remark: {}", thingId,
autoConfirmation.getInitiator(), autoConfirmation.getRemark());
final String remark = autoConfirmation.getRemark() == null
? "Activated using Device Management Federation API."
: autoConfirmation.getRemark();
controllerManagement.activateAutoConfirmation(thingId, autoConfirmation.getInitiator(), remark);
} else {
LOG.debug("Deactivate auto-confirmation for device {} using DMF.", thingId);
log.debug("Deactivate auto-confirmation for device {} using DMF.", thingId);
controllerManagement.deactivateAutoConfirmation(thingId);
}
}
@@ -480,7 +476,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
private Action checkActionExist(final Message message, final DmfActionUpdateStatus actionUpdateStatus) {
final Long actionId = actionUpdateStatus.getActionId();
LOG.debug("Target notifies intermediate about action {} with status {}.", actionId,
log.debug("Target notifies intermediate about action {} with status {}.", actionId,
actionUpdateStatus.getActionStatus());
final Optional<Action> findActionWithDetails = controllerManagement.findActionWithDetails(actionId);

View File

@@ -31,5 +31,4 @@ public interface AmqpMessageSenderService {
* the reply to uri
*/
void sendMessage(@NotNull final Message message, @NotNull final URI replyTo);
}
}

View File

@@ -13,8 +13,7 @@ import java.util.Map;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
@@ -26,9 +25,9 @@ import org.springframework.amqp.support.converter.MessageConverter;
/**
* A base class which provide basis amqp staff.
*/
@Slf4j
public class BaseAmqpService {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class);
private final RabbitTemplate rabbitTemplate;
/**
@@ -91,7 +90,7 @@ public class BaseAmqpService {
}
protected static final void logAndThrowMessageError(final Message message, final String error) {
LOGGER.debug("Warning! \"{}\" reported by message: {}", error, message);
log.debug("Warning! \"{}\" reported by message: {}", error, message);
throw new AmqpRejectAndDontRequeueException(error);
}

View File

@@ -21,6 +21,7 @@ import org.springframework.util.ErrorHandler;
*
*/
public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {
private final int declarationRetries;
/**
@@ -51,4 +52,4 @@ public class ConfigurableRabbitListenerContainerFactory extends SimpleRabbitList
super.initializeContainer(instance, endpoint);
instance.setDeclarationRetries(declarationRetries);
}
}
}

View File

@@ -12,9 +12,8 @@ package org.eclipse.hawkbit.amqp;
import java.net.URI;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -25,14 +24,13 @@ import org.springframework.util.StringUtils;
* message to the configured spring rabbitmq connections. The exchange is
* extracted from the uri.
*/
@Slf4j
public class DefaultAmqpMessageSenderService extends BaseAmqpService implements AmqpMessageSenderService {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAmqpMessageSenderService.class);
/**
* Constructor.
*
* @param rabbitTemplate
* the AMQP template
* @param rabbitTemplate the AMQP template
*/
public DefaultAmqpMessageSenderService(final RabbitTemplate rabbitTemplate) {
super(rabbitTemplate);
@@ -51,10 +49,10 @@ public class DefaultAmqpMessageSenderService extends BaseAmqpService implements
message.getMessageProperties().setCorrelationId(correlationId);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId);
if (log.isTraceEnabled()) {
log.trace("Sending message {} to exchange {} with correlationId {}", message, exchange, correlationId);
} else {
LOGGER.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId);
log.debug("Sending message to exchange {} with correlationId {}", exchange, correlationId);
}
getRabbitTemplate().send(exchange, "", message, new CorrelationData(correlationId));

View File

@@ -13,14 +13,13 @@ import java.util.concurrent.TimeUnit;
import jakarta.validation.ConstraintViolationException;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.repository.exception.CancelActionNotAllowedException;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.InvalidTargetAddressException;
import org.eclipse.hawkbit.repository.exception.InvalidTargetAttributeException;
import org.eclipse.hawkbit.repository.exception.AssignmentQuotaExceededException;
import org.eclipse.hawkbit.repository.exception.TenantNotExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.FatalExceptionStrategy;
import org.springframework.amqp.support.converter.MessageConversionException;
@@ -31,16 +30,14 @@ import org.springframework.messaging.MessageHandlingException;
* exceptions not to be requeued. In addition it throttles in case of a requeue
* by means of blocking the processing thread for a certain amount of time. That
* avoids a back and forth between broker and hawkBit at maximum speed.
*
*/
@Slf4j
public class DelayedRequeueExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private static final Logger LOG = LoggerFactory.getLogger(DelayedRequeueExceptionStrategy.class);
private final long delay;
/**
* @param delay
* in {@link TimeUnit#MILLISECONDS} before requeue.
* @param delay in {@link TimeUnit#MILLISECONDS} before requeue.
*/
public DelayedRequeueExceptionStrategy(final long delay) {
this.delay = delay;
@@ -52,12 +49,12 @@ public class DelayedRequeueExceptionStrategy extends ConditionalRejectingErrorHa
return true;
}
LOG.error("Found a message that has to be requeued. Processing with delay of {}ms: ", delay, cause);
log.error("Found a message that has to be requeued. Processing with delay of {}ms: ", delay, cause);
try {
TimeUnit.MILLISECONDS.sleep(delay);
} catch (final InterruptedException e) {
LOG.error("Delay interrupted!", e);
log.error("Delay interrupted!", e);
Thread.currentThread().interrupt();
}

View File

@@ -12,16 +12,15 @@ package org.eclipse.hawkbit.amqp;
import java.util.List;
import jakarta.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.util.ErrorHandler;
/**
* An error handler delegates error handling to the matching {@link AmqpErrorHandler} based on the type of exception
*/
@Slf4j
public class DelegatingConditionalErrorHandler implements ErrorHandler {
private static final Logger LOG = LoggerFactory.getLogger(DelegatingConditionalErrorHandler.class);
private final List<AmqpErrorHandler> handlers;
private final ErrorHandler defaultHandler;
@@ -41,12 +40,12 @@ public class DelegatingConditionalErrorHandler implements ErrorHandler {
@Override
public void handleError(final Throwable t) {
if (t.getCause() == null) {
LOG.error("Cannot handle the error as the cause of the error is null!");
log.error("Cannot handle the error as the cause of the error is null!");
return;
}
if (includesAmqpRejectException(t.getCause())) {
LOG.error("Received an AmqpRejectAndDontRequeueException due to {}", t.getCause().getMessage());
log.error("Received an AmqpRejectAndDontRequeueException due to {}", t.getCause().getMessage());
return;
}

View File

@@ -21,4 +21,4 @@ import org.springframework.context.annotation.Import;
@Import(AmqpConfiguration.class)
public class DmfApiConfiguration {
}
}

View File

@@ -20,4 +20,4 @@ public class EntityNotFoundExceptionHandler extends AbstractAmqpErrorHandler<Ent
public Class<EntityNotFoundException> getExceptionClass() {
return EntityNotFoundException.class;
}
}
}

View File

@@ -20,4 +20,4 @@ public class InvalidTargetAttributeExceptionHandler extends AbstractAmqpErrorHan
public Class<InvalidTargetAttributeException> getExceptionClass() {
return InvalidTargetAttributeException.class;
}
}
}

View File

@@ -13,14 +13,13 @@ import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.eclipse.hawkbit.repository.jpa.RepositoryApplicationConfiguration;
import org.eclipse.hawkbit.repository.test.TestConfiguration;
import org.eclipse.hawkbit.repository.test.util.AbstractIntegrationTest;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -35,6 +34,7 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.annotation.DirtiesContext.ClassMode;
import org.springframework.test.context.ContextConfiguration;
@Slf4j
@RabbitAvailable
@ContextConfiguration(classes = { RepositoryApplicationConfiguration.class, AmqpTestConfiguration.class,
TestConfiguration.class})
@@ -43,9 +43,7 @@ import org.springframework.test.context.ContextConfiguration;
// beans after every test class.
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpIntegrationTest.class);
private static final Duration TIMEOUT = Duration.ofSeconds(5);
@Autowired
@@ -85,7 +83,7 @@ public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTes
return Integer.parseInt(queueProps.get(RabbitAdmin.QUEUE_MESSAGE_COUNT).toString());
}
final int fallbackCount = 0;
LOG.warn(
log.warn(
"Cannot determine the queue message count for queue '{}' (queue properties {}). Returning queue message count {}.",
queueName, queueProps, fallbackCount);
return fallbackCount;