Skip to content

Commit 6389f59

Browse files
authored
Fix BT integration tests (#5606)
1 parent 8a68e11 commit 6389f59

File tree

3 files changed

+14
-19
lines changed

3 files changed

+14
-19
lines changed

integration/src/test/scala/com/spotify/scio/bigtable/BigtableIT.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class BigtableIT extends PipelineSpec {
110110
.build()
111111
runWithRealContext() { sc =>
112112
sc
113-
.bigtable(projectId, instanceId, tableId, rowFilter = rowFilter)
113+
.bigtable(BTOptions(projectId, instanceId), tableId, rowFilter = rowFilter)
114114
.map(fromRow) should containInAnyOrder(data)
115115
}.waitUntilDone()
116116
} catch {
@@ -151,7 +151,7 @@ class BigtableIT extends PipelineSpec {
151151
.build()
152152
runWithRealContext() { sc =>
153153
sc
154-
.bigtable(projectId, instanceId, tableId, rowFilter = rowFilter)
154+
.bigtable(BTOptions(projectId, instanceId), tableId, rowFilter = rowFilter)
155155
.map(fromRow) should containInAnyOrder(data)
156156
}.waitUntilDone()
157157
} catch {

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/SCollectionSyntax.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.protobuf.ByteString
2323
import com.spotify.scio.io.ClosedTap
2424
import com.spotify.scio.values.SCollection
2525
import org.joda.time.Duration
26-
import com.spotify.scio.bigtable.{BigtableTypedIO, BigtableWrite}
26+
import com.spotify.scio.bigtable.{BTOptions, BigtableTypedIO, BigtableWrite}
2727
import com.spotify.scio.coders.Coder
2828
import magnolify.bigtable.BigtableType
2929

@@ -58,8 +58,6 @@ final class SCollectionMutationOps[T <: Mutation](
5858
final class BigtableTypedOps[K: Coder, T: BigtableType: Coder](
5959
private val self: SCollection[(K, T)]
6060
) {
61-
private def btOpts(projectId: String, instanceId: String): BigtableOptions =
62-
BigtableOptions.builder().setProjectId(projectId).setInstanceId(instanceId).build
6361

6462
def saveAsBigtable(
6563
projectId: String,
@@ -69,7 +67,7 @@ final class BigtableTypedOps[K: Coder, T: BigtableType: Coder](
6967
keyFn: K => ByteString
7068
): ClosedTap[Nothing] = {
7169
val params = BigtableTypedIO.WriteParam[K](columnFamily, keyFn)
72-
self.write(BigtableTypedIO[K, T](btOpts(projectId, instanceId), tableId))(params)
70+
self.write(BigtableTypedIO[K, T](BTOptions(projectId, instanceId), tableId))(params)
7371
}
7472

7573
def saveAsBigtable(
@@ -81,7 +79,7 @@ final class BigtableTypedOps[K: Coder, T: BigtableType: Coder](
8179
timestamp: Long
8280
): ClosedTap[Nothing] = {
8381
val params = BigtableTypedIO.WriteParam[K](columnFamily, keyFn, timestamp)
84-
self.write(BigtableTypedIO[K, T](btOpts(projectId, instanceId), tableId))(params)
82+
self.write(BigtableTypedIO[K, T](BTOptions(projectId, instanceId), tableId))(params)
8583
}
8684

8785
def saveAsBigtable(

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigtable/syntax/ScioContextSyntax.scala

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import com.google.bigtable.v2._
2222
import com.google.cloud.bigtable.config.BigtableOptions
2323
import com.google.protobuf.ByteString
2424
import com.spotify.scio.ScioContext
25-
import com.spotify.scio.bigtable.{BigtableRead, BigtableTypedIO, BigtableUtil, TableAdmin}
25+
import com.spotify.scio.bigtable.{BTOptions, BigtableRead, BigtableTypedIO, BigtableUtil, TableAdmin}
2626
import com.spotify.scio.coders.Coder
2727
import com.spotify.scio.values.SCollection
2828
import magnolify.bigtable.BigtableType
@@ -40,17 +40,14 @@ object ScioContextOps {
4040
final class ScioContextOps(private val self: ScioContext) extends AnyVal {
4141
import ScioContextOps._
4242

43-
private def btOpts(projectId: String, instanceId: String): BigtableOptions =
44-
BigtableOptions.builder().setProjectId(projectId).setInstanceId(instanceId).build
45-
4643
def typedBigtable[K: Coder, T: BigtableType: Coder](
4744
projectId: String,
4845
instanceId: String,
4946
tableId: String,
5047
columnFamily: String,
5148
keyFn: ByteString => K
5249
): SCollection[(K, T)] =
53-
typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn)
50+
typedBigtable(BTOptions(projectId, instanceId), tableId, columnFamily, keyFn)
5451

5552
def typedBigtable[K: Coder, T: BigtableType: Coder](
5653
projectId: String,
@@ -60,7 +57,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
6057
keyFn: ByteString => K,
6158
keyRanges: Seq[ByteKeyRange]
6259
): SCollection[(K, T)] =
63-
typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges)
60+
typedBigtable(BTOptions(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges)
6461

6562
def typedBigtable[K: Coder, T: BigtableType: Coder](
6663
projectId: String,
@@ -71,7 +68,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
7168
keyRanges: Seq[ByteKeyRange],
7269
rowFilter: RowFilter
7370
): SCollection[(K, T)] =
74-
typedBigtable(btOpts(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges, rowFilter)
71+
typedBigtable(BTOptions(projectId, instanceId), tableId, columnFamily, keyFn, keyRanges, rowFilter)
7572

7673
def typedBigtable[K: Coder, T: BigtableType: Coder](
7774
projectId: String,
@@ -84,7 +81,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
8481
maxBufferElementCount: Option[Int]
8582
): SCollection[(K, T)] =
8683
typedBigtable(
87-
btOpts(projectId, instanceId),
84+
BTOptions(projectId, instanceId),
8885
tableId,
8986
columnFamily,
9087
keyFn,
@@ -115,7 +112,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
115112
keyRange: ByteKeyRange,
116113
rowFilter: RowFilter
117114
): SCollection[Row] =
118-
bigtable(btOpts(projectId, instanceId), tableId, Seq(keyRange), rowFilter)
115+
bigtable(BTOptions(projectId, instanceId), tableId, Seq(keyRange), rowFilter)
119116

120117
/** Get an SCollection for a Bigtable table. */
121118
def bigtable(
@@ -127,7 +124,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
127124
maxBufferElementCount: Option[Int]
128125
): SCollection[Row] =
129126
bigtable(
130-
btOpts(projectId, instanceId),
127+
BTOptions(projectId, instanceId),
131128
tableId,
132129
Seq(keyRange),
133130
rowFilter,
@@ -140,7 +137,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
140137
instanceId: String,
141138
tableId: String
142139
): SCollection[Row] =
143-
bigtable(btOpts(projectId, instanceId), tableId)
140+
bigtable(BTOptions(projectId, instanceId), tableId)
144141

145142
/** Get an SCollection for a Bigtable table. */
146143
def bigtable(
@@ -151,7 +148,7 @@ final class ScioContextOps(private val self: ScioContext) extends AnyVal {
151148
rowFilter: RowFilter,
152149
maxBufferElementCount: Option[Int]
153150
): SCollection[Row] =
154-
bigtable(btOpts(projectId, instanceId), tableId, keyRanges, rowFilter, maxBufferElementCount)
151+
bigtable(BTOptions(projectId, instanceId), tableId, keyRanges, rowFilter, maxBufferElementCount)
155152

156153
/** Get an SCollection for a Bigtable table. */
157154
def bigtable(

0 commit comments

Comments
 (0)