Skip to content

Commit 4b2fd68

Browse files
authored
feat: Add GCS backend (#182)
feat: Add GCS backend
1 parent 20fe175 commit 4b2fd68

File tree

9 files changed

+384
-2
lines changed

9 files changed

+384
-2
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
Finally-tagless implementation of client for misc. storages represented by backends. Supports backends fallbacks.
44

55
Currently supported backends:
6-
1. [HCP](hcp/README.md)
6+
1. [HCP (Hitachi Content Platform)](hcp/README.md)
7+
2. [GCS (Google Cloud Storage)](gcs/README.md)
78

89
## Dependency
910

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ plugins {
1313
ext {
1414
metricsVersion = "2.10.4"
1515
http4sVersion = "0.22.12"
16+
gcsVersion = "2.22.2"
1617
monixVersion = "3.4.1" // Used only in tests.
1718
}
1819

gcs/README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# GCS (Google Cloud Storage) backend
2+
3+
## Dependency
4+
5+
```groovy
6+
compile "com.avast.clients.storage:storage-client-gcs_2.13:x.x.x"
7+
```
8+
9+
## Usage
10+
11+
Configuration:
12+
13+
```hocon
14+
projectId = "my-project-id"
15+
bucketName = "bucket-name"
16+
```
17+
18+
Client init, example for `monix.eval.Task`:
19+
20+
```scala
21+
import com.avast.clients.storage.gcs.GcsStorageBackend
22+
import com.typesafe.config.Config
23+
import monix.eval.Task
24+
import monix.execution.Scheduler
25+
import cats.effect.Blocker
26+
27+
implicit val scheduler: Scheduler = ???
28+
val blocker: Blocker = ???
29+
val config: Config = ???
30+
31+
GcsStorageBackend.fromConfig[Task](config, blocker).map{ resource =>
32+
resource.use { client =>
33+
client.get(sha256, destinationFile)
34+
}
35+
}
36+
```

gcs/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
archivesBaseName = "storage-client-gcs_2.13"
2+
3+
dependencies {
4+
api project(":core")
5+
6+
implementation "com.google.cloud:google-cloud-storage:$gcsVersion"
7+
8+
testImplementation "io.monix:monix_2.13:$monixVersion"
9+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
gcsBackendDefaults {
2+
//projectId = "" // REQUIRED
3+
//bucketName = "" // REQUIRED
4+
//jsonKeyPath = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended)
5+
}
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package com.avast.clients.storage.gcs
2+
3+
import better.files.File
4+
import cats.data.EitherT
5+
import cats.effect.implicits.catsEffectSyntaxBracket
6+
import cats.effect.{Blocker, ContextShift, Resource, Sync}
7+
import cats.syntax.all._
8+
import com.avast.clients.storage.gcs.GcsStorageBackend.composeBlobPath
9+
import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, StorageBackend, StorageException}
10+
import com.avast.scala.hashes.Sha256
11+
import com.google.auth.oauth2.ServiceAccountCredentials
12+
import com.google.cloud.ServiceOptions
13+
import com.google.cloud.storage.{Blob, Bucket, Storage, StorageOptions, StorageException => GcStorageException}
14+
import com.typesafe.config.{Config, ConfigFactory}
15+
import com.typesafe.scalalogging.StrictLogging
16+
import pureconfig.error.ConfigReaderException
17+
import pureconfig.generic.ProductHint
18+
import pureconfig.generic.auto._
19+
import pureconfig.{CamelCase, ConfigFieldMapping}
20+
21+
import java.io.FileInputStream
22+
import java.nio.file.StandardOpenOption
23+
import java.security.{DigestOutputStream, MessageDigest}
24+
25+
class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Blocker) extends StorageBackend[F] with StrictLogging {
26+
private val FileStreamOpenOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)
27+
28+
override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = {
29+
{
30+
for {
31+
_ <- Sync[F].delay(logger.debug(s"Checking presence of file $sha256 in GCS"))
32+
blob <- getBlob(sha256)
33+
result = blob match {
34+
case Some(blob) =>
35+
HeadResult.Exists(blob.getSize)
36+
case None =>
37+
HeadResult.NotFound
38+
}
39+
} yield Either.right[StorageException, HeadResult](result)
40+
}.recover {
41+
case e: GcStorageException =>
42+
logger.error(s"Error while checking presence of file $sha256 in GCS", e)
43+
Either.left[StorageException, HeadResult] {
44+
StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason)
45+
}
46+
}
47+
}
48+
49+
override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = {
50+
{
51+
for {
52+
_ <- Sync[F].delay(logger.debug(s"Downloading file $sha256 from GCS"))
53+
blob <- getBlob(sha256)
54+
result <- blob match {
55+
case Some(blob) =>
56+
receiveStreamedFile(blob, dest, sha256)
57+
case None =>
58+
Sync[F].pure[Either[StorageException, GetResult]] {
59+
Right(GetResult.NotFound)
60+
}
61+
}
62+
} yield result
63+
}.recover {
64+
case e: GcStorageException =>
65+
logger.error(s"Error while downloading file $sha256 from GCS", e)
66+
Either.left[StorageException, GetResult] {
67+
StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason)
68+
}
69+
}
70+
}
71+
72+
private def getBlob(sha256: Sha256): F[Option[Blob]] = {
73+
for {
74+
objectPath <- Sync[F].delay(composeBlobPath(sha256))
75+
result <- blocker.delay {
76+
Option(bucket.get(objectPath))
77+
}
78+
} yield result
79+
}
80+
81+
private def receiveStreamedFile(blob: Blob, destination: File, expectedHash: Sha256): F[Either[StorageException, GetResult]] = {
82+
Sync[F].delay(logger.debug(s"Downloading streamed data to $destination")) >>
83+
blocker
84+
.delay(destination.newOutputStream(FileStreamOpenOptions))
85+
.bracket { fileStream =>
86+
Sync[F]
87+
.delay(new DigestOutputStream(fileStream, MessageDigest.getInstance("SHA-256")))
88+
.bracket { stream =>
89+
blocker.delay(blob.downloadTo(stream)).flatMap { _ =>
90+
Sync[F].delay {
91+
(blob.getSize, Sha256(stream.getMessageDigest.digest))
92+
}
93+
}
94+
}(stream => blocker.delay(stream.close()))
95+
}(fileStream => blocker.delay(fileStream.close()))
96+
.map[Either[StorageException, GetResult]] {
97+
case (size, hash) =>
98+
if (expectedHash != hash) {
99+
Left {
100+
StorageException.InvalidDataException(200, "-stream-", s"Expected SHA256 $expectedHash but got $hash")
101+
}
102+
} else {
103+
Right {
104+
GetResult.Downloaded(destination, size)
105+
}
106+
}
107+
}
108+
}
109+
110+
override def close(): Unit = {
111+
()
112+
}
113+
}
114+
115+
object GcsStorageBackend {
116+
private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults")
117+
118+
def fromConfig[F[_]: Sync: ContextShift](config: Config,
119+
blocker: Blocker): EitherT[F, ConfigurationException, Resource[F, GcsStorageBackend[F]]] = {
120+
121+
def composeConfig: EitherT[F, ConfigurationException, GcsBackendConfiguration] = EitherT {
122+
Sync[F].delay {
123+
pureconfig.ConfigSource
124+
.fromConfig(config.withFallback(DefaultConfig))
125+
.load[GcsBackendConfiguration]
126+
.leftMap { failures =>
127+
ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures))
128+
}
129+
}
130+
}
131+
132+
{
133+
for {
134+
conf <- composeConfig
135+
storageClient <- prepareStorageClient(conf, blocker)
136+
bucket <- getBucket(conf, storageClient, blocker)
137+
} yield (storageClient, bucket)
138+
}.map {
139+
case (storage, bucket) =>
140+
Resource
141+
.fromAutoCloseable {
142+
Sync[F].pure(storage)
143+
}
144+
.map { _ =>
145+
new GcsStorageBackend[F](bucket)(blocker)
146+
}
147+
}
148+
}
149+
150+
private[gcs] def composeBlobPath(sha256: Sha256): String = {
151+
val sha256Hex = sha256.toHexString
152+
String.join("/", sha256Hex.substring(0, 2), sha256Hex.substring(2, 4), sha256Hex.substring(4, 6), sha256Hex)
153+
}
154+
155+
def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
156+
blocker: Blocker): EitherT[F, ConfigurationException, Storage] = {
157+
EitherT {
158+
blocker.delay {
159+
Either
160+
.catchNonFatal {
161+
val builder = conf.jsonKeyPath match {
162+
case Some(jsonKeyPath) =>
163+
StorageOptions.newBuilder
164+
.setCredentials(ServiceAccountCredentials.fromStream(new FileInputStream(jsonKeyPath)))
165+
case None =>
166+
StorageOptions.getDefaultInstance.toBuilder
167+
}
168+
169+
builder
170+
.setProjectId(conf.projectId)
171+
.setRetrySettings(ServiceOptions.getNoRetrySettings)
172+
173+
builder.build.getService
174+
}
175+
.leftMap { e =>
176+
ConfigurationException("Could not create GCS client", e)
177+
}
178+
}
179+
}
180+
}
181+
182+
def getBucket[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
183+
storageClient: Storage,
184+
blocker: Blocker): EitherT[F, ConfigurationException, Bucket] = {
185+
EitherT {
186+
blocker
187+
.delay {
188+
Either
189+
.catchNonFatal {
190+
Option(storageClient.get(conf.bucketName, Storage.BucketGetOption.userProject(conf.projectId)))
191+
}
192+
}
193+
.map {
194+
_.leftMap { e =>
195+
ConfigurationException(s"Attempt to get bucket ${conf.bucketName} failed", e)
196+
}.flatMap {
197+
case Some(bucket) =>
198+
Right(bucket)
199+
case None =>
200+
Left {
201+
ConfigurationException(s"Bucket ${conf.bucketName} does not exist")
202+
}
203+
}
204+
}
205+
}
206+
}
207+
}
208+
209+
case class GcsBackendConfiguration(projectId: String, bucketName: String, jsonKeyPath: Option[String] = None)
210+
211+
object GcsBackendConfiguration {
212+
// configure pureconfig:
213+
implicit val productHint: ProductHint[GcsBackendConfiguration] = ProductHint[GcsBackendConfiguration](
214+
fieldMapping = ConfigFieldMapping(CamelCase, CamelCase)
215+
)
216+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package com.avast.clients.storage.gcs
2+
3+
import better.files.File
4+
import cats.effect.Blocker
5+
import com.avast.clients.storage.gcs.TestImplicits.{randomString, StringOps}
6+
import com.avast.clients.storage.{GetResult, HeadResult}
7+
import com.avast.scala.hashes.Sha256
8+
import com.google.cloud.storage.{Blob, Bucket}
9+
import monix.eval.Task
10+
import monix.execution.Scheduler.Implicits.global
11+
import org.junit.runner.RunWith
12+
import org.mockito.ArgumentMatchers.any
13+
import org.mockito.Mockito.when
14+
import org.scalatest.FunSuite
15+
import org.scalatest.concurrent.ScalaFutures
16+
import org.scalatestplus.junit.JUnitRunner
17+
import org.scalatestplus.mockito.MockitoSugar
18+
19+
import java.io.OutputStream
20+
import scala.concurrent.duration._
21+
22+
@RunWith(classOf[JUnitRunner])
23+
class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar {
24+
test("head") {
25+
val fileSize = 1001100
26+
val content = randomString(fileSize)
27+
val sha = content.sha256
28+
val shaStr = sha.toString()
29+
30+
val blob = mock[Blob]
31+
when(blob.getSize).thenReturn(fileSize.toLong)
32+
33+
val bucket = mock[Bucket]
34+
when(bucket.get(any[String]())).thenAnswer { call =>
35+
val blobPath = call.getArgument[String](0)
36+
assertResult {
37+
List(
38+
shaStr.substring(0, 2),
39+
shaStr.substring(2, 4),
40+
shaStr.substring(4, 6),
41+
shaStr,
42+
)
43+
}(blobPath.split("/").toList)
44+
blob
45+
}
46+
47+
val result = composeTestBackend(bucket).head(sha).runSyncUnsafe(10.seconds)
48+
49+
assertResult(Right(HeadResult.Exists(fileSize)))(result)
50+
}
51+
52+
test("get") {
53+
val fileSize = 1001200
54+
val content = randomString(fileSize)
55+
val sha = content.sha256
56+
val shaStr = sha.toString()
57+
58+
val blob = mock[Blob]
59+
when(blob.getSize).thenReturn(fileSize.toLong)
60+
when(blob.downloadTo(any[OutputStream]())).thenAnswer { call =>
61+
val outputStream = call.getArgument[OutputStream](0)
62+
outputStream.write(content.getBytes())
63+
}
64+
65+
val bucket = mock[Bucket]
66+
when(bucket.get(any[String]())).thenAnswer { call =>
67+
val blobPath = call.getArgument[String](0)
68+
assertResult {
69+
List(
70+
shaStr.substring(0, 2),
71+
shaStr.substring(2, 4),
72+
shaStr.substring(4, 6),
73+
shaStr,
74+
)
75+
}(blobPath.split("/").toList)
76+
blob
77+
}
78+
79+
File.usingTemporaryFile() { file =>
80+
val result = composeTestBackend(bucket).get(sha, file).runSyncUnsafe(10.seconds)
81+
assertResult(Right(GetResult.Downloaded(file, fileSize)))(result)
82+
assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase)
83+
assertResult(fileSize)(file.size)
84+
}
85+
}
86+
87+
test("composeObjectPath") {
88+
val sha = Sha256("d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")
89+
assertResult("d0/5a/f9/d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")(GcsStorageBackend.composeBlobPath(sha))
90+
}
91+
92+
private def composeTestBackend(bucket: Bucket): GcsStorageBackend[Task] = {
93+
val blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io())
94+
new GcsStorageBackend[Task](bucket)(blocker)
95+
}
96+
}

0 commit comments

Comments
 (0)