diff --git a/hawkbit-repository/hawkbit-repository-core/pom.xml b/hawkbit-repository/hawkbit-repository-core/pom.xml
index 3710c88616..2f18c74fab 100644
--- a/hawkbit-repository/hawkbit-repository-core/pom.xml
+++ b/hawkbit-repository/hawkbit-repository-core/pom.xml
@@ -9,16 +9,16 @@
-->
- 4.0.0
-
- org.eclipse.hawkbit
- hawkbit-repository
- 0.2.0-SNAPSHOT
-
- hawkbit-repository-core
- hawkBit :: Repository Core Implementation Support
-
-
+ 4.0.0
+
+ org.eclipse.hawkbit
+ hawkbit-repository
+ 0.2.0-SNAPSHOT
+
+ hawkbit-repository-core
+ hawkBit :: Repository Core Implementation Support
+
+
org.eclipse.hawkbit
hawkbit-repository-api
@@ -34,6 +34,33 @@
protostuff-runtime
true
-
-
+
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.easytesting
+ fest-assert-core
+ test
+
+
+ org.easytesting
+ fest-assert
+ test
+
+
+ ru.yandex.qatools.allure
+ allure-junit-adaptor
+ test
+
+
+
\ No newline at end of file
diff --git a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverter.java b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverter.java
index a72b73f757..8a6adbd5df 100644
--- a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverter.java
+++ b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverter.java
@@ -8,16 +8,13 @@
*/
package org.eclipse.hawkbit.event;
-import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConversionException;
-import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.MimeType;
import io.protostuff.LinkedBuffer;
@@ -29,13 +26,23 @@
/**
* A customize message converter for the spring cloud events. The converter is
* registered for the application/binary+protostuff type.
+ *
+ * The clazz-type-information is encoded into the message payload infront with a
+ * length of {@link #EVENT_TYPE_LENGTH}. This is necessary due in case of
+ * rabbitMQ batching the message headers will be merged together and custom
+ * message header information will get lost. So in this implementation the
+ * information about the event-type is encoded in the payload of the message
+ * directly using the encoded values of {@link EventType}.
*
*/
public class BusProtoStuffMessageConverter extends AbstractMessageConverter {
public static final MimeType APPLICATION_BINARY_PROTOSTUFF = new MimeType("application", "binary+protostuff");
private static final Logger LOG = LoggerFactory.getLogger(BusProtoStuffMessageConverter.class);
- private static final String DEFAULT_CLASS_FIELD_NAME = "__Class__";
+ /**
+ * The length of the class type length of the payload.
+ */
+ private static final byte EVENT_TYPE_LENGTH = 2;
/**
* Constructor.
@@ -52,34 +59,91 @@ protected boolean supports(final Class> aClass) {
@Override
public Object convertFromInternal(final Message> message, final Class> targetClass,
final Object conversionHint) {
- final Object payload = message.getPayload();
+ final Object objectPayload = message.getPayload();
+ if (objectPayload instanceof byte[]) {
- try {
- final Class> deserializeClass = ClassUtils
- .getClass(message.getHeaders().get(DEFAULT_CLASS_FIELD_NAME).toString());
- if (payload instanceof byte[]) {
- @SuppressWarnings("unchecked")
- final Schema schema = (Schema) RuntimeSchema.getSchema(deserializeClass);
- final Object deserializeEvent = schema.newMessage();
- ProtobufIOUtil.mergeFrom((byte[]) message.getPayload(), deserializeEvent, schema);
- return deserializeEvent;
- }
- } catch (final ClassNotFoundException e) {
- LOG.error("Protostuff cannot find derserialize class", e);
- throw new MessageConversionException(message, "Failed to read payload", e);
- }
+ final byte[] payload = (byte[]) objectPayload;
+ final byte[] clazzHeader = extractClazzHeader(payload);
+ final byte[] content = extraxtContent(payload);
+ final EventType eventType = readClassHeader(clazzHeader);
+ return readContent(eventType, content);
+ }
return null;
}
@Override
protected Object convertToInternal(final Object payload, final MessageHeaders headers,
final Object conversionHint) {
- checkIfHeaderMutable(headers);
+
+ final byte[] clazzHeader = writeClassHeader(payload.getClass());
+
+ final byte[] writeContent = writeContent(payload);
+
+ return mergeClassHeaderAndContent(clazzHeader, writeContent);
+ }
+
+ private static Object readContent(final EventType eventType, final byte[] content) {
+ final Class> targetClass = eventType.getTargetClass();
+ if (targetClass == null) {
+ LOG.error("Cannot read clazz header for given EventType value {}, missing mapping", eventType.getValue());
+ throw new MessageConversionException("Missing mapping of EventType for value " + eventType.getValue());
+ }
+ @SuppressWarnings("unchecked")
+ final Schema schema = (Schema) RuntimeSchema.getSchema(targetClass);
+ final Object deserializeEvent = schema.newMessage();
+ ProtobufIOUtil.mergeFrom(content, deserializeEvent, schema);
+ return deserializeEvent;
+ }
+
+ private static byte[] mergeClassHeaderAndContent(final byte[] clazzHeader, final byte[] writeContent) {
+ final byte[] body = new byte[clazzHeader.length + writeContent.length];
+ System.arraycopy(clazzHeader, 0, body, 0, clazzHeader.length);
+ System.arraycopy(writeContent, 0, body, clazzHeader.length, writeContent.length);
+ return body;
+ }
+
+ private static byte[] extractClazzHeader(final byte[] payload) {
+ final byte[] clazzHeader = new byte[EVENT_TYPE_LENGTH];
+ System.arraycopy(payload, 0, clazzHeader, 0, EVENT_TYPE_LENGTH);
+ return clazzHeader;
+ }
+
+ private static byte[] extraxtContent(final byte[] payload) {
+ final byte[] content = new byte[payload.length - EVENT_TYPE_LENGTH];
+ System.arraycopy(payload, EVENT_TYPE_LENGTH, content, 0, content.length);
+ return content;
+ }
+
+ private static EventType readClassHeader(final byte[] typeInformation) {
+ final Schema schema = RuntimeSchema.getSchema(EventType.class);
+ final EventType deserializedType = schema.newMessage();
+ ProtobufIOUtil.mergeFrom(typeInformation, deserializedType, schema);
+ return deserializedType;
+ }
+
+ private static byte[] writeContent(final Object payload) {
final Class extends Object> serializeClass = payload.getClass();
@SuppressWarnings("unchecked")
final Schema schema = (Schema) RuntimeSchema.getSchema(serializeClass);
final LinkedBuffer buffer = LinkedBuffer.allocate();
+ return writeProtoBuf(payload, schema, buffer);
+ }
+
+ private static byte[] writeClassHeader(final Class> clazz) {
+ final EventType clazzEventType = EventType.from(clazz);
+ if (clazzEventType == null) {
+ LOG.error("There is no mapping to EventType for the given clazz {}", clazzEventType);
+ throw new MessageConversionException("Missing EventType for given class : " + clazz);
+ }
+ @SuppressWarnings("unchecked")
+ final Schema schema = (Schema) RuntimeSchema
+ .getSchema((Class extends Object>) EventType.class);
+ final LinkedBuffer buffer = LinkedBuffer.allocate();
+ return writeProtoBuf(clazzEventType, schema, buffer);
+ }
+
+ private static byte[] writeProtoBuf(final Object payload, final Schema schema, final LinkedBuffer buffer) {
final byte[] serializeByte;
try {
serializeByte = ProtostuffIOUtil.toByteArray(payload, schema, buffer);
@@ -87,22 +151,6 @@ protected Object convertToInternal(final Object payload, final MessageHeaders he
buffer.clear();
}
- headers.put(DEFAULT_CLASS_FIELD_NAME, serializeClass.getName());
return serializeByte;
}
-
- private static void checkIfHeaderMutable(final MessageHeaders headers) {
- if (isAccessorMutable(headers) || headers instanceof MutableMessageHeaders) {
- return;
- }
- LOG.error("Protostuff cannot set serializae class because message header is not mutable");
- throw new MessageConversionException(
- "Cannot set the serialize class to message header. Need Mutable message header");
- }
-
- private static boolean isAccessorMutable(final MessageHeaders headers) {
- final MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
- return accessor != null && accessor.isMutable();
- }
-
-}
\ No newline at end of file
+}
diff --git a/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventType.java b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventType.java
new file mode 100644
index 0000000000..c7d030ccaa
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-core/src/main/java/org/eclipse/hawkbit/event/EventType.java
@@ -0,0 +1,136 @@
+/**
+ * Copyright (c) 2015 Bosch Software Innovations GmbH and others.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.eclipse.hawkbit.event;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.eclipse.hawkbit.repository.event.remote.DistributionSetDeletedEvent;
+import org.eclipse.hawkbit.repository.event.remote.DistributionSetTagDeletedEvent;
+import org.eclipse.hawkbit.repository.event.remote.DownloadProgressEvent;
+import org.eclipse.hawkbit.repository.event.remote.TargetAssignDistributionSetEvent;
+import org.eclipse.hawkbit.repository.event.remote.TargetDeletedEvent;
+import org.eclipse.hawkbit.repository.event.remote.TargetTagDeletedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.ActionCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.ActionUpdatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.CancelTargetAssignmentEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetTagUpdateEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.DistributionSetUpdateEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupUpdatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.RolloutUpdatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagCreatedEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.TargetTagUpdateEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.TargetUpdatedEvent;
+
+/**
+ * The {@link EventType} class declares the event-type and it's corresponding
+ * encoding value in the payload of an remote header. The event-type is encoded
+ * into the payload of the message which is distributed.
+ *
+ * To encode and decode the event class type we need some conversation mapping
+ * between the actual class and the corresponding integer value which is the
+ * encoded value in the byte-payload.
+ */
+public class EventType {
+
+ private static final Map> TYPES = new HashMap<>();
+
+ /**
+ * The associated event-type-value must remain the same as initially
+ * declared. Otherwise messages cannot correctly de-serialized.
+ */
+ static {
+
+ // target
+ TYPES.put(1, TargetCreatedEvent.class);
+ TYPES.put(2, TargetUpdatedEvent.class);
+ TYPES.put(3, TargetDeletedEvent.class);
+ TYPES.put(4, CancelTargetAssignmentEvent.class);
+ TYPES.put(5, TargetAssignDistributionSetEvent.class);
+
+ // target tag
+ TYPES.put(6, TargetTagCreatedEvent.class);
+ TYPES.put(7, TargetTagUpdateEvent.class);
+ TYPES.put(8, TargetTagDeletedEvent.class);
+
+ // action
+ TYPES.put(9, ActionCreatedEvent.class);
+ TYPES.put(10, ActionUpdatedEvent.class);
+
+ // distribution set
+ TYPES.put(11, DistributionSetCreatedEvent.class);
+ TYPES.put(12, DistributionSetUpdateEvent.class);
+ TYPES.put(13, DistributionSetDeletedEvent.class);
+
+ // distribution set tag
+ TYPES.put(14, DistributionSetTagCreatedEvent.class);
+ TYPES.put(15, DistributionSetTagUpdateEvent.class);
+ TYPES.put(16, DistributionSetTagDeletedEvent.class);
+
+ // rollout
+ TYPES.put(17, RolloutUpdatedEvent.class);
+
+ // rollout group
+ TYPES.put(18, RolloutGroupCreatedEvent.class);
+ TYPES.put(19, RolloutGroupUpdatedEvent.class);
+
+ // download
+ TYPES.put(20, DownloadProgressEvent.class);
+ }
+
+ private int value;
+
+ /**
+ * Constructor.
+ */
+ public EventType() {
+ // for marshalling and unmarshalling.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param value
+ * the value to initialize
+ */
+ public EventType(final int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public Class> getTargetClass() {
+ return TYPES.get(value);
+ }
+
+ /**
+ * Returns a {@link EventType} based on the given class type.
+ *
+ * @param clazz
+ * the clazz type to retrieve the corresponding {@link EventType}
+ * .
+ * @return the corresponding {@link EventType} or {@code null} if the clazz
+ * does not have a {@link EventType}.
+ */
+ public static EventType from(final Class> clazz) {
+ final Optional foundEventType = TYPES.entrySet().stream()
+ .filter(entry -> entry.getValue().equals(clazz)).map(entry -> entry.getKey()).findFirst();
+ if (!foundEventType.isPresent()) {
+ return null;
+ }
+ return new EventType(foundEventType.get());
+ }
+}
diff --git a/hawkbit-repository/hawkbit-repository-core/src/test/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverterTest.java b/hawkbit-repository/hawkbit-repository-core/src/test/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverterTest.java
new file mode 100644
index 0000000000..d8c9aeab42
--- /dev/null
+++ b/hawkbit-repository/hawkbit-repository-core/src/test/java/org/eclipse/hawkbit/event/BusProtoStuffMessageConverterTest.java
@@ -0,0 +1,88 @@
+/**
+ * Copyright (c) 2015 Bosch Software Innovations GmbH and others.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.eclipse.hawkbit.event;
+
+import static org.fest.assertions.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+
+import org.eclipse.hawkbit.repository.event.remote.entity.RemoteEntityEvent;
+import org.eclipse.hawkbit.repository.event.remote.entity.TargetCreatedEvent;
+import org.eclipse.hawkbit.repository.model.Target;
+import org.fest.assertions.api.Assertions;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.springframework.cloud.bus.event.RemoteApplicationEvent;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.MessageConversionException;
+
+import ru.yandex.qatools.allure.annotations.Description;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BusProtoStuffMessageConverterTest {
+
+ private final BusProtoStuffMessageConverter underTest = new BusProtoStuffMessageConverter();
+
+ @Mock
+ private Target targetMock;
+
+ @Mock
+ private Message messageMock;
+
+ @Before
+ public void before() {
+ when(targetMock.getId()).thenReturn(1L);
+ }
+
+ @Test
+ @Description("Verifies that the TargetCreatedEvent can be successfully serialized and deserialized")
+ public void successfullySerializeAndDeserializeEvent() {
+ final TargetCreatedEvent targetCreatedEvent = new TargetCreatedEvent(targetMock, "1");
+ // serialize
+ final Object serializedEvent = underTest.convertToInternal(targetCreatedEvent,
+ new MessageHeaders(new HashMap<>()), null);
+ assertThat(serializedEvent).isInstanceOf(byte[].class);
+
+ // deserialize
+ when(messageMock.getPayload()).thenReturn(serializedEvent);
+ final Object deserializedEvent = underTest.convertFromInternal(messageMock, RemoteApplicationEvent.class, null);
+ assertThat(deserializedEvent).isInstanceOf(TargetCreatedEvent.class);
+ assertThat(deserializedEvent).isEqualTo(targetCreatedEvent);
+ }
+
+ @Test
+ @Description("Verifies that a MessageConversationException is thrown on missing event-type information encoding")
+ public void missingEventTypeMappingThrowsMessageConversationException() {
+ final DummyRemoteEntityEvent dummyEvent = new DummyRemoteEntityEvent(targetMock, "applicationId");
+ try {
+ underTest.convertToInternal(dummyEvent, new MessageHeaders(new HashMap<>()), null);
+ Assertions.fail("Missing MessageConversationException for un-defined event-type");
+ } catch (final MessageConversionException e) {
+ // expected exception
+ }
+ }
+
+ /**
+ * Test event with which non-existing mapping to serialize.
+ */
+ private final class DummyRemoteEntityEvent extends RemoteEntityEvent {
+
+ private static final long serialVersionUID = 1L;
+
+ private DummyRemoteEntityEvent(final Target target, final String applicationId) {
+ super(target, applicationId);
+ }
+
+ }
+}
diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/event/remote/RemoteTenantAwareEventTest.java b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/event/remote/RemoteTenantAwareEventTest.java
index 337b0c6c7c..e5ba5daecd 100644
--- a/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/event/remote/RemoteTenantAwareEventTest.java
+++ b/hawkbit-repository/hawkbit-repository-jpa/src/test/java/org/eclipse/hawkbit/repository/event/remote/RemoteTenantAwareEventTest.java
@@ -9,7 +9,6 @@
package org.eclipse.hawkbit.repository.event.remote;
import static org.fest.assertions.api.Assertions.assertThat;
-import static org.junit.Assert.fail;
import org.eclipse.hawkbit.repository.jpa.model.JpaAction;
import org.eclipse.hawkbit.repository.model.Action;
@@ -17,7 +16,6 @@
import org.eclipse.hawkbit.repository.model.DistributionSet;
import org.eclipse.hawkbit.repository.model.Target;
import org.junit.Test;
-import org.springframework.messaging.converter.MessageConversionException;
import ru.yandex.qatools.allure.annotations.Description;
import ru.yandex.qatools.allure.annotations.Features;
@@ -27,19 +25,6 @@
@Stories("RemoteTenantAwareEvent Tests")
public class RemoteTenantAwareEventTest extends AbstractRemoteEventTest {
- @Test
- @Description("Verifies that a immutable header is not work")
- public void testMessageWithImmutableHeader() {
- final DownloadProgressEvent downloadProgressEvent = new DownloadProgressEvent("DEFAULT", 3L, "Node");
-
- try {
- createMessageWithImmutableHeader(downloadProgressEvent);
- fail("MessageConversionException should happen");
- } catch (final MessageConversionException e) {
- // ok
- }
- }
-
@Test
@Description("Verifies that the download progress reloading by remote events works")
public void reloadDownloadProgessByRemoteEvent() {