Skip to content

Commit

Permalink
[KIP-460] ElectLeaders implementation public API changes and document…
Browse files Browse the repository at this point in the history
…ation improvement (#1833)

- ElectionType confluent_kafka as in Java
- dictionary value is a KafkaError as it's not thrown
- ElectionType should be added to documentation .rst
- docstring changes
- tests without brokers with successful validation that leads to a timeout
- example elects leaders for all partitions in case no partition is specified
  • Loading branch information
emasab authored Oct 10, 2024
1 parent 7085708 commit 979343a
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 74 deletions.
10 changes: 10 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Supporting classes
- :ref:`ConsumerGroupTopicPartitions <pythonclient_consumer_group_topic_partition>`
- :ref:`ConsumerGroupState <pythonclient_consumer_group_state>`
- :ref:`Uuid <pythonclient_uuid>`
- :ref:`ElectionType <pythonclient_election_type>`

- Errors:
- :ref:`KafkaError <pythonclient_kafkaerror>`
Expand Down Expand Up @@ -701,6 +702,15 @@ Uuid
.. autoclass:: confluent_kafka.Uuid
:members:

.. _pythonclient_election_type:

************
ElectionType
************

.. autoclass:: confluent_kafka.ElectionType
:members:

.. _serde_field:

************
Expand Down
17 changes: 12 additions & 5 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition, ConsumerGroupState, TopicCollection,
IsolationLevel)
IsolationLevel, ElectionType)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion,
OffsetSpec, ElectionType)
OffsetSpec)
import sys
import threading
import logging
Expand Down Expand Up @@ -893,17 +893,24 @@ def example_elect_leaders(a, args):
for topic, partition in zip(args[1::2], args[2::2]):
partitions.append(TopicPartition(topic, int(partition)))

if len(partitions) == 0:
# When passing None as partitions, election is triggered for
# all partitions in the cluster
partitions = None

f = a.elect_leaders(election_type, partitions)
try:
results = f.result()
for partition, exception in results.items():
if exception is None:
print(f"Elect leaders call returned {len(results)} result(s):")
for partition, error in results.items():
if error is None:
print(f"Leader Election Successful for topic: '{partition.topic}'" +
f" partition: '{partition.partition}'")
else:
print(
"Leader Election Failed for topic: " +
f"'{partition.topic}' partition: '{partition.partition}': {exception}")
f"'{partition.topic}' partition: '{partition.partition}' " +
f"error code: {error.code()} error message: {error.str()}")
except KafkaException as e:
print(f"Error electing leaders: {e}")

Expand Down
3 changes: 2 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
ConsumerGroupState,
TopicCollection,
TopicPartitionInfo,
IsolationLevel)
IsolationLevel,
ElectionType)

