Skip to content

Commit b000303

Browse files
authored
feat: GCS backend - Add support for the ZSTD decompression (#222)
feat: GCS backend - Add support for the ZSTD decompression
1 parent 518e33a commit b000303

File tree

9 files changed

+295
-13
lines changed

9 files changed

+295
-13
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ ext {
1414
metricsVersion = "3.0.2"
1515
http4sVersion = "0.22.15"
1616
gcsVersion = "2.30.1"
17+
zstdVersion = "1.5.5-11"
1718
monixVersion = "3.4.1" // Used only in tests.
1819
}
1920

core/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ dependencies {
1313

1414
api "com.avast.metrics:metrics-scala_2.13:$metricsVersion"
1515

16+
implementation "com.github.luben:zstd-jni:$zstdVersion"
17+
1618
testImplementation "io.monix:monix_2.13:$monixVersion"
1719
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.avast.clients.storage.compression
2+
3+
import com.github.luben.zstd.{ZstdDecompressCtx, ZstdInputStream}
4+
5+
import java.io.OutputStream
6+
import java.nio.ByteBuffer
7+
import java.nio.channels.Channels
8+
9+
class ZstdDecompressOutputStream(outputStream: OutputStream) extends OutputStream {
10+
private val decompressCtx = new ZstdDecompressCtx()
11+
private val outputChannel = Channels.newChannel(outputStream)
12+
private val outputBuffer = ByteBuffer.allocateDirect(ZstdInputStream.recommendedDOutSize().toInt)
13+
14+
private var closed = false
15+
16+
override def write(chunk: Array[Byte]): Unit = {
17+
if (closed) {
18+
throw new IllegalStateException("Stream is closed")
19+
}
20+
21+
val inputBuffer = ByteBuffer.allocateDirect(chunk.length) // ByteBuffer.wrap(chunk) does not work, we need direct buffer
22+
inputBuffer.put(chunk)
23+
inputBuffer.rewind()
24+
25+
while (inputBuffer.hasRemaining) {
26+
outputBuffer.clear()
27+
28+
decompressCtx.decompressDirectByteBufferStream(outputBuffer, inputBuffer)
29+
30+
outputBuffer.flip()
31+
32+
while (outputBuffer.hasRemaining) {
33+
outputChannel.write(outputBuffer)
34+
}
35+
}
36+
}
37+
38+
override def write(chunk: Array[Byte], offset: Int, length: Int): Unit = {
39+
write(chunk.slice(offset, offset + length))
40+
}
41+
42+
override def write(b: Int): Unit = {
43+
write(Array(b.toByte))
44+
}
45+
46+
override def close(): Unit = {
47+
if (!closed) {
48+
closed = true
49+
decompressCtx.close()
50+
outputBuffer.clear()
51+
outputChannel.close()
52+
}
53+
}
54+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.avast.clients.storage.compression
2+
3+
import com.avast.scala.hashes.Sha256
4+
import com.github.luben.zstd.Zstd
5+
import org.junit.runner.RunWith
6+
import org.scalatest.FunSuite
7+
import org.scalatestplus.junit.JUnitRunner
8+
9+
import java.io.ByteArrayOutputStream
10+
import java.nio.ByteBuffer
11+
import java.security.MessageDigest
12+
13+
@RunWith(classOf[JUnitRunner])
14+
class ZstdDecompressOutputStreamTest extends FunSuite {
15+
private def computeSha256(data: Array[Byte]): Sha256 = {
16+
Sha256(MessageDigest.getInstance("SHA-256").digest(data))
17+
}
18+
19+
private def generateData(size: Int): Array[Byte] = {
20+
1.to(size).map(i => (i % 256).toByte).toArray // generates compressible data, random data wouldn't compress well
21+
}
22+
23+
test("decompress zstd stream") {
24+
val chunkSize = 4 * 1024
25+
val testCases = Seq(0, 1, chunkSize, 10 * 1024 * 1024)
26+
27+
for (testCase <- testCases) {
28+
val original_data = generateData(testCase)
29+
val original_sha256 = computeSha256(original_data)
30+
31+
println(s"Original data size: ${original_data.length}")
32+
val compressed_data = Zstd.compress(original_data, 9)
33+
println(s"Compressed data size: ${compressed_data.length}")
34+
35+
val sourceStream = ByteBuffer.wrap(compressed_data)
36+
val targetStream = new ByteArrayOutputStream()
37+
38+
val decompressStream = new ZstdDecompressOutputStream(targetStream)
39+
40+
while (sourceStream.hasRemaining) {
41+
val chunkSize = math.min(sourceStream.remaining(), 4 * 1024)
42+
val chunk = new Array[Byte](chunkSize)
43+
sourceStream.get(chunk)
44+
decompressStream.write(chunk)
45+
}
46+
47+
decompressStream.close()
48+
49+
val result = targetStream.toByteArray
50+
51+
assert(original_sha256 == computeSha256(result))
52+
}
53+
}
54+
}

gcs/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ GCS backends supports multiple ways of authentication:
2525
* Reading credential file from default paths (see https://cloud.google.com/docs/authentication/application-default-credentials#personal)
2626
* For all options see https://cloud.google.com/docs/authentication/provide-credentials-adc#how-to
2727

28+
### Object decompression
29+
30+
GCS backend supports decompression of objects. The decision whether to decompress object or not is based on the `comp-type` header in the object's metadata.
31+
If the header is present and contains `zstd` value, the object is decompressed on the fly. Otherwise the object is downloaded as is.
32+
33+
The only supported compression algorithm is currently `zstd`.
2834

2935
### Client initialization
3036

gcs/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ dependencies {
66
implementation "com.google.cloud:google-cloud-storage:$gcsVersion"
77

88
testImplementation "io.monix:monix_2.13:$monixVersion"
9+
testImplementation "com.github.luben:zstd-jni:$zstdVersion"
910
}

gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import better.files.File
44
import cats.effect.implicits.catsEffectSyntaxBracket
55
import cats.effect.{Blocker, ContextShift, Resource, Sync}
66
import cats.syntax.all._
7+
import com.avast.clients.storage.compression.ZstdDecompressOutputStream
78
import com.avast.clients.storage.gcs.GcsStorageBackend.composeBlobPath
89
import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, StorageBackend, StorageException}
910
import com.avast.scala.hashes.Sha256
@@ -17,7 +18,7 @@ import pureconfig.generic.ProductHint
1718
import pureconfig.generic.auto._
1819
import pureconfig.{CamelCase, ConfigFieldMapping}
1920

20-
import java.io.{ByteArrayInputStream, FileInputStream}
21+
import java.io.{ByteArrayInputStream, FileInputStream, OutputStream}
2122
import java.nio.charset.StandardCharsets
2223
import java.nio.file.StandardOpenOption
2324
import java.security.{DigestOutputStream, MessageDigest}
@@ -34,7 +35,12 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucket
3435
blob <- getBlob(sha256)
3536
result = blob match {
3637
case Some(blob) =>
37-
HeadResult.Exists(blob.getSize)
38+
blob.getMetadata.get(GcsStorageBackend.OriginalSizeHeader) match {
39+
case null =>
40+
HeadResult.Exists(blob.getSize)
41+
case originalSize =>
42+
HeadResult.Exists(originalSize.toLong)
43+
}
3844
case None =>
3945
HeadResult.NotFound
4046
}
@@ -85,15 +91,7 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucket
8591
blocker
8692
.delay(destination.newOutputStream(FileStreamOpenOptions))
8793
.bracket { fileStream =>
88-
Sync[F]
89-
.delay(new DigestOutputStream(fileStream, MessageDigest.getInstance("SHA-256")))
90-
.bracket { stream =>
91-
blocker.delay(blob.downloadTo(stream)).flatMap { _ =>
92-
Sync[F].delay {
93-
(blob.getSize, Sha256(stream.getMessageDigest.digest))
94-
}
95-
}
96-
}(stream => blocker.delay(stream.close()))
94+
downloadBlobToFile(blob, fileStream)
9795
}(fileStream => blocker.delay(fileStream.close()))
9896
.map[Either[StorageException, GetResult]] {
9997
case (size, hash) =>
@@ -109,6 +107,53 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucket
109107
}
110108
}
111109

110+
private def downloadBlobToFile(blob: Blob, fileStream: OutputStream): F[(Long, Sha256)] = {
111+
def getCompressionType: Option[String] = {
112+
Option(blob.getMetadata.get(GcsStorageBackend.CompressionTypeHeader)).map(_.toLowerCase)
113+
}
114+
115+
Sync[F]
116+
.delay {
117+
val countingStream = new GcsStorageBackend.CountingOutputStream(fileStream)
118+
val hashingStream = new DigestOutputStream(countingStream, MessageDigest.getInstance("SHA-256"))
119+
(countingStream, hashingStream)
120+
}
121+
.bracket {
122+
case (countingStream, hashingStream) => {
123+
getCompressionType match {
124+
case None =>
125+
downloadBlobToStream(blob, hashingStream)
126+
case Some("zstd") =>
127+
decodeZstdBlobToStream(blob, hashingStream)
128+
case Some(unknown) =>
129+
throw new IllegalArgumentException(s"Unknown compression type $unknown")
130+
}
131+
}.flatMap { _ =>
132+
Sync[F].delay {
133+
(countingStream.length, Sha256(hashingStream.getMessageDigest.digest))
134+
}
135+
}
136+
} {
137+
case (hashingStream, countingStream) =>
138+
Sync[F].delay {
139+
hashingStream.close()
140+
countingStream.close()
141+
}
142+
}
143+
}
144+
145+
private def decodeZstdBlobToStream(blob: Blob, targetStream: DigestOutputStream): F[Unit] = {
146+
Sync[F]
147+
.delay(new ZstdDecompressOutputStream(targetStream))
148+
.bracket { decompressionStream =>
149+
downloadBlobToStream(blob, decompressionStream)
150+
}(decompressionStream => Sync[F].delay(decompressionStream.close()))
151+
}
152+
153+
private def downloadBlobToStream(blob: Blob, targetStream: OutputStream): F[Unit] = {
154+
blocker.delay(blob.downloadTo(targetStream))
155+
}
156+
112157
override def close(): Unit = {
113158
()
114159
}
@@ -117,6 +162,9 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucket
117162
object GcsStorageBackend {
118163
private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults")
119164

165+
private[gcs] val CompressionTypeHeader = "comp-type"
166+
private[gcs] val OriginalSizeHeader = "original-size"
167+
120168
def fromConfig[F[_]: Sync: ContextShift](config: Config,
121169
blocker: Blocker): Either[ConfigurationException, Resource[F, GcsStorageBackend[F]]] = {
122170

@@ -148,6 +196,35 @@ object GcsStorageBackend {
148196
String.join("/", sha256Hex.substring(0, 2), sha256Hex.substring(2, 4), sha256Hex.substring(4, 6), sha256Hex)
149197
}
150198

199+
private[gcs] class CountingOutputStream(target: OutputStream) extends OutputStream {
200+
private var count: Long = 0
201+
202+
def length: Long = count
203+
204+
override def write(b: Int): Unit = {
205+
target.write(b)
206+
count += 1
207+
}
208+
209+
override def write(b: Array[Byte]): Unit = {
210+
target.write(b)
211+
count += b.length
212+
}
213+
214+
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
215+
target.write(b, off, len)
216+
count += len
217+
}
218+
219+
override def flush(): Unit = {
220+
target.flush()
221+
}
222+
223+
override def close(): Unit = {
224+
target.close()
225+
}
226+
}
227+
151228
def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
152229
blocker: Blocker): Either[ConfigurationException, Storage] = {
153230
Either

0 commit comments

Comments
 (0)