Fix Protostuff message converter (#2583)
Co-authored-by: vasilchev <vasil.ilchev@bosch.com>
This commit is contained in:
@@ -10,15 +10,30 @@
|
||||
package org.eclipse.hawkbit.event;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
public class EventJacksonMessageConverter extends MappingJackson2MessageConverter {
|
||||
|
||||
public static final MimeType APPLICATION_REMOTE_EVENT_JSON = new MimeType("application", "remote-event-json");
|
||||
|
||||
|
||||
public EventJacksonMessageConverter() {
|
||||
super(new MimeType("application", "remote-event-json"));
|
||||
super(APPLICATION_REMOTE_EVENT_JSON);
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
EventType.getNamedTypes().forEach(objectMapper::registerSubtypes);
|
||||
setObjectMapper(objectMapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
|
||||
return super.convertToInternal(payload, headers, conversionHint);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertFromInternal(final Message<?> message, final Class<?> targetClass, final Object conversionHint) {
|
||||
return super.convertFromInternal(message, targetClass, conversionHint);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.event;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.protostuff.LinkedBuffer;
|
||||
import io.protostuff.ProtobufIOUtil;
|
||||
import io.protostuff.Schema;
|
||||
@@ -22,20 +24,28 @@ import org.springframework.messaging.converter.MessageConversionException;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* A customize message converter for the spring cloud events. The converter is registered for the application/binary+protostuff type.
|
||||
* <p/>
|
||||
* The clazz-type-information is encoded into the message payload infront with a length of {@link #EVENT_TYPE_LENGTH}. This is necessary
|
||||
* due in case of rabbitMQ batching the message headers will be merged together and custom message header information will get lost.
|
||||
* So in this implementation the information about the event-type is encoded in the payload of the message directly using the encoded
|
||||
* values of {@link EventType}.
|
||||
* A custom message converter for Spring Cloud Stream using Protostuff serialization.
|
||||
* The converter is registered for the {@code application/binary+protostuff} content type.
|
||||
* It embeds the {@link EventType} metadata inside the message payload to preserve the type
|
||||
* during deserialization — even when message headers may be lost (e.g. with RabbitMQ batching).
|
||||
*
|
||||
* <p>
|
||||
* Message Structure:
|
||||
* <ul>
|
||||
* <li>The first {@link #HEADER_LENGTH_PREFIX_SIZE} bytes are an integer indicating the length N (in bytes) of the serialized {@link EventType}.</li>
|
||||
* <li>Next N bytes contain the serialized {@link EventType} data itself.</li>
|
||||
* <li>The remaining bytes represent the serialized {@link AbstractRemoteEvent} payload.</li>
|
||||
* </ul>
|
||||
*
|
||||
* This format allows decoding messages without relying on external headers, ensuring robustness
|
||||
* in systems where header information may be merged or dropped.
|
||||
*/
|
||||
@Slf4j
|
||||
public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
|
||||
|
||||
public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
|
||||
private static final int HEADER_LENGTH_PREFIX_SIZE = 4;
|
||||
|
||||
/** The length of the class type length of the payload. */
|
||||
private static final byte EVENT_TYPE_LENGTH = 2;
|
||||
|
||||
public EventProtoStuffMessageConverter() {
|
||||
super(APPLICATION_BINARY_PROTOSTUFF);
|
||||
@@ -47,11 +57,11 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convertFromInternal(final Message<?> message, final Class<?> targetClass, final Object conversionHint) {
|
||||
protected Object convertFromInternal(final Message<?> message, final Class<?> targetClass, final Object conversionHint) {
|
||||
final Object objectPayload = message.getPayload();
|
||||
if (objectPayload instanceof byte[] payload) {
|
||||
final byte[] clazzHeader = extractClazzHeader(payload);
|
||||
final byte[] content = extraxtContent(payload);
|
||||
final byte[] content = extractContent(payload);
|
||||
|
||||
final EventType eventType = readClassHeader(clazzHeader);
|
||||
return readContent(eventType, content);
|
||||
@@ -86,17 +96,23 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
|
||||
}
|
||||
|
||||
private static byte[] extractClazzHeader(final byte[] payload) {
|
||||
final byte[] clazzHeader = new byte[EVENT_TYPE_LENGTH];
|
||||
System.arraycopy(payload, 0, clazzHeader, 0, EVENT_TYPE_LENGTH);
|
||||
ByteBuffer wrapper = ByteBuffer.wrap(payload);
|
||||
int headerLength = wrapper.getInt();
|
||||
byte[] clazzHeader = new byte[headerLength];
|
||||
wrapper.get(clazzHeader);
|
||||
return clazzHeader;
|
||||
}
|
||||
|
||||
private static byte[] extraxtContent(final byte[] payload) {
|
||||
final byte[] content = new byte[payload.length - EVENT_TYPE_LENGTH];
|
||||
System.arraycopy(payload, EVENT_TYPE_LENGTH, content, 0, content.length);
|
||||
private static byte[] extractContent(final byte[] payload) {
|
||||
ByteBuffer wrapper = ByteBuffer.wrap(payload);
|
||||
int headerLength = wrapper.getInt();
|
||||
byte[] content = new byte[payload.length - HEADER_LENGTH_PREFIX_SIZE - headerLength];
|
||||
wrapper.position(HEADER_LENGTH_PREFIX_SIZE + headerLength);
|
||||
wrapper.get(content);
|
||||
return content;
|
||||
}
|
||||
|
||||
|
||||
private static EventType readClassHeader(final byte[] typeInformation) {
|
||||
final Schema<EventType> schema = RuntimeSchema.getSchema(EventType.class);
|
||||
final EventType deserializedType = schema.newMessage();
|
||||
@@ -117,9 +133,16 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
|
||||
log.error("There is no mapping to EventType for the given class {}", clazz);
|
||||
throw new MessageConversionException("Missing EventType for given class : " + clazz);
|
||||
}
|
||||
@SuppressWarnings("unchecked") final Schema<Object> schema = (Schema<Object>) RuntimeSchema
|
||||
.getSchema((Class<?>) EventType.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema((Class<?>) EventType.class);
|
||||
final LinkedBuffer buffer = LinkedBuffer.allocate();
|
||||
return ProtobufIOUtil.toByteArray(clazzEventType, schema, buffer);
|
||||
byte[] typeBytes = ProtobufIOUtil.toByteArray(clazzEventType, schema, buffer);
|
||||
|
||||
ByteBuffer result = ByteBuffer.allocate(HEADER_LENGTH_PREFIX_SIZE + typeBytes.length);
|
||||
result.putInt(typeBytes.length);
|
||||
result.put(typeBytes);
|
||||
return result.array();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.event;
|
||||
|
||||
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.service.TargetCreatedServiceEvent;
|
||||
import org.eclipse.hawkbit.repository.model.Target;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
abstract class AbstractEventMessageConverterTest {
|
||||
|
||||
protected final MessageConverter messageConverter;
|
||||
|
||||
@Mock
|
||||
protected Message<Object> messageMock;
|
||||
|
||||
@Mock
|
||||
protected Target targetMock;
|
||||
|
||||
@BeforeEach
|
||||
void before() {
|
||||
when(targetMock.getId()).thenReturn(1L);
|
||||
}
|
||||
|
||||
AbstractEventMessageConverterTest(MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the TargetCreatedEvent can be successfully serialized and deserialized
|
||||
*/
|
||||
@Test
|
||||
void successfullySerializeAndDeserializeEvent() {
|
||||
final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock);
|
||||
// serialize
|
||||
assertSerializeAndDeserialize(targetCreatedEvent, TargetCreatedEvent.class);
|
||||
assertSerializeAndDeserialize(targetCreatedEvent, TargetCreatedEvent.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void successfullySerializeAndDeserializeServiceEvent() {
|
||||
final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock);
|
||||
final TargetCreatedServiceEvent targetCreatedServiceEvent = new TargetCreatedServiceEvent(targetCreatedEvent);
|
||||
assertSerializeAndDeserialize(targetCreatedServiceEvent, TargetCreatedServiceEvent.class);
|
||||
}
|
||||
|
||||
<T extends AbstractRemoteEvent> void assertSerializeAndDeserialize(T event, Class<? extends AbstractRemoteEvent> expectedClass) {
|
||||
// serialize
|
||||
Object serializedEvent = null;
|
||||
if (messageConverter instanceof EventProtoStuffMessageConverter protoStuff) {
|
||||
serializedEvent = protoStuff.convertToInternal(event, new MessageHeaders(new HashMap<>()), null);
|
||||
} else if (messageConverter instanceof EventJacksonMessageConverter jackson) {
|
||||
serializedEvent = jackson.convertToInternal(event, new MessageHeaders(new HashMap<>()), null);
|
||||
}
|
||||
assertThat(serializedEvent).isInstanceOf(byte[].class);
|
||||
|
||||
// deserialize
|
||||
when(messageMock.getPayload()).thenReturn(serializedEvent);
|
||||
Object deserializedEvent = null;
|
||||
if (messageConverter instanceof EventProtoStuffMessageConverter protoStuff) {
|
||||
deserializedEvent = protoStuff.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
|
||||
} else if (messageConverter instanceof EventJacksonMessageConverter jackson) {
|
||||
deserializedEvent = jackson.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
|
||||
}
|
||||
assertThat(deserializedEvent)
|
||||
.isInstanceOf(expectedClass)
|
||||
.isEqualTo(event);
|
||||
}
|
||||
}
|
||||
@@ -9,69 +9,13 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.event;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
|
||||
import org.eclipse.hawkbit.repository.model.Target;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class EventJacksonMessageConverterTest {
|
||||
class EventJacksonMessageConverterTest extends AbstractEventMessageConverterTest {
|
||||
|
||||
private final TestEventJacksonMessageConverter underTest = new TestEventJacksonMessageConverter();
|
||||
|
||||
@Mock
|
||||
private Target targetMock;
|
||||
|
||||
@Mock
|
||||
private Message<Object> messageMock;
|
||||
|
||||
@BeforeEach
|
||||
void before() {
|
||||
when(targetMock.getId()).thenReturn(1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the TargetCreatedEvent can be successfully serialized and deserialized
|
||||
*/
|
||||
@Test
|
||||
void successfullySerializeAndDeserializeEvent() {
|
||||
final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock);
|
||||
// serialize
|
||||
final Object serializedEvent = underTest.convertToInternal(targetCreatedEvent,
|
||||
new MessageHeaders(new HashMap<>()), null);
|
||||
assertThat(serializedEvent).isInstanceOf(byte[].class);
|
||||
|
||||
// deserialize
|
||||
when(messageMock.getPayload()).thenReturn(serializedEvent);
|
||||
final Object deserializedEvent = underTest.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
|
||||
assertThat(deserializedEvent)
|
||||
.isInstanceOf(TargetCreatedEvent.class)
|
||||
.isEqualTo(targetCreatedEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test subclass to expose protected methods for testing.
|
||||
*/
|
||||
private static class TestEventJacksonMessageConverter extends EventJacksonMessageConverter {
|
||||
@Override
|
||||
public Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
|
||||
return super.convertToInternal(payload, headers, conversionHint);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
|
||||
return super.convertFromInternal(message, targetClass, conversionHint);
|
||||
}
|
||||
EventJacksonMessageConverterTest() {
|
||||
super(new EventJacksonMessageConverter());
|
||||
}
|
||||
}
|
||||
@@ -9,59 +9,24 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.event;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.Serial;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.eclipse.hawkbit.repository.event.remote.AbstractRemoteEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.RemoteEntityEvent;
|
||||
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
|
||||
import org.eclipse.hawkbit.repository.model.Target;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class EventProtoStuffMessageConverterTest {
|
||||
class EventProtoStuffMessageConverterTest extends AbstractEventMessageConverterTest {
|
||||
|
||||
private final EventProtoStuffMessageConverter underTest = new EventProtoStuffMessageConverter();
|
||||
|
||||
@Mock
|
||||
private Target targetMock;
|
||||
|
||||
@Mock
|
||||
private Message<Object> messageMock;
|
||||
|
||||
@BeforeEach
|
||||
void before() {
|
||||
when(targetMock.getId()).thenReturn(1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that the TargetCreatedEvent can be successfully serialized and deserialized
|
||||
*/
|
||||
@Test
|
||||
void successfullySerializeAndDeserializeEvent() {
|
||||
final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock);
|
||||
// serialize
|
||||
final Object serializedEvent = underTest.convertToInternal(targetCreatedEvent,
|
||||
new MessageHeaders(new HashMap<>()), null);
|
||||
assertThat(serializedEvent).isInstanceOf(byte[].class);
|
||||
|
||||
// deserialize
|
||||
when(messageMock.getPayload()).thenReturn(serializedEvent);
|
||||
final Object deserializedEvent = underTest.convertFromInternal(messageMock, AbstractRemoteEvent.class, null);
|
||||
assertThat(deserializedEvent)
|
||||
.isInstanceOf(TargetCreatedEvent.class)
|
||||
.isEqualTo(targetCreatedEvent);
|
||||
EventProtoStuffMessageConverterTest() {
|
||||
super(new EventProtoStuffMessageConverter());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -74,7 +39,7 @@ class EventProtoStuffMessageConverterTest {
|
||||
|
||||
assertThatExceptionOfType(MessageConversionException.class)
|
||||
.as("Missing MessageConversationException for un-defined event-type")
|
||||
.isThrownBy(() -> underTest.convertToInternal(dummyEvent, messageHeaders, null));
|
||||
.isThrownBy(() -> ((EventProtoStuffMessageConverter)messageConverter).convertToInternal(dummyEvent, messageHeaders, null));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -8,50 +8,29 @@
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.repository.event.remote;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.eclipse.hawkbit.event.EventJacksonMessageConverter;
|
||||
import org.eclipse.hawkbit.event.EventProtoStuffMessageConverter;
|
||||
import org.eclipse.hawkbit.repository.event.TenantAwareEvent;
|
||||
import org.eclipse.hawkbit.repository.jpa.AbstractJpaIntegrationTest;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.TestConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.integration.support.MutableMessageHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.AbstractMessageConverter;
|
||||
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
|
||||
/**
|
||||
* Test the remote entity events.
|
||||
*/
|
||||
@SuppressWarnings("java:S6813") // constructor injects are not possible for test classes
|
||||
@Import(AbstractRemoteEventTest.EventProtoStuffTestConfig.class)
|
||||
public abstract class AbstractRemoteEventTest extends AbstractJpaIntegrationTest {
|
||||
|
||||
@Autowired
|
||||
private EventProtoStuffMessageConverter eventProtoStuffMessageConverter;
|
||||
private EventProtoStuffMessageConverter eventProtoStuffMessageConverter = new EventProtoStuffMessageConverter();
|
||||
|
||||
private AbstractMessageConverter jacksonMessageConverter;
|
||||
private EventJacksonMessageConverter jacksonMessageConverter = new EventJacksonMessageConverter();
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
this.jacksonMessageConverter = new MappingJackson2MessageConverter();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T extends TenantAwareEvent> T createJacksonEvent(final T event) {
|
||||
final Message<String> message = createJsonMessage(event);
|
||||
final Message<?> message = createJsonMessage(event);
|
||||
return (T) jacksonMessageConverter.fromMessage(message, event.getClass());
|
||||
}
|
||||
|
||||
@@ -68,23 +47,9 @@ public abstract class AbstractRemoteEventTest extends AbstractJpaIntegrationTest
|
||||
);
|
||||
}
|
||||
|
||||
private Message<String> createJsonMessage(final Object event) {
|
||||
try {
|
||||
String json = new ObjectMapper().writeValueAsString(event);
|
||||
return MessageBuilder.withPayload(json)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
|
||||
.build();
|
||||
} catch (JsonProcessingException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
return null;
|
||||
private Message<?> createJsonMessage(final Object event) {
|
||||
return jacksonMessageConverter.toMessage(event, new MutableMessageHeaders(Map.of(MessageHeaders.CONTENT_TYPE,
|
||||
EventJacksonMessageConverter.APPLICATION_REMOTE_EVENT_JSON)));
|
||||
}
|
||||
|
||||
@TestConfiguration
|
||||
static class EventProtoStuffTestConfig {
|
||||
@Bean
|
||||
public MessageConverter eventProtoBufConverter() {
|
||||
return new EventProtoStuffMessageConverter();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user