encode class type information in message payload instead of header (#383)

* encode class type information in message payload instead of header
Signed-off-by: Michael Hirsch <michael.hirsch@bosch-si.com>
This commit is contained in:
Michael Hirsch
2016-12-09 14:44:13 +01:00
committed by Kai Zimmermann
parent 7a7e52de47
commit 63adbd0298
5 changed files with 348 additions and 64 deletions

View File

@@ -8,16 +8,13 @@
*/
package org.eclipse.hawkbit.event;
import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.MimeType;
import io.protostuff.LinkedBuffer;
@@ -29,13 +26,23 @@ import io.protostuff.runtime.RuntimeSchema;
/**
* A customize message converter for the spring cloud events. The converter is
* registered for the application/binary+protostuff type.
*
* The clazz-type-information is encoded into the message payload infront with a
* length of {@link #EVENT_TYPE_LENGTH}. This is necessary due in case of
* rabbitMQ batching the message headers will be merged together and custom
* message header information will get lost. So in this implementation the
* information about the event-type is encoded in the payload of the message
* directly using the encoded values of {@link EventType}.
*
*/
public class BusProtoStuffMessageConverter extends AbstractMessageConverter {
public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
private static final Logger LOG = LoggerFactory.getLogger(BusProtoStuffMessageConverter.class);
private static final String DEFAULT_CLASS_FIELD_NAME = "__Class__";
/**
* The length of the class type length of the payload.
*/
private static final byte EVENT_TYPE_LENGTH = 2;
/**
* Constructor.
@@ -52,34 +59,91 @@ public class BusProtoStuffMessageConverter extends AbstractMessageConverter {
@Override
public Object convertFromInternal(final Message<?> message, final Class<?> targetClass,
final Object conversionHint) {
final Object payload = message.getPayload();
final Object objectPayload = message.getPayload();
if (objectPayload instanceof byte[]) {
try {
final Class<?> deserializeClass = ClassUtils
.getClass(message.getHeaders().get(DEFAULT_CLASS_FIELD_NAME).toString());
if (payload instanceof byte[]) {
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(deserializeClass);
final Object deserializeEvent = schema.newMessage();
ProtobufIOUtil.mergeFrom((byte[]) message.getPayload(), deserializeEvent, schema);
return deserializeEvent;
}
} catch (final ClassNotFoundException e) {
LOG.error("Protostuff cannot find derserialize class", e);
throw new MessageConversionException(message, "Failed to read payload", e);
final byte[] payload = (byte[]) objectPayload;
final byte[] clazzHeader = extractClazzHeader(payload);
final byte[] content = extraxtContent(payload);
final EventType eventType = readClassHeader(clazzHeader);
return readContent(eventType, content);
}
return null;
}
@Override
protected Object convertToInternal(final Object payload, final MessageHeaders headers,
final Object conversionHint) {
checkIfHeaderMutable(headers);
final byte[] clazzHeader = writeClassHeader(payload.getClass());
final byte[] writeContent = writeContent(payload);
return mergeClassHeaderAndContent(clazzHeader, writeContent);
}
private static Object readContent(final EventType eventType, final byte[] content) {
final Class<?> targetClass = eventType.getTargetClass();
if (targetClass == null) {
LOG.error("Cannot read clazz header for given EventType value {}, missing mapping", eventType.getValue());
throw new MessageConversionException("Missing mapping of EventType for value " + eventType.getValue());
}
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(targetClass);
final Object deserializeEvent = schema.newMessage();
ProtobufIOUtil.mergeFrom(content, deserializeEvent, schema);
return deserializeEvent;
}
private static byte[] mergeClassHeaderAndContent(final byte[] clazzHeader, final byte[] writeContent) {
final byte[] body = new byte[clazzHeader.length + writeContent.length];
System.arraycopy(clazzHeader, 0, body, 0, clazzHeader.length);
System.arraycopy(writeContent, 0, body, clazzHeader.length, writeContent.length);
return body;
}
private static byte[] extractClazzHeader(final byte[] payload) {
final byte[] clazzHeader = new byte[EVENT_TYPE_LENGTH];
System.arraycopy(payload, 0, clazzHeader, 0, EVENT_TYPE_LENGTH);
return clazzHeader;
}
private static byte[] extraxtContent(final byte[] payload) {
final byte[] content = new byte[payload.length - EVENT_TYPE_LENGTH];
System.arraycopy(payload, EVENT_TYPE_LENGTH, content, 0, content.length);
return content;
}
private static EventType readClassHeader(final byte[] typeInformation) {
final Schema<EventType> schema = RuntimeSchema.getSchema(EventType.class);
final EventType deserializedType = schema.newMessage();
ProtobufIOUtil.mergeFrom(typeInformation, deserializedType, schema);
return deserializedType;
}
private static byte[] writeContent(final Object payload) {
final Class<? extends Object> serializeClass = payload.getClass();
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(serializeClass);
final LinkedBuffer buffer = LinkedBuffer.allocate();
return writeProtoBuf(payload, schema, buffer);
}
private static byte[] writeClassHeader(final Class<?> clazz) {
final EventType clazzEventType = EventType.from(clazz);
if (clazzEventType == null) {
LOG.error("There is no mapping to EventType for the given clazz {}", clazzEventType);
throw new MessageConversionException("Missing EventType for given class : " + clazz);
}
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema
.getSchema((Class<? extends Object>) EventType.class);
final LinkedBuffer buffer = LinkedBuffer.allocate();
return writeProtoBuf(clazzEventType, schema, buffer);
}
private static byte[] writeProtoBuf(final Object payload, final Schema<Object> schema, final LinkedBuffer buffer) {
final byte[] serializeByte;
try {
serializeByte = ProtostuffIOUtil.toByteArray(payload, schema, buffer);
@@ -87,22 +151,6 @@ public class BusProtoStuffMessageConverter extends AbstractMessageConverter {
buffer.clear();
}
headers.put(DEFAULT_CLASS_FIELD_NAME, serializeClass.getName());
return serializeByte;
}
private static void checkIfHeaderMutable(final MessageHeaders headers) {
if (isAccessorMutable(headers) || headers instanceof MutableMessageHeaders) {
return;
}
LOG.error("Protostuff cannot set serializae class because message header is not mutable");
throw new MessageConversionException(
"Cannot set the serialize class to message header. Need Mutable message header");
}
private static boolean isAccessorMutable(final MessageHeaders headers) {
final MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
return accessor != null && accessor.isMutable();
}
}
}

View File

@@ -0,0 +1,136 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.event;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.eclipse.hawkbit.repository.event.remote.DistributionSetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.DistributionSetTagDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.DownloadProgressEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetTagDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent;
/**
* The {@link EventType} class declares the event-type and it's corresponding
* encoding value in the payload of an remote header. The event-type is encoded
* into the payload of the message which is distributed.
*
* To encode and decode the event class type we need some conversation mapping
* between the actual class and the corresponding integer value which is the
* encoded value in the byte-payload.
*/
public class EventType {
private static final Map<Integer, Class<?>> TYPES = new HashMap<>();
/**
* The associated event-type-value must remain the same as initially
* declared. Otherwise messages cannot correctly de-serialized.
*/
static {
// target
TYPES.put(1, TargetCreatedEvent.class);
TYPES.put(2, TargetUpdatedEvent.class);
TYPES.put(3, TargetDeletedEvent.class);
TYPES.put(4, CancelTargetAssignmentEvent.class);
TYPES.put(5, TargetAssignDistributionSetEvent.class);
// target tag
TYPES.put(6, TargetTagCreatedEvent.class);
TYPES.put(7, TargetTagUpdateEvent.class);
TYPES.put(8, TargetTagDeletedEvent.class);
// action
TYPES.put(9, ActionCreatedEvent.class);
TYPES.put(10, ActionUpdatedEvent.class);
// distribution set
TYPES.put(11, DistributionSetCreatedEvent.class);
TYPES.put(12, DistributionSetUpdateEvent.class);
TYPES.put(13, DistributionSetDeletedEvent.class);
// distribution set tag
TYPES.put(14, DistributionSetTagCreatedEvent.class);
TYPES.put(15, DistributionSetTagUpdateEvent.class);
TYPES.put(16, DistributionSetTagDeletedEvent.class);
// rollout
TYPES.put(17, RolloutUpdatedEvent.class);
// rollout group
TYPES.put(18, RolloutGroupCreatedEvent.class);
TYPES.put(19, RolloutGroupUpdatedEvent.class);
// download
TYPES.put(20, DownloadProgressEvent.class);
}
private int value;
/**
* Constructor.
*/
public EventType() {
// for marshalling and unmarshalling.
}
/**
* Constructor.
*
* @param value
* the value to initialize
*/
public EventType(final int value) {
this.value = value;
}
public int getValue() {
return value;
}
public Class<?> getTargetClass() {
return TYPES.get(value);
}
/**
* Returns a {@link EventType} based on the given class type.
*
* @param clazz
* the clazz type to retrieve the corresponding {@link EventType}
* .
* @return the corresponding {@link EventType} or {@code null} if the clazz
* does not have a {@link EventType}.
*/
public static EventType from(final Class<?> clazz) {
final Optional<Integer> foundEventType = TYPES.entrySet().stream()
.filter(entry -> entry.getValue().equals(clazz)).map(entry -> entry.getKey()).findFirst();
if (!foundEventType.isPresent()) {
return null;
}
return new EventType(foundEventType.get());
}
}