Skip to content

Commit 21a8c72

Browse files
W1thOutW1thOut
W1thOut
authored andcommitted
delete segment expect remaining numbe
Revise modify code
1 parent d8f7df9 commit 21a8c72

File tree

4 files changed

+200
-1
lines changed

4 files changed

+200
-1
lines changed

integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
180180
protected val REGISTER = carbonKeyWord("REGISTER")
181181
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
182182
protected val REFRESH = carbonKeyWord("REFRESH")
183+
protected val EXPECT = carbonKeyWord("EXPECT")
184+
protected val REMAIN_NUMBER = carbonKeyWord("REMAIN_NUMBER")
183185

184186
// For materialized view
185187
// Keywords used in this parser
@@ -353,4 +355,6 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
353355
p.getClass.getSimpleName.equals("FloatLit") ||
354356
p.getClass.getSimpleName.equals("DecimalLit")
355357
}) ^^ (_.chars)
358+
359+
protected lazy val number: Parser[Int] = numericLit ^^ (_.toInt)
356360
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command.management
19+
20+
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
21+
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
22+
import org.apache.carbondata.api.CarbonStore
23+
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
24+
import org.apache.carbondata.core.exception.ConcurrentOperationException
25+
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
26+
import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, withEvents}
27+
28+
/**
29+
* A command for delete by remaining number.
30+
* In general, keep the latest segment.
31+
*
32+
* @param remaining expected remaining quantity after deletion
33+
*/
34+
case class CarbonDeleteLoadByRemainNumberCommand(
35+
remaining: Int,
36+
databaseNameOp: Option[String],
37+
tableName: String)
38+
extends DataCommand {
39+
40+
override def processData(sparkSession: SparkSession): Seq[Row] = {
41+
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
42+
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
43+
setAuditTable(carbonTable)
44+
setAuditInfo(Map("remaining number" -> remaining.toString))
45+
if (!carbonTable.getTableInfo.isTransactionalTable) {
46+
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
47+
}
48+
49+
// if insert overwrite in progress, do not allow delete segment
50+
if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
51+
throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
52+
}
53+
54+
val segments = CarbonStore.readSegments(carbonTable.getTablePath, showHistory = false, None)
55+
56+
// Through the remaining number, get the delete id
57+
val deleteSegmentIds = segments.filter(segment => segment.getSegmentStatus != SegmentStatus.MARKED_FOR_DELETE)
58+
.sortBy(_.getLoadStartTime)
59+
.map(_.getLoadName)
60+
.reverse
61+
.drop(remaining)
62+
63+
if (deleteSegmentIds.length == 0) {
64+
return Seq.empty
65+
}
66+
67+
withEvents(DeleteSegmentByIdPreEvent(carbonTable, deleteSegmentIds, sparkSession),
68+
DeleteSegmentByIdPostEvent(carbonTable, deleteSegmentIds, sparkSession)) {
69+
CarbonStore.deleteLoadById(
70+
deleteSegmentIds,
71+
CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
72+
tableName,
73+
carbonTable
74+
)
75+
}
76+
Seq.empty
77+
}
78+
79+
override protected def opName: String = "DELETE SEGMENT BY REMAIN_NUMBER"
80+
}

integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
7979

8080
protected lazy val segmentManagement: Parser[LogicalPlan] =
8181
deleteSegmentByID | deleteSegmentByLoadDate | deleteStage | cleanFiles | addSegment |
82-
showSegments
82+
showSegments | deleteSegmentByRemainNumber
8383

8484
protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn
8585

@@ -508,6 +508,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
508508
CarbonDeleteLoadByIdCommand(loadIds, dbName, tableName.toLowerCase())
509509
}
510510

