Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/user/kafka/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Parameter Type Description
which may require significant memory overhead.
``kafka.topic.partitions`` Integer Number of partitions to use in new kafka topics
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
``kafka.topic.truncate-on-delete`` Boolean Instead of deleting the kafka topic on schema change, just truncate (delete all messages) on that kafka topic.
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
or ``avro-native``
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. ``10 minutes``. See :ref:`kafka_expiry`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,13 @@ class KafkaDataStore(

WithClose(AdminClient.create(props)) { admin =>
if (admin.listTopics().names().get.contains(topic)) {
admin.deleteTopics(Collections.singletonList(topic)).all().get
if (config.onSchemaDeleteTruncate) {
logger.info(s"Truncating topic $topic")
KafkaTruncateTopic(admin).truncate(topic)
} else {
logger.info(s"Deleting topic $topic")
admin.deleteTopics(Collections.singletonList(topic)).all().get
}
} else {
logger.warn(s"Topic [$topic] does not exist, can't delete it")
}
Expand Down Expand Up @@ -510,6 +516,7 @@ object KafkaDataStore extends LazyLogging {
consumers: ConsumerConfig,
producers: ProducerConfig,
clearOnStart: Boolean,
onSchemaDeleteTruncate: Boolean,
topics: TopicConfig,
indices: IndexConfig,
looseBBox: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
KafkaDataStore.ProducerConfig(props)
}
val clearOnStart = ClearOnStart.lookup(params)
val truncateOnDelete = TruncateOnDelete.lookup(params)

val indices = {
val cqEngine = {
Expand Down Expand Up @@ -244,7 +245,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
}
}

KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, topics,
KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, truncateOnDelete, topics,
indices, looseBBox, layerViews, authProvider, audit, metrics, ns)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ object KafkaDataStoreParams extends NamespaceParams {
readWrite = ReadWriteFlag.WriteOnly
)

val TruncateOnDelete =
new GeoMesaParam[java.lang.Boolean](
"kafka.topic.truncate-on-delete",
"Instead of deleting a topic when when removing a schema, truncate the topic by deleting to the latest offset",
default = Boolean.box(false),
readWrite = ReadWriteFlag.WriteOnly
)

val ConsumerReadBack =
new GeoMesaParam[Duration](
"kafka.consumer.read-back",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/

package org.locationtech.geomesa.kafka.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}

import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import scala.collection.JavaConverters._

class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {

/**
* Truncation of a Kafka topic.
*
* @param topic the topic name to truncate
*/
def truncate(topic: String): Unit = {

val topicPartitions = getTopicPartitions(topic)
val latestOffsets = getLatestOffsets(topicPartitions)

val deleteCleanupPolicy = hasDeleteCleanupPolicy(topic)

try {
if (!deleteCleanupPolicy) {
logger.debug(s"adding 'delete' cleanup policy to topic=$topic so it can be truncated.")
addDeleteCleanupPolicy(topic)
}
logger.debug(s"truncate: topic: $topic starting")
deleteRecords(latestOffsets)
logger.debug(s"truncate: topic: $topic completed")
} finally {
if (!deleteCleanupPolicy) {
logger.debug(s"removing 'delete' cleanup policy to topic=$topic.")
removeDeleteCleanupPolicy(topic)
}
}

logger.info(s"$topic truncated.")
}

private def deleteRecords(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Unit = {
val recordsToDelete = generateRecordsToDelete(latestOffsets)
admin.deleteRecords(recordsToDelete.asJava).all().get()
}

/**
* For the given offsets, generate the RecordToDelete objects; this is used with the latest offsets so the topic
* will be truncated.
*/
private def generateRecordsToDelete(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Map[TopicPartition, RecordsToDelete] =
latestOffsets.map {
case (tp, info) => tp -> RecordsToDelete.beforeOffset(info.offset())
}

/**
* for the list of partitions, return the latest offsets.
*/
private def getLatestOffsets(partitions: List[TopicPartition]): Map[TopicPartition, ListOffsetsResultInfo] = {
val input = partitions.map(tp => tp -> OffsetSpec.latest()).toMap
getOffsets(input)
}

/**
* used by getLatestOffsets() and getEarliestOffsets() for obtaining the offsets
*/
private def getOffsets(offsetSpecs: Map[TopicPartition, OffsetSpec]): Map[TopicPartition, ListOffsetsResultInfo] = {
admin.listOffsets(offsetSpecs.asJava).all().get().asScala.toMap
}

/**
* Get all TopicPartitions for the given topic; truncation is performed is performed at the partition level.
*/
private def getTopicPartitions(topic: String): List[TopicPartition] = {
val topicInfo = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
}

/**
* Check if the topic has the 'delete' cleanup policy.
*/
private def hasDeleteCleanupPolicy(topicName: String): Boolean = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName)
val configsResult = admin.describeConfigs(Collections.singleton(configResource))
val config = configsResult.all().get().get(configResource)

config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().contains(TopicConfig.CLEANUP_POLICY_DELETE)
}

/**
* Add the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def addDeleteCleanupPolicy(topic: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.APPEND)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
}

/**
* Remove the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def removeDeleteCleanupPolicy(topic: String): Unit = {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.SUBTRACT)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
}

/**
* Singleton wrapper for AlertConfigOp.
*/
private def alterConfigOpColl(alterConfigOp: AlterConfigOp): util.Collection[AlterConfigOp] =
Collections.singleton(alterConfigOp)

/**
* Convert ExecutionException to RuntimeException, preserving the underlying cause.
*/
private def convertExecutionException(e: ExecutionException): RuntimeException = {
val cause = Option(e.getCause).getOrElse(e)
new RuntimeException(cause.getMessage, cause)
}
}

object KafkaTruncateTopic {
def apply(admin: Admin): KafkaTruncateTopic = new KafkaTruncateTopic(admin)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ package org.locationtech.geomesa.kafka.data
import org.apache.commons.lang3.StringUtils
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListOffsetsResult, NewTopic, OffsetSpec, TopicDescription}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.serialization.StringSerializer
import org.geotools.api.data._
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
Expand Down Expand Up @@ -188,6 +192,82 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}