from .cimpl import (Producer,
Consumer,
Expand Down
19 changes: 19 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,22 @@ def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value


class ElectionType(Enum):
"""
Enumerates the different types of leader elections.
Values:
-------
"""

#: Preferred election
PREFERRED = cimpl.ELECTION_TYPE_PREFERRED
#: Unclean election
UNCLEAN = cimpl.ELECTION_TYPE_UNCLEAN

def __lt__(self, other):
if self.__class__ != other.__class__:
return NotImplemented
return self.value < other.value
9 changes: 4 additions & 5 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@
from ._listoffsets import (OffsetSpec, # noqa: F401
ListOffsetsResultInfo)

from ._election import (ElectionType) # noqa: F401

from ._records import DeletedRecords # noqa: F401

from .._model import TopicCollection as _TopicCollection
from .._model import (TopicCollection as _TopicCollection,
ElectionType as _ElectionType)

from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
Expand Down Expand Up @@ -552,7 +551,7 @@ def _check_delete_records(request):

@staticmethod
def _check_elect_leaders(election_type, partitions):
if not isinstance(election_type, ElectionType):
if not isinstance(election_type, _ElectionType):
raise TypeError("Expected 'election_type' to be of type 'ElectionType'")
if partitions is not None:
if not isinstance(partitions, list):
Expand Down Expand Up @@ -1280,7 +1279,7 @@ def delete_records(self, topic_partition_offsets, **kwargs):
def elect_leaders(self, election_type, partitions=None, **kwargs):
"""
Perform Preferred or Unclean leader election for
all the specified topic partitions.
all the specified partitions or all partitions in the cluster.
:param ElectionType election_type: The type of election to perform.
:param List[TopicPartition]|None partitions: The topic partitions to perform
Expand Down
29 changes: 0 additions & 29 deletions src/confluent_kafka/admin/_election.py

This file was deleted.

2 changes: 1 addition & 1 deletion src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,7 @@ const char Admin_elect_leaders_doc[] = PyDoc_STR(
"future, [request_timeout, operation_timeout])\n"
"\n"
" Perform Preferred or Unclean election for the specified "
"Topic Partitions.\n"
"partion or all partition in the cluster.\n"
"\n"
" This method should not be used directly, use "
"confluent_kafka.AdminClient.elect_leaders()\n");
Expand Down
31 changes: 2 additions & 29 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -408,33 +408,6 @@ static void cfl_PyErr_Fatal (rd_kafka_resp_err_t err, const char *reason) {
PyErr_SetObject(KafkaException, eo);
}

/**
* @brief Creates a KafkaException from error code and error string.
*/
PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str) {
if (err) {
PyObject *excargs , *exc;
PyObject *error = KafkaError_new0(err, str);

excargs = PyTuple_New(1);
PyTuple_SetItem(excargs, 0, error);

exc = ((PyTypeObject *)KafkaException)->tp_new(
(PyTypeObject *)KafkaException, NULL, NULL);
exc->ob_type->tp_init(exc, excargs, NULL);

Py_DECREF(excargs);
Py_DECREF(error);

return exc;
}
else
Py_RETURN_NONE;
}





/****************************************************************************
*
Expand Down Expand Up @@ -1403,8 +1376,8 @@ PyObject *c_topic_partition_result_to_py_dict(
rd_kafka_topic_partition_result_partition(partition_results[i]);
c_error = rd_kafka_topic_partition_result_error(partition_results[i]);

value = KafkaException_new_or_none(rd_kafka_error_code(c_error),
rd_kafka_error_string(c_error));
value = KafkaError_new_or_None(rd_kafka_error_code(c_error),
rd_kafka_error_string(c_error));
key = c_part_to_py(c_topic_partition);

PyDict_SetItem(result, key, value);
Expand Down
1 change: 0 additions & 1 deletion src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ extern PyObject *KafkaException;
PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...);
PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
PyObject *KafkaError_new_from_error_destroy (rd_kafka_error_t *error);
PyObject *KafkaException_new_or_none (rd_kafka_resp_err_t err, const char *str);

/**
* @brief Raise an exception using KafkaError.
Expand Down
9 changes: 6 additions & 3 deletions tests/test_Admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
ResourcePatternType, AclOperation, AclPermissionType, AlterConfigOpType, \
ScramCredentialInfo, ScramMechanism, \
UserScramCredentialAlteration, UserScramCredentialDeletion, \
UserScramCredentialUpsertion, OffsetSpec, \
ElectionType
UserScramCredentialUpsertion, OffsetSpec
from confluent_kafka import KafkaException, KafkaError, libversion, \
TopicPartition, ConsumerGroupTopicPartitions, ConsumerGroupState, \
IsolationLevel, TopicCollection
IsolationLevel, TopicCollection, ElectionType
import concurrent.futures


Expand Down Expand Up @@ -1230,3 +1229,7 @@ def test_elect_leaders():

with pytest.raises(ValueError):
a.elect_leaders(correct_election_type, [incorrect_partitions])

with pytest.raises(KafkaException):
a.elect_leaders(correct_election_type, [correct_partitions])\
.result(timeout=1)

0 comments on commit 979343a

Please sign in to comment.