Skip to content

Commit

Permalink
Merge pull request #186 from avast/fix/gcs_remove_bucket_dependency
Browse files Browse the repository at this point in the history
GCS - remove bucket dependency
  • Loading branch information
mi-char authored Jun 9, 2023
2 parents ff194cb + 9e39456 commit 013825c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 99 deletions.
2 changes: 1 addition & 1 deletion gcs/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
gcsBackendDefaults {
//projectId = "" // REQUIRED
//bucketName = "" // REQUIRED
//jsonKeyPath = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended)
//credentialsFile = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.avast.clients.storage.gcs

import better.files.File
import cats.data.EitherT
import cats.effect.implicits.catsEffectSyntaxBracket
import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.syntax.all._
Expand All @@ -10,7 +9,7 @@ import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult,
import com.avast.scala.hashes.Sha256
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.ServiceOptions
import com.google.cloud.storage.{Blob, Bucket, Storage, StorageOptions, StorageException => GcStorageException}
import com.google.cloud.storage.{Blob, BlobId, Storage, StorageOptions, StorageException => GcStorageException}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.StrictLogging
import pureconfig.error.ConfigReaderException
Expand All @@ -23,7 +22,9 @@ import java.nio.charset.StandardCharsets
import java.nio.file.StandardOpenOption
import java.security.{DigestOutputStream, MessageDigest}

class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Blocker) extends StorageBackend[F] with StrictLogging {
class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucketName: String)(blocker: Blocker)
extends StorageBackend[F]
with StrictLogging {
private val FileStreamOpenOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)

override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = {
Expand Down Expand Up @@ -74,7 +75,7 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Block
for {
objectPath <- Sync[F].delay(composeBlobPath(sha256))
result <- blocker.delay {
Option(bucket.get(objectPath))
Option(storageClient.get(BlobId.of(bucketName, objectPath)))
}
} yield result
}
Expand Down Expand Up @@ -117,34 +118,28 @@ object GcsStorageBackend {
private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults")

def fromConfig[F[_]: Sync: ContextShift](config: Config,
blocker: Blocker): EitherT[F, ConfigurationException, Resource[F, GcsStorageBackend[F]]] = {

def composeConfig: EitherT[F, ConfigurationException, GcsBackendConfiguration] = EitherT {
Sync[F].delay {
pureconfig.ConfigSource
.fromConfig(config.withFallback(DefaultConfig))
.load[GcsBackendConfiguration]
.leftMap { failures =>
ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures))
}
}
blocker: Blocker): Either[ConfigurationException, Resource[F, GcsStorageBackend[F]]] = {

def composeConfig: Either[ConfigurationException, GcsBackendConfiguration] = {
pureconfig.ConfigSource
.fromConfig(config.withFallback(DefaultConfig))
.load[GcsBackendConfiguration]
.leftMap { failures =>
ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures))
}
}

{
for {
conf <- composeConfig
storageClient <- prepareStorageClient(conf, blocker)
bucket <- getBucket(conf, storageClient, blocker)
} yield (storageClient, bucket)
}.map {
case (storage, bucket) =>
Resource
.fromAutoCloseable {
Sync[F].pure(storage)
}
.map { _ =>
new GcsStorageBackend[F](bucket)(blocker)
}
for {
conf <- composeConfig
storageClient <- prepareStorageClient(conf, blocker)
} yield {
Resource
.fromAutoCloseable {
Sync[F].pure(storageClient)
}
.map { storageClient =>
new GcsStorageBackend[F](storageClient, conf.bucketName)(blocker)
}
}
}

Expand All @@ -154,66 +149,36 @@ object GcsStorageBackend {
}

