Skip to content

Commit cfd83f4

Browse files
author
Michel Davit
committed
Reduce code breaking change
1 parent 09f972e commit cfd83f4

File tree

9 files changed

+40
-37
lines changed

9 files changed

+40
-37
lines changed

Diff for: integration/src/test/scala/com/spotify/scio/bigquery/BigQueryClientIT.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class BigQueryClientIT extends AnyFlatSpec with Matchers {
154154

155155
"TableService.getRows" should "work" in {
156156
val rows =
157-
bq.tables.rows(Table("bigquery-public-data:samples.shakespeare")).take(10).toList
157+
bq.tables.rows(Table.Spec("bigquery-public-data:samples.shakespeare")).take(10).toList
158158
val columns = Set("word", "word_count", "corpus", "corpus_date")
159159
all(rows.map(_.keySet().asScala)) shouldBe columns
160160
}

Diff for: integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ object TypedBigQueryIT {
9898
val now = Instant.now().toString(TIME_FORMATTER)
9999
val spec =
100100
s"data-integration-test:bigquery_avro_it.$name${now}_${Random.nextInt(Int.MaxValue)}"
101-
Table(spec)
101+
Table.Spec(spec)
102102
}
103103
private val typedTable = table("records")
104104
private val tableRowTable = table("records_tablerow")

Diff for: integration/src/test/scala/com/spotify/scio/bigquery/types/BigQueryTypeIT.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,10 @@ class BigQueryTypeIT extends AnyFlatSpec with Matchers {
187187
tableReference.setProjectId("data-integration-test")
188188
tableReference.setDatasetId("partition_a")
189189
tableReference.setTableId("table_$LATEST")
190-
Table(tableReference).latest().ref.getTableId shouldBe "table_20170302"
190+
Table.Ref(tableReference).latest().ref.getTableId shouldBe "table_20170302"
191191

192-
Table("data-integration-test:partition_a.table_$LATEST")
192+
Table
193+
.Spec("data-integration-test:partition_a.table_$LATEST")
193194
.latest()
194195
.ref
195196
.getTableId shouldBe "table_20170302"

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryIO.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,13 @@ object BigQueryIO {
143143
val selectedFields = bqt.selectedFields
144144
val rowRestriction = bqt.rowRestriction
145145
if (selectedFields.isEmpty && rowRestriction.isEmpty) {
146-
Table(bqt.table.get)
146+
Table.Spec(bqt.table.get)
147147
} else {
148148
val filter = Table.Filter(selectedFields.getOrElse(Nil), rowRestriction)
149-
Table(bqt.table.get, filter)
149+
Table.Spec(bqt.table.get, filter)
150150
}
151151
} else {
152-
Table(bqt.table.get)
152+
Table.Spec(bqt.table.get)
153153
}
154154
BigQueryIO(source)
155155
}

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/BigQueryTypes.scala

+24-22
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,16 @@ import org.joda.time.format.{DateTimeFormat, DateTimeFormatterBuilder}
3232

3333
import java.math.MathContext
3434
import java.nio.ByteBuffer
35+
import scala.annotation.unused
3536
import scala.jdk.CollectionConverters._
3637

3738
sealed trait Source {
38-
protected type Impl <: Source
39-
def latest(bq: BigQuery): Impl
40-
def latest(): Impl = latest(BigQuery.defaultInstance())
39+
def latest(bq: BigQuery): Source
40+
def latest(): Source = latest(BigQuery.defaultInstance())
4141
}
4242

4343
/** A wrapper type [[Query]] which wraps a SQL String. */
4444
final case class Query(underlying: String) extends Source {
45-
override protected type Impl = Query
4645

4746
/**
4847
* A helper method to replace the "$LATEST" placeholder in query to the latest common partition.
@@ -73,21 +72,22 @@ final case class Query(underlying: String) extends Source {
7372
}
7473

7574
/**
76-
* Bigquery [[Table]]. Tables can be referenced by a table spec `String` or by a table reference
77-
* [[GTableReference]]. An additional [[Table.Filter]] can be given to specify selected fields and
78-
* row restrictions when used with the BQ storage read API.
75+
* Bigquery [[Table]] abstracts the multiple ways of referencing Bigquery tables. Tables can be
76+
* referenced by a table spec `String` or by a table reference [[GTableReference]]. An additional
77+
* [[Table.Filter]] can be given to specify selected fields and row restrictions when used with the
78+
* BQ storage read API.
7979
*
8080
* Example: Create a [[Table]] from a [[GTableReference]]:
8181
* {{{
8282
* val tableReference = new TableReference
8383
* tableReference.setProjectId("bigquery-public-data")
8484
* tableReference.setDatasetId("samples")
8585
* tableReference.setTableId("shakespeare")
86-
* val table = Table(tableReference)
86+
* val table = Table.Ref(tableReference)
8787
* }}}
8888
* or with a spec string with filtering:
8989
* {{{
90-
* val table = Table(
90+
* val table = Table.Spec(
9191
* "bigquery-public-data:samples.shakespeare",
9292
* List("word", "word_count"),
9393
* "word_count > 10"
@@ -101,9 +101,8 @@ final case class Query(underlying: String) extends Source {
101101
* }}}
102102
*/
103103
case class Table private (ref: GTableReference, filter: Option[Table.Filter]) extends Source {
104-
override protected type Impl = Table
105104
lazy val spec: String = BigQueryHelpers.toTableSpec(ref)
106-
def latest(bq: BigQuery): Table = {
105+
override def latest(bq: BigQuery): Source = {
107106
val latestSpec = BigQueryPartitionUtil.latestTable(bq, spec)
108107
val latestRef = BigQueryHelpers.parseTableSpec(latestSpec)
109108
copy(latestRef)
@@ -151,40 +150,43 @@ object Table {
151150
rowRestriction: Option[String]
152151
)
153152

154-
def apply(ref: GTableReference): Table =
153+
@unused private def apply(ref: GTableReference, filter: Option[Table.Filter]): Table =
154+
new Table(ref, filter)
155+
156+
def Ref(ref: GTableReference): Table =
155157
new Table(ref, None)
156158

157-
def apply(ref: GTableReference, selectedFields: List[String]): Table =
159+
def Ref(ref: GTableReference, selectedFields: List[String]): Table =
158160
new Table(ref, Some(Filter(selectedFields, None)))
159161

160-
def apply(ref: GTableReference, rowRestriction: String): Table =
162+
def Ref(ref: GTableReference, rowRestriction: String): Table =
161163
new Table(ref, Some(Filter(List.empty, Some(rowRestriction))))
162164

163-
def apply(ref: GTableReference, selectedFields: List[String], rowRestriction: String): Table =
165+
def Ref(ref: GTableReference, selectedFields: List[String], rowRestriction: String): Table =
164166
new Table(ref, Some(Filter(selectedFields, Some(rowRestriction))))
165167

166-
def apply(ref: GTableReference, filter: Table.Filter): Table =
168+
def Ref(ref: GTableReference, filter: Table.Filter): Table =
167169
new Table(ref, Some(filter))
168170

169-
def apply(spec: String): Table =
171+
def Spec(spec: String): Table =
170172
new Table(BigQueryHelpers.parseTableSpec(spec), None)
171173

172-
def apply(spec: String, selectedFields: List[String]): Table =
174+
def Spec(spec: String, selectedFields: List[String]): Table =
173175
new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(selectedFields, None)))
174176

175-
def apply(spec: String, rowRestriction: String): Table =
177+
def Spec(spec: String, rowRestriction: String): Table =
176178
new Table(BigQueryHelpers.parseTableSpec(spec), Some(Filter(List.empty, Some(rowRestriction))))
177179

178-
def apply(spec: String, selectedFields: List[String], rowRestriction: String): Table =
180+
def Spec(spec: String, selectedFields: List[String], rowRestriction: String): Table =
179181
new Table(
180182
BigQueryHelpers.parseTableSpec(spec),
181183
Some(Filter(selectedFields, Some(rowRestriction)))
182184
)
183185

184-
def apply(spec: String, filter: Table.Filter): Table =
186+
def Spec(spec: String, filter: Table.Filter): Table =
185187
new Table(BigQueryHelpers.parseTableSpec(spec), Some(filter))
186188

187-
def apply(spec: String, filter: Option[Table.Filter]): Table =
189+
def Spec(spec: String, filter: Option[Table.Filter]): Table =
188190
new Table(BigQueryHelpers.parseTableSpec(spec), filter)
189191
}
190192

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/MockBigQuery.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class MockTable(
181181
*/
182182
def withSample(numRows: Int): Unit = {
183183
ensureUnique()
184-
val rows = bq.tables.rows(Table(original)).take(numRows).toList
184+
val rows = bq.tables.rows(Table.Ref(original)).take(numRows).toList
185185
require(rows.length == numRows, s"Sample size ${rows.length} != requested $numRows")
186186
writeRows(rows)
187187
()
@@ -193,7 +193,7 @@ class MockTable(
193193
*/
194194
def withSample(minNumRows: Int, maxNumRows: Int): Unit = {
195195
ensureUnique()
196-
val rows = bq.tables.rows(Table(original)).take(maxNumRows).toList
196+
val rows = bq.tables.rows(Table.Ref(original)).take(maxNumRows).toList
197197
require(
198198
rows.length >= minNumRows && rows.length <= maxNumRows,
199199
s"Sample size ${rows.length} < requested minimal $minNumRows"

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/BigQuery.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ final class BigQuery private (val client: Client) {
8989
val rows = if (newSource == null) {
9090
// newSource is missing, T's companion object must have either table or query
9191
if (bqt.isTable) {
92-
tables.rows(STable(bqt.table.get))
92+
tables.rows(STable.Spec(bqt.table.get))
9393
} else if (bqt.isQuery) {
9494
query.rows(bqt.queryRaw.get)
9595
} else {
@@ -98,7 +98,7 @@ final class BigQuery private (val client: Client) {
9898
} else {
9999
// newSource can be either table or query
100100
Try(BigQueryHelpers.parseTableSpec(newSource)).toOption
101-
.map(STable.apply)
101+
.map(STable.Ref)
102102
.map(tables.rows)
103103
.getOrElse(query.rows(newSource))
104104
}

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/client/QueryOps.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ final private[client] class QueryOps(client: Client, tableService: TableOps, job
111111

112112
newQueryJob(config).map { job =>
113113
jobService.waitForJobs(job)
114-
tableService.rows(STable(job.table))
114+
tableService.rows(STable.Ref(job.table))
115115
}.get
116116
}
117117

Diff for: scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/taps.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ final case class BigQueryTaps(self: Taps) {
7979
s"BigQuery Table: $table",
8080
() => bqc.tables.exists(table),
8181
() =>
82-
BigQueryIO[TableRow](Table(table))
82+
BigQueryIO[TableRow](Table.Ref(table))
8383
.tap(BigQueryIO.TableReadParam(BigQueryIO.Format.Default(), Method.DEFAULT))
8484
)
8585

@@ -131,7 +131,7 @@ final case class BigQueryTaps(self: Taps) {
131131
val selectedFields = readOptions.getSelectedFieldsList.asScala.toList
132132
val rowRestriction = Option(readOptions.getRowRestriction)
133133
val filter = Table.Filter(selectedFields, rowRestriction)
134-
val source = Table(table, filter)
134+
val source = Table.Ref(table, filter)
135135
BigQueryIO[TableRow](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ))
136136
}
137137
)
@@ -148,7 +148,7 @@ final case class BigQueryTaps(self: Taps) {
148148
val selectedFields = readOptions.getSelectedFieldsList.asScala.toList
149149
val rowRestriction = Option(readOptions.getRowRestriction)
150150
val filter = Table.Filter(selectedFields, rowRestriction)
151-
val source = Table(table, filter)
151+
val source = Table.Ref(table, filter)
152152
BigQueryIO[T](source).tap(BigQueryIO.TableReadParam(format, Method.DIRECT_READ))
153153
}
154154
)

0 commit comments

Comments
 (0)