Skip to content

Start partition producer in get metadata function #636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 26, 2025

Conversation

wiserfz
Copy link
Contributor

@wiserfz wiserfz commented Jun 25, 2025

Close #635

@wiserfz wiserfz force-pushed the master branch 4 times, most recently from f42c4e3 to 8d26440 Compare June 26, 2025 06:36
@wiserfz
Copy link
Contributor Author

wiserfz commented Jul 20, 2025

@zmstone hi, how about this PR

@zmstone
Copy link
Contributor

zmstone commented Jul 21, 2025

@wiserfz

thank you for the pr. i just got back from vacation last week, slowly catching up with the missed activities.
I'll have a closer look soon during this week.

@wiserfz wiserfz closed this Jul 22, 2025
@wiserfz wiserfz reopened this Jul 22, 2025
@wiserfz wiserfz force-pushed the master branch 2 times, most recently from 0e1f3a6 to 8287fef Compare July 22, 2025 12:03
@@ -1,5 +1,8 @@
# Changelog

- 4.4.4
- Start supervisor process for the new increase partitions at `do_get_metadata` function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the goal is to automatically start producers for newly discovered partitions, maybe brod_client should start a timer to periodically refresh partition counts for each topic which has a producers_sup started

Copy link
Contributor Author

@wiserfz wiserfz Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of this, I do really consider of this, maybe should start a process and automatically get kafka metadata then start new partition producer, and also I noticed the PR of #623 which already implemented this feature.

But, there are two reasons lead me open this PR:

  1. The bug of function brod_client:get_metadata/2, this function only modify the number of topic partition in ETS table, but not start the partition producer.
  2. In my situation, I get kafka metadata and calculate the partition index which I need to produce the message; use Support Create Partitions #623 feature, there may be a jet lag that the partition producer is not start when I need to produce.

Of course, #623 feature is good and I think it's not conflict with this PR.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think i can agree to this.
but should also stop producers if some partitions are deleted ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think i can agree to this. but should also stop producers if some partitions are deleted ?

kafka topic partitions only can be increased, it's can't be reduced, so it doesn't need to be stopped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i'm referring to the case when a topic is deleted then created.

Copy link
Contributor

@zmstone zmstone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please kindly add some tests

@@ -414,7 +415,15 @@ handle_call({get_transactional_coordinator, TransactionId}, _From, State) ->
{Result, NewState} = do_get_transactional_coordinator(State, TransactionId),
{reply, Result, NewState};
handle_call({start_producer, TopicName, ProducerConfig}, _From, State) ->
{Reply, NewState} = do_start_producer(TopicName, ProducerConfig, State),
%% NOTE: Store ProducerConfig into the config in state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer config is per topic, it is already stored in supervisor child spec.
we can port get_childspec to brod_supervisor3

https://github.com/erlang/otp/blob/ec6323dd5065203737b0a48d074f116f0c1a2719/lib/stdlib/src/supervisor.erl#L752-L759

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1,5 +1,8 @@
# Changelog

- 4.4.4
- Start supervisor process for the new increase partitions at `do_get_metadata` function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i think i can agree to this.
but should also stop producers if some partitions are deleted ?


-spec started_producers(pid()) -> #{topic() => non_neg_integer()}.
started_producers(ProducerSup) ->
Producers = brod_supervisor3:which_children(ProducerSup),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

brod_supervisor3:which_children(ProducerSup) will give you the per-topic supervisors which supervises the per-partition workers

i think you'll need to do it like this:

started_producers(ProducerSup) ->
  TopicSups = which_producers(ProducerSup),
  lists:foldl(
    fun({Topic, Pid, _Type, _Mods}, Acc) when is_pid(Pid) ->
      PartitionWorkers = which_producers(Pid),
      case length(PartitionWorkers) of
        0 -> Acc;
        L -> Acc#{Topic => L}
    (_) ->
      Acc
    end, TopicSups).

and this

which_producers(Sup) ->
  try
    brod_supervisor3:which_children(Sup)
  catch
    _:_ ->
     []
  end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@wiserfz wiserfz force-pushed the master branch 2 times, most recently from e9cbc7a to 65981a2 Compare July 25, 2025 09:52
@wiserfz
Copy link
Contributor Author

wiserfz commented Jul 25, 2025

please kindly add some tests

Done.

Copy link
Contributor

@zmstone zmstone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good to merge, however, it would be nice to have another PR to address decreased partition count.

@zmstone zmstone merged commit 7a2bc1c into kafka4beam:master Jul 26, 2025
9 checks passed
@wiserfz
Copy link
Contributor Author

wiserfz commented Jul 28, 2025

Thank you for merge this PR.

Would you release a tag for this feature?

@zmstone
Copy link
Contributor

zmstone commented Jul 30, 2025

released 4.4.5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Produce message to increased partition failed
2 participants