Skip to content

Commit 24fa35c

Browse files
Fix all referenced tables (#348)
* Refactor allReferencedTables so it no longer uses allReferencedAsPartitions. It wipes partition type from tables if there is no outerRef. * Refactor allReferencedAsPartitions to also expand views as TableRefs * Don't expand and exclude views in ZetaSql
1 parent 5d50245 commit 24fa35c

File tree

3 files changed

+42
-33
lines changed

3 files changed

+42
-33
lines changed

core/src/main/scala/no/nrk/bigquery/BQSqlFrag.scala

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
package no.nrk.bigquery
88

99
import cats.syntax.all.*
10-
import no.nrk.bigquery.syntax.*
1110
import no.nrk.bigquery.BQSqlFrag.asSubQuery
11+
import no.nrk.bigquery.syntax.*
1212

1313
import scala.annotation.tailrec
1414

@@ -127,43 +127,49 @@ sealed trait BQSqlFrag {
127127
final def allReferencedAsPartitions: Seq[BQPartitionId[Any]] =
128128
allReferencedAsPartitions(expandAndExcludeViews = true)
129129
final def allReferencedAsPartitions(expandAndExcludeViews: Boolean): Seq[BQPartitionId[Any]] = {
130-
def pf(outerRef: Option[BQPartitionId[Any]]): PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
131-
case BQSqlFrag.PartitionRef(partitionRef) =>
132-
partitionRef.wholeTable match {
133-
case tableDef: BQTableDef.View[?] if expandAndExcludeViews =>
134-
tableDef.query.collect(pf(Some(partitionRef))).flatten
135-
case tvf: BQAppliedTableValuedFunction[?] if expandAndExcludeViews =>
136-
tvf.query.collect(pf(Some(partitionRef))).flatten
137-
case _ => List(partitionRef)
138-
}
139-
140-
case BQSqlFrag.TableRef(table) =>
141-
(table.partitionType, outerRef) match {
142-
case (partitionType: BQPartitionType.DatePartitioned, Some(partitionRef: BQPartitionId.DatePartitioned)) =>
143-
List(table.withTableType(partitionType).assertPartition(partitionRef.partition))
144-
case (partitionType: BQPartitionType.HourPartitioned, Some(partitionRef: BQPartitionId.HourPartitioned)) =>
145-
List(table.withTableType(partitionType).assertPartition(partitionRef.partition))
146-
case (_, _) => List(table.unpartitioned.assertPartition)
147-
}
130+
def expandAndApplyOuterRef(
131+
table: BQTableLike[Any],
132+
outerRef: Option[BQPartitionId[Any]]): List[BQPartitionId[Any]] =
133+
table match {
134+
case tableDef: BQTableDef.View[?] if expandAndExcludeViews => tableDef.query.collect(pf(outerRef)).flatten
135+
case tvf: BQAppliedTableValuedFunction[?] if expandAndExcludeViews => tvf.query.collect(pf(outerRef)).flatten
136+
case _ =>
137+
(table.partitionType, outerRef) match {
138+
case (partitionType: BQPartitionType.DatePartitioned, Some(partitionRef: BQPartitionId.DatePartitioned)) =>
139+
List(table.withTableType(partitionType).assertPartition(partitionRef.partition))
140+
case (partitionType: BQPartitionType.HourPartitioned, Some(partitionRef: BQPartitionId.HourPartitioned)) =>
141+
List(table.withTableType(partitionType).assertPartition(partitionRef.partition))
142+
case (_, _) => List(table.unpartitioned.assertPartition)
143+
}
144+
}
148145

146+
def pf(outerRef: Option[BQPartitionId[Any]]): PartialFunction[BQSqlFrag, List[BQPartitionId[Any]]] = {
147+
case BQSqlFrag.PartitionRef(partitionRef) => expandAndApplyOuterRef(partitionRef.wholeTable, Some(partitionRef))
148+
case BQSqlFrag.TableRef(table) => expandAndApplyOuterRef(table, outerRef)
149149
case BQSqlFrag.FillRef(fill) => List(fill.destination)
150-
case BQSqlFrag.FilledTableRef(fill) =>
151-
(fill.tableDef.partitionType, outerRef) match {
152-
case (partitionType: BQPartitionType.DatePartitioned, Some(partitionRef: BQPartitionId.DatePartitioned)) =>
153-
List(fill.tableDef.withTableType(partitionType).assertPartition(partitionRef.partition))
154-
case (partitionType: BQPartitionType.HourPartitioned, Some(partitionRef: BQPartitionId.HourPartitioned)) =>
155-
List(fill.tableDef.withTableType(partitionType).assertPartition(partitionRef.partition))
156-
case (_, _) => List(fill.tableDef.unpartitioned.assertPartition)
157-
}
150+
case BQSqlFrag.FilledTableRef(fill) => expandAndApplyOuterRef(fill.tableDef, outerRef)
158151
}
159152

160153
this.collect(pf(None)).flatten.distinct
161154
}
162155

163-
final def allReferencedTables: Seq[BQTableLike[Any]] =
164-
allReferencedAsPartitions
165-
.map(_.wholeTable)
166-
.filterNot(tableLike => tableLike.isInstanceOf[BQTableDef.View[?]])
156+
final def allReferencedTables(expandAndExcludeViews: Boolean): Seq[BQTableLike[Any]] = {
157+
def expand(table: BQTableLike[Any]): List[BQTableLike[Any]] =
158+
table match {
159+
case tableDef: BQTableDef.View[?] if expandAndExcludeViews => tableDef.query.collect(pf).flatten
160+
case tvf: BQAppliedTableValuedFunction[?] if expandAndExcludeViews => tvf.query.collect(pf).flatten
161+
case _ => List(table)
162+
}
163+
164+
def pf: PartialFunction[BQSqlFrag, List[BQTableLike[Any]]] = {
165+
case BQSqlFrag.PartitionRef(partitionRef) => expand(partitionRef.wholeTable)
166+
case BQSqlFrag.TableRef(table) => expand(table)
167+
case BQSqlFrag.FillRef(fill) => List(fill.destination.wholeTable)
168+
case BQSqlFrag.FilledTableRef(fill) => List(fill.tableDef)
169+
}
170+
this.collect(pf).flatten.distinct
171+
}
172+
final def allReferencedTables: Seq[BQTableLike[Any]] = allReferencedTables(expandAndExcludeViews = true)
167173

168174
final def allReferencedTablesAsPartitions: Seq[BQPartitionId[Any]] =
169175
allReferencedAsPartitions(expandAndExcludeViews = true)

zetasql/src/main/scala/no/nrk/bigquery/ZetaSql.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class ZetaSql[F[_]](implicit F: Sync[F]) {
170170

171171
def analyzeFirst(frag: BQSqlFrag): F[Either[AnalysisException, AnalyzedStatement]] =
172172
F.interruptible {
173-
val tables = frag.allReferencedTables
173+
val tables = frag.allReferencedTables(expandAndExcludeViews = false)
174174
val catalog = toCatalog(tables*)
175175
val rendered = frag.asString
176176

zetasql/src/test/scala/no/nrk/bigquery/ZetaTest.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,10 @@ class ZetaTest extends munit.CatsEffectSuite {
144144

145145
zetaSql
146146
.parseAndBuildAnalysableFragment(query, List(table), List(tvf))
147-
.flatMap(fragment => zetaSql.queryFields(fragment).tupleRight(fragment.allReferencedTables.map(_.tableId)))
147+
.flatMap(fragment =>
148+
zetaSql
149+
.queryFields(fragment)
150+
.tupleRight(fragment.allReferencedTables(expandAndExcludeViews = false).map(_.tableId)))
148151
.assertEquals(expected -> List(BQTableId(tvf.name.dataset, tvf.name.name.value)))
149152
}
150153

0 commit comments

Comments
 (0)