Skip to content

Commit 4db1ad2

Browse files
committed
read from parsedStats in loadActions
1 parent 416b2a1 commit 4db1ad2

File tree

7 files changed

+738
-12
lines changed

7 files changed

+738
-12
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/CheckpointProvider.scala

Lines changed: 124 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ trait CheckpointProvider extends UninitializedCheckpointProvider {
8181
* This is only intended to be used for logging and metrics.
8282
*/
8383
def checkpointPolicy: Option[CheckpointPolicy.Policy]
84+
85+
/**
86+
* List of different file indexes and corresponding schemas which could help derive full
87+
* state-reconstruction for the checkpoint.
88+
* Different FileIndexes could have different schemas depending on `stats_parsed` / `stats`
89+
* columns in the underlying file(s).
90+
*/
91+
def allActionsFileIndexesAndSchemas(
92+
spark: SparkSession, deltaLog: DeltaLog): Seq[(DeltaLogFileIndex, StructType)]
8493
}
8594

8695
object CheckpointProvider extends DeltaLogging {
@@ -112,7 +121,11 @@ object CheckpointProvider extends DeltaLogging {
112121
s"has no CheckpointMetadata action")
113122
}
114123
require(isV2CheckpointEnabled(snapshotDescriptor.protocol))
115-
V2CheckpointProvider(uninitializedV2CheckpointProvider, checkpointMetadata, sidecarFiles)
124+
V2CheckpointProvider(
125+
uninitializedV2CheckpointProvider,
126+
checkpointMetadata,
127+
sidecarFiles,
128+
snapshotDescriptor.deltaLog)
116129
}
117130
}
118131
case provider: UninitializedV1OrV2ParquetCheckpointProvider
@@ -136,7 +149,7 @@ object CheckpointProvider extends DeltaLogging {
136149
checkpointMetadataOpt match {
137150
case Some(cm) =>
138151
require(isV2CheckpointEnabled(snapshotDescriptor))
139-
V2CheckpointProvider(provider, cm, sidecarFiles)
152+
V2CheckpointProvider(provider, cm, sidecarFiles, snapshotDescriptor.deltaLog)
140153
case None =>
141154
PreloadedCheckpointProvider(provider.topLevelFiles, provider.lastCheckpointInfoOpt)
142155
}
@@ -166,6 +179,24 @@ object CheckpointProvider extends DeltaLogging {
166179
checksumOpt.flatMap(checksum => Option(checksum.protocol)).map(isV2CheckpointEnabled)
167180
}
168181

182+
private[delta] def getParquetSchema(
183+
spark: SparkSession,
184+
deltaLog: DeltaLog,
185+
parquetFile: FileStatus,
186+
schemaFromLastCheckpoint: Option[StructType]): StructType = {
187+
// Try to get the checkpoint schema from the last_checkpoint.
188+
// If it is not there then get it from filesystem by doing I/O.
189+
val fetchChkSchemaFromLastCheckpoint = spark.sessionState.conf.getConf(
190+
DeltaSQLConf.USE_CHECKPOINT_SCHEMA_FROM_CHECKPOINT_METADATA)
191+
schemaFromLastCheckpoint match {
192+
case Some(schema) if fetchChkSchemaFromLastCheckpoint => schema
193+
case _ =>
194+
recordDeltaOperation(deltaLog, "snapshot.checkpointSchema.fromFileSystem") {
195+
Snapshot.getParquetFileSchemaAndRowCount(spark, deltaLog, parquetFile)._1
196+
}
197+
}
198+
}
199+
169200
private def sendEventForV2CheckpointRead(
170201
startTimeMs: Long,
171202
fileStatus: FileStatus,
@@ -294,6 +325,20 @@ case class PreloadedCheckpointProvider(
294325
override lazy val topLevelFileIndex: Option[DeltaLogFileIndex] = Some(fileIndex)
295326

296327
override def checkpointPolicy: Option[CheckpointPolicy.Policy] = Some(CheckpointPolicy.Classic)
328+
329+
override def allActionsFileIndexesAndSchemas(
330+
spark: SparkSession, deltaLog: DeltaLog): Seq[(DeltaLogFileIndex, StructType)] = {
331+
Seq((fileIndex, checkpointSchema(spark, deltaLog)))
332+
}
333+
334+
private val checkpointSchemaWithCaching = new LazyCheckpointSchemaGetter {
335+
override def fileStatus: FileStatus = topLevelFiles.head
336+
override def schemaFromLastCheckpoint: Option[StructType] =
337+
lastCheckpointInfoOpt.flatMap(_.checkpointSchema)
338+
}
339+
private def checkpointSchema(spark: SparkSession, deltaLog: DeltaLog): StructType =
340+
checkpointSchemaWithCaching.get(spark, deltaLog)
341+
297342
}
298343

299344
/**
@@ -312,6 +357,8 @@ object EmptyCheckpointProvider extends CheckpointProvider {
312357
override def allActionsFileIndexes(): Seq[DeltaLogFileIndex] = Nil
313358
override def topLevelFileIndex: Option[DeltaLogFileIndex] = None
314359
override def checkpointPolicy: Option[CheckpointPolicy.Policy] = None
360+
override def allActionsFileIndexesAndSchemas(
361+
spark: SparkSession, deltaLog: DeltaLog): Seq[(DeltaLogFileIndex, StructType)] = Nil
315362
}
316363

317364
/** A trait representing a v2 [[UninitializedCheckpointProvider]] */
@@ -426,6 +473,11 @@ abstract class LazyCompleteCheckpointProvider(
426473

427474
override def checkpointPolicy: Option[CheckpointPolicy.Policy] =
428475
underlyingCheckpointProvider.checkpointPolicy
476+
477+
override def allActionsFileIndexesAndSchemas(
478+
spark: SparkSession, deltaLog: DeltaLog): Seq[(DeltaLogFileIndex, StructType)] = {
479+
underlyingCheckpointProvider.allActionsFileIndexesAndSchemas(spark, deltaLog)
480+
}
429481
}
430482

431483
/**
@@ -438,6 +490,8 @@ abstract class LazyCompleteCheckpointProvider(
438490
* @param sidecarFiles seq of [[SidecarFile]] for the v2 checkpoint
439491
* @param lastCheckpointInfoOpt optional last checkpoint info for the v2 checkpoint
440492
* @param logPath delta log path for the underlying delta table
493+
* @param sidecarSchemaFetcher function to fetch sidecar schema.
494+
* Returns None if there are no sidecar files.
441495
*/
442496
case class V2CheckpointProvider(
443497
override val version: Long,
@@ -446,7 +500,8 @@ case class V2CheckpointProvider(
446500
checkpointMetadata: CheckpointMetadata,
447501
sidecarFiles: Seq[SidecarFile],
448502
lastCheckpointInfoOpt: Option[LastCheckpointInfo],
449-
logPath: Path
503+
logPath: Path,
504+
sidecarSchemaFetcher: () => Option[StructType]
450505
) extends CheckpointProvider with DeltaLogging {
451506

452507
private[delta] def sidecarFileStatuses: Seq[FileStatus] =
@@ -473,22 +528,85 @@ case class V2CheckpointProvider(
473528

474529
override def checkpointPolicy: Option[CheckpointPolicy.Policy] = Some(CheckpointPolicy.V2)
475530

531+
private val v2SchemaWithCaching = new LazyCheckpointSchemaGetter {
532+
override def fileStatus: FileStatus = v2CheckpointFile
533+
override def schemaFromLastCheckpoint: Option[StructType] =
534+
lastCheckpointInfoOpt.flatMap(_.checkpointSchema)
535+
}
536+
537+
protected def schemaForV2Checkpoint(
538+
spark: SparkSession, deltaLog: DeltaLog): StructType = {
539+
if (v2CheckpointFormat != V2Checkpoint.Format.PARQUET) {
540+
return Action.logSchema
541+
}
542+
v2SchemaWithCaching.get(spark, deltaLog)
543+
}
544+
545+
protected def schemaForSidecarFile(spark: SparkSession, deltaLog: DeltaLog): StructType = {
546+
sidecarSchemaFetcher()
547+
.getOrElse {
548+
throw DeltaErrors.assertionFailedError("Sidecar schema asked without any sidecar files")
549+
}
550+
}
551+
552+
override def allActionsFileIndexesAndSchemas(
553+
spark: SparkSession, deltaLog: DeltaLog): Seq[(DeltaLogFileIndex, StructType)] = {
554+
(fileIndexForV2Checkpoint, schemaForV2Checkpoint(spark, deltaLog)) +:
555+
fileIndexesForSidecarFiles.map((_, schemaForSidecarFile(spark, deltaLog)))
556+
}
476557
}
477558

478559
object V2CheckpointProvider {
479-
480560
/** Alternate constructor which uses [[UninitializedV2LikeCheckpointProvider]] */
481561
def apply(
482562
uninitializedV2LikeCheckpointProvider: UninitializedV2LikeCheckpointProvider,
483563
checkpointMetadata: CheckpointMetadata,
484-
sidecarFiles: Seq[SidecarFile]): V2CheckpointProvider = {
564+
sidecarFiles: Seq[SidecarFile],
565+
deltaLog: DeltaLog): V2CheckpointProvider = {
566+
def getSidecarSchemaFetcher: () => Option[StructType] = {
567+
val nonFateSharingSidecarSchemaFuture: NonFateSharingFuture[Option[StructType]] = {
568+
checkpointV2ThreadPool.submitNonFateSharing { spark: SparkSession =>
569+
sidecarFiles.headOption.map { sidecarFile =>
570+
val sidecarFileStatus =
571+
sidecarFile.toFileStatus(uninitializedV2LikeCheckpointProvider.logPath)
572+
CheckpointProvider.getParquetSchema(
573+
spark, deltaLog, sidecarFileStatus, schemaFromLastCheckpoint = None)
574+
}
575+
}
576+
}
577+
() => nonFateSharingSidecarSchemaFuture.get(Duration.Inf)
578+
}
485579
V2CheckpointProvider(
486580
uninitializedV2LikeCheckpointProvider.version,
487581
uninitializedV2LikeCheckpointProvider.fileStatus,
488582
uninitializedV2LikeCheckpointProvider.v2CheckpointFormat,
489583
checkpointMetadata,
490584
sidecarFiles,
491585
uninitializedV2LikeCheckpointProvider.lastCheckpointInfoOpt,
492-
uninitializedV2LikeCheckpointProvider.logPath)
586+
uninitializedV2LikeCheckpointProvider.logPath,
587+
getSidecarSchemaFetcher
588+
)
493589
}
494590
}
591+
592+
abstract class LazyCheckpointSchemaGetter {
593+
protected def fileStatus: FileStatus
594+
protected def schemaFromLastCheckpoint: Option[StructType]
595+
596+
private var lazySchema = Option.empty[StructType]
597+
598+
def get(spark: SparkSession, deltaLog: DeltaLog): StructType = {
599+
lazySchema.getOrElse {
600+
this.synchronized {
601+
// re-check with lock held, in case of races with other initializers
602+
if (lazySchema.isEmpty) {
603+
lazySchema = Some(CheckpointProvider.getParquetSchema(
604+
spark, deltaLog, fileStatus, schemaFromLastCheckpoint))
605+
}
606+
lazySchema.get
607+
}
608+
}
609+
}
610+
611+
def getIfKnown: Option[StructType] = lazySchema
612+
}

spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaLogGroupingIte
3535
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
3636
import org.apache.spark.sql.delta.util.FileNames._
3737
import org.apache.spark.sql.delta.util.JsonUtils
38+
import org.apache.spark.sql.util.ScalaExtensions._
3839
import org.apache.hadoop.conf.Configuration
3940
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
4041
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
@@ -1196,6 +1197,7 @@ object Checkpoints
11961197
val partitionValues = Checkpoints.extractPartitionValues(
11971198
snapshot.metadata.partitionSchema, "add.partitionValues")
11981199
additionalCols ++= partitionValues
1200+
additionalCols ++= Checkpoints.extractStats(snapshot.statsSchema, "add.stats")
11991201
}
12001202
state.withColumn("add",
12011203
when(col("add").isNotNull, struct(Seq(
@@ -1215,7 +1217,8 @@ object Checkpoints
12151217
}
12161218

12171219
def shouldWriteStatsAsStruct(conf: SQLConf, snapshot: Snapshot): Boolean = {
1218-
DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT.fromMetaData(snapshot.metadata)
1220+
DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT.fromMetaData(snapshot.metadata) &&
1221+
!conf.getConf(DeltaSQLConf.STATS_AS_STRUCT_IN_CHECKPOINT_FORCE_DISABLED).getOrElse(false)
12191222
}
12201223

12211224
def shouldWriteStatsAsJson(snapshot: Snapshot): Boolean = {
@@ -1247,6 +1250,17 @@ object Checkpoints
12471250
None
12481251
} else Some(struct(partitionValues: _*).as(STRUCT_PARTITIONS_COL_NAME))
12491252
}
1253+
// This method can be overridden in tests to create a checkpoint with parsed stats.
1254+
def includeStatsParsedInCheckpoint(): Boolean = true
1255+
1256+
/** Parse the stats from JSON and keep as a struct field when available. */
1257+
def extractStats(statsSchema: StructType, statsColName: String): Option[Column] = {
1258+
import org.apache.spark.sql.functions.from_json
1259+
Option.when(includeStatsParsedInCheckpoint() && statsSchema.nonEmpty) {
1260+
from_json(col(statsColName), statsSchema, DeltaFileProviderUtils.jsonStatsParseOption)
1261+
.as(Checkpoints.STRUCT_STATS_COL_NAME)
1262+
}
1263+
}
12501264
}
12511265

12521266
object V2Checkpoint {

spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
1919
// scalastyle:off import.ordering.noEmptyLine
2020
import java.util.{Locale, TimeZone}
2121

22+
import scala.collection.JavaConverters._
2223
import scala.collection.mutable
2324

2425
import org.apache.spark.sql.delta.actions._
@@ -38,10 +39,14 @@ import org.apache.spark.sql.delta.util.StateCache
3839
import org.apache.spark.sql.util.ScalaExtensions._
3940
import io.delta.storage.commit.CommitCoordinatorClient
4041
import org.apache.hadoop.fs.{FileStatus, Path}
42+
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
43+
import org.apache.parquet.hadoop.Footer
44+
import org.apache.parquet.hadoop.ParquetFileReader
4145

4246
import org.apache.spark.internal.{MDC, MessageWithContext}
4347
import org.apache.spark.sql._
4448
import org.apache.spark.sql.catalyst.catalog.CatalogTable
49+
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter}
4550
import org.apache.spark.sql.functions._
4651
import org.apache.spark.sql.types.StructType
4752
import org.apache.spark.util.Utils
@@ -528,11 +533,51 @@ class Snapshot(
528533
* when sorted in ascending order, will order older actions before newer ones, as required by
529534
* [[InMemoryLogReplay]]); and [[ADD_STATS_TO_USE_COL_NAME]] (to handle certain combinations of
530535
* config settings for delta.checkpoint.writeStatsAsJson and delta.checkpoint.writeStatsAsStruct).
536+
* When we see a V2 checkpoint without the old stats column, but the stats_parsed column, we
537+
* json encode the stats_parsed column back as "stats" again. This is a temporary correctness
538+
* hack.
531539
*/
532540
protected def loadActions: DataFrame = {
533-
fileIndices.map(deltaLog.loadIndex(_))
534-
.reduceOption(_.union(_)).getOrElse(emptyDF)
535-
.withColumn(ADD_STATS_TO_USE_COL_NAME, col("add.stats"))
541+
if (fileIndices.isEmpty) return emptyDF
542+
543+
// Augment the schema with a NullType add.stats_parsed column, as a place-holder for
544+
// compatibility with the checkpoint parquet. Both deltas and checkpoints generally use this
545+
// schema. HOWEVER, IF (and only if) a checkpoint actually exists, AND it provides an
546+
// add.stats_parsed column AND it lacks an add.stats column, THEN (and only then) the checkpoint
547+
// DF includes the actual add.stats_parsed column -- not a NullType placeholder -- from which we
548+
// generate the add_stats_to_use column (add.stats is unused in that case). Meanwhile, JSON
549+
// deltas always map add.stats to add_stats_to_use, and always use the placeholder.
550+
val logSchemaToUse = Action.logSchema
551+
val jsonStatsCol = col("add.stats")
552+
val deltas = deltaFileIndexOpt.map(deltaLog.loadIndex(_, logSchemaToUse))
553+
.map(_.withColumn(ADD_STATS_TO_USE_COL_NAME, jsonStatsCol))
554+
555+
val checkpointDataframes = checkpointProvider
556+
.allActionsFileIndexesAndSchemas(spark, deltaLog)
557+
.map { case (index, schema) =>
558+
val addSchema = schema("add").dataType.asInstanceOf[StructType]
559+
val (checkpointSchemaToUse, checkpointStatsColToUse) =
560+
if (addSchema.exists(_.name == "stats_parsed") && !addSchema.exists(_.name == "stats")) {
561+
val checkpointSchemaToUse =
562+
Action.logSchemaWithAddStatsParsed(addSchema("stats_parsed"))
563+
(
564+
checkpointSchemaToUse,
565+
to_json(
566+
col("add.stats_parsed")
567+
)
568+
)
569+
} else {
570+
// Normal (JSON-like) schema suffices
571+
(logSchemaToUse, jsonStatsCol)
572+
}
573+
574+
// For schema compat, make sure to discard add.stats_parsed (if present)
575+
deltaLog.loadIndex(index, checkpointSchemaToUse)
576+
.withColumn(COMMIT_VERSION_COLUMN, lit(checkpointProvider.version))
577+
.withColumn(ADD_STATS_TO_USE_COL_NAME, checkpointStatsColToUse)
578+
.withColumn("add", col("add").dropFields("stats_parsed"))
579+
}
580+
(checkpointDataframes ++ deltas).reduce(_.union(_))
536581
}
537582

538583
/**
@@ -817,6 +862,30 @@ object Snapshot extends DeltaLogging {
817862
base
818863
}
819864
}
865+
866+
/**
867+
* Gets the schema of a single parquet file by reading its footer. Code here is copied from
868+
* ParquetFileFormat.
869+
*/
870+
private[delta] def getParquetFileSchemaAndRowCount(
871+
spark: SparkSession,
872+
deltaLog: DeltaLog,
873+
file: FileStatus): (StructType, Long) = {
874+
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
875+
val converter = new ParquetToSparkSchemaConverter(
876+
assumeBinaryIsString = spark.sessionState.conf.isParquetBinaryAsString,
877+
assumeInt96IsTimestamp = spark.sessionState.conf.isParquetINT96AsTimestamp)
878+
879+
val conf = deltaLog.newDeltaHadoopConf()
880+
881+
val parquetMetadata = {
882+
ParquetFileReader.readFooter(deltaLog.newDeltaHadoopConf(), file.getPath)
883+
}
884+
val rowCount = parquetMetadata.getBlocks.asScala.map(_.getRowCount).sum
885+
886+
val footer = new Footer(file.getPath(), parquetMetadata)
887+
(ParquetFileFormat.readSchemaFromFooter(footer, converter), rowCount)
888+
}
820889
}
821890

822891
/**

spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,18 @@ object Action {
115115

116116
lazy val logSchema = ExpressionEncoder[SingleAction]().schema
117117
lazy val addFileSchema = logSchema("add").dataType.asInstanceOf[StructType]
118+
119+
/**
120+
* Same as [[logSchema]], but with a user-specified add.stats_parsed column. This is useful for
121+
* reading parquet checkpoint files that provide add.stats_parsed instead of add.stats.
122+
*/
123+
def logSchemaWithAddStatsParsed(statsParsed: StructField): StructType = {
124+
val logAddSchema = logSchema("add").dataType.asInstanceOf[StructType]
125+
val fields = logSchema.map { f =>
126+
if (f.name == "add") f.copy(dataType = logAddSchema.add(statsParsed)) else f
127+
}
128+
StructType(fields)
129+
}
118130
}
119131

120132
/**

0 commit comments

Comments
 (0)