Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,13 @@ class KafkaDataStore(

WithClose(AdminClient.create(props)) { admin =>
if (admin.listTopics().names().get.contains(topic)) {
admin.deleteTopics(Collections.singletonList(topic)).all().get
if (config.onSchemaDeleteTruncate) {
logger.info(s"truncating $topic")
KafkaTruncateTopic(admin).truncate(topic)
} else {
logger.info(s"deleting $topic")
admin.deleteTopics(Collections.singletonList(topic)).all().get
}
} else {
logger.warn(s"Topic [$topic] does not exist, can't delete it")
}
Expand Down Expand Up @@ -510,6 +516,7 @@ object KafkaDataStore extends LazyLogging {
consumers: ConsumerConfig,
producers: ProducerConfig,
clearOnStart: Boolean,
onSchemaDeleteTruncate: Boolean,
topics: TopicConfig,
indices: IndexConfig,
looseBBox: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
KafkaDataStore.ProducerConfig(props)
}
val clearOnStart = ClearOnStart.lookup(params)
val onSchemaDeleteTruncate = OnSchemaDeleteTruncate.lookup(params)

val indices = {
val cqEngine = {
Expand Down Expand Up @@ -244,7 +245,7 @@ object KafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogging {
}
}

KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, topics,
KafkaDataStoreConfig(catalog, brokers, zookeepers, consumers, producers, clearOnStart, onSchemaDeleteTruncate, topics,
indices, looseBBox, layerViews, authProvider, audit, metrics, ns)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ object KafkaDataStoreParams extends NamespaceParams {
readWrite = ReadWriteFlag.WriteOnly
)

val OnSchemaDeleteTruncate =
new GeoMesaParam[java.lang.Boolean](
"kafka.catalog.on-schema-delete-truncate",
"Do note delete and recreate a topic on a schema change, just truncate the topic by issuing delete to (latest) offset command",
default = Boolean.box(false),
readWrite = ReadWriteFlag.WriteOnly
)

val ConsumerReadBack =
new GeoMesaParam[Duration](
"kafka.consumer.read-back",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/

package org.locationtech.geomesa.kafka.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.locationtech.geomesa.kafka.data.KafkaTruncateTopic.{CLEANUP_POLICY, CLEANUP_POLICY_DELETE}

import java.util
import java.util.Collections
import java.util.concurrent.ExecutionException
import scala.collection.JavaConverters._

class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {

/**
* Truncation of a Kafka topic.
*
* @param topic the topic name to truncate
*/
def truncate(topic: String): Unit = {

val topicPartitions = getTopicPartitions(topic)
val earliestOffsets = getEarliestOffsets(topicPartitions)
val latestOffsets = getLatestOffsets(topicPartitions)

val count = numberOfMessages(topic, earliestOffsets, latestOffsets)

if (count == 0) {
logger.debug(s"truncate: topic: $topic has no messages to delete")
return
}

logger.debug(s"truncate: topic: $topic has $count messages to be deleted over ${topicPartitions.size} partitions")

val deleteCleanupPolicy = hasDeleteCleanupPolicy(topic)

try {
if (!deleteCleanupPolicy) {
logger.debug(s"adding 'delete' cleanup policy to topic=$topic so it can be truncated.")
addDeleteCleanupPolicy(topic)
}
logger.debug(s"truncate: topic: $topic starting")
deleteRecords(latestOffsets)
logger.debug(s"truncate: topic: $topic completed")
} finally {
if (!deleteCleanupPolicy) {
logger.debug(s"removing 'delete' cleanup policy to topic=$topic.")
removeDeleteCleanupPolicy(topic)
}
}

logger.info(s"$topic truncated.")

}

/**
* return the number of messages over all partitions for the given topic.
*/
private def numberOfMessages(topic: String,
earliestOffsets: Map[TopicPartition, ListOffsetsResultInfo],
latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Long =
getTopicPartitions(topic).map { tp =>
val earliestOffset = earliestOffsets(tp).offset()
val latestOffset = latestOffsets(tp).offset()
latestOffset - earliestOffset
}.sum

private def deleteRecords(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Unit =
try {
val recordsToDelete = generateRecordsToDelete(latestOffsets)
admin.deleteRecords(recordsToDelete.asJava).all().get()
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("the deleteRecords operation was interrupted, aborting; it may have still completed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* For the given offsets, generate the RecordToDelete objects; this is used with the latest offsets so the topic
* will be truncated.
*/
private def generateRecordsToDelete(latestOffsets: Map[TopicPartition, ListOffsetsResultInfo]): Map[TopicPartition, RecordsToDelete] =
latestOffsets.map {
case (tp, info) => tp -> RecordsToDelete.beforeOffset(info.offset())
}

/**
* for the list of partitions, return the latest offsets.
*/
private def getLatestOffsets(partitions: List[TopicPartition]): Map[TopicPartition, ListOffsetsResultInfo] = {
val input = partitions.map(tp => tp -> OffsetSpec.latest()).toMap
getOffsets(input)
}

/**
* for the list of partitions, return the earliest offsets.
*/
private def getEarliestOffsets(partitions: List[TopicPartition]): Map[TopicPartition, ListOffsetsResultInfo] = {
val input = partitions.map(tp => tp -> OffsetSpec.earliest()).toMap
getOffsets(input)
}

/**
* used by getLatestOffsets() and getEarliestOffsets() for obtaining the offsets
*/
private def getOffsets(offsetSpecs: Map[TopicPartition, OffsetSpec]): Map[TopicPartition, ListOffsetsResultInfo] =
try {
admin.listOffsets(offsetSpecs.asJava).all().get().asScala.toMap
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("listOffsets operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Get all TopicPartitions for the given topic; truncation is performed is performed at the partition level.
*/
private def getTopicPartitions(topic: String): List[TopicPartition] =
try {
val topicInfo = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("describeTopics operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Check if the topic has the 'delete' cleanup policy.
*/
private def hasDeleteCleanupPolicy(topicName: String): Boolean =
try {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName)
val configsResult = admin.describeConfigs(Collections.singleton(configResource))
val config = configsResult.all().get().get(configResource)

config.get(TopicConfig.CLEANUP_POLICY_CONFIG).value().contains(CLEANUP_POLICY_DELETE)
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("describeTopics operation interrupted, deleteRecords will not executed.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Add the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def addDeleteCleanupPolicy(topic: String): Unit =
try {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(CLEANUP_POLICY, CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.APPEND)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("incrementalAlterConfigs operation interrupted, ...")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Remove the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
*/
private def removeDeleteCleanupPolicy(topic: String): Unit =
try {
val configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic)
val alterConfigOp = new AlterConfigOp(new ConfigEntry(CLEANUP_POLICY, CLEANUP_POLICY_DELETE), AlterConfigOp.OpType.SUBTRACT)
val configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]] = Map(configResource -> alterConfigOpColl(alterConfigOp)).asJava
admin.incrementalAlterConfigs(configs).all().get()
} catch {
case _: InterruptedException =>
Thread.currentThread().interrupt()
throw new RuntimeException("incrementalAlterConfigs operation interrupted, was not able to remove the 'delete' cleanup.policy.")
case e: ExecutionException =>
throw convertExecutionException(e)
}

/**
* Singleton wrapper for AlertConfigOp.
*/
private def alterConfigOpColl(alterConfigOp: AlterConfigOp): util.Collection[AlterConfigOp] =
Collections.singleton(alterConfigOp)

/**
* Convert ExecutionException to RuntimeException, preserving the underlying cause.
*/
private def convertExecutionException(e: ExecutionException): RuntimeException = {
val cause = Option(e.getCause).getOrElse(e)
new RuntimeException(cause.getMessage, cause)
}
}

object KafkaTruncateTopic {
val CLEANUP_POLICY = "cleanup.policy"
val CLEANUP_POLICY_DELETE = "delete"

def apply(admin: Admin): KafkaTruncateTopic = new KafkaTruncateTopic(admin)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ package org.locationtech.geomesa.kafka.data
import org.apache.commons.lang3.StringUtils
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, ListOffsetsResult, NewTopic, OffsetSpec, TopicDescription}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.serialization.StringSerializer
import org.geotools.api.data._
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
Expand Down Expand Up @@ -188,6 +192,86 @@ class KafkaDataStoreTest extends KafkaContainerTest with Mockito {
}
}

"allow schemas to be created and truncated" >> {

foreach(Seq(true, false)) { zk =>
TableBasedMetadata.Expiry.threadLocalValue.set("10ms")
val (producer, consumer, _) = try {
createStorePair(if (zk) {
Map(
"kafka.zookeepers" -> zookeepers,
"kafka.catalog.on-schema-delete-truncate" -> "true"
)
} else {
Map(
"kafka.catalog.on-schema-delete-truncate" -> "true"
)
})
} finally {
TableBasedMetadata.Expiry.threadLocalValue.remove()
}
consumer must not(beNull)
producer must not(beNull)

try {
val sft = SimpleFeatureTypes.createImmutableType("kafka", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326;geomesa.foo='bar'")
val topic = s"${producer.config.catalog}-${sft.getTypeName}".replaceAll("/", "-")

producer.createSchema(sft)

consumer.metadata.resetCache()
foreach(Seq(producer, consumer)) { ds =>
ds.getTypeNames.toSeq mustEqual Seq(sft.getTypeName)
val schema = ds.getSchema(sft.getTypeName)
schema must not(beNull)
schema mustEqual sft
schema.getUserData.get("geomesa.foo") mustEqual "bar"
schema.getUserData.get(KafkaDataStore.TopicKey) mustEqual topic
}

val props = Collections.singletonMap[String, AnyRef](AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
WithClose(AdminClient.create(props)) { admin =>
admin.listTopics().names().get.asScala must contain(topic)
}


val pprops = Map[String, Object](
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
).asJava
WithClose(new KafkaProducer[String, String](pprops)) { p =>
p.send(new ProducerRecord(topic, null, "dummy1"))
p.send(new ProducerRecord(topic, null, "dummy2"))
p.flush()
}

consumer.removeSchema(sft.getTypeName)
foreach(Seq(consumer, producer)) { ds =>
eventually(40, 100.millis)(ds.getTypeNames.toSeq must beEmpty)
ds.getSchema(sft.getTypeName) must beNull
}
WithClose(AdminClient.create(props)) { admin =>
// topic is not deleted, so it should remain.
admin.listTopics().names().get.asScala must (contain(topic))

var topicInfo: TopicDescription = admin.describeTopics(Collections.singleton(topic)).allTopicNames().get().get(topic)
val pl: Seq[TopicPartition] = topicInfo.partitions().asScala.map(info => new TopicPartition(topic, info.partition())).toList
val e: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.earliest()).toMap.asJava).all().get().asScala.toMap
val l: Map[TopicPartition, ListOffsetsResult.ListOffsetsResultInfo] = admin.listOffsets(pl.map(tp => tp -> OffsetSpec.latest()).toMap.asJava).all().get().asScala.toMap

// the earliest offset must not match the latest offset, since all others were deleted, when the topic was truncated.
e(new TopicPartition(topic, 0)).offset() mustEqual 2
l(new TopicPartition(topic, 0)).offset() mustEqual 2

}
} finally {
consumer.dispose()
producer.dispose()
}
}
}

"support multiple stores creating schemas on the same catalog topic" >> {
val (producer, consumer, sft) = createStorePair()
val sft2 = SimpleFeatureTypes.renameSft(sft, "consumer")
Expand Down
Loading