Skip to content

MINIFICPP-2542 ConsumeKafka late offset commit #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

martinzink
Copy link
Member

Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with MINIFICPP-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file?
  • If applicable, have you updated the NOTICE file?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible.

@martinzink martinzink force-pushed the kafka_minifi_property branch 2 times, most recently from 01717ed to 6843d8c Compare March 24, 2025 12:34
@martinzink martinzink force-pushed the minifi-api-property branch from 4e6644f to 09156be Compare March 28, 2025 10:21
@martinzink martinzink force-pushed the kafka_minifi_property branch from ce25bb1 to 35010b9 Compare March 28, 2025 14:43
@martinzink martinzink force-pushed the minifi-api-property branch from 732ac0a to 02fbd23 Compare April 2, 2025 08:17
@martinzink martinzink force-pushed the kafka_minifi_property branch from 35010b9 to d85e66f Compare April 2, 2025 12:42
@martinzink martinzink force-pushed the minifi-api-property branch from 02fbd23 to 06ae832 Compare April 2, 2025 12:45
@martinzink martinzink marked this pull request as ready for review April 2, 2025 14:11
@martinzink martinzink force-pushed the kafka_minifi_property branch from d85e66f to f72322b Compare April 2, 2025 14:18
@adamdebreceni adamdebreceni requested a review from Copilot April 7, 2025 08:58
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 30 out of 37 changed files in this pull request and generated no comments.

Files not reviewed (7)
  • cmake/Spdlog.cmake: Language not supported
  • cmake/fmt.cmake: Language not supported
  • docker/requirements.txt: Language not supported
  • docker/test/integration/features/consume_kafka.feature: Language not supported
  • docker/test/integration/features/publish_kafka.feature: Language not supported
  • docker/test/integration/resources/kafka_broker/Dockerfile: Language not supported
  • docker/test/integration/resources/kafka_broker/conf/server.properties: Language not supported
Comments suppressed due to low confidence (1)

extensions/kafka/ConsumeKafka.h:263

  • The relationship name 'Commited' appears to be misspelled; consider renaming it to 'Committed' to improve clarity and consistency.
EXTENSIONAPI static constexpr auto Commited = core::RelationshipDefinition{"commited", "Only when using \"Commit from incoming flowfiles\" policy. Flowfiles that were used for commiting offsets are routed here."};

@martinzink martinzink force-pushed the kafka_minifi_property branch from f72322b to 11ba8f2 Compare April 7, 2025 14:01
@martinzink martinzink requested a review from Copilot April 8, 2025 12:29
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 30 out of 37 changed files in this pull request and generated no comments.

Files not reviewed (7)
  • cmake/Spdlog.cmake: Language not supported
  • cmake/fmt.cmake: Language not supported
  • docker/requirements.txt: Language not supported
  • docker/test/integration/features/consume_kafka.feature: Language not supported
  • docker/test/integration/features/publish_kafka.feature: Language not supported
  • docker/test/integration/resources/kafka_broker/Dockerfile: Language not supported
  • docker/test/integration/resources/kafka_broker/conf/server.properties: Language not supported
Comments suppressed due to low confidence (2)

extensions/kafka/ConsumeKafka.h:273

  • Changing the InputRequirement from INPUT_FORBIDDEN to INPUT_ALLOWED may impact the processor's data flow behavior. Please verify that this change is intentional and that downstream logic can handle incoming flowfiles.
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED;

extensions/kafka/ConsumeKafka.h:274

  • Altering IsSingleThreaded from false to true is a significant concurrency change. Please ensure that the processor logic has been reviewed for thread-safety under single-threaded execution.
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;

@martinzink martinzink force-pushed the kafka_minifi_property branch from 11ba8f2 to 459554d Compare April 9, 2025 11:37
@martinzink martinzink changed the base branch from minifi-api-property to main April 9, 2025 11:37
@martinzink martinzink added the priority Review these first label Apr 11, 2025
@martinzink martinzink force-pushed the kafka_minifi_property branch from 459554d to 92b35b5 Compare April 11, 2025 11:19
Copy link
Member

@szaszm szaszm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending a batch early because I think the renaming of error categories is a mistake: it shows up in error messages, making them unnecessarily longer and probably even more confusing to users. (The error is not a "category", yet "category" is part of the error message.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're patching it anyway, we might as well put the message first and the integer error code in parens, similar to how it was before in SplitTextTests.cpp:195.

@@ -192,7 +192,7 @@ void runSplitTextTest(const std::string& input, const std::vector<ExpectedSplitT

TEST_CASE("Line Split Count property is required") {
SingleProcessorTestController controller{std::make_unique<processors::SplitText>("SplitText")};
REQUIRE_THROWS_WITH(controller.trigger("", {}), "Expected parsable uint64_t from SplitText::Line Split Count: property error: PropertyNotSet (2)");
REQUIRE_THROWS_WITH(controller.trigger("", {}), "Expected parsable uint64_t from \"SplitText::Line Split Count\", but got MiNiFi Property Error Category:2 (PropertyNotSet)");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The category words are unnecessary, and I'm not sure about including MiNiFi either: it's probably obvious from the context, and shorter errors are better when they convey the same meaning.

Comment on lines +264 to +267
EXTENSIONAPI static constexpr auto Committed = core::RelationshipDefinition{"committed",
"Only when using \"Commit from incoming flowfiles\" policy. Flowfiles that were used for commiting offsets are routed here."};
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
"Only when using \"Commit from incoming flowfiles\" policy. Flowfiles that were malformed for commiting offsets are routed here."};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that his is the best design for solving the problem of commit after flow processing. If we need two sets of output relationships based on a configuration enum, which makes the processor function fundamentally differently, then it would probably be better to separate the two.

After all maybe it would be better to separate consume and commit to two separate processors, and move the kafka client to a controller service. Consume step could add the offset to an output property, commit step could use that to commit the consumer up to that offset. Let's discuss this sometime, maybe I'm missing something else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, wasnt sure about it either, we could of course split this into 2 processors, but then we would probably want to write the Consumer from scratch and not reuse this one because the structure and properties would be wastly different then ofcourse we should deprecate this one because we dont want to maintain two almost identitcal versions of it...


void ConsumeKafka::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}

void ConsumeKafka::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
void ConsumeKafka::onSchedule(core::ProcessContext &context, core::ProcessSessionFactory &) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with moving the & and * modifiers from the type name to the identifier. They're part of the type.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority Review these first
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants