Move DMF message converter in amqp-api (#3143)
- move in amqp to be in single place - public DmfMessageConverter that could be used everywhere directly (instead of factory methods) - defualt amqpMessageConverter bean renamed to dmfMessageConverter - trusted packages configured (for dmfMessageConverter) with hawkbit.dmf.trusted-packages - default hwakbit dmf model package Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -9,14 +9,18 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import static org.eclipse.hawkbit.dmf.DmfMessageConverter.DMF_JSON_MODEL_PACKAGE;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.artifact.urlresolver.ArtifactUrlResolver;
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.repository.ConfirmationManagement;
|
||||
import org.eclipse.hawkbit.repository.ControllerManagement;
|
||||
import org.eclipse.hawkbit.repository.DeploymentManagement;
|
||||
@@ -27,9 +31,6 @@ import org.eclipse.hawkbit.repository.TargetManagement;
|
||||
import org.eclipse.hawkbit.repository.model.DistributionSet;
|
||||
import org.eclipse.hawkbit.repository.model.SoftwareModule;
|
||||
import org.eclipse.hawkbit.repository.model.Target;
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.listener.ConditionalRejectingErrorHandler;
|
||||
import org.springframework.amqp.listener.FatalExceptionStrategy;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
@@ -95,18 +96,20 @@ public class DmfApiConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(name = "amqpMessageConverter") // override it if needed to add / edit trusted packages or need other customization
|
||||
public MessageConverter amqpMessageConverter(final JsonMapper jsonMapper) {
|
||||
return DmfApiConfiguration.messageConverter(jsonMapper);
|
||||
@ConditionalOnMissingBean(name = "dmfMessageConverter") // override it if needed to add / edit trusted packages or need other customization
|
||||
public MessageConverter dmfMessageConverter(
|
||||
final JsonMapper jsonMapper,
|
||||
@Value("${hawkbit.dmf.trusted-packages:" + DMF_JSON_MODEL_PACKAGE + "}") final String trustedPackages) {
|
||||
return new DmfMessageConverter(jsonMapper, Arrays.stream(trustedPackages.split(",")).map(String::trim).toArray(String[]::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link RabbitTemplate} with automatic retry, published confirms and {@link JacksonJsonMessageConverter}.
|
||||
*/
|
||||
@Bean
|
||||
public RabbitTemplate rabbitTemplate(@Qualifier("amqpMessageConverter") final MessageConverter amqpMessageConverter) {
|
||||
public RabbitTemplate rabbitTemplate(@Qualifier("dmfMessageConverter") final MessageConverter dmfMessageConverter) {
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
|
||||
rabbitTemplate.setMessageConverter(amqpMessageConverter);
|
||||
rabbitTemplate.setMessageConverter(dmfMessageConverter);
|
||||
|
||||
// the same policy the previously used default ExponentialBackOffPolicy applied
|
||||
rabbitTemplate.setRetryTemplate(new RetryTemplate(RetryPolicy.builder()
|
||||
@@ -186,31 +189,6 @@ public class DmfApiConfiguration {
|
||||
deploymentManagement);
|
||||
}
|
||||
|
||||
// since spring-amqp 4.0.4 not all packages are assumed trusted for type converter (only java.land and java.util)
|
||||
// so e need to add hawkbit (and eventual extension packages as trusted
|
||||
// also (again since spring-amqp 4.0.4) the conversion from empty payload fail (which probably is fine since it is JSON)
|
||||
// however, (for backward compatibility, e.g. THING_REMOVED doesn't define payload and could be empty byte[]) we assume that
|
||||
// empty payload is empty byte[] and not try to convert it to Object (which fail since it is not JSON)
|
||||
static @NonNull JacksonJsonMessageConverter messageConverter(final JsonMapper jsonMapper) {
|
||||
return messageConverter(jsonMapper, "org.eclipse.hawkbit.dmf.json.model");
|
||||
}
|
||||
|
||||
public static @NonNull JacksonJsonMessageConverter messageConverter(final JsonMapper jsonMapper, final String... trustedPackages) {
|
||||
return new JacksonJsonMessageConverter(jsonMapper, trustedPackages) {
|
||||
|
||||
@Override
|
||||
public @NonNull Object fromMessage(@NonNull final Message message, final @Nullable Object conversionHint) {
|
||||
// default converter tries to convert empty body payload to Object (since rabbit 4.0.4)
|
||||
// which probably is correct since it has to be JSON - however, in this case we assume - empty byte[]
|
||||
if (message.getBody().length == 0) {
|
||||
return message.getBody();
|
||||
} else {
|
||||
return super.fromMessage(message, conversionHint);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ToString
|
||||
private static class SqlFatalExceptionStrategy implements FatalExceptionStrategy {
|
||||
|
||||
|
||||
@@ -26,13 +26,13 @@ import org.eclipse.hawkbit.artifact.model.ArtifactHashes;
|
||||
import org.eclipse.hawkbit.artifact.model.StoredArtifactInfo;
|
||||
import org.eclipse.hawkbit.artifact.urlresolver.ArtifactUrl;
|
||||
import org.eclipse.hawkbit.artifact.urlresolver.ArtifactUrlResolver;
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfActionRequest;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfDownloadAndUpdateRequest;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfSoftwareModule;
|
||||
import org.eclipse.hawkbit.rabbitmq.test.AmqpTestConfiguration;
|
||||
import org.eclipse.hawkbit.repository.SystemManagement;
|
||||
import org.eclipse.hawkbit.repository.TargetManagement.Create;
|
||||
import org.eclipse.hawkbit.repository.event.remote.CancelTargetAssignmentEvent;
|
||||
@@ -64,7 +64,6 @@ import org.springframework.amqp.support.converter.DefaultJacksonJavaTypeMapper;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
/**
|
||||
* Feature: Component Tests - Device Management Federation API<br/>
|
||||
@@ -95,7 +94,7 @@ class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTest {
|
||||
Create.builder().controllerId(CONTROLLER_ID).securityToken(TEST_TOKEN).address(AMQP_URI.toString()).build());
|
||||
|
||||
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(AmqpTestConfiguration.messageConverter(new JsonMapper()));
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(new DmfMessageConverter());
|
||||
|
||||
senderService = Mockito.mock(DefaultAmqpMessageSenderService.class);
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
|
||||
@@ -56,7 +57,6 @@ import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
/**
|
||||
* Feature: Component Tests - Device Management Federation API<br/>
|
||||
@@ -108,7 +108,7 @@ class AmqpMessageHandlerServiceTest {
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
void before() {
|
||||
TenantConfigHelper.setTenantConfigurationManagement(tenantConfigurationManagement);
|
||||
messageConverter = DmfApiConfiguration.messageConverter(new JsonMapper());
|
||||
messageConverter = new DmfMessageConverter();
|
||||
lenient().when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(
|
||||
|
||||
@@ -15,6 +15,7 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
|
||||
@@ -29,7 +30,6 @@ import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.MessageConversionException;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
/**
|
||||
* Feature: Component Tests - Device Management Federation API<br/>
|
||||
@@ -54,7 +54,7 @@ class BaseAmqpServiceTest {
|
||||
@Test
|
||||
void convertMessageTest() {
|
||||
final DmfActionUpdateStatus actionUpdateStatus = createActionStatus();
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(DmfApiConfiguration.messageConverter(new JsonMapper()));
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(new DmfMessageConverter());
|
||||
|
||||
final Message message = rabbitTemplate.getMessageConverter().toMessage(actionUpdateStatus, createJsonProperties());
|
||||
final DmfActionUpdateStatus convertedActionUpdateStatus = baseAmqpService.convertMessage(message, DmfActionUpdateStatus.class);
|
||||
@@ -91,7 +91,7 @@ class BaseAmqpServiceTest {
|
||||
@ExpectEvents({ @Expect(type = TargetCreatedEvent.class, count = 0) })
|
||||
void updateActionStatusWithInvalidJsonContent() {
|
||||
final Message message = createMessage("Invalid Json".getBytes());
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(DmfApiConfiguration.messageConverter(new JsonMapper()));
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(new DmfMessageConverter());
|
||||
|
||||
assertThatExceptionOfType(MessageConversionException.class)
|
||||
.as("Expected MessageConversionException for invalid JSON")
|
||||
@@ -111,5 +111,4 @@ class BaseAmqpServiceTest {
|
||||
private DmfActionUpdateStatus createActionStatus() {
|
||||
return new DmfActionUpdateStatus(1L, DmfActionStatus.RUNNING, null, 2L, List.of("Message 1", "Message 2"), 2);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -30,5 +30,22 @@
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- DmfMessageConverter dependencies -->
|
||||
<dependency>
|
||||
<groupId>tools.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-beans</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-amqp</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
/**
|
||||
* Copyright (c) 2026 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.dmf;
|
||||
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.support.converter.JacksonJsonMessageConverter;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
// - since spring-amqp 4.0.4 not all packages are assumed trusted for type converter (only java.land and java.util)
|
||||
// so e need to add hawkbit DMF model package (and eventual extension packages) as trusted
|
||||
// - also (again since spring-amqp 4.0.4) the conversion from empty payload fail (which probably is fine since it is JSON)
|
||||
// however, (for backward compatibility, e.g. THING_REMOVED doesn't define payload and could be empty byte[]) we assume that
|
||||
// empty payload is empty byte[] and not try to convert it to Object (which fail since it is not JSON)
|
||||
public class DmfMessageConverter extends JacksonJsonMessageConverter {
|
||||
|
||||
public static final String DMF_JSON_MODEL_PACKAGE = "org.eclipse.hawkbit.dmf.json.model";
|
||||
|
||||
public DmfMessageConverter(final String... trustedPackages) {
|
||||
this(new JsonMapper(), trustedPackages);
|
||||
}
|
||||
|
||||
public DmfMessageConverter(final JsonMapper jsonMapper, final String... trustedPackages) {
|
||||
super(jsonMapper, trustedPackages.length == 0 ? new String[] { DMF_JSON_MODEL_PACKAGE } : trustedPackages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NonNull Object fromMessage(@NonNull final Message message, final @Nullable Object conversionHint) {
|
||||
// default converter tries to convert empty body payload to Object (since rabbit 4.0.4)
|
||||
// which probably is correct since it has to be JSON - however, in this case we assume - empty byte[]
|
||||
if (message.getBody().length == 0) {
|
||||
return message.getBody();
|
||||
} else {
|
||||
return super.fromMessage(message, conversionHint);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.awaitility.core.ConditionFactory;
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.repository.jpa.JpaRepositoryConfiguration;
|
||||
import org.eclipse.hawkbit.repository.test.TestConfiguration;
|
||||
import org.eclipse.hawkbit.repository.test.util.AbstractIntegrationTest;
|
||||
@@ -30,7 +31,6 @@ import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.annotation.DirtiesContext.ClassMode;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
@Slf4j
|
||||
@RabbitAvailable
|
||||
@@ -88,7 +88,7 @@ public abstract class AbstractAmqpIntegrationTest extends AbstractIntegrationTes
|
||||
|
||||
private RabbitTemplate createDmfClient() {
|
||||
final RabbitTemplate template = new RabbitTemplate(connectionFactory);
|
||||
template.setMessageConverter(AmqpTestConfiguration.messageConverter(new JsonMapper()));
|
||||
template.setMessageConverter(new DmfMessageConverter());
|
||||
template.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
|
||||
template.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
|
||||
template.setExchange(getExchange());
|
||||
|
||||
@@ -14,12 +14,9 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.jspecify.annotations.Nullable;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.JacksonJsonMessageConverter;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
@@ -27,7 +24,6 @@ import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService;
|
||||
import tools.jackson.databind.json.JsonMapper;
|
||||
|
||||
@Configuration
|
||||
public class AmqpTestConfiguration {
|
||||
@@ -36,7 +32,7 @@ public class AmqpTestConfiguration {
|
||||
@Primary
|
||||
public RabbitTemplate rabbitTemplateForTest(final ConnectionFactory connectionFactory) {
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
|
||||
rabbitTemplate.setMessageConverter(messageConverter(new JsonMapper()));
|
||||
rabbitTemplate.setMessageConverter(new DmfMessageConverter());
|
||||
rabbitTemplate.setReplyTimeout(TimeUnit.SECONDS.toMillis(3));
|
||||
rabbitTemplate.setReceiveTimeout(TimeUnit.SECONDS.toMillis(3));
|
||||
return rabbitTemplate;
|
||||
@@ -71,19 +67,4 @@ public class AmqpTestConfiguration {
|
||||
RabbitMqSetupService rabbitMqSetupService() {
|
||||
return new RabbitMqSetupService();
|
||||
}
|
||||
|
||||
// note - it MUST be the same as DmfApiConfiguration#messageConverter for the test to work properly (to test the real AMQP)
|
||||
public static @NonNull JacksonJsonMessageConverter messageConverter(final JsonMapper jsonMapper) {
|
||||
return new JacksonJsonMessageConverter(jsonMapper, "org.eclipse.hawkbit.dmf.json.model") {
|
||||
|
||||
@Override
|
||||
public @NonNull Object fromMessage(@NonNull final Message message, final @Nullable Object conversionHint) {
|
||||
if (message.getBody().length == 0) {
|
||||
return message.getBody();
|
||||
} else {
|
||||
return super.fromMessage(message, conversionHint);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.dmf.DmfMessageConverter;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.EventTopic;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageHeaderKey;
|
||||
import org.eclipse.hawkbit.dmf.amqp.api.MessageType;
|
||||
@@ -40,7 +41,6 @@ import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.support.converter.DefaultJacksonJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.JacksonJsonMessageConverter;
|
||||
|
||||
/**
|
||||
* Abstract class for sender and receiver service.
|
||||
@@ -60,7 +60,7 @@ public final class VHost extends DmfSender implements MessageListener {
|
||||
// It is necessary to define rabbitTemplate as a Bean and set JacksonJsonMessageConverter explicitly here in order to convert only
|
||||
// OUTCOMING messages to JSON. In case of INCOMING messages, JacksonJsonMessageConverter can not handle messages with NULL
|
||||
// payload (e.g. REQUEST_ATTRIBUTES_UPDATE), so the SimpleMessageConverter is used instead per default.
|
||||
rabbitTemplate.setMessageConverter(new JacksonJsonMessageConverter());
|
||||
rabbitTemplate.setMessageConverter(new DmfMessageConverter());
|
||||
|
||||
if (initVHost) {
|
||||
final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
|
||||
|
||||
1
hawkbit-ui/.gitignore
vendored
1
hawkbit-ui/.gitignore
vendored
@@ -8,3 +8,4 @@ tsconfig.json
|
||||
types.d.ts
|
||||
vite.config.ts
|
||||
vite.generated.ts
|
||||
*.lock
|
||||
Reference in New Issue
Block a user