ConsumerStrategy
is a contract to create Kafka Consumers in a Spark Streaming application that allows for their custom configuration after the consumers have been created.
Note
|
Kafka consumers read records from topic partitions in a Kafka cluster. |
ConsumerStrategy[K, V]
is an abstract class with two methods, i.e. executorKafkaParams and onStart.
Consumer Strategy | DirectKafkaInputDStream Usage |
---|---|
Used when a |
|
Used to create a Kafka consumer (in |
The following table are the Kafka Consumer strategies currently available in Spark 2.0.
Consumer Strategy | Description |
---|---|
You can access the predefined ConsumerStrategy
implementations using ConsumerStrategies factory object.
import org.apache.spark.streaming.kafka010.ConsumerStrategies
val topics = List("topic1")
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-notes",
"auto.offset.reset" -> "earliest"
)
import org.apache.kafka.common.TopicPartition
val offsets = Map(new TopicPartition("topic3", 0) -> 2L)
val subscribeStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
class Assign[K, V](
topicPartitions: java.util.Collection[TopicPartition],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]
Assign
returns the input kafkaParams
directly from executorKafkaParams method.
For onStart
, Assign
creates a KafkaConsumer
(with kafkaParams
) and explicitly assigns the list of partitions topicPartitions
to this consumer (using Kafka’s KafkaConsumer.assign method). It then overrides the fetch offsets that the consumer will use (on the next poll) to onStart
's input currentOffsets
or offsets
whatever is not empty (using Kafka’s KafkaConsumer.seek method).
class Subscribe[K, V](
topics: java.util.Collection[jl.String],
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]
Subscribe
returns the input kafkaParams
directly from executorKafkaParams method.
For onStart
, Subscribe
creates a KafkaConsumer
(with kafkaParams
) and subscribes to topics
(using Kafka’s KafkaConsumer.subscribe method). For non-empty currentOffsets
or offsets
(whatever is not empty in that order), onStart
polls data for topics or partitions (using Kafka’s KafkaConsumer.poll method). It then overrides the fetch offsets that the consumer will use (on the next poll) to onStart
's input currentOffsets
or offsets
whatever is not empty (using Kafka’s KafkaConsumer.seek method).
Tip
|
You can suppress Kafka’s NoOffsetForPartitionException with Kafka’s auto.offset.reset setting set to NONE in kafkaParams .
|
In case of Kafka’s NoOffsetForPartitionException
with exception suppression enabled, you can see the following WARN message in the logs:
WARN Catching NoOffsetForPartitionException since auto.offset.reset is none. See KAFKA-3370
Tip
|
Read through KAFKA-3370: Add options to auto.offset.reset to reset offsets upon initialization only |
??? FIXME Example with the WARN above
class SubscribePattern[K, V](
pattern: java.util.regex.Pattern,
kafkaParams: java.util.Map[String, Object],
offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]
SubscribePattern
returns the input kafkaParams
directly from executorKafkaParams method.
For onStart
, SubscribePattern
creates a KafkaConsumer
(with kafkaParams
) and subscribes to pattern
topics with Kafka’s internal NoOpConsumerRebalanceListener
(using Kafka’s KafkaConsumer.subscribe method).
Note
|
The only difference between SubscribePattern and Subscribe Consumer strategies is the use of Kafka’s KafkaConsumer.subscribe(Collection, ConsumerRebalanceListener) and KafkaConsumer.subscribe(Collection) methods, respectively. |