@@ -81,6 +81,15 @@ trait CheckpointProvider extends UninitializedCheckpointProvider {
8181 * This is only intended to be used for logging and metrics.
8282 */
8383 def checkpointPolicy : Option [CheckpointPolicy .Policy ]
84+
85+ /**
86+ * List of different file indexes and corresponding schemas which could help derive full
87+ * state-reconstruction for the checkpoint.
88+ * Different FileIndexes could have different schemas depending on `stats_parsed` / `stats`
89+ * columns in the underlying file(s).
90+ */
91+ def allActionsFileIndexesAndSchemas (
92+ spark : SparkSession , deltaLog : DeltaLog ): Seq [(DeltaLogFileIndex , StructType )]
8493}
8594
8695object CheckpointProvider extends DeltaLogging {
@@ -112,7 +121,11 @@ object CheckpointProvider extends DeltaLogging {
112121 s " has no CheckpointMetadata action " )
113122 }
114123 require(isV2CheckpointEnabled(snapshotDescriptor.protocol))
115- V2CheckpointProvider (uninitializedV2CheckpointProvider, checkpointMetadata, sidecarFiles)
124+ V2CheckpointProvider (
125+ uninitializedV2CheckpointProvider,
126+ checkpointMetadata,
127+ sidecarFiles,
128+ snapshotDescriptor.deltaLog)
116129 }
117130 }
118131 case provider : UninitializedV1OrV2ParquetCheckpointProvider
@@ -136,7 +149,7 @@ object CheckpointProvider extends DeltaLogging {
136149 checkpointMetadataOpt match {
137150 case Some (cm) =>
138151 require(isV2CheckpointEnabled(snapshotDescriptor))
139- V2CheckpointProvider (provider, cm, sidecarFiles)
152+ V2CheckpointProvider (provider, cm, sidecarFiles, snapshotDescriptor.deltaLog )
140153 case None =>
141154 PreloadedCheckpointProvider (provider.topLevelFiles, provider.lastCheckpointInfoOpt)
142155 }
@@ -166,6 +179,24 @@ object CheckpointProvider extends DeltaLogging {
166179 checksumOpt.flatMap(checksum => Option (checksum.protocol)).map(isV2CheckpointEnabled)
167180 }
168181
182+ private [delta] def getParquetSchema (
183+ spark : SparkSession ,
184+ deltaLog : DeltaLog ,
185+ parquetFile : FileStatus ,
186+ schemaFromLastCheckpoint : Option [StructType ]): StructType = {
187+ // Try to get the checkpoint schema from the last_checkpoint.
188+ // If it is not there then get it from filesystem by doing I/O.
189+ val fetchChkSchemaFromLastCheckpoint = spark.sessionState.conf.getConf(
190+ DeltaSQLConf .USE_CHECKPOINT_SCHEMA_FROM_CHECKPOINT_METADATA )
191+ schemaFromLastCheckpoint match {
192+ case Some (schema) if fetchChkSchemaFromLastCheckpoint => schema
193+ case _ =>
194+ recordDeltaOperation(deltaLog, " snapshot.checkpointSchema.fromFileSystem" ) {
195+ Snapshot .getParquetFileSchemaAndRowCount(spark, deltaLog, parquetFile)._1
196+ }
197+ }
198+ }
199+
169200 private def sendEventForV2CheckpointRead (
170201 startTimeMs : Long ,
171202 fileStatus : FileStatus ,
@@ -294,6 +325,20 @@ case class PreloadedCheckpointProvider(
294325 override lazy val topLevelFileIndex : Option [DeltaLogFileIndex ] = Some (fileIndex)
295326
296327 override def checkpointPolicy : Option [CheckpointPolicy .Policy ] = Some (CheckpointPolicy .Classic )
328+
329+ override def allActionsFileIndexesAndSchemas (
330+ spark : SparkSession , deltaLog : DeltaLog ): Seq [(DeltaLogFileIndex , StructType )] = {
331+ Seq ((fileIndex, checkpointSchema(spark, deltaLog)))
332+ }
333+
334+ private val checkpointSchemaWithCaching = new LazyCheckpointSchemaGetter {
335+ override def fileStatus : FileStatus = topLevelFiles.head
336+ override def schemaFromLastCheckpoint : Option [StructType ] =
337+ lastCheckpointInfoOpt.flatMap(_.checkpointSchema)
338+ }
339+ private def checkpointSchema (spark : SparkSession , deltaLog : DeltaLog ): StructType =
340+ checkpointSchemaWithCaching.get(spark, deltaLog)
341+
297342}
298343
299344/**
@@ -312,6 +357,8 @@ object EmptyCheckpointProvider extends CheckpointProvider {
312357 override def allActionsFileIndexes (): Seq [DeltaLogFileIndex ] = Nil
313358 override def topLevelFileIndex : Option [DeltaLogFileIndex ] = None
314359 override def checkpointPolicy : Option [CheckpointPolicy .Policy ] = None
360+ override def allActionsFileIndexesAndSchemas (
361+ spark : SparkSession , deltaLog : DeltaLog ): Seq [(DeltaLogFileIndex , StructType )] = Nil
315362}
316363
317364/** A trait representing a v2 [[UninitializedCheckpointProvider ]] */
@@ -426,6 +473,11 @@ abstract class LazyCompleteCheckpointProvider(
426473
427474 override def checkpointPolicy : Option [CheckpointPolicy .Policy ] =
428475 underlyingCheckpointProvider.checkpointPolicy
476+
477+ override def allActionsFileIndexesAndSchemas (
478+ spark : SparkSession , deltaLog : DeltaLog ): Seq [(DeltaLogFileIndex , StructType )] = {
479+ underlyingCheckpointProvider.allActionsFileIndexesAndSchemas(spark, deltaLog)
480+ }
429481}
430482
431483/**
@@ -438,6 +490,8 @@ abstract class LazyCompleteCheckpointProvider(
438490 * @param sidecarFiles seq of [[SidecarFile ]] for the v2 checkpoint
439491 * @param lastCheckpointInfoOpt optional last checkpoint info for the v2 checkpoint
440492 * @param logPath delta log path for the underlying delta table
493+ * @param sidecarSchemaFetcher function to fetch sidecar schema.
494+ * Returns None if there are no sidecar files.
441495 */
442496case class V2CheckpointProvider (
443497 override val version : Long ,
@@ -446,7 +500,8 @@ case class V2CheckpointProvider(
446500 checkpointMetadata : CheckpointMetadata ,
447501 sidecarFiles : Seq [SidecarFile ],
448502 lastCheckpointInfoOpt : Option [LastCheckpointInfo ],
449- logPath : Path
503+ logPath : Path ,
504+ sidecarSchemaFetcher : () => Option [StructType ]
450505 ) extends CheckpointProvider with DeltaLogging {
451506
452507 private [delta] def sidecarFileStatuses : Seq [FileStatus ] =
@@ -473,22 +528,85 @@ case class V2CheckpointProvider(
473528
474529 override def checkpointPolicy : Option [CheckpointPolicy .Policy ] = Some (CheckpointPolicy .V2 )
475530
531+ private val v2SchemaWithCaching = new LazyCheckpointSchemaGetter {
532+ override def fileStatus : FileStatus = v2CheckpointFile
533+ override def schemaFromLastCheckpoint : Option [StructType ] =
534+ lastCheckpointInfoOpt.flatMap(_.checkpointSchema)
535+ }
536+
537+ protected def schemaForV2Checkpoint (
538+ spark : SparkSession , deltaLog : DeltaLog ): StructType = {
539+ if (v2CheckpointFormat != V2Checkpoint .Format .PARQUET ) {
540+ return Action .logSchema
541+ }
542+ v2SchemaWithCaching.get(spark, deltaLog)
543+ }
544+
545+ protected def schemaForSidecarFile (spark : SparkSession , deltaLog : DeltaLog ): StructType = {
546+ sidecarSchemaFetcher()
547+ .getOrElse {
548+ throw DeltaErrors .assertionFailedError(" Sidecar schema asked without any sidecar files" )
549+ }
550+ }
551+
552+ override def allActionsFileIndexesAndSchemas (
553+ spark : SparkSession , deltaLog : DeltaLog ): Seq [(DeltaLogFileIndex , StructType )] = {
554+ (fileIndexForV2Checkpoint, schemaForV2Checkpoint(spark, deltaLog)) +:
555+ fileIndexesForSidecarFiles.map((_, schemaForSidecarFile(spark, deltaLog)))
556+ }
476557}
477558
478559object V2CheckpointProvider {
479-
480560 /** Alternate constructor which uses [[UninitializedV2LikeCheckpointProvider ]] */
481561 def apply (
482562 uninitializedV2LikeCheckpointProvider : UninitializedV2LikeCheckpointProvider ,
483563 checkpointMetadata : CheckpointMetadata ,
484- sidecarFiles : Seq [SidecarFile ]): V2CheckpointProvider = {
564+ sidecarFiles : Seq [SidecarFile ],
565+ deltaLog : DeltaLog ): V2CheckpointProvider = {
566+ def getSidecarSchemaFetcher : () => Option [StructType ] = {
567+ val nonFateSharingSidecarSchemaFuture : NonFateSharingFuture [Option [StructType ]] = {
568+ checkpointV2ThreadPool.submitNonFateSharing { spark : SparkSession =>
569+ sidecarFiles.headOption.map { sidecarFile =>
570+ val sidecarFileStatus =
571+ sidecarFile.toFileStatus(uninitializedV2LikeCheckpointProvider.logPath)
572+ CheckpointProvider .getParquetSchema(
573+ spark, deltaLog, sidecarFileStatus, schemaFromLastCheckpoint = None )
574+ }
575+ }
576+ }
577+ () => nonFateSharingSidecarSchemaFuture.get(Duration .Inf )
578+ }
485579 V2CheckpointProvider (
486580 uninitializedV2LikeCheckpointProvider.version,
487581 uninitializedV2LikeCheckpointProvider.fileStatus,
488582 uninitializedV2LikeCheckpointProvider.v2CheckpointFormat,
489583 checkpointMetadata,
490584 sidecarFiles,
491585 uninitializedV2LikeCheckpointProvider.lastCheckpointInfoOpt,
492- uninitializedV2LikeCheckpointProvider.logPath)
586+ uninitializedV2LikeCheckpointProvider.logPath,
587+ getSidecarSchemaFetcher
588+ )
493589 }
494590}
591+
592+ abstract class LazyCheckpointSchemaGetter {
593+ protected def fileStatus : FileStatus
594+ protected def schemaFromLastCheckpoint : Option [StructType ]
595+
596+ private var lazySchema = Option .empty[StructType ]
597+
598+ def get (spark : SparkSession , deltaLog : DeltaLog ): StructType = {
599+ lazySchema.getOrElse {
600+ this .synchronized {
601+ // re-check with lock held, in case of races with other initializers
602+ if (lazySchema.isEmpty) {
603+ lazySchema = Some (CheckpointProvider .getParquetSchema(
604+ spark, deltaLog, fileStatus, schemaFromLastCheckpoint))
605+ }
606+ lazySchema.get
607+ }
608+ }
609+ }
610+
611+ def getIfKnown : Option [StructType ] = lazySchema
612+ }
0 commit comments