Skip to content

Commit

Permalink
encode class type information in message payload instead of header (e…
Browse files Browse the repository at this point in the history
…clipse-hawkbit#383)

* encode class type information in message payload instead of header
Signed-off-by: Michael Hirsch <[email protected]>
  • Loading branch information
michahirsch authored and kaizimmerm committed Dec 9, 2016
1 parent 7a7e52d commit 63adbd0
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 64 deletions.
51 changes: 39 additions & 12 deletions hawkbit-repository/hawkbit-repository-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.eclipse.hawkbit</groupId>
<artifactId>hawkbit-repository</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<artifactId>hawkbit-repository-core</artifactId>
<name>hawkBit :: Repository Core Implementation Support</name>
<dependencies>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.eclipse.hawkbit</groupId>
<artifactId>hawkbit-repository</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<artifactId>hawkbit-repository-core</artifactId>
<name>hawkBit :: Repository Core Implementation Support</name>

<dependencies>
<dependency>
<groupId>org.eclipse.hawkbit</groupId>
<artifactId>hawkbit-repository-api</artifactId>
Expand All @@ -34,6 +34,33 @@
<artifactId>protostuff-runtime</artifactId>
<optional>true</optional>
</dependency>
</dependencies>


<!-- TEST -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ru.yandex.qatools.allure</groupId>
<artifactId>allure-junit-adaptor</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@
*/
package org.eclipse.hawkbit.event;

import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.MimeType;

import io.protostuff.LinkedBuffer;
Expand All @@ -29,13 +26,23 @@
/**
* A customize message converter for the spring cloud events. The converter is
* registered for the application/binary+protostuff type.
*
* The clazz-type-information is encoded into the message payload infront with a
* length of {@link #EVENT_TYPE_LENGTH}. This is necessary due in case of
* rabbitMQ batching the message headers will be merged together and custom
* message header information will get lost. So in this implementation the
* information about the event-type is encoded in the payload of the message
* directly using the encoded values of {@link EventType}.
*
*/
public class BusProtoStuffMessageConverter extends AbstractMessageConverter {

public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
private static final Logger LOG = LoggerFactory.getLogger(BusProtoStuffMessageConverter.class);
private static final String DEFAULT_CLASS_FIELD_NAME = "__Class__";
/**
* The length of the class type length of the payload.
*/
private static final byte EVENT_TYPE_LENGTH = 2;

/**
* Constructor.
Expand All @@ -52,57 +59,98 @@ protected boolean supports(final Class<?> aClass) {
@Override
public Object convertFromInternal(final Message<?> message, final Class<?> targetClass,
final Object conversionHint) {
final Object payload = message.getPayload();
final Object objectPayload = message.getPayload();
if (objectPayload instanceof byte[]) {

try {
final Class<?> deserializeClass = ClassUtils
.getClass(message.getHeaders().get(DEFAULT_CLASS_FIELD_NAME).toString());
if (payload instanceof byte[]) {
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(deserializeClass);
final Object deserializeEvent = schema.newMessage();
ProtobufIOUtil.mergeFrom((byte[]) message.getPayload(), deserializeEvent, schema);
return deserializeEvent;
}
} catch (final ClassNotFoundException e) {
LOG.error("Protostuff cannot find derserialize class", e);
throw new MessageConversionException(message, "Failed to read payload", e);
}
final byte[] payload = (byte[]) objectPayload;
final byte[] clazzHeader = extractClazzHeader(payload);
final byte[] content = extraxtContent(payload);

final EventType eventType = readClassHeader(clazzHeader);
return readContent(eventType, content);
}
return null;
}

@Override
protected Object convertToInternal(final Object payload, final MessageHeaders headers,
final Object conversionHint) {
checkIfHeaderMutable(headers);

final byte[] clazzHeader = writeClassHeader(payload.getClass());

final byte[] writeContent = writeContent(payload);

return mergeClassHeaderAndContent(clazzHeader, writeContent);
}

private static Object readContent(final EventType eventType, final byte[] content) {
final Class<?> targetClass = eventType.getTargetClass();
if (targetClass == null) {
LOG.error("Cannot read clazz header for given EventType value {}, missing mapping", eventType.getValue());
throw new MessageConversionException("Missing mapping of EventType for value " + eventType.getValue());
}
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(targetClass);
final Object deserializeEvent = schema.newMessage();
ProtobufIOUtil.mergeFrom(content, deserializeEvent, schema);
return deserializeEvent;
}

private static byte[] mergeClassHeaderAndContent(final byte[] clazzHeader, final byte[] writeContent) {
final byte[] body = new byte[clazzHeader.length + writeContent.length];
System.arraycopy(clazzHeader, 0, body, 0, clazzHeader.length);
System.arraycopy(writeContent, 0, body, clazzHeader.length, writeContent.length);
return body;
}

private static byte[] extractClazzHeader(final byte[] payload) {
final byte[] clazzHeader = new byte[EVENT_TYPE_LENGTH];
System.arraycopy(payload, 0, clazzHeader, 0, EVENT_TYPE_LENGTH);
return clazzHeader;
}

private static byte[] extraxtContent(final byte[] payload) {
final byte[] content = new byte[payload.length - EVENT_TYPE_LENGTH];
System.arraycopy(payload, EVENT_TYPE_LENGTH, content, 0, content.length);
return content;
}

private static EventType readClassHeader(final byte[] typeInformation) {
final Schema<EventType> schema = RuntimeSchema.getSchema(EventType.class);
final EventType deserializedType = schema.newMessage();
ProtobufIOUtil.mergeFrom(typeInformation, deserializedType, schema);
return deserializedType;
}

private static byte[] writeContent(final Object payload) {
final Class<? extends Object> serializeClass = payload.getClass();
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema.getSchema(serializeClass);
final LinkedBuffer buffer = LinkedBuffer.allocate();
return writeProtoBuf(payload, schema, buffer);
}

private static byte[] writeClassHeader(final Class<?> clazz) {
final EventType clazzEventType = EventType.from(clazz);
if (clazzEventType == null) {
LOG.error("There is no mapping to EventType for the given clazz {}", clazzEventType);
throw new MessageConversionException("Missing EventType for given class : " + clazz);
}
@SuppressWarnings("unchecked")
final Schema<Object> schema = (Schema<Object>) RuntimeSchema
.getSchema((Class<? extends Object>) EventType.class);
final LinkedBuffer buffer = LinkedBuffer.allocate();
return writeProtoBuf(clazzEventType, schema, buffer);
}

private static byte[] writeProtoBuf(final Object payload, final Schema<Object> schema, final LinkedBuffer buffer) {
final byte[] serializeByte;
try {
serializeByte = ProtostuffIOUtil.toByteArray(payload, schema, buffer);
} finally {
buffer.clear();
}

headers.put(DEFAULT_CLASS_FIELD_NAME, serializeClass.getName());
return serializeByte;
}

private static void checkIfHeaderMutable(final MessageHeaders headers) {
if (isAccessorMutable(headers) || headers instanceof MutableMessageHeaders) {
return;
}
LOG.error("Protostuff cannot set serializae class because message header is not mutable");
throw new MessageConversionException(
"Cannot set the serialize class to message header. Need Mutable message header");
}

private static boolean isAccessorMutable(final MessageHeaders headers) {
final MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
return accessor != null && accessor.isMutable();
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/**
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.event;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.eclipse.hawkbit.repository.event.remote.DistributionSetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.DistributionSetTagDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.DownloadProgressEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.TargetTagDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutUpdatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagUpdateEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent;

/**
* The {@link EventType} class declares the event-type and it's corresponding
* encoding value in the payload of an remote header. The event-type is encoded
* into the payload of the message which is distributed.
*
* To encode and decode the event class type we need some conversation mapping
* between the actual class and the corresponding integer value which is the
* encoded value in the byte-payload.
*/
public class EventType {

private static final Map<Integer, Class<?>> TYPES = new HashMap<>();

/**
* The associated event-type-value must remain the same as initially
* declared. Otherwise messages cannot correctly de-serialized.
*/
static {

// target
TYPES.put(1, TargetCreatedEvent.class);
TYPES.put(2, TargetUpdatedEvent.class);
TYPES.put(3, TargetDeletedEvent.class);
TYPES.put(4, CancelTargetAssignmentEvent.class);
TYPES.put(5, TargetAssignDistributionSetEvent.class);

// target tag
TYPES.put(6, TargetTagCreatedEvent.class);
TYPES.put(7, TargetTagUpdateEvent.class);
TYPES.put(8, TargetTagDeletedEvent.class);

// action
TYPES.put(9, ActionCreatedEvent.class);
TYPES.put(10, ActionUpdatedEvent.class);

// distribution set
TYPES.put(11, DistributionSetCreatedEvent.class);
TYPES.put(12, DistributionSetUpdateEvent.class);
TYPES.put(13, DistributionSetDeletedEvent.class);

// distribution set tag
TYPES.put(14, DistributionSetTagCreatedEvent.class);
TYPES.put(15, DistributionSetTagUpdateEvent.class);
TYPES.put(16, DistributionSetTagDeletedEvent.class);

// rollout
TYPES.put(17, RolloutUpdatedEvent.class);

// rollout group
TYPES.put(18, RolloutGroupCreatedEvent.class);
TYPES.put(19, RolloutGroupUpdatedEvent.class);

// download
TYPES.put(20, DownloadProgressEvent.class);
}

private int value;

/**
* Constructor.
*/
public EventType() {
// for marshalling and unmarshalling.
}

/**
* Constructor.
*
* @param value
* the value to initialize
*/
public EventType(final int value) {
this.value = value;
}

public int getValue() {
return value;
}

public Class<?> getTargetClass() {
return TYPES.get(value);
}

/**
* Returns a {@link EventType} based on the given class type.
*
* @param clazz
* the clazz type to retrieve the corresponding {@link EventType}
* .
* @return the corresponding {@link EventType} or {@code null} if the clazz
* does not have a {@link EventType}.
*/
public static EventType from(final Class<?> clazz) {
final Optional<Integer> foundEventType = TYPES.entrySet().stream()
.filter(entry -> entry.getValue().equals(clazz)).map(entry -> entry.getKey()).findFirst();
if (!foundEventType.isPresent()) {
return null;
}
return new EventType(foundEventType.get());
}
}
Loading

0 comments on commit 63adbd0

Please sign in to comment.