Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeshparangi committed Nov 6, 2024
1 parent c60437b commit f973819
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 39 deletions.
29 changes: 26 additions & 3 deletions spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ singleStatement
// If you add keywords here that should not be reserved, add them to 'nonReserved' list.
statement
: VACUUM (path=STRING | table=qualifiedName)
(USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN))?
(RETAIN number HOURS)? (DRY RUN)? #vacuumTable
vacuumModifiers #vacuumTable
| (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail
| GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate
| (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName)
Expand Down Expand Up @@ -196,6 +195,29 @@ dataType
: identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')? #primitiveDataType
;

vacuumModifiers
: (vacuumType
| inventory
| retain
| dryRun)*
;

vacuumType
: LITE|FULL
;

inventory
: USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN)
;

retain
: RETAIN number HOURS
;

dryRun
: DRY RUN
;

number
: MINUS? DECIMAL_VALUE #decimalLiteral
| MINUS? INTEGER_VALUE #integerLiteral
Expand Down Expand Up @@ -234,7 +256,7 @@ exprToken
// Add keywords here so that people's queries don't break if they have a column name as one of
// these tokens
nonReserved
: VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
: VACUUM | FULL | LITE | USING | INVENTORY | RETAIN | HOURS | DRY | RUN
| CONVERT | TO | DELTA | PARTITIONED | BY
| DESC | DESCRIBE | LIMIT | DETAIL
| GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL
Expand Down Expand Up @@ -285,6 +307,7 @@ IF: 'IF';
INVENTORY: 'INVENTORY';
LEFT_PAREN: '(';
LIMIT: 'LIMIT';
LITE: 'LITE';
LOCATION: 'LOCATION';
MINUS: '-';
NO: 'NO';
Expand Down
34 changes: 29 additions & 5 deletions spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,41 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
/**
* Create a [[VacuumTableCommand]] logical plan. Example SQL:
* {{{
* VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN];
* VACUUM ('/path/to/dir' | delta.`/path/to/dir`)
* LITE|FULL
* [RETAIN number HOURS] [DRY RUN];
* }}}
*/
override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) {
val vacuumModifiersCtx = ctx.vacuumModifiers()
withOrigin(vacuumModifiersCtx) {
checkDuplicateClauses(vacuumModifiersCtx.vacuumType(), "LITE/FULL", vacuumModifiersCtx)
checkDuplicateClauses(vacuumModifiersCtx.inventory(), "INVENTORY", vacuumModifiersCtx)
checkDuplicateClauses(vacuumModifiersCtx.retain(), "RETAIN", vacuumModifiersCtx)
checkDuplicateClauses(vacuumModifiersCtx.dryRun(), "DRY RUN", vacuumModifiersCtx)
if (!vacuumModifiersCtx.inventory().isEmpty &&
!vacuumModifiersCtx.vacuumType().isEmpty &&
vacuumModifiersCtx.vacuumType().asScala.head.LITE != null) {
operationNotAllowed("Inventory option is not compatible with LITE", vacuumModifiersCtx)
}
}
VacuumTableCommand(
path = Option(ctx.path).map(string),
table = Option(ctx.table).map(visitTableIdentifier),
inventoryTable = Option(ctx.inventoryTable).map(visitTableIdentifier),
inventoryQuery = Option(ctx.inventoryQuery).map(extractRawText),
horizonHours = Option(ctx.number).map(_.getText.toDouble),
dryRun = ctx.RUN != null)
inventoryTable = ctx.vacuumModifiers().inventory().asScala.headOption.collect {
case i if i.inventoryTable != null => visitTableIdentifier(i.inventoryTable)
},
inventoryQuery = ctx.vacuumModifiers().inventory().asScala.headOption.collect {
case i if i.inventoryQuery != null => extractRawText(i.inventoryQuery)
},
horizonHours =
ctx.vacuumModifiers().retain().asScala.headOption.map(_.number.getText.toDouble),
dryRun =
ctx.vacuumModifiers().dryRun().asScala.headOption.map(_.RUN != null).getOrElse(false),
vacuumType = ctx.vacuumModifiers().vacuumType().asScala.headOption.map {
t => if (t.LITE != null) "LITE" else "FULL"
}
)
}

