Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/user/kafka/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Parameter Type Description
until all existing messages are processed. However, feature listeners will still be invoked as
normal. See :ref:`kafka_initial_load`
``kafka.consumer.count`` Integer Number of kafka consumers used per feature type. Set to 0 to disable consuming (i.e. producer only)
``kafka.consumer.offset-commit-interval-ms`` Integer Number of milliseconds to pass before commiting offsets for the consumer group.
``kafka.consumer.offset-commit-interval`` String How often to commit offsets for the consumer group, by default ``10 seconds``
``kafka.consumer.group-prefix`` String Prefix to use for kafka group ID, to more easily identify particular data stores
``kafka.consumer.start-on-demand`` Boolean The default behavior is to start consuming a topic only when that feature type is first requested.
This can reduce load if some layers are never queried. Note that care should be taken when setting
Expand All @@ -41,7 +41,7 @@ Parameter Type Description
``kafka.topic.replication`` Integer Replication factor to use in new kafka topics
``kafka.serialization.type`` String Internal serialization format to use for kafka messages. Must be one of ``kryo``, ``avro``
or ``avro-native``
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. "10 minutes". See :ref:`kafka_expiry`
``kafka.cache.expiry`` String Expire features from in-memory cache after this delay, e.g. ``10 minutes``. See :ref:`kafka_expiry`
``kafka.cache.expiry.dynamic`` String Expire features dynamically based on CQL predicates. See :ref:`kafka_expiry`
``kafka.cache.event-time`` String Instead of message time, determine expiry based on feature data. See :ref:`kafka_event_time`
``kafka.cache.event-time.ordering`` Boolean Instead of message time, determine feature ordering based on the feature event time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
package org.locationtech.geomesa.kafka.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerRecords}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener, ConsumerRecord, ConsumerRecords}
import org.apache.kafka.common.TopicPartition
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.kafka.consumer.ThreadedConsumer
Expand All @@ -25,9 +26,11 @@ import org.locationtech.geomesa.utils.io.CloseWithLogging
import java.io.Closeable
import java.time.Duration
import java.util.Collections
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Future}
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
* Reads from Kafka and populates a `KafkaFeatureCache`.
Expand Down Expand Up @@ -74,28 +77,30 @@ object KafkaCacheLoader extends LazyLogging {
override val cache: KafkaFeatureCache,
consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
topic: String,
frequency: Long,
frequency: Duration,
offsetCommitInterval: FiniteDuration,
serializer: GeoMessageSerializer,
doInitialLoad: Boolean,
initialLoad: Option[scala.concurrent.duration.Duration],
initialLoadConfig: ExpiryTimeConfig,
offsetCommitIntervalMs: Long
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs) with KafkaCacheLoader {
) extends ThreadedConsumer(consumers, frequency, offsetCommitInterval) with KafkaCacheLoader {

try { classOf[ConsumerRecord[Any, Any]].getMethod("timestamp") } catch {
case _: NoSuchMethodException => logger.warn("This version of Kafka doesn't support timestamps, using system time")
}

private val initialLoader =
if (doInitialLoad) {
// for the initial load, don't bother spatially indexing until we have the final state
Some(new InitialLoader(sft, consumers, topic, frequency, offsetCommitIntervalMs, serializer, initialLoadConfig, this))
} else {
None
}
private val initialLoader = initialLoad.map { readBack =>
// for the initial load, don't bother spatially indexing until we have the final state
new InitialLoader(sft, consumers, topic, frequency, offsetCommitInterval, serializer, readBack, initialLoadConfig, this)
}

def start(): Unit = {
initialLoader match {
case None => startConsumers()
case Some(loader) => loader.start()
case None =>
consumers.foreach(KafkaConsumerVersions.subscribe(_, topic))
startConsumers()
case Some(loader) =>
consumers.foreach(KafkaConsumerVersions.subscribe(_, topic, loader))
loader.start()
}
}

Expand All @@ -121,37 +126,91 @@ object KafkaCacheLoader extends LazyLogging {
}

/**
* Handles initial loaded 'from-beginning' without indexing features in the spatial index. Will still
* trigger message events.
*
* @param consumers consumers, won't be closed even on call to 'close()'
* @param topic kafka topic
* @param frequency polling frequency in milliseconds
* @param serializer message serializer
* @param toLoad main cache loader, used for callback when bulk loading is done
*/
* Handles initial loaded 'from-beginning' without indexing features in the spatial index. Will still
* trigger message events.
*
* @param sft simple feature type
* @param consumers consumers, won't be closed even on call to 'close()'
* @param topic kafka topic
* @param frequency polling frequency
* @param offsetCommitInterval how often to commit offsets
* @param serializer message serializer
* @param readBack initial load read back
* @param ordering feature ordering
* @param toLoad main cache loader, used for callback when bulk loading is done
*/
private class InitialLoader(
sft: SimpleFeatureType,
consumers: Seq[Consumer[Array[Byte], Array[Byte]]],
topic: String,
frequency: Long,
offsetCommitIntervalMs: Long,
frequency: Duration,
offsetCommitInterval: FiniteDuration,
serializer: GeoMessageSerializer,
readBack: scala.concurrent.duration.Duration,
ordering: ExpiryTimeConfig,
toLoad: KafkaCacheLoaderImpl
) extends ThreadedConsumer(consumers, Duration.ofMillis(frequency), offsetCommitIntervalMs, false) with Runnable {
) extends ThreadedConsumer(consumers, frequency, offsetCommitInterval, false) with Runnable with ConsumerRebalanceListener {

import scala.collection.JavaConverters._

private val cache = KafkaFeatureCache.nonIndexing(sft, ordering)

// track the offsets that we want to read to
private val offsets = new ConcurrentHashMap[Int, Long]()
private val done = new AtomicBoolean(false)
private val assignment = Collections.newSetFromMap(new ConcurrentHashMap[Int, java.lang.Boolean]())
@volatile
private var done: Boolean = false
private var latch: CountDownLatch = _
@volatile
private var submission: Future[_] = _

override def onPartitionsRevoked(topicPartitions: java.util.Collection[TopicPartition]): Unit = {}

override def onPartitionsAssigned(topicPartitions: java.util.Collection[TopicPartition]): Unit = {
logger.debug(s"Partitions assigned: ${topicPartitions.asScala.mkString(", ")}")
topicPartitions.asScala.foreach { tp =>
if (assignment.add(tp.partition())) {
val consumer = consumers.find(_.assignment().contains(tp)).orNull
if (consumer == null) {
logger.warn("Partition assigned but no consumer contains the assignment")
} else {
KafkaConsumerVersions.pause(consumer, tp)
try {
logger.debug(s"Checking offsets for [${tp.topic()}:${tp.partition()}]")
// the only reliable way we've found to check max offset is to seek to the end and check the position there
consumer.seekToEnd(Collections.singleton(tp))
val end = consumer.position(tp)
logger.debug(s"Setting max offset to [${tp.topic}:${tp.partition}:${end - 1}]")
offsets.put(tp.partition(), end - 1)
if (!readBack.isFinite) {
KafkaConsumerVersions.seekToBeginning(consumer, tp)
} else {
val offset = Try {
val time = System.currentTimeMillis() - readBack.toMillis
KafkaConsumerVersions.offsetsForTimes(consumer, tp.topic, Seq(tp.partition), time).get(tp.partition)
}
offset match {
case Success(Some(o)) =>
logger.debug(s"Seeking to offset $o for read-back $readBack on [${tp.topic}:${tp.partition}]")
consumer.seek(tp, o)

case Success(None) =>
logger.debug(s"No prior offset found for read-back $readBack on [${tp.topic}:${tp.partition}], " +
"reading from head of queue")

case Failure(e) =>
logger.warn(s"Error finding initial offset: [${tp.topic}:${tp.partition}], seeking to beginning", e)
KafkaConsumerVersions.seekToBeginning(consumer, tp)
}
}
} finally {
KafkaConsumerVersions.resume(consumer, tp)
}
}
}
}
}

override protected def createConsumerRunnable(
id: String,
consumer: Consumer[Array[Byte], Array[Byte]],
Expand All @@ -160,7 +219,7 @@ object KafkaCacheLoader extends LazyLogging {
}

override protected def consume(record: ConsumerRecord[Array[Byte], Array[Byte]]): Unit = {
if (done.get) { toLoad.consume(record) } else {
if (done) { toLoad.consume(record) } else {
val headers = RecordVersions.getHeaders(record)
val timestamp = RecordVersions.getTimestamp(record)
val message = serializer.deserialize(record.key, record.value, headers, timestamp)
Expand All @@ -181,27 +240,10 @@ object KafkaCacheLoader extends LazyLogging {
LoaderStatus.startLoad(this)
try {
val partitions = consumers.head.partitionsFor(topic).asScala.map(_.partition)
val doInitialLoad =
try {
// note: these methods are not available in kafka 0.9, which will cause it to fall back to normal loading
val beginningOffsets = KafkaConsumerVersions.beginningOffsets(consumers.head, topic, partitions.toSeq)
val endOffsets = KafkaConsumerVersions.endOffsets(consumers.head, topic, partitions.toSeq)
partitions.exists(p => beginningOffsets.getOrElse(p, 0L) < endOffsets.getOrElse(p, 0L))
} catch {
case e: NoSuchMethodException =>
logger.warn(s"Can't support initial bulk loading for current Kafka version: $e")
false
}
if (doInitialLoad) {
logger.info(s"Starting initial load for [$topic] with ${partitions.size} partitions")
latch = new CountDownLatch(partitions.size)
startConsumers() // kick off the asynchronous consumer threads
submission = CachedThreadPool.submit(this)
} else {
// don't bother spinning up the consumer threads if we don't need to actually bulk load anything
startNormalLoad()
LoaderStatus.completedLoad(this)
}
logger.info(s"Starting initial load for [$topic] with ${partitions.size} partitions")
latch = new CountDownLatch(partitions.size)
startConsumers() // kick off the asynchronous consumer threads
submission = CachedThreadPool.submit(this)
} catch {
case NonFatal(e) =>
LoaderStatus.completedLoad(this)
Expand All @@ -218,11 +260,11 @@ object KafkaCacheLoader extends LazyLogging {
}
// set a flag just in case the consumer threads haven't finished spinning down, so that we will
// pass any additional messages back to the main loader
done.set(true)
done = true
logger.info(s"Finished initial load, transferring to indexed cache for [$topic]")
cache.query(Filter.INCLUDE).foreach(toLoad.cache.put)
logger.info(s"Finished transfer for [$topic]")
startNormalLoad()
logger.info(s"Finished transfer for [$topic], starting normal load")
toLoad.startConsumers()
} finally {
LoaderStatus.completedLoad(this)
}
Expand All @@ -235,16 +277,9 @@ object KafkaCacheLoader extends LazyLogging {
}
}
}
// start the normal loading
private def startNormalLoad(): Unit = {
logger.info(s"Starting normal load for [$topic]")
toLoad.startConsumers()
}

/**
* Consumer runnable for the initial load. It's not possible to check assignments until after poll() has been called,
* and checking endOffsets is not reliable as a high water mark, due to transaction markers or other issues that
* may advance the endOffset past where the consumer can actually reach (unless new messages are written).
* Consumer runnable that tracks when we have completed the initial load
*
* @param id id
* @param consumer consumer
Expand All @@ -253,22 +288,7 @@ object KafkaCacheLoader extends LazyLogging {
private class InitialLoaderConsumerRunnable(id: String, consumer: Consumer[Array[Byte], Array[Byte]], handler: ConsumerErrorHandler)
extends ConsumerRunnable(id, consumer, handler) {

private var checkOffsets = true

override protected def processPoll(result: ConsumerRecords[Array[Byte], Array[Byte]]): Unit = {
if (checkOffsets) {
consumer.assignment().asScala.foreach { tp =>
// the only reliable way we've found to check max offset is to seek to the end and check the position there
val position = consumer.position(tp)
consumer.seekToEnd(Collections.singleton(tp))
val end = consumer.position(tp)
// return to the original position
consumer.seek(tp, position)
logger.debug(s"Setting max offset to [${tp.topic}:${tp.partition}:${end - 1}]")
offsets.put(tp.partition(), end - 1)
}
checkOffsets = false
}
try {
super.processPoll(result)
} finally {
Expand Down
Loading
Loading