Skip to content

Commit cdfdade

Browse files
authored
GEOMESA-3539 Kafka - Option to truncate a topic on schema removal (#3450)
1 parent eea148e commit cdfdade

File tree

6 files changed

+239
-3
lines changed

6 files changed

+239
-3
lines changed

docs/user/kafka/usage.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Parameter Type Description
3939
which may require significant memory overhead.
4040
``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics
4141
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
42+
``kafka.topic.truncate-on-delete`` Boolean Instead of deleting the kafka topic on schema change, just truncate (delete all messages) on that kafka topic.
4243
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
4344
or ``avro-native``
4445
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. ``10 minutes``. See :ref:`kafka_expiry`

geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStore.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,13 @@ class KafkaDataStore(
276276

277277
WithClose(AdminClient.create(props)) { admin =>
278278
if (admin.listTopics().names().get.contains(topic)) {
279-
admin.deleteTopics(Collections.singletonList(topic)).all().get
279+
if (config.onSchemaDeleteTruncate) {
280+
logger.info(s"Truncating topic $topic")
281+
KafkaTruncateTopic(admin).truncate(topic)
282+
} else {
283+
logger.info(s"Deleting topic $topic")
284+
admin.deleteTopics(Collections.singletonList(topic)).all().get
285+
}
280286
} else {
281287
logger.warn(s"Topic [$topic] does not exist, can't delete it")
282288
}
@@ -510,6 +516,7 @@ object KafkaDataStore extends LazyLogging {
510516
consumers: ConsumerConfig,
511517
producers: ProducerConfig,
512518
clearOnStart: Boolean,
519+
onSchemaDeleteTruncate: Boolean,
513520
topics: TopicConfig,
514521
indices: IndexConfig,
515522
looseBBox: Boolean,

geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreFactory.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
161161
KafkaDataStore.ProducerConfig(props)
162162
}
163163
val clearOnStart = ClearOnStart.lookup(params)
164+
val truncateOnDelete = TruncateOnDelete.lookup(params)
164165

165166
val indices = {
166167
val cqEngine = {
@@ -243,7 +244,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
243244
}
244245
}
245246

246-
KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, topics,
247+
KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, truncateOnDelete, topics,
247248
indices, looseBBox, layerViews, authProvider, audit, metrics, ns)
248249
}
249250

geomesa-kafka/geomesa-kafka-datastore/src/main/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreParams.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,14 @@ object KafkaDataStoreParams extends NamespaceParams {
9494
readWrite = ReadWriteFlag.WriteOnly
9595
)
9696

97+
val TruncateOnDelete =
98+
new GeoMesaParam[java.lang.Boolean](
99+
"kafka.topic.truncate-on-delete",
100+
"Instead of deleting a topic when when removing a schema, truncate the topic by deleting to the latest offset",
101+
default = Boolean.box(false),
102+
readWrite = ReadWriteFlag.WriteOnly
103+
)
104+
97105
val ConsumerReadBack =
98106
new GeoMesaParam[Duration](
99107
"kafka.consumer.read-back",
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
package org.locationtech.geomesa.kafka.data
10+
11+
import com.typesafe.scalalogging.LazyLogging
12+
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
13+
import org.apache.kafka.clients.admin._
14+
import org.apache.kafka.common.TopicPartition
15+
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
16+
17+
import java.util
18+
import java.util.Collections
19+
import java.util.concurrent.ExecutionException
20+
import scala.collection.JavaConverters._
21+
22+
class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {
23+
24+
/**
25+
* Truncation of a Kafka topic.
26+
*
27+
* @param topic the topic name to truncate
28+
*/
29+
def truncate(topic: String): Unit = {
30+
31+
val topicPartitions = getTopicPartitions(topic)
32+
val latestOffsets = getLatestOffsets(topicPartitions)
33+
34+
val deleteCleanupPolicy = hasDeleteCleanupPolicy(topic)
35+
36+
try {
37+
if (!deleteCleanupPolicy) {
38+
logger.debug(s"adding 'delete' cleanup policy to topic=$topic so it can be truncated.")
39+
addDeleteCleanupPolicy(topic)
40+
}
41+
logger.debug(s"truncate: topic: $topic starting")
42+
deleteRecords(latestOffsets)
43+
logger.debug(s"truncate: topic: $topic completed")
44+
} finally {
45+
if (!deleteCleanupPolicy) {
46+
logger.debug(s"removing 'delete' cleanup policy to topic=$topic.")
47+
removeDeleteCleanupPolicy(topic)
48+
}
49+
}
50+
51+
logger.info(s"$topic truncated.")
52+
}
53+
54+
private def deleteRecords(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Unit = {
55+
val recordsToDelete = generateRecordsToDelete(latestOffsets)
56+
admin.deleteRecords(recordsToDelete.asJava).all().get()
57+
}
58+
59+
/**
60+
* For the given offsets, generate the RecordToDelete objects; this is used with the latest offsets so the topic
61+
* will be truncated.
62+
*/
63+
private def generateRecordsToDelete(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Map[TopicPartition, RecordsToDelete] =
64+
latestOffsets.map {
65+
case (tp, info) => tp -> RecordsToDelete.beforeOffset(info.offset())
66+
}
67+
68+
/**
69+
* for the list of partitions, return the latest offsets.
70+
*/
71+
private def getLatestOffsets(partitions: List[TopicPartition]): Map[TopicPartition, ListOffsetsResultInfo] = {
72+
val input = partitions.map(tp => tp -> OffsetSpec.latest()).toMap
73+
getOffsets(input)
74+
}
75+
76+
/**
77+
* used by getLatestOffsets() and getEarliestOffsets() for obtaining the offsets
78+
*/
79+
private def getOffsets(offsetSpecs: Map[TopicPartition, OffsetSpec]): Map[TopicPartition, ListOffsetsResultInfo] = {
80+
admin.listOffsets(offsetSpecs.asJava).all().get().asScala.toMap
81+
}
82+
83+
/**
84+
* Get all TopicPartitions for the given topic; truncation is performed is performed at the partition level.
85+
*/
86+
private def getTopicPartitions(topic: String): List[TopicPartition] = {
87+
val topicInfo = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
88+
topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
89+
}
90+
91+
/**
92+
* Check if the topic has the 'delete' cleanup policy.
93+
*/
94+
private def hasDeleteCleanupPolicy(topicName: String): Boolean = {
95+
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName)
96+
val configsResult = admin.describeConfigs(Collections.singleton(configResource))
97+
val config = configsResult.all().get().get(configResource)
98+
99+
config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().contains(TopicConfig.CLEANUP_POLICY_DELETE)
100+
}
101+
102+
/**
103+
* Add the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
104+
*/
105+
private def addDeleteCleanupPolicy(topic: String): Unit = {
106+
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
107+
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.APPEND)
108+
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
109+
admin.incrementalAlterConfigs(configs).all().get()
110+
}
111+
112+
/**
113+
* Remove the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
114+
*/
115+
private def removeDeleteCleanupPolicy(topic: String): Unit = {
116+
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
117+
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.SUBTRACT)
118+
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
119+
admin.incrementalAlterConfigs(configs).all().get()
120+
}
121+
122+
/**
123+
* Singleton wrapper for AlertConfigOp.
124+
*/
125+
private def alterConfigOpColl(alterConfigOp: AlterConfigOp): util.Collection[AlterConfigOp] =
126+
Collections.singleton(alterConfigOp)
127+
128+
/**
129+
* Convert ExecutionException to RuntimeException, preserving the underlying cause.
130+
*/
131+
private def convertExecutionException(e: ExecutionException): RuntimeException = {
132+
val cause = Option(e.getCause).getOrElse(e)
133+
new RuntimeException(cause.getMessage, cause)
134+
}
135+
}
136+
137+
object KafkaTruncateTopic {
138+
def apply(admin: Admin): KafkaTruncateTopic = new KafkaTruncateTopic(admin)
139+
}

geomesa-kafka/geomesa-kafka-datastore/src/test/scala/org/locationtech/geomesa/kafka/data/KafkaDataStoreTest.scala

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ package org.locationtech.geomesa.kafka.data
1111
import org.apache.commons.lang3.StringUtils
1212
import org.apache.curator.framework.CuratorFrameworkFactory
1313
import org.apache.curator.retry.ExponentialBackoffRetry
14-
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
14+
import org.apache.kafka.clients.CommonClientConfigs
15+
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListOffsetsResult, NewTopic, OffsetSpec, TopicDescription}
16+
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
17+
import org.apache.kafka.common.TopicPartition
1518
import org.apache.kafka.common.config.ConfigResource
19+
import org.apache.kafka.common.serialization.StringSerializer
1620
import org.geotools.api.data._
1721
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
1822
import org.geotools.api.filter.Filter
@@ -188,6 +192,82 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
188192
}
189193
}
190194

195+
"allow schemas to be created and truncated" >> {
196+
197+
foreach(Seq(true, false)) { zk =>
198+
TableBasedMetadata.Expiry.threadLocalValue.set("10ms")
199+
val (producer, consumer, _) = try {
200+
createStorePair(if (zk) {
201+
Map(
202+
"kafka.zookeepers" -> zookeepers,
203+
"kafka.topic.truncate-on-delete" -> "true"
204+
)
205+
} else {
206+
Map(
207+
"kafka.topic.truncate-on-delete" -> "true"
208+
)
209+
})
210+
} finally {
211+
TableBasedMetadata.Expiry.threadLocalValue.remove()
212+
}
213+
consumer must not(beNull)
214+
producer must not(beNull)
215+
216+
try {
217+
val sft = SimpleFeatureTypes.createImmutableType("kafka", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326;geomesa.foo='bar'")
218+
producer.createSchema(sft)
219+
consumer.metadata.resetCache()
220+
foreach(Seq(producer, consumer)) { ds =>
221+
ds.getTypeNames.toSeq mustEqual Seq(sft.getTypeName)
222+
val schema = ds.getSchema(sft.getTypeName)
223+
schema must not(beNull)
224+
schema mustEqual sft
225+
}
226+
val topic = producer.getSchema(sft.getTypeName).getUserData.get(KafkaDataStore.TopicKey).asInstanceOf[String]
227+
228+
val props = Collections.singletonMap[String, AnyRef](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
229+
WithClose(AdminClient.create(props)) { admin =>
230+
admin.listTopics().names().get.asScala must contain(topic)
231+
}
232+
233+
234+
val pprops = Map[String, Object](
235+
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokers,
236+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
237+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
238+
).asJava
239+
WithClose(new KafkaProducer[String, String](pprops)) { p =>
240+
p.send(new ProducerRecord(topic, null, "dummy1"))
241+
p.send(new ProducerRecord(topic, null, "dummy2"))
242+
p.flush()
243+
}
244+
245+
consumer.removeSchema(sft.getTypeName)
246+
foreach(Seq(consumer, producer)) { ds =>
247+
eventually(40, 100.millis)(ds.getTypeNames.toSeq must beEmpty)
248+
ds.getSchema(sft.getTypeName) must beNull
249+
}
250+
WithClose(AdminClient.create(props)) { admin =>
251+
// topic is not deleted, so it should remain.
252+
admin.listTopics().names().get.asScala must (contain(topic))
253+
254+
var topicInfo: TopicDescription = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
255+
val pl: Seq[TopicPartition] = topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
256+
val e: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.earliest()).toMap.asJava).all().get().asScala.toMap
257+
val l: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.latest()).toMap.asJava).all().get().asScala.toMap
258+
259+
// the earliest offset must not match the latest offset, since all others were deleted, when the topic was truncated.
260+
e(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)
261+
l(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)
262+
263+
}
264+
} finally {
265+
consumer.dispose()
266+
producer.dispose()
267+
}
268+
}
269+
}
270+
191271
"support multiple stores creating schemas on the same catalog topic" >> {
192272
val (producer, consumer, sft) = createStorePair()
193273
val sft2 = SimpleFeatureTypes.renameSft(sft, "consumer")

0 commit comments

Comments
 (0)