"allow schemas to be created and truncated" >> {

foreach(Seq(true, false)) { zk =>
TableBasedMetadata.Expiry.threadLocalValue.set("10ms")
val (producer, consumer, _) = try {
createStorePair(if (zk) {
Map(
"kafka.zookeepers" -> zookeepers,
"kafka.topic.truncate-on-delete" -> "true"
)
} else {
Map(
"kafka.topic.truncate-on-delete" -> "true"
)
})
} finally {
TableBasedMetadata.Expiry.threadLocalValue.remove()
}
consumer must not(beNull)
producer must not(beNull)

try {
val sft = SimpleFeatureTypes.createImmutableType("kafka", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326;geomesa.foo='bar'")
producer.createSchema(sft)
consumer.metadata.resetCache()
foreach(Seq(producer, consumer)) { ds =>
ds.getTypeNames.toSeq mustEqual Seq(sft.getTypeName)
val schema = ds.getSchema(sft.getTypeName)
schema must not(beNull)
schema mustEqual sft
}
val topic = producer.getSchema(sft.getTypeName).getUserData.get(KafkaDataStore.TopicKey).asInstanceOf[String]

val props = Collections.singletonMap[String, AnyRef](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
WithClose(AdminClient.create(props)) { admin =>
admin.listTopics().names().get.asScala must contain(topic)
}


val pprops = Map[String, Object](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
).asJava
WithClose(new KafkaProducer[String, String](pprops)) { p =>
p.send(new ProducerRecord(topic, null, "dummy1"))
p.send(new ProducerRecord(topic, null, "dummy2"))
p.flush()
}

consumer.removeSchema(sft.getTypeName)
foreach(Seq(consumer, producer)) { ds =>
eventually(40, 100.millis)(ds.getTypeNames.toSeq must beEmpty)
ds.getSchema(sft.getTypeName) must beNull
}
WithClose(AdminClient.create(props)) { admin =>
// topic is not deleted, so it should remain.
admin.listTopics().names().get.asScala must (contain(topic))

var topicInfo: TopicDescription = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
val pl: Seq[TopicPartition] = topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
val e: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.earliest()).toMap.asJava).all().get().asScala.toMap
val l: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.latest()).toMap.asJava).all().get().asScala.toMap

// the earliest offset must not match the latest offset, since all others were deleted, when the topic was truncated.
e(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)
l(new TopicPartition(topic, 0)).offset() must beGreaterThan(1L)

}
} finally {
consumer.dispose()
producer.dispose()
}
}
}

"support multiple stores creating schemas on the same catalog topic" >> {
val (producer, consumer, sft) = createStorePair()
val sft2 = SimpleFeatureTypes.renameSft(sft, "consumer")
Expand Down
Loading