def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
blocker: Blocker): EitherT[F, ConfigurationException, Storage] = {
EitherT {
blocker.delay {
Either
.catchNonFatal {
val credentialsFileContent = conf.credentialsFile
.map { credentialsFilePath =>
new FileInputStream(credentialsFilePath)
}
.orElse {
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS_RAW").map { credentialFileRaw =>
new ByteArrayInputStream(credentialFileRaw.getBytes(StandardCharsets.UTF_8))
}
}

val builder = credentialsFileContent match {
case Some(inputStream) =>
StorageOptions.newBuilder
.setCredentials(ServiceAccountCredentials.fromStream(inputStream))
case None =>
StorageOptions.getDefaultInstance.toBuilder
}

builder
.setProjectId(conf.projectId)
.setRetrySettings(ServiceOptions.getNoRetrySettings)

builder.build.getService
blocker: Blocker): Either[ConfigurationException, Storage] = {
Either
.catchNonFatal {
val credentialsFileContent = conf.credentialsFile
.map { credentialsFilePath =>
new FileInputStream(credentialsFilePath)
}
.leftMap { e =>
ConfigurationException("Could not create GCS client", e)
}
}
}
}

def getBucket[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
storageClient: Storage,
blocker: Blocker): EitherT[F, ConfigurationException, Bucket] = {
EitherT {
blocker
.delay {
Either
.catchNonFatal {
Option(storageClient.get(conf.bucketName, Storage.BucketGetOption.userProject(conf.projectId)))
.orElse {
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS_RAW").map { credentialFileRaw =>
new ByteArrayInputStream(credentialFileRaw.getBytes(StandardCharsets.UTF_8))
}
}
.map {
_.leftMap { e =>
ConfigurationException(s"Attempt to get bucket ${conf.bucketName} failed", e)
}.flatMap {
case Some(bucket) =>
Right(bucket)
case None =>
Left {
ConfigurationException(s"Bucket ${conf.bucketName} does not exist")
}
}

val builder = credentialsFileContent match {
case Some(inputStream) =>
StorageOptions.newBuilder
.setCredentials(ServiceAccountCredentials.fromStream(inputStream))
case None =>
StorageOptions.getDefaultInstance.toBuilder
}
}

builder
.setProjectId(conf.projectId)
.setRetrySettings(ServiceOptions.getNoRetrySettings)

builder.build.getService
}
.leftMap { e =>
ConfigurationException("Could not create GCS client", e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package com.avast.clients.storage.gcs

import better.files.File
import cats.effect.Blocker
import com.avast.clients.storage.gcs.TestImplicits.{randomString, StringOps}
import com.avast.clients.storage.gcs.TestImplicits.{StringOps, randomString}
import com.avast.clients.storage.{GetResult, HeadResult}
import com.avast.scala.hashes.Sha256
import com.google.cloud.storage.{Blob, Bucket}
import com.google.cloud.storage.{Blob, BlobId, Storage}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.junit.runner.RunWith
Expand All @@ -26,13 +26,16 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
val content = randomString(fileSize)
val sha = content.sha256
val shaStr = sha.toString()
val bucketName = "bucket-tst"

val blob = mock[Blob]
when(blob.getSize).thenReturn(fileSize.toLong)

val bucket = mock[Bucket]
when(bucket.get(any[String]())).thenAnswer { call =>
val blobPath = call.getArgument[String](0)
val storageClient = mock[Storage]
when(storageClient.get(any[BlobId]())).thenAnswer { call =>
val blobId = call.getArgument[BlobId](0)
val blobPath = blobId.getName
assertResult(bucketName)(blobId.getBucket)
assertResult {
List(
shaStr.substring(0, 2),
Expand All @@ -44,7 +47,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
blob
}

val result = composeTestBackend(bucket).head(sha).runSyncUnsafe(10.seconds)
val result = composeTestBackend(storageClient, bucketName).head(sha).runSyncUnsafe(10.seconds)

assertResult(Right(HeadResult.Exists(fileSize)))(result)
}
Expand All @@ -54,6 +57,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
val content = randomString(fileSize)
val sha = content.sha256
val shaStr = sha.toString()
val bucketName = "bucket-tst"

val blob = mock[Blob]
when(blob.getSize).thenReturn(fileSize.toLong)
Expand All @@ -62,9 +66,11 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
outputStream.write(content.getBytes())
}

val bucket = mock[Bucket]
when(bucket.get(any[String]())).thenAnswer { call =>
val blobPath = call.getArgument[String](0)
val storageClient = mock[Storage]
when(storageClient.get(any[BlobId]())).thenAnswer { call =>
val blobId = call.getArgument[BlobId](0)
val blobPath = blobId.getName
assertResult(bucketName)(blobId.getBucket)
assertResult {
List(
shaStr.substring(0, 2),
Expand All @@ -77,7 +83,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
}

File.usingTemporaryFile() { file =>
val result = composeTestBackend(bucket).get(sha, file).runSyncUnsafe(10.seconds)
val result = composeTestBackend(storageClient, bucketName).get(sha, file).runSyncUnsafe(10.seconds)
assertResult(Right(GetResult.Downloaded(file, fileSize)))(result)
assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase)
assertResult(fileSize)(file.size)
Expand All @@ -89,8 +95,8 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar
assertResult("d0/5a/f9/d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")(GcsStorageBackend.composeBlobPath(sha))
}

private def composeTestBackend(bucket: Bucket): GcsStorageBackend[Task] = {
private def composeTestBackend(storageClient: Storage, bucketName: String): GcsStorageBackend[Task] = {
val blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io())
new GcsStorageBackend[Task](bucket)(blocker)
new GcsStorageBackend[Task](storageClient, bucketName)(blocker)
}
}

0 comments on commit 013825c

Please sign in to comment.