/** Provides a list of unresolved attributes for multi dimensional clustering. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ case class VacuumTableCommand(
horizonHours: Option[Double],
inventoryTable: Option[LogicalPlan],
inventoryQuery: Option[String],
dryRun: Boolean) extends RunnableCommand with UnaryNode with DeltaCommand {
dryRun: Boolean,
vacuumType: Option[String]) extends RunnableCommand with UnaryNode with DeltaCommand {

override val output: Seq[Attribute] =
Seq(AttributeReference("path", StringType, nullable = true)())
Expand All @@ -64,7 +65,7 @@ case class VacuumTableCommand(
.map(p => Some(getDeltaTable(p, "VACUUM").toDf(sparkSession)))
.getOrElse(inventoryQuery.map(sparkSession.sql))
VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours,
inventory).collect()
inventory, vacuumType).collect()
}
}

Expand All @@ -75,9 +76,11 @@ object VacuumTableCommand {
inventoryTable: Option[TableIdentifier],
inventoryQuery: Option[String],
horizonHours: Option[Double],
dryRun: Boolean): VacuumTableCommand = {
dryRun: Boolean,
vacuumType: Option[String]): VacuumTableCommand = {
val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM")
val unresolvedInventoryTable = inventoryTable.map(rt => UnresolvedTable(rt.nameParts, "VACUUM"))
VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun)
VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun,
vacuumType)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames}
import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -74,6 +74,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
StructField("modificationTime", LongType)
))

object VacuumType extends Enumeration {
type VacuumType = Value
val LITE = Value("LITE")
val FULL = Value("FULL")
}

/**
* Additional check on retention duration to prevent people from shooting themselves in the foot.
*/
Expand Down Expand Up @@ -211,14 +217,17 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
* @return A Dataset containing the paths of the files/folders to delete in dryRun mode. Otherwise
* returns the base path of the table.
*/
// scalastyle:off argcount
def gc(
spark: SparkSession,
deltaLog: DeltaLog,
dryRun: Boolean = true,
retentionHours: Option[Double] = None,
inventory: Option[DataFrame] = None,
vacuumTypeOpt: Option[String] = None,
commandMetrics: Map[String, SQLMetric] = Map.empty,
clock: Clock = new SystemClock): DataFrame = {
// scalastyle:on argcount
recordDeltaOperation(deltaLog, "delta.gc") {

val vacuumStartTime = System.currentTimeMillis()
Expand Down Expand Up @@ -265,9 +274,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val partitionColumns = snapshot.metadata.partitionSchema.fieldNames
val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism
val shouldIcebergMetadataDirBeHidden = UniversalFormat.icebergEnabled(snapshot.metadata)
// By default, we will do full vacuum unless LITE vacuum conf is set
val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED)
val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL
val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType)
val latestCommitVersionOutsideOfRetentionWindowOpt: Option[Long] =
if (isLiteVacuumEnabled) {
if (vacuumType == VacuumType.LITE) {
try {
val timestamp = new Timestamp(deleteBeforeTimestamp)
val commit = new DeltaHistoryManager(deltaLog).getActiveCommitAtTime(
Expand All @@ -290,7 +302,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val files = getFilesFromInventory(
basePath, partitionColumns, inventoryDF, shouldIcebergMetadataDirBeHidden)
(files, None, None)
case _ if isLiteVacuumEnabled =>
case _ if vacuumType == VacuumType.LITE =>
getFilesFromDeltaLog(spark, snapshot, basePath, hadoopConf,
latestCommitVersionOutsideOfRetentionWindowOpt)
case _ =>
Expand Down Expand Up @@ -408,7 +420,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
latestCommitVersion = snapshot.version,
eligibleStartCommitVersion = eligibleStartCommitVersionOpt,
eligibleEndCommitVersion = eligibleEndCommitVersionOpt,
typeOfVacuum = if (isLiteVacuumEnabled) "Lite" else "Full"
typeOfVacuum = vacuumType.toString
)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
Expand Down Expand Up @@ -455,7 +467,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
latestCommitVersion = snapshot.version,
eligibleStartCommitVersion = eligibleStartCommitVersionOpt,
eligibleEndCommitVersion = eligibleEndCommitVersionOpt,
typeOfVacuum = if (isLiteVacuumEnabled) "Lite" else "Full")
typeOfVacuum = vacuumType.toString)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(
deltaLog,
Expand Down Expand Up @@ -552,16 +564,36 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
eligibleStartCommitVersion: Long,
eligibleEndCommitVersion: Long): Dataset[SerializableFileStatus] = {
import org.apache.spark.sql.delta.implicits._
// When coordinated commits are enabled, commit files could be found in _delta_log directory
// as well as in commit directory. We get the delta log files outside of the retention window
// from both the places.
val prefix = listingPrefix(deltaLog.logPath, eligibleStartCommitVersion)
val eligibleDeltaLogFilesOutsideTheRetentionWindow =
val eligibleDeltaLogFilesFromDeltaLogDirectory =
deltaLog.store.listFrom(prefix, deltaLog.newDeltaHadoopConf)
.collect { case DeltaFile(f, deltaFileVersion) => (f, deltaFileVersion) }
.takeWhile(_._2 <= eligibleEndCommitVersion)
.toSeq

val deltaLogFileIndex = DeltaLogFileIndex(
DeltaLogFileIndex.COMMIT_FILE_FORMAT,
eligibleDeltaLogFilesOutsideTheRetentionWindow.map(_._1)).get
val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf())
val commitDirPath = FileNames.commitDirPath(deltaLog.logPath)
val updatedStartCommitVersion =
eligibleDeltaLogFilesFromDeltaLogDirectory.lastOption.map(_._2)
.getOrElse(eligibleStartCommitVersion)
val eligibleDeltaLogFilesFromCommitDirectory = if (fs.exists(commitDirPath)) {
deltaLog.store
.listFrom(listingPrefix(commitDirPath, updatedStartCommitVersion),
deltaLog.newDeltaHadoopConf)
.collect { case UnbackfilledDeltaFile(f, deltaFileVersion, _) => (f, deltaFileVersion) }
.takeWhile(_._2 <= eligibleEndCommitVersion)
.toSeq
} else {
Seq.empty
}

val allDeltaLogFilesOutsideTheRetentionWindow = eligibleDeltaLogFilesFromDeltaLogDirectory ++
eligibleDeltaLogFilesFromCommitDirectory
val deltaLogFileIndex = DeltaLogFileIndex(DeltaLogFileIndex.COMMIT_FILE_FORMAT,
allDeltaLogFilesOutsideTheRetentionWindow.map(_._1)).get

val allActions = deltaLog.loadIndex(deltaLogFileIndex).as[SingleAction]
val nonCDFFiles = allActions
Expand Down Expand Up @@ -733,8 +765,11 @@ trait VacuumCommandImpl extends DeltaCommand {
// This is done to make sure that the commit timestamp reflects the one provided by the clock
// object.
if (Utils.isTesting) {
val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
f.setLastModified(deltaLog.clock.getTimeMillis())
val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf())
val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version)
if (fs.exists(filePath)) {
fs.setTimes(filePath, deltaLog.clock.getTimeMillis(), deltaLog.clock.getTimeMillis())
}
}
}

Expand Down
19 changes: 11 additions & 8 deletions spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,29 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
// Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`.
val parser = new DeltaSqlParser(null)
assert(parser.parsePlan("vacuum 123_") ===
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, false))
VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, None, false))
assert(parser.parsePlan("vacuum 1a.123_") ===
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, false))
VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, None,
false))
assert(parser.parsePlan("vacuum a.123A") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, false))
VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, None,
false))
assert(parser.parsePlan("vacuum a.123E3_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM"),
None, None, None, false))
None, None, None, None, false))
assert(parser.parsePlan("vacuum a.123D_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM"),
None, None, None, false))
None, None, None, None, false))
assert(parser.parsePlan("vacuum a.123BD_column") ===
VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM"),
None, None, None, false))
None, None, None, None, false))
assert(parser.parsePlan("vacuum delta.`/tmp/table`") ===
VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM"),
None, None, None, false))
None, None, None, None, false))
assert(parser.parsePlan("vacuum \"/tmp/table\"") ===
VacuumTableCommand(
UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false))
UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, None,
false))
}

test("Restore command is parsed as expected") {
Expand Down
Loading

0 comments on commit f973819

Please sign in to comment.