diff --git a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventJacksonMessageConverter.java b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventJacksonMessageConverter.java
index 039dfb16c..cfd2b98d2 100644
--- a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventJacksonMessageConverter.java
+++ b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventJacksonMessageConverter.java
@@ -10,15 +10,30 @@
package org.eclipse.hawkbit.event;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.util.MimeType;
public class EventJacksonMessageConverter extends MappingJackson2MessageConverter {
+ public static final MimeType APPLICATION_REMOTE_EVENT_JSON = new MimeType("application", "remote-event-json");
+
+
public EventJacksonMessageConverter() {
- super(new MimeType("application", "remote-event-json"));
+ super(APPLICATION_REMOTE_EVENT_JSON);
ObjectMapper objectMapper = new ObjectMapper();
EventType.getNamedTypes().forEach(objectMapper::registerSubtypes);
setObjectMapper(objectMapper);
}
+
+ @Override
+ protected Object convertToInternal(final Object payload, final MessageHeaders headers, final Object conversionHint) {
+ return super.convertToInternal(payload, headers, conversionHint);
+ }
+
+ @Override
+ protected Object convertFromInternal(final Message> message, final Class> targetClass, final Object conversionHint) {
+ return super.convertFromInternal(message, targetClass, conversionHint);
+ }
}
diff --git a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventProtoStuffMessageConverter.java b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventProtoStuffMessageConverter.java
index 0e3aa222e..2b8d2504c 100644
--- a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventProtoStuffMessageConverter.java
+++ b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventProtoStuffMessageConverter.java
@@ -9,6 +9,8 @@
*/
package org.eclipse.hawkbit.event;
+import java.nio.ByteBuffer;
+
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtobufIOUtil;
import io.protostuff.Schema;
@@ -22,20 +24,28 @@ import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
- * A customize message converter for the spring cloud events. The converter is registered for the application/binary+protostuff type.
- *
- * The clazz-type-information is encoded into the message payload infront with a length of {@link #EVENT_TYPE_LENGTH}. This is necessary
- * due in case of rabbitMQ batching the message headers will be merged together and custom message header information will get lost.
- * So in this implementation the information about the event-type is encoded in the payload of the message directly using the encoded
- * values of {@link EventType}.
+ * A custom message converter for Spring Cloud Stream using Protostuff serialization.
+ * The converter is registered for the {@code application/binary+protostuff} content type.
+ * It embeds the {@link EventType} metadata inside the message payload to preserve the type
+ * during deserialization — even when message headers may be lost (e.g. with RabbitMQ batching).
+ *
+ *
+ * Message Structure:
+ *
+ *
The first {@link #HEADER_LENGTH_PREFIX_SIZE} bytes are an integer indicating the length N (in bytes) of the serialized {@link EventType}.
+ *
Next N bytes contain the serialized {@link EventType} data itself.
+ *
The remaining bytes represent the serialized {@link AbstractRemoteEvent} payload.
+ *
+ *
+ * This format allows decoding messages without relying on external headers, ensuring robustness
+ * in systems where header information may be merged or dropped.
*/
@Slf4j
public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
+ private static final int HEADER_LENGTH_PREFIX_SIZE = 4;
- /** The length of the class type length of the payload. */
- private static final byte EVENT_TYPE_LENGTH = 2;
public EventProtoStuffMessageConverter() {
super(APPLICATION_BINARY_PROTOSTUFF);
@@ -47,11 +57,11 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
}
@Override
- public Object convertFromInternal(final Message> message, final Class> targetClass, final Object conversionHint) {
+ protected Object convertFromInternal(final Message> message, final Class> targetClass, final Object conversionHint) {
final Object objectPayload = message.getPayload();
if (objectPayload instanceof byte[] payload) {
final byte[] clazzHeader = extractClazzHeader(payload);
- final byte[] content = extraxtContent(payload);
+ final byte[] content = extractContent(payload);
final EventType eventType = readClassHeader(clazzHeader);
return readContent(eventType, content);
@@ -86,17 +96,23 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
}
private static byte[] extractClazzHeader(final byte[] payload) {
- final byte[] clazzHeader = new byte[EVENT_TYPE_LENGTH];
- System.arraycopy(payload, 0, clazzHeader, 0, EVENT_TYPE_LENGTH);
+ ByteBuffer wrapper = ByteBuffer.wrap(payload);
+ int headerLength = wrapper.getInt();
+ byte[] clazzHeader = new byte[headerLength];
+ wrapper.get(clazzHeader);
return clazzHeader;
}
- private static byte[] extraxtContent(final byte[] payload) {
- final byte[] content = new byte[payload.length - EVENT_TYPE_LENGTH];
- System.arraycopy(payload, EVENT_TYPE_LENGTH, content, 0, content.length);
+ private static byte[] extractContent(final byte[] payload) {
+ ByteBuffer wrapper = ByteBuffer.wrap(payload);
+ int headerLength = wrapper.getInt();
+ byte[] content = new byte[payload.length - HEADER_LENGTH_PREFIX_SIZE - headerLength];
+ wrapper.position(HEADER_LENGTH_PREFIX_SIZE + headerLength);
+ wrapper.get(content);
return content;
}
+
private static EventType readClassHeader(final byte[] typeInformation) {
final Schema schema = RuntimeSchema.getSchema(EventType.class);
final EventType deserializedType = schema.newMessage();
@@ -117,9 +133,16 @@ public class EventProtoStuffMessageConverter extends AbstractMessageConverter {
log.error("There is no mapping to EventType for the given class {}", clazz);
throw new MessageConversionException("Missing EventType for given class : " + clazz);
}
- @SuppressWarnings("unchecked") final Schema