511+
protected lazy val deleteSegmentByRemainNumber: Parser[LogicalPlan] =
512+
DELETE ~> FROM ~ TABLE ~> (ident <~ ".").? ~ ident ~
513+
(EXPECT ~> (SEGMENT ~ "." ~ REMAIN_NUMBER) ~> "=" ~> number) <~
514+
opt(";") ^^ {
515+
case dbName ~ tableName ~ remaining =>
516+
CarbonDeleteLoadByRemainNumberCommand(remaining, dbName, tableName.toLowerCase())
517+
}
518+
511519
protected lazy val deleteSegmentByLoadDate: Parser[LogicalPlan] =
512520
DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
513521
(WHERE ~> (SEGMENT ~ "." ~ STARTTIME ~> BEFORE) ~ stringLit) <~
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.carbondata.spark.testsuite.deletesegment
18+
19+
import org.apache.spark.sql.test.util.QueryTest
20+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
21+
22+
import org.apache.carbondata.core.constants.CarbonCommonConstants
23+
import org.apache.carbondata.core.util.CarbonProperties
24+
25+
/**
26+
* test class for testing the delete segment expect remaining number.
27+
*/
28+
class DeleteSegmentByRemainNumberTestCase extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
29+
val DELETED_STATUS = "Marked for Delete"
30+
31+
val SUCCESSFUL_STATUS = "Success"
32+
33+
override def beforeAll {
34+
CarbonProperties.getInstance()
35+
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
36+
.addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
37+
}
38+
39+
override def beforeEach(): Unit = {
40+
initTestTable
41+
}
42+
43+
override def afterAll(): Unit = {
44+
sql("drop table if exists deleteSegmentTable")
45+
}
46+
47+
test("delete segment, remain_number = 1") {
48+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
49+
val rows = sql("show segments on deleteSegmentTable").collect()
50+
assertResult(SUCCESSFUL_STATUS)(rows(0).get(1))
51+
assertResult(DELETED_STATUS)(rows(1).get(1))
52+
assertResult(DELETED_STATUS)(rows(2).get(1))
53+
}
54+
55+
test("delete segment, remain nothing") {
56+
sql("delete from table deleteSegmentTable expect segment.remain_number = 0")
57+
val rows = sql("show segments on deleteSegmentTable").collect()
58+
rows.foreach(row => assertResult(DELETED_STATUS)(row.get(1)))
59+
}
60+
61+
test("delete segment, remain all") {
62+
sql("delete from table deleteSegmentTable expect segment.remain_number = 3")
63+
val rows = sql("show segments on deleteSegmentTable").collect()
64+
rows.foreach(row => assertResult(SUCCESSFUL_STATUS)(row.get(1)))
65+
}
66+
67+
test("delete segment, remain_number = -1") {
68+
val ex = intercept[Exception] {
69+
sql("delete from table deleteSegmentTable expect segment.remain_number = -1")
70+
}
71+
assert(ex.getMessage.contains("not found"))
72+
}
73+
74+
test("delete segment after update") {
75+
sql("update deleteSegmentTable d set (d.country) = ('fr') where d.country = 'aus'")
76+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
77+
val rows = sql("select * from deleteSegmentTable").collect()
78+
rows.foreach(row => assertResult("fr")(row(2)))
79+
}
80+
81+
test("delete segment after delete newest segment by segmentId") {
82+
sql("delete from table deleteSegmentTable where segment.id in (2)")
83+
sql("delete from table deleteSegmentTable expect segment.remain_number = 1")
84+
val rows = sql("show segments on deleteSegmentTable").collect()
85+
assertResult(DELETED_STATUS)(rows(0).get(1))
86+
assertResult(SUCCESSFUL_STATUS)(rows(1).get(1))
87+
assertResult(DELETED_STATUS)(rows(2).get(1))
88+
}
89+
90+
private def initTestTable = {
91+
sql("drop table if exists deleteSegmentTable")
92+
sql(
93+
"CREATE table deleteSegmentTable (ID int, date String, country String, name " +
94+
"String, phonetype String, serialname String, salary String) STORED AS carbondata " +
95+
"PARTITIONED by(age int)"
96+
)
97+
sql(
98+
s"""LOAD DATA local inpath '$resourcesPath/dataretention1.csv'
99+
| INTO TABLE deleteSegmentTable PARTITION (age='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
100+
sql(
101+
s"""LOAD DATA local inpath '$resourcesPath/dataretention2.csv'
102+
| INTO TABLE deleteSegmentTable PARTITION (age='30') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
103+
sql(
104+
s"""LOAD DATA local inpath '$resourcesPath/dataretention3.csv'
105+
| INTO TABLE deleteSegmentTable PARTITION (age='40') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
106+
}
107+
}

0 commit comments

Comments
 (0)