Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user/kafka/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Parameter Type Description
which may require significant memory overhead.
``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
``kafka.topic.truncate-on-delete`` Boolean Instead of deleting the kafka topic on schema change, just truncate (delete all messages) on that kafka topic.
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
or ``avro-native``
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. ``10 minutes``. See :ref:`kafka_expiry`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,13 @@ class KafkaDataStore(

WithClose(AdminClient.create(props)) { admin =>
if (admin.listTopics().names().get.contains(topic)) {
admin.deleteTopics(Collections.singletonList(topic)).all().get
if (config.onSchemaDeleteTruncate) {
logger.info(s"Truncating topic $topic")
KafkaTruncateTopic(admin).truncate(topic)
} else {
logger.info(s"Deleting topic $topic")
admin.deleteTopics(Collections.singletonList(topic)).all().get
}
} else {
logger.warn(s"Topic [$topic] does not exist, can't delete it")
}
Expand Down Expand Up @@ -510,6 +516,7 @@ object KafkaDataStore extends LazyLogging {
consumers: ConsumerConfig,
producers: ProducerConfig,
clearOnStart: Boolean,
onSchemaDeleteTruncate: Boolean,
topics: TopicConfig,
indices: IndexConfig,
looseBBox: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
KafkaDataStore.ProducerConfig(props)
}
val clearOnStart = ClearOnStart.lookup(params)
val truncateOnDelete = TruncateOnDelete.lookup(params)

val indices = {
val cqEngine = {
Expand Down Expand Up @@ -244,7 +245,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
}
}

KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, topics,
KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, truncateOnDelete, topics,
indices, looseBBox, layerViews, authProvider, audit, metrics, ns)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ object KafkaDataStoreParams extends NamespaceParams {
readWrite = ReadWriteFlag.WriteOnly
)

val TruncateOnDelete =
new GeoMesaParam[java.lang.Boolean](
"kafka.topic.truncate-on-delete",
"Instead of deleting a topic when when removing a schema, truncate the topic by deleting to the latest offset",
default = Boolean.box(false),
readWrite = ReadWriteFlag.WriteOnly
)

val ConsumerReadBack =
new GeoMesaParam[Duration](
"kafka.consumer.read-back",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/

package org.locationtech.geomesa.kafka.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}

import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import scala.collection.JavaConverters._

class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {

/**
* Truncation of a Kafka topic.
*
* @param topic the topic name to truncate
*/
def truncate(topic: String): Unit = {

val topicPartitions = getTopicPartitions(topic)
val latestOffsets = getLatestOffsets(topicPartitions)

val deleteCleanupPolicy = hasDeleteCleanupPolicy(topic)

try {
if (!deleteCleanupPolicy) {
logger.debug(s"adding 'delete' cleanup policy to topic=$topic so it can be truncated.")
addDeleteCleanupPolicy(topic)
}
logger.debug(s"truncate: topic: $topic starting")
deleteRecords(latestOffsets)
logger.debug(s"truncate: topic: $topic completed")
} finally {
if (!deleteCleanupPolicy) {
logger.debug(s"removing 'delete' cleanup policy to topic=$topic.")
removeDeleteCleanupPolicy(topic)
}
}

logger.info(s"$topic truncated.")

}

private def deleteRecords(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Unit =
try {
val recordsToDelete = generateRecordsToDelete(latestOffsets)
admin.deleteRecords(recordsToDelete.asJava).all().get()
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("the deleteRecords operation was interrupted, aborting; it may have still completed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* For the given offsets, generate the RecordToDelete objects; this is used with the latest offsets so the topic
* will be truncated.
*/
private def generateRecordsToDelete(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Map[TopicPartition, RecordsToDelete] =
latestOffsets.map {
case (tp, info) => tp -> RecordsToDelete.beforeOffset(info.offset())
}

/**
* for the list of partitions, return the latest offsets.
*/
private def getLatestOffsets(partitions: List[TopicPartition]): Map[TopicPartition, ListOffsetsResultInfo] = {
val input = partitions.map(tp => tp -> OffsetSpec.latest()).toMap
getOffsets(input)
}

/**
* used by getLatestOffsets() and getEarliestOffsets() for obtaining the offsets
*/
private def getOffsets(offsetSpecs: Map[TopicPartition, OffsetSpec]): Map[TopicPartition, ListOffsetsResultInfo] =
try {
admin.listOffsets(offsetSpecs.asJava).all().get().asScala.toMap
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("listOffsets operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Get all TopicPartitions for the given topic; truncation is performed is performed at the partition level.
*/
private def getTopicPartitions(topic: String): List[TopicPartition] =
try {
val topicInfo = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("describeTopics operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Check if the topic has the 'delete' cleanup policy.
*/
private def hasDeleteCleanupPolicy(topicName: String): Boolean =
try {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName)
val configsResult = admin.describeConfigs(Collections.singleton(configResource))
val config = configsResult.all().get().get(configResource)

config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().contains(TopicConfig.CLEANUP_POLICY_DELETE)
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("describeTopics operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Add the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def addDeleteCleanupPolicy(topic: String): Unit =
try {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.APPEND)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("incrementalAlterConfigs operation interrupted, ...")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Remove the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def removeDeleteCleanupPolicy(topic: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.SUBTRACT)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
}

/**
* Singleton wrapper for AlertConfigOp.
*/
private def alterConfigOpColl(alterConfigOp: AlterConfigOp): util.Collection[AlterConfigOp] =
Collections.singleton(alterConfigOp)

/**
* Convert ExecutionException to RuntimeException, preserving the underlying cause.
*/
private def convertExecutionException(e: ExecutionException): RuntimeException = {
val cause = Option(e.getCause).getOrElse(e)
new RuntimeException(cause.getMessage, cause)
}
}

object KafkaTruncateTopic {
def apply(admin: Admin): KafkaTruncateTopic = new KafkaTruncateTopic(admin)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ package org.locationtech.geomesa.kafka.data
import org.apache.commons.lang3.StringUtils
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListOffsetsResult, NewTopic, OffsetSpec, TopicDescription}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.serialization.StringSerializer
import org.geotools.api.data._
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
Expand Down Expand Up @@ -188,6 +192,82 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}

"allow schemas to be created and truncated" >> {

foreach(Seq(true, false)) { zk =>
TableBasedMetadata.Expiry.threadLocalValue.set("10ms")
val (producer, consumer, _) = try {
createStorePair(if (zk) {
Map(
"kafka.zookeepers" -> zookeepers,
"kafka.topic.truncate-on-delete" -> "true"
)
} else {
Map(
"kafka.topic.truncate-on-delete" -> "true"
)
})
} finally {
TableBasedMetadata.Expiry.threadLocalValue.remove()
}
consumer must not(beNull)
producer must not(beNull)

try {
val sft = SimpleFeatureTypes.createImmutableType("kafka", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326;geomesa.foo='bar'")
producer.createSchema(sft)
consumer.metadata.resetCache()
foreach(Seq(producer, consumer)) { ds =>
ds.getTypeNames.toSeq mustEqual Seq(sft.getTypeName)
val schema = ds.getSchema(sft.getTypeName)
schema must not(beNull)
schema mustEqual sft
}
val topic = producer.getSchema(sft.getTypeName).getUserData.get(KafkaDataStore.TopicKey).asInstanceOf[String]

val props = Collections.singletonMap[String, AnyRef](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
WithClose(AdminClient.create(props)) { admin =>
admin.listTopics().names().get.asScala must contain(topic)
}


val pprops = Map[String, Object](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
).asJava
WithClose(new KafkaProducer[String, String](pprops)) { p =>
p.send(new ProducerRecord(topic, null, "dummy1"))
p.send(new ProducerRecord(topic, null, "dummy2"))
p.flush()
}

consumer.removeSchema(sft.getTypeName)
foreach(Seq(consumer, producer)) { ds =>
eventually(40, 100.millis)(ds.getTypeNames.toSeq must beEmpty)
ds.getSchema(sft.getTypeName) must beNull
}
WithClose(AdminClient.create(props)) { admin =>
// topic is not deleted, so it should remain.
admin.listTopics().names().get.asScala must (contain(topic))

var topicInfo: TopicDescription = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
val pl: Seq[TopicPartition] = topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
val e: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.earliest()).toMap.asJava).all().get().asScala.toMap
val l: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.latest()).toMap.asJava).all().get().asScala.toMap

// the earliest offset must not match the latest offset, since all others were deleted, when the topic was truncated.
e(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)
l(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)

}
} finally {
consumer.dispose()
producer.dispose()
}
}
}

"support multiple stores creating schemas on the same catalog topic" >> {
val (producer, consumer, sft) = createStorePair()
val sft2 = SimpleFeatureTypes.renameSft(sft, "consumer")
Expand Down
Loading