diff --git a/gcs/src/main/resources/reference.conf b/gcs/src/main/resources/reference.conf index 7b52f38..0161f8e 100644 --- a/gcs/src/main/resources/reference.conf +++ b/gcs/src/main/resources/reference.conf @@ -1,5 +1,5 @@ gcsBackendDefaults { //projectId = "" // REQUIRED //bucketName = "" // REQUIRED - //jsonKeyPath = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended) + //credentialsFile = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended) } \ No newline at end of file diff --git a/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala b/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala index 1455695..6c5db95 100644 --- a/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala +++ b/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala @@ -1,7 +1,6 @@ package com.avast.clients.storage.gcs import better.files.File -import cats.data.EitherT import cats.effect.implicits.catsEffectSyntaxBracket import cats.effect.{Blocker, ContextShift, Resource, Sync} import cats.syntax.all._ @@ -10,7 +9,7 @@ import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, import com.avast.scala.hashes.Sha256 import com.google.auth.oauth2.ServiceAccountCredentials import com.google.cloud.ServiceOptions -import com.google.cloud.storage.{Blob, Bucket, Storage, StorageOptions, StorageException => GcStorageException} +import com.google.cloud.storage.{Blob, BlobId, Storage, StorageOptions, StorageException => GcStorageException} import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.StrictLogging import pureconfig.error.ConfigReaderException @@ -23,7 +22,9 @@ import java.nio.charset.StandardCharsets import java.nio.file.StandardOpenOption import java.security.{DigestOutputStream, MessageDigest} -class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Blocker) extends StorageBackend[F] with StrictLogging { +class GcsStorageBackend[F[_]: Sync: ContextShift](storageClient: Storage, bucketName: String)(blocker: Blocker) + extends StorageBackend[F] + with StrictLogging { private val FileStreamOpenOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING) override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { @@ -74,7 +75,7 @@ class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Block for { objectPath <- Sync[F].delay(composeBlobPath(sha256)) result <- blocker.delay { - Option(bucket.get(objectPath)) + Option(storageClient.get(BlobId.of(bucketName, objectPath))) } } yield result } @@ -117,34 +118,28 @@ object GcsStorageBackend { private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults") def fromConfig[F[_]: Sync: ContextShift](config: Config, - blocker: Blocker): EitherT[F, ConfigurationException, Resource[F, GcsStorageBackend[F]]] = { - - def composeConfig: EitherT[F, ConfigurationException, GcsBackendConfiguration] = EitherT { - Sync[F].delay { - pureconfig.ConfigSource - .fromConfig(config.withFallback(DefaultConfig)) - .load[GcsBackendConfiguration] - .leftMap { failures => - ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures)) - } - } + blocker: Blocker): Either[ConfigurationException, Resource[F, GcsStorageBackend[F]]] = { + + def composeConfig: Either[ConfigurationException, GcsBackendConfiguration] = { + pureconfig.ConfigSource + .fromConfig(config.withFallback(DefaultConfig)) + .load[GcsBackendConfiguration] + .leftMap { failures => + ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures)) + } } - { - for { - conf <- composeConfig - storageClient <- prepareStorageClient(conf, blocker) - bucket <- getBucket(conf, storageClient, blocker) - } yield (storageClient, bucket) - }.map { - case (storage, bucket) => - Resource - .fromAutoCloseable { - Sync[F].pure(storage) - } - .map { _ => - new GcsStorageBackend[F](bucket)(blocker) - } + for { + conf <- composeConfig + storageClient <- prepareStorageClient(conf, blocker) + } yield { + Resource + .fromAutoCloseable { + Sync[F].pure(storageClient) + } + .map { storageClient => + new GcsStorageBackend[F](storageClient, conf.bucketName)(blocker) + } } } @@ -154,66 +149,36 @@ object GcsStorageBackend { } def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration, - blocker: Blocker): EitherT[F, ConfigurationException, Storage] = { - EitherT { - blocker.delay { - Either - .catchNonFatal { - val credentialsFileContent = conf.credentialsFile - .map { credentialsFilePath => - new FileInputStream(credentialsFilePath) - } - .orElse { - sys.env.get("GOOGLE_APPLICATION_CREDENTIALS_RAW").map { credentialFileRaw => - new ByteArrayInputStream(credentialFileRaw.getBytes(StandardCharsets.UTF_8)) - } - } - - val builder = credentialsFileContent match { - case Some(inputStream) => - StorageOptions.newBuilder - .setCredentials(ServiceAccountCredentials.fromStream(inputStream)) - case None => - StorageOptions.getDefaultInstance.toBuilder - } - - builder - .setProjectId(conf.projectId) - .setRetrySettings(ServiceOptions.getNoRetrySettings) - - builder.build.getService + blocker: Blocker): Either[ConfigurationException, Storage] = { + Either + .catchNonFatal { + val credentialsFileContent = conf.credentialsFile + .map { credentialsFilePath => + new FileInputStream(credentialsFilePath) } - .leftMap { e => - ConfigurationException("Could not create GCS client", e) - } - } - } - } - - def getBucket[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration, - storageClient: Storage, - blocker: Blocker): EitherT[F, ConfigurationException, Bucket] = { - EitherT { - blocker - .delay { - Either - .catchNonFatal { - Option(storageClient.get(conf.bucketName, Storage.BucketGetOption.userProject(conf.projectId))) + .orElse { + sys.env.get("GOOGLE_APPLICATION_CREDENTIALS_RAW").map { credentialFileRaw => + new ByteArrayInputStream(credentialFileRaw.getBytes(StandardCharsets.UTF_8)) } - } - .map { - _.leftMap { e => - ConfigurationException(s"Attempt to get bucket ${conf.bucketName} failed", e) - }.flatMap { - case Some(bucket) => - Right(bucket) - case None => - Left { - ConfigurationException(s"Bucket ${conf.bucketName} does not exist") - } } + + val builder = credentialsFileContent match { + case Some(inputStream) => + StorageOptions.newBuilder + .setCredentials(ServiceAccountCredentials.fromStream(inputStream)) + case None => + StorageOptions.getDefaultInstance.toBuilder } - } + + builder + .setProjectId(conf.projectId) + .setRetrySettings(ServiceOptions.getNoRetrySettings) + + builder.build.getService + } + .leftMap { e => + ConfigurationException("Could not create GCS client", e) + } } } diff --git a/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala b/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala index 70cc8d2..75ff7dc 100644 --- a/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala +++ b/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala @@ -2,10 +2,10 @@ package com.avast.clients.storage.gcs import better.files.File import cats.effect.Blocker -import com.avast.clients.storage.gcs.TestImplicits.{randomString, StringOps} +import com.avast.clients.storage.gcs.TestImplicits.{StringOps, randomString} import com.avast.clients.storage.{GetResult, HeadResult} import com.avast.scala.hashes.Sha256 -import com.google.cloud.storage.{Blob, Bucket} +import com.google.cloud.storage.{Blob, BlobId, Storage} import monix.eval.Task import monix.execution.Scheduler.Implicits.global import org.junit.runner.RunWith @@ -26,13 +26,16 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar val content = randomString(fileSize) val sha = content.sha256 val shaStr = sha.toString() + val bucketName = "bucket-tst" val blob = mock[Blob] when(blob.getSize).thenReturn(fileSize.toLong) - val bucket = mock[Bucket] - when(bucket.get(any[String]())).thenAnswer { call => - val blobPath = call.getArgument[String](0) + val storageClient = mock[Storage] + when(storageClient.get(any[BlobId]())).thenAnswer { call => + val blobId = call.getArgument[BlobId](0) + val blobPath = blobId.getName + assertResult(bucketName)(blobId.getBucket) assertResult { List( shaStr.substring(0, 2), @@ -44,7 +47,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar blob } - val result = composeTestBackend(bucket).head(sha).runSyncUnsafe(10.seconds) + val result = composeTestBackend(storageClient, bucketName).head(sha).runSyncUnsafe(10.seconds) assertResult(Right(HeadResult.Exists(fileSize)))(result) } @@ -54,6 +57,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar val content = randomString(fileSize) val sha = content.sha256 val shaStr = sha.toString() + val bucketName = "bucket-tst" val blob = mock[Blob] when(blob.getSize).thenReturn(fileSize.toLong) @@ -62,9 +66,11 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar outputStream.write(content.getBytes()) } - val bucket = mock[Bucket] - when(bucket.get(any[String]())).thenAnswer { call => - val blobPath = call.getArgument[String](0) + val storageClient = mock[Storage] + when(storageClient.get(any[BlobId]())).thenAnswer { call => + val blobId = call.getArgument[BlobId](0) + val blobPath = blobId.getName + assertResult(bucketName)(blobId.getBucket) assertResult { List( shaStr.substring(0, 2), @@ -77,7 +83,7 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar } File.usingTemporaryFile() { file => - val result = composeTestBackend(bucket).get(sha, file).runSyncUnsafe(10.seconds) + val result = composeTestBackend(storageClient, bucketName).get(sha, file).runSyncUnsafe(10.seconds) assertResult(Right(GetResult.Downloaded(file, fileSize)))(result) assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase) assertResult(fileSize)(file.size) @@ -89,8 +95,8 @@ class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar assertResult("d0/5a/f9/d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")(GcsStorageBackend.composeBlobPath(sha)) } - private def composeTestBackend(bucket: Bucket): GcsStorageBackend[Task] = { + private def composeTestBackend(storageClient: Storage, bucketName: String): GcsStorageBackend[Task] = { val blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io()) - new GcsStorageBackend[Task](bucket)(blocker) + new GcsStorageBackend[Task](storageClient, bucketName)(blocker) } }