diff --git a/MYCONTRIBUTION.md b/MYCONTRIBUTION.md new file mode 100644 index 0000000000..2486dc0a15 --- /dev/null +++ b/MYCONTRIBUTION.md @@ -0,0 +1,7 @@ +```Clean up PR descriptions by removing comments #2017``` + +```Anubhab/1788 #2018 +``` + + + diff --git a/fury-core/README.md b/fury-core/README.md new file mode 100644 index 0000000000..03a54b7b5d --- /dev/null +++ b/fury-core/README.md @@ -0,0 +1,26 @@ +# fury-core Module + +This module provides core functionalities for the FURY project. + +## Zstd Compression Integration + +This module now includes support for compressing and decompressing type metadata using the Zstd compression library. + +**Features:** + +- **Improved Compression:** Zstd generally offers better compression ratios than the previous Deflater-based implementation. +- **ZstdMetaCompressor:** A new `ZstdMetaCompressor` class has been implemented to handle Zstd compression and decompression. +- **Unit Tests:** Unit tests have been added to verify the functionality and correctness of the `ZstdMetaCompressor`. + +**Usage:** + +- To use the `ZstdMetaCompressor`, inject it into your service classes: + + ```java + @Autowired + private MetaCompressor metaCompressor; + + public void processMetadata(byte[] metadata) { + byte[] compressedMetadata = metaCompressor.compress(metadata); + // ... use compressedMetadata ... + } \ No newline at end of file diff --git a/fury-core/pom.xml b/fury-core/pom.xml new file mode 100644 index 0000000000..8690a49aea --- /dev/null +++ b/fury-core/pom.xml @@ -0,0 +1,5 @@ + + com.github.luben + zstd-jni + 1.5.2 + \ No newline at end of file diff --git a/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressor.java b/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressor.java new file mode 100644 index 0000000000..2763e29112 --- /dev/null +++ b/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressor.java @@ -0,0 +1,24 @@ +package com.example.fury.core.compression; + +import com.github.luben.zstd.Zstd; + +public class ZstdMetaCompressor implements MetaCompressor { + + @Override + public byte[] compress(byte[] metadata) { + try { + return Zstd.compress(metadata); + } catch (Exception e) { + throw new RuntimeException("Failed to compress metadata: " + e.getMessage(), e); + } + } + + @Override + public byte[] decompress(byte[] compressedMetadata) { + try { + return Zstd.decompress(compressedMetadata); + } catch (Exception e) { + throw new RuntimeException("Failed to decompress metadata: " + e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressorTest.java b/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressorTest.java new file mode 100644 index 0000000000..aa4c60a1e5 --- /dev/null +++ b/fury-core/src/main/java/com/example/fury/core/compression/ZstdMetaCompressorTest.java @@ -0,0 +1,21 @@ +package com.example.fury.core.compression; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +public class ZstdMetaCompressorTest { + + private final MetaCompressor compressor = new ZstdMetaCompressor(); + + @Test + public void testCompressDecompress() { + byte[] originalData = "This is some sample metadata.".getBytes(); + byte[] compressedData = compressor.compress(originalData); + byte[] decompressedData = compressor.decompress(compressedData); + + assertArrayEquals(originalData, decompressedData); + } + + // Add more test cases for different input sizes and edge cases +} \ No newline at end of file diff --git a/fury-core/src/main/java/com/example/fury/service/MyService.java b/fury-core/src/main/java/com/example/fury/service/MyService.java new file mode 100644 index 0000000000..427ca7f1ac --- /dev/null +++ b/fury-core/src/main/java/com/example/fury/service/MyService.java @@ -0,0 +1,17 @@ +package com.example.fury.service; + +import com.example.fury.core.compression.MetaCompressor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class MyService { + + @Autowired + private MetaCompressor metaCompressor; + + public void processMetadata(byte[] metadata) { + byte[] compressedMetadata = metaCompressor.compress(metadata); + // ... use compressedMetadata ... + } +} \ No newline at end of file diff --git a/scala/src/main/scala/org/apache/fury/serializer/scala/CollectionSerializer.scala b/scala/src/main/scala/org/apache/fury/serializer/scala/CollectionSerializer.scala index 0c116e864f..b43c51a6c8 100644 --- a/scala/src/main/scala/org/apache/fury/serializer/scala/CollectionSerializer.scala +++ b/scala/src/main/scala/org/apache/fury/serializer/scala/CollectionSerializer.scala @@ -185,3 +185,56 @@ class ScalaSeqSerializer[A, T <: scala.collection.Seq[A]](fury: Fury, cls: Class new ListAdapter[Any](value) } } +abstract class AbstractScalaCollectionSerializer[A, T <: Iterable[A]](fury: Fury, cls: Class[T]) + extends AbstractCollectionSerializer[T](fury, cls) { + + // Existing methods + + // Add copy method + def copy(value: T): T = { + val builder = value.iterableFactory.newBuilder[A] + builder ++= value + builder.result() + } +} + +class ScalaCollectionSerializer[A, T <: Iterable[A]] (fury: Fury, cls: Class[T]) + extends AbstractScalaCollectionSerializer[A, T](fury, cls) { + override def onCollectionWrite(buffer: MemoryBuffer, value: T): util.Collection[_] = { + val factory: Factory[A, Any] = value.iterableFactory.iterableFactory + val adapter = new CollectionAdapter[A, T](value) + buffer.writeVarUint32Small7(adapter.size) + fury.writeRef(buffer, factory) + adapter + } + + // Implement copy method + override def copy(value: T): T = super.copy(value) +} + +class ScalaSortedSetSerializer[A, T <: scala.collection.SortedSet[A]](fury: Fury, cls: Class[T]) + extends AbstractScalaCollectionSerializer[A, T](fury, cls) { + override def onCollectionWrite(buffer: MemoryBuffer, value: T): util.Collection[_] = { + buffer.writeVarUint32Small7(value.size) + val factory = value.sortedIterableFactory.evidenceIterableFactory[Any]( + value.ordering.asInstanceOf[Ordering[Any]]) + fury.writeRef(buffer, factory) + new CollectionAdapter[A, T](value) + } + + // Implement copy method + override def copy(value: T): T = super.copy(value) +} + +class ScalaSeqSerializer[A, T <: scala.collection.Seq[A]](fury: Fury, cls: Class[T]) + extends AbstractScalaCollectionSerializer[A, T](fury, cls) { + override def onCollectionWrite(buffer: MemoryBuffer, value: T): util.Collection[_] = { + buffer.writeVarUint32Small7(value.size) + val factory: Factory[A, Any] = value.iterableFactory.iterableFactory + fury.writeRef(buffer, factory) + new ListAdapter[Any](value) + } + + // Implement copy method + override def copy(value: T): T = super.copy(value) +} diff --git a/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala b/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala index 4f68119381..3d373a6c64 100644 --- a/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala +++ b/scala/src/main/scala/org/apache/fury/serializer/scala/MapSerializer.scala @@ -142,3 +142,44 @@ class ScalaSortedMapSerializer[K, V, T <: scala.collection.SortedMap[K, V]](fury new MapAdapter[K, V](value) } } + +abstract class AbstractScalaMapSerializer[K, V, T](fury: Fury, cls: Class[T]) + extends AbstractMapSerializer[T](fury, cls) { + + // Existing methods + + // Add copy method + def copy(value: T): T = { + val builder = value.mapFactory.newBuilder[K, V] + builder ++= value + builder.result() + } +} + +class ScalaMapSerializer[K, V, T <: scala.collection.Map[K, V]](fury: Fury, cls: Class[T]) + extends AbstractScalaMapSerializer[K, V, T](fury, cls) { + + override def onMapWrite(buffer: MemoryBuffer, value: T): util.Map[_, _] = { + buffer.writeVarUint32Small7(value.size) + val factory = value.mapFactory.mapFactory[Any, Any].asInstanceOf[Factory[Any, Any]] + fury.writeRef(buffer, factory) + new MapAdapter[K, V](value) + } + + // Implement copy method + override def copy(value: T): T = super.copy(value) +} + +class ScalaSortedMapSerializer[K, V, T <: scala.collection.SortedMap[K, V]](fury: Fury, cls: Class[T]) + extends AbstractScalaMapSerializer[K, V, T](fury, cls) { + override def onMapWrite(buffer: MemoryBuffer, value: T): util.Map[_, _] = { + buffer.writeVarUint32Small7(value.size) + val factory = value.sortedMapFactory.sortedMapFactory[Any, Any]( + value.ordering.asInstanceOf[Ordering[Any]]).asInstanceOf[Factory[Any, Any]] + fury.writeRef(buffer, factory) + new MapAdapter[K, V](value) + } + + // Implement copy method + override def copy(value: T): T = super.copy(value) +} diff --git a/scala/src/test/scala/org/apache/fury/serializer/scala/SerializerTest.scala b/scala/src/test/scala/org/apache/fury/serializer/scala/SerializerTest.scala new file mode 100644 index 0000000000..565dcae1aa --- /dev/null +++ b/scala/src/test/scala/org/apache/fury/serializer/scala/SerializerTest.scala @@ -0,0 +1,24 @@ +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class SerializerTest extends AnyFlatSpec with Matchers { + "ScalaCollectionSerializer" should "copy collections correctly" in { + val fury = new Fury() + val original = Seq(1, 2, 3) + val serializer = new ScalaCollectionSerializer[Int, Seq[Int]](fury, classOf[Seq[Int]]) + + val copy = serializer.copy(original) + copy shouldEqual original + copy should not be theSameInstanceAs (original) + } + + "ScalaMapSerializer" should "copy maps correctly" in { + val fury = new Fury() + val original = Map("a" -> 1, "b" -> 2) + val serializer = new ScalaMapSerializer[String, Int, Map[String, Int]](fury, classOf[Map[String, Int]]) + + val copy = serializer.copy(original) + copy shouldEqual original + copy should not be theSameInstanceAs (original) + } +}