Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using KafkaActor Producer to push Messages #131

Open
joesan opened this issue Jan 19, 2018 · 7 comments
Open

Using KafkaActor Producer to push Messages #131

joesan opened this issue Jan 19, 2018 · 7 comments

Comments

@joesan
Copy link

joesan commented Jan 19, 2018

I can see some documentation on how to use the KafkaActor Consumer, but there is very little information on how to use the KafkaActor Producer. I want to push a message that is contained as a case class into a Topic. For this purpose (Just to see how it might work), I just copied one of the tests and tried to run it:

"KafkaProducerActor" should "write a given batch to Kafka 1" in {
  case class MyDummy(s: String, i: Int)
  val topic = randomString
  val probe = TestProbe()
  val producer = system.actorOf(KafkaProducerActor.props(producerConf))
  val batch: Seq[ProducerRecord[String, MyDummy]] = Seq(
    KafkaProducerRecord(topic, MyDummy("foo", 1)),
    KafkaProducerRecord(topic, "key", MyDummy("value", 2)),
    KafkaProducerRecord(topic, MyDummy("bar", 3)))
  val message = ProducerRecords(batch, Some('response))

  probe.send(producer, message)

  probe.expectMsg('response)

  val results = consumeFromTopic(topic, 3, 10000)

  results.head shouldEqual ((None, "foo"))
  results(1) shouldEqual ((Some("key"), "value"))
  results(2) shouldEqual ((None, "bar"))
}

So as it can be seen that I want to push the MyDummy into the Kafka topic, but I get the following errors:

Error:(60, 34) No TypeTag available for MyDummy
    val message = ProducerRecords(batch, Some('response))
Error:(60, 34) not enough arguments for method apply: (implicit evidence$13: reflect.runtime.universe.TypeTag[String], implicit evidence$14: reflect.runtime.universe.TypeTag[MyDummy])cakesolutions.kafka.akka.ProducerRecords[String,MyDummy] in object ProducerRecords.
Unspecified value parameter evidence$14.
    val message = ProducerRecords(batch, Some('response))

I'm just not sure how to get past the compiler. Any help?

@simonsouter
Copy link
Contributor

Try moving the definition of the MyDummy case class above the test case definition, to avoid this problem.

@joesan
Copy link
Author

joesan commented Jan 20, 2018

Cool! That got rid of the problem. But is there an explanation as to how this solves the problem? Curious to know?

@simonsouter
Copy link
Contributor

Not exactly sure. The TypeTag feature relies on reflection and implicits, which are prone to this kind of weirdness...

@joesan
Copy link
Author

joesan commented Jan 22, 2018

How could I understand the TypeTaggedTrait? The code looks super simple and awesome but I can't reason out what this trait is actually doing! Could you please give me a short explanation?

@jpallari
Copy link
Contributor

The TypeTaggedTrait is there to add the type tag derived from the extending class as a value. This allows preserving the type information in the runtime that would be otherwise lost due to type erasure. This is especially nice when combined with Akka actors where each message loses its type information. The problem that TypeTaggedTrait has is that it doesn't deal very well with inheritance: the cast operation matches on exact type rather than being aware of the type hierarchy.

Here's some additional reading on the subject, if you're interested: https://www.cakesolutions.net/teamblogs/ways-to-pattern-match-generic-types-in-scala

@joesan
Copy link
Author

joesan commented Jan 23, 2018

I do understand the TypeTag in Scala and why it is needed. I wanted to understand what roles does the TypeTaggedTrait play in the application. For example., the ProducerRecord usage is like this:

final case class ProducerRecords[Key: TypeTag, Value: TypeTag](
  records: Iterable[ProducerRecord[Key, Value]],
  successResponse: Option[Any] = None,
  failureResponse: Option[Any] = None
) extends TypeTagged[ProducerRecords[Key, Value]]

How does each of the methods implemented in the TypeTaggedTrait relate to the ProducerRecords?

@jpallari
Copy link
Contributor

It's there so that the actors that receive those records (in this case the producer actor), can pattern match on only ProducerRecords[MyKey, MyString] and not on any other ProducerRecords with different type parameters.

The pattern match for the producer actor is effectively a function from Any => Option[ProducerRecords[MyKey, MyString] where MyKey and MyString are any custom type parameters. If the input is of type TypeTaggedTrait, then we know that it has a cast method. We can use that cast method to attempt to cast the value to ProducerRecords[MyKey, MyString], which will work if only if the value is of type ProducerRecords and it has the MyKey and MyString type parameters. Otherwise it will return None, i.e. it's not a match. See TypeTaggedExtractor for the implementation details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants