From 7e04f4e3c2249bb4a3e73877719c1f50cb387d2f Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Thu, 14 Apr 2016 10:19:29 +0200 Subject: [PATCH] Initial version --- .dir-locals.el | 1 + .gitignore | 13 + Consumer.c | 658 +++++++++++++++++ LICENSE | 202 ++++++ MANIFEST.in | 3 + Makefile | 14 + Producer.c | 516 ++++++++++++++ README.md | 59 ++ confluent_kafka.c | 1283 ++++++++++++++++++++++++++++++++++ confluent_kafka.h | 205 ++++++ docs/Makefile | 177 +++++ docs/conf.py | 262 +++++++ docs/index.rst | 44 ++ docs/make-doc.bat | 242 +++++++ examples/consumer.py | 75 ++ examples/producer.py | 70 ++ integration_test.py | 368 ++++++++++ setup.py | 19 + tests/README.md | 7 + tests/test_Consumer.py | 57 ++ tests/test_Producer.py | 34 + tests/test_TopicPartition.py | 38 + tests/test_docs.py | 28 + tests/test_enums.py | 9 + tests/test_misc.py | 16 + 25 files changed, 4400 insertions(+) create mode 100644 .dir-locals.el create mode 100644 .gitignore create mode 100644 Consumer.c create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 Makefile create mode 100644 Producer.c create mode 100644 README.md create mode 100644 confluent_kafka.c create mode 100644 confluent_kafka.h create mode 100644 docs/Makefile create mode 100644 docs/conf.py create mode 100644 docs/index.rst create mode 100644 docs/make-doc.bat create mode 100755 examples/consumer.py create mode 100755 examples/producer.py create mode 100755 integration_test.py create mode 100644 setup.py create mode 100644 tests/README.md create mode 100644 tests/test_Consumer.py create mode 100644 tests/test_Producer.py create mode 100644 tests/test_TopicPartition.py create mode 100644 tests/test_docs.py create mode 100644 tests/test_enums.py create mode 100644 tests/test_misc.py diff --git a/.dir-locals.el b/.dir-locals.el new file mode 100644 index 000000000..49027dab5 --- /dev/null +++ b/.dir-locals.el @@ -0,0 +1 @@ +( (c-mode . ((c-file-style . "linux"))) ) diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..f91793b9e --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +build +_build +dist +*~ +\#* +MANIFEST +core +vgcore* +__pycache__ +*.so +confluent?kafka.egg-info +*.pyc +.cache diff --git a/Consumer.c b/Consumer.c new file mode 100644 index 000000000..4bd58a848 --- /dev/null +++ b/Consumer.c @@ -0,0 +1,658 @@ +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "confluent_kafka.h" + + +/**************************************************************************** + * + * + * Consumer + * + * + * + * + ****************************************************************************/ + + +static int Consumer_clear (Consumer *self) { + if (self->on_assign) { + Py_DECREF(self->on_assign); + self->on_assign = NULL; + } + if (self->on_revoke) { + Py_DECREF(self->on_revoke); + self->on_revoke = NULL; + } + return 0; +} + +static void Consumer_dealloc (Consumer *self) { + PyObject_GC_UnTrack(self); + + Consumer_clear(self); + + if (self->rk) + rd_kafka_destroy(self->rk); + + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static int Consumer_traverse (Consumer *self, + visitproc visit, void *arg) { + if (self->on_assign) + Py_VISIT(self->on_assign); + if (self->on_revoke) + Py_VISIT(self->on_revoke); + return 0; +} + + + + + + +static PyObject *Consumer_subscribe (Consumer *self, PyObject *args, + PyObject *kwargs) { + + rd_kafka_topic_partition_list_t *topics; + static char *kws[] = { "topics", "on_assign", "on_revoke", NULL }; + PyObject *tlist, *on_assign = NULL, *on_revoke = NULL; + Py_ssize_t pos = 0; + rd_kafka_resp_err_t err; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OO", kws, + &tlist, &on_assign, &on_revoke)) + return NULL; + + if (!PyList_Check(tlist)) { + PyErr_Format(PyExc_TypeError, + "expected list of topic unicode strings"); + return NULL; + } + + if (on_assign && !PyCallable_Check(on_assign)) { + PyErr_Format(PyExc_TypeError, + "on_assign expects a callable"); + return NULL; + } + + if (on_revoke && !PyCallable_Check(on_revoke)) { + PyErr_Format(PyExc_TypeError, + "on_revoke expects a callable"); + return NULL; + } + + topics = rd_kafka_topic_partition_list_new(PyList_Size(tlist)); + for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) { + PyObject *o = PyList_GetItem(tlist, pos); + PyObject *uo; + if (!(uo = cfl_PyObject_Unistr(o))) { + PyErr_Format(PyExc_TypeError, + "expected list of unicode strings"); + rd_kafka_topic_partition_list_destroy(topics); + return NULL; + } + rd_kafka_topic_partition_list_add(topics, + cfl_PyUnistr_AsUTF8(uo), + RD_KAFKA_PARTITION_UA); + Py_DECREF(uo); + } + + err = rd_kafka_subscribe(self->rk, topics); + + rd_kafka_topic_partition_list_destroy(topics); + + if (err) { + cfl_PyErr_Format(err, + "Failed to set subscription: %s", + rd_kafka_err2str(err)); + return NULL; + } + + /* + * Update rebalance callbacks + */ + if (self->on_assign) { + Py_DECREF(self->on_assign); + self->on_assign = NULL; + } + if (on_assign) { + self->on_assign = on_assign; + Py_INCREF(self->on_assign); + } + + if (self->on_revoke) { + Py_DECREF(self->on_revoke); + self->on_revoke = NULL; + } + if (on_revoke) { + self->on_revoke = on_revoke; + Py_INCREF(self->on_revoke); + } + + Py_RETURN_NONE; +} + + +static PyObject *Consumer_unsubscribe (Consumer *self, + PyObject *ignore) { + + rd_kafka_resp_err_t err; + + err = rd_kafka_unsubscribe(self->rk); + if (err) { + cfl_PyErr_Format(err, + "Failed to remove subscription: %s", + rd_kafka_err2str(err)); + return NULL; + } + + Py_RETURN_NONE; +} + + +static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) { + + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + + if (!(c_parts = py_to_c_parts(tlist))) + return NULL; + + self->rebalance_assigned++; + + err = rd_kafka_assign(self->rk, c_parts); + + rd_kafka_topic_partition_list_destroy(c_parts); + + if (err) { + cfl_PyErr_Format(err, + "Failed to set assignemnt: %s", + rd_kafka_err2str(err)); + return NULL; + } + + Py_RETURN_NONE; +} + + +static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) { + + rd_kafka_resp_err_t err; + + self->rebalance_assigned++; + + err = rd_kafka_assign(self->rk, NULL); + if (err) { + cfl_PyErr_Format(err, + "Failed to remove assignment: %s", + rd_kafka_err2str(err)); + return NULL; + } + + Py_RETURN_NONE; +} + + + +static PyObject *Consumer_commit (Consumer *self, PyObject *args, + PyObject *kwargs) { + + rd_kafka_resp_err_t err; + PyObject *msg = NULL, *offsets = NULL, *async_o = NULL; + rd_kafka_topic_partition_list_t *c_offsets; + int async = 1; + static char *kws[] = { "message", "offsets", "async",NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOO", kws, + &msg, &offsets, &async_o)) + return NULL; + + if (msg && offsets) { + PyErr_SetString(PyExc_ValueError, + "message and offsets are mutually exclusive"); + return NULL; + } + + if (async_o) + async = PyObject_IsTrue(async_o); + + + if (offsets) { + + if (!(c_offsets = py_to_c_parts(offsets))) + return NULL; + } else if (msg) { + Message *m; + + if (PyObject_Type((PyObject *)msg) != + (PyObject *)&MessageType) { + PyErr_Format(PyExc_TypeError, + "expected %s", MessageType.tp_name); + return NULL; + } + + m = (Message *)msg; + + c_offsets = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add( + c_offsets, cfl_PyUnistr_AsUTF8(m->topic), + m->partition)->offset =m->offset + 1; + + } else { + c_offsets = NULL; + err = rd_kafka_commit(self->rk, NULL, async); + } + + + err = rd_kafka_commit(self->rk, c_offsets, async); + + if (c_offsets) + rd_kafka_topic_partition_list_destroy(c_offsets); + + + + if (err) { + cfl_PyErr_Format(err, + "Commit failed: %s", rd_kafka_err2str(err)); + return NULL; + } + + Py_RETURN_NONE; +} + + + +static PyObject *Consumer_committed (Consumer *self, PyObject *args, + PyObject *kwargs) { + + PyObject *plist; + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + double tmout = -1.0f; + static char *kws[] = { "partitions", "timeout", NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kws, + &plist, &tmout)) + return NULL; + + + if (!(c_parts = py_to_c_parts(plist))) + return NULL; + + err = rd_kafka_committed(self->rk, c_parts, + tmout >= 0 ? (int)(tmout * 1000.0f) : -1); + + if (err) { + rd_kafka_topic_partition_list_destroy(c_parts); + cfl_PyErr_Format(err, + "Failed to get committed offsets: %s", + rd_kafka_err2str(err)); + return NULL; + } + + + plist = c_parts_to_py(c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + + return plist; +} + + +static PyObject *Consumer_position (Consumer *self, PyObject *args, + PyObject *kwargs) { + + PyObject *plist; + rd_kafka_topic_partition_list_t *c_parts; + rd_kafka_resp_err_t err; + static char *kws[] = { "partitions", NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", kws, + &plist)) + return NULL; + + + if (!(c_parts = py_to_c_parts(plist))) + return NULL; + + err = rd_kafka_position(self->rk, c_parts); + + if (err) { + rd_kafka_topic_partition_list_destroy(c_parts); + cfl_PyErr_Format(err, + "Failed to get position: %s", + rd_kafka_err2str(err)); + return NULL; + } + + + plist = c_parts_to_py(c_parts); + rd_kafka_topic_partition_list_destroy(c_parts); + + return plist; +} + + + +static PyObject *Consumer_poll (Consumer *self, PyObject *args, + PyObject *kwargs) { + double tmout = -1.0f; + static char *kws[] = { "timeout", NULL }; + rd_kafka_message_t *rkm; + PyObject *msgobj; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) + return NULL; + + self->callback_crashed = 0; + self->thread_state = PyEval_SaveThread(); + + rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ? + (int)(tmout * 1000.0f) : -1); + + PyEval_RestoreThread(self->thread_state); + self->thread_state = NULL; + + if (self->callback_crashed) + return NULL; + + if (!rkm) + Py_RETURN_NONE; + + msgobj = Message_new0(rkm); + rd_kafka_message_destroy(rkm); + + return msgobj; +} + + +static PyObject *Consumer_close (Consumer *self, PyObject *ignore) { + self->thread_state = PyEval_SaveThread(); + rd_kafka_consumer_close(self->rk); + PyEval_RestoreThread(self->thread_state); + Py_RETURN_NONE; +} + + +static PyMethodDef Consumer_methods[] = { + { "subscribe", (PyCFunction)Consumer_subscribe, + METH_VARARGS|METH_KEYWORDS, + ".. py:function:: subscribe(topics, [listener=None])\n" + "\n" + " Set subscription to supplied list of topics\n" + " This replaces a previous subscription.\n" + "\n" + " :param list(str) topics: List of topics (strings) to subscribe to.\n" + " :param callable on_assign: callback to provide handling of " + "customized offsets on completion of a successful partition " + "re-assignment.\n" + " :param callable on_revoke: callback to provide handling of " + "offset commits to a customized store on the start of a " + "rebalance operation.\n" + "\n" + " :raises KafkaException:\n" + "\n" + "\n" + ".. py:function:: on_assign(consumer, partitions)\n" + ".. py:function:: on_revoke(consumer, partitions)\n" + "\n" + " :param Consumer consumer: Consumer instance.\n" + " :param list(TopicPartition) partitions: Absolute list of partitions being assigned or revoked.\n" + "\n" + }, + { "unsubscribe", (PyCFunction)Consumer_unsubscribe, METH_NOARGS, + " Remove current subscription.\n" + "\n" + }, + { "poll", (PyCFunction)Consumer_poll, + METH_VARARGS|METH_KEYWORDS, + ".. py:function:: poll([timeout=None])\n" + "\n" + " Consume messages, calls callbacks and returns events.\n" + "\n" + " The application must check the returned :py:class:`Message` " + "object's :py:func:`Message.error()` method to distinguish " + "between proper messages (error() returns None), or an event or " + "error (see error().code() for specifics).\n" + "\n" + " .. note: Callbacks may be called from this method, " + "such as ``on_assign``, ``on_revoke``, et.al.\n" + "\n" + " :param float timeout: Maximum time to block waiting for message, event or callback.\n" + " :returns: A Message object or None on timeout\n" + " :rtype: :py:class:`Message` or None\n" + "\n" + }, + { "assign", (PyCFunction)Consumer_assign, METH_O, + ".. py:function:: assign(partitions)\n" + "\n" + " Set consumer partition assignment to the provided list of " + ":py:class:`TopicPartition` and starts consuming.\n" + "\n" + " :param list(TopicPartition) partitions: List of topic+partitions and optionally initial offsets to start consuming.\n" + "\n" + }, + { "unassign", (PyCFunction)Consumer_unassign, METH_NOARGS, + " Removes the current partition assignment and stops consuming.\n" + "\n" + }, + { "commit", (PyCFunction)Consumer_commit, METH_VARARGS|METH_KEYWORDS, + ".. py:function:: commit([message=None], [offsets=None], [async=True])\n" + "\n" + " Commit a message or a list of offsets.\n" + "\n" + " ``message`` and ``offsets`` are mutually exclusive, if neither is set " + "the current partition assignment's offsets are used instead.\n" + "\n" + " :param confluent_kafka.Message message: Commit message's offset+1.\n" + " :param list(TopicPartition) offsets: List of topic+partitions+offsets to commit.\n" + " :param bool async: Asynchronous commit, return immediately.\n" + " :rtype: None\n" + " :raises: KafkaException\n" + "\n" + }, + { "committed", (PyCFunction)Consumer_committed, + METH_VARARGS|METH_KEYWORDS, + ".. py:function:: committed(partitions, [timeout=None])\n" + "\n" + " Retrieve committed offsets for the list of partitions.\n" + "\n" + " :param list(TopicPartition) partitions: List of topic+partitions " + "to query for stored offsets.\n" + " :param float timeout: Request timeout\n" + " :returns: List of topic+partitions with offset and possibly error set.\n" + " :rtype: list(TopicPartition)\n" + " :raises: KafkaException\n" + "\n" + }, + { "position", (PyCFunction)Consumer_position, + METH_VARARGS|METH_KEYWORDS, + ".. py:function:: position(partitions, [timeout=None])\n" + "\n" + " Retrieve current positions (offsets) for the list of partitions.\n" + "\n" + " :param list(TopicPartition) partitions: List of topic+partitions " + "to return current offsets for. The current offset is the offset of the " + "last consumed message + 1.\n" + " :returns: List of topic+partitions with offset and possibly error set.\n" + " :rtype: list(TopicPartition)\n" + " :raises: KafkaException\n" + "\n" + }, + { "close", (PyCFunction)Consumer_close, METH_NOARGS, + "\n" + " Close down and terminate the Kafka Consumer.\n" + "\n" + " Actions performed:\n" + "\n" + " - Stops consuming\n" + " - Commits offsets\n" + " - Leave consumer group\n" + "\n" + " .. note: Registered callbacks may be called from this method, " + "see :py:func::`poll()` for more info.\n" + "\n" + " :rtype: None\n" + "\n" + }, + { NULL } +}; + + +static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs); + +PyTypeObject ConsumerType = { + PyVarObject_HEAD_INIT(NULL, 0) + "confluent_kafka.Consumer", /*tp_name*/ + sizeof(Consumer), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)Consumer_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + "High-level Kafka Consumer\n" + "\n" + ".. py:function:: Consumer(**kwargs)\n" + "\n" + " Create new Consumer instance using provided configuration dict.\n" + "\n" + "\n", /*tp_doc*/ + (traverseproc)Consumer_traverse, /* tp_traverse */ + (inquiry)Consumer_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + Consumer_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + Consumer_new /* tp_new */ +}; + + + +static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *c_parts, + void *opaque) { + Consumer *self = opaque; + + PyEval_RestoreThread(self->thread_state); + + self->rebalance_assigned = 0; + + if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) || + (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) { + PyObject *parts; + PyObject *args, *result; + + /* Construct list of TopicPartition based on 'c_parts' */ + parts = c_parts_to_py(c_parts); + + args = Py_BuildValue("(OO)", self, parts); + + Py_DECREF(parts); + + if (!args) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, + "Unable to build callback args"); + self->thread_state = PyEval_SaveThread(); + return; + } + + result = PyObject_CallObject( + err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? + self->on_assign : self->on_revoke, args); + + Py_DECREF(args); + + if (result) + Py_DECREF(result); + else { + self->callback_crashed++; + rd_kafka_yield(rk); + } + } + + /* Fallback: librdkafka needs the rebalance_cb to call assign() + * to synchronize state, if the user did not do this from callback, + * or there was no callback, or the callback failed, then we perform + * that assign() call here instead. */ + if (!self->rebalance_assigned) { + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + rd_kafka_assign(rk, c_parts); + else + rd_kafka_assign(rk, NULL); + } + + self->thread_state = PyEval_SaveThread(); +} + + + +static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + Consumer *self; + char errstr[256]; + rd_kafka_conf_t *conf; + + self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0); + if (!self) + return NULL; + + if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self, + args, kwargs))) { + Py_DECREF(self); + return NULL; + } + + rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb); + + self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, + errstr, sizeof(errstr)); + if (!self->rk) { + cfl_PyErr_Format(rd_kafka_last_error(), + "Failed to create consumer: %s", errstr); + Py_DECREF(self); + return NULL; + } + + rd_kafka_poll_set_consumer(self->rk); + + return (PyObject *)self; +} + + diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..e06d20818 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..e15da190c --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,3 @@ +include README.md +include *.c *.h + diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..877356983 --- /dev/null +++ b/Makefile @@ -0,0 +1,14 @@ +all: + @echo "Targets:" + @echo " clean" + @echo " docs" + + +clean: + python setup.py clean + make -C docs clean + +.PHONY: docs + +docs: + $(MAKE) -C docs html diff --git a/Producer.c b/Producer.c new file mode 100644 index 000000000..f25c07d8e --- /dev/null +++ b/Producer.c @@ -0,0 +1,516 @@ +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "confluent_kafka.h" + + +/** + * @brief KNOWN ISSUES + * + * - Partitioners will cause a dead-lock with librdkafka, because: + * GIL + topic lock in topic_new is different lock order than + * topic lock in msg_partitioner + GIL. + * This needs to be sorted out in librdkafka, preferably making the + * partitioner run without any locks taken. + * Until this is fixed the partitioner is ignored and librdkafka's + * default will be used. + * + */ + + + +/**************************************************************************** + * + * + * Producer + * + * + * + * + ****************************************************************************/ + +/** + * Per-message state. + */ +struct Producer_msgstate { + Producer *self; + PyObject *dr_cb; + PyObject *partitioner_cb; +}; + + +/** + * Create a new per-message state. + * Returns NULL if neither dr_cb or partitioner_cb is set. + */ +static __inline struct Producer_msgstate * +Producer_msgstate_new (Producer *self, + PyObject *dr_cb, PyObject *partitioner_cb) { + struct Producer_msgstate *msgstate; + + if (!dr_cb && !partitioner_cb) + return NULL; + + msgstate = calloc(1, sizeof(*msgstate)); + msgstate->self = self; + + if (dr_cb) { + msgstate->dr_cb = dr_cb; + Py_INCREF(dr_cb); + } + if (partitioner_cb) { + msgstate->partitioner_cb = partitioner_cb; + Py_INCREF(partitioner_cb); + } + return msgstate; +} + +static __inline void +Producer_msgstate_destroy (struct Producer_msgstate *msgstate) { + if (msgstate->dr_cb) + Py_DECREF(msgstate->dr_cb); + if (msgstate->partitioner_cb) + Py_DECREF(msgstate->partitioner_cb); + free(msgstate); +} + + +static int Producer_clear (Producer *self) { + if (self->default_dr_cb) { + Py_DECREF(self->default_dr_cb); + self->default_dr_cb = NULL; + } + if (self->partitioner_cb) { + Py_DECREF(self->partitioner_cb); + self->partitioner_cb = NULL; + } + return 0; +} + +static void Producer_dealloc (Producer *self) { + PyObject_GC_UnTrack(self); + + Producer_clear(self); + + if (self->rk) + rd_kafka_destroy(self->rk); + + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static int Producer_traverse (Producer *self, + visitproc visit, void *arg) { + if (self->default_dr_cb) + Py_VISIT(self->default_dr_cb); + if (self->partitioner_cb) + Py_VISIT(self->partitioner_cb); + return 0; +} + + +static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm, + void *opaque) { + struct Producer_msgstate *msgstate = rkm->_private; + Producer *self = opaque; + PyObject *args; + PyObject *result; + PyObject *msgobj; + + if (!msgstate) + return; + + PyEval_RestoreThread(self->thread_state); + + if (!msgstate->dr_cb) { + /* No callback defined */ + goto done; + } + + msgobj = Message_new0(rkm); + + args = Py_BuildValue("(OO)", + Message_error((Message *)msgobj, NULL), + msgobj); + + Py_DECREF(msgobj); + + if (!args) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, + "Unable to build callback args"); + self->callback_crashed++; + goto done; + } + + result = PyObject_CallObject(msgstate->dr_cb, args); + Py_DECREF(args); + + if (result) + Py_DECREF(result); + else { + self->callback_crashed++; + rd_kafka_yield(rk); + } + + done: + Producer_msgstate_destroy(msgstate); + self->thread_state = PyEval_SaveThread(); +} + + +/** + * FIXME: The partitioner is currently broken due to threading/GIL issues. + */ +int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, + const void *keydata, + size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, void *msg_opaque) { + Producer *self = rkt_opaque; + struct Producer_msgstate *msgstate = msg_opaque; + PyGILState_STATE gstate; + PyObject *result; + PyObject *args; + int32_t r = RD_KAFKA_PARTITION_UA; + + if (!msgstate) { + /* Fall back on default C partitioner if neither a per-msg + * partitioner nor a default Python partitioner is available */ + return self->c_partitioner_cb(rkt, keydata, keylen, + partition_cnt, rkt_opaque, + msg_opaque); + } + + gstate = PyGILState_Ensure(); + + if (!msgstate->partitioner_cb) { + /* Fall back on default C partitioner if neither a per-msg + * partitioner nor a default Python partitioner is available */ + r = msgstate->self->c_partitioner_cb(rkt, keydata, keylen, + partition_cnt, rkt_opaque, + msg_opaque); + goto done; + } + + args = Py_BuildValue("(s#l)", + (const char *)keydata, (int)keylen, + (long)partition_cnt); + if (!args) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL, + "Unable to build callback args"); + printf("Failed to build args\n"); + goto done; + } + + + result = PyObject_CallObject(msgstate->partitioner_cb, args); + Py_DECREF(args); + + if (result) { + r = PyLong_AsLong(result); + if (PyErr_Occurred()) + printf("FIXME: partition_cb returned wrong type " + "(expected long), how to propagate?\n"); + Py_DECREF(result); + } else { + printf("FIXME: partitioner_cb crashed, how to propagate?\n"); + } + + done: + PyGILState_Release(gstate); + return r; +} + + + + + +static PyObject *Producer_produce (Producer *self, PyObject *args, + PyObject *kwargs) { + const char *topic, *value = NULL, *key = NULL; + int value_len = 0, key_len = 0; + int partition = RD_KAFKA_PARTITION_UA; + PyObject *dr_cb = NULL, *dr_cb2 = NULL, *partitioner_cb = NULL; + rd_kafka_topic_t *rkt; + struct Producer_msgstate *msgstate; + static char *kws[] = { "topic", + "value", + "key", + "partition", + "callback", + "delivery_callback", /* Alias */ + "partitioner", + NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, + "s|z#z#iOOO", kws, + &topic, &value, &value_len, + &key, &key_len, &partition, + &dr_cb, &dr_cb2, &partitioner_cb)) + return NULL; + + if (dr_cb2 && !dr_cb) /* Alias */ + dr_cb = dr_cb2; + + if (!(rkt = rd_kafka_topic_new(self->rk, topic, NULL))) { + cfl_PyErr_Format(rd_kafka_last_error(), + "Unable to create topic object: %s", + rd_kafka_err2str(rd_kafka_last_error())); + return NULL; + } + + if (!dr_cb) + dr_cb = self->default_dr_cb; + if (!partitioner_cb) + partitioner_cb = self->partitioner_cb; + + /* Create msgstate if necessary, may return NULL if no callbacks + * are wanted. */ + msgstate = Producer_msgstate_new(self, dr_cb, partitioner_cb); + + /* Produce message */ + if (rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, + (void *)value, value_len, + (void *)key, key_len, msgstate) == -1) { + rd_kafka_resp_err_t err = rd_kafka_last_error(); + + if (msgstate) + Producer_msgstate_destroy(msgstate); + + if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) + PyErr_Format(PyExc_BufferError, + "%s", rd_kafka_err2str(err)); + else + cfl_PyErr_Format(err, + "Unable to produce message: %s", + rd_kafka_err2str(err)); + + return NULL; + } + + rd_kafka_topic_destroy(rkt); + + Py_RETURN_NONE; +} + + +/** + * @brief Call rd_kafka_poll() and keep track of crashing callbacks. + * @returns -1 if callback crashed (or poll() failed), else the number + * of events served. + */ +static int Producer_poll0 (Producer *self, int tmout) { + int r; + + self->callback_crashed = 0; + self->thread_state = PyEval_SaveThread(); + + r = rd_kafka_poll(self->rk, tmout); + + PyEval_RestoreThread(self->thread_state); + self->thread_state = NULL; + + if (PyErr_CheckSignals() == -1) + return -1; + + if (self->callback_crashed) + return -1; + + return r; +} + + +static PyObject *Producer_poll (Producer *self, PyObject *args, + PyObject *kwargs) { + double tmout; + int r; + static char *kws[] = { "timeout", NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout)) + return NULL; + + r = Producer_poll0(self, (int)(tmout * 1000)); + if (r == -1) + return NULL; + + return PyLong_FromLong(r); +} + + +static PyObject *Producer_flush (Producer *self, PyObject *ignore) { + while (rd_kafka_outq_len(self->rk) > 0) { + if (Producer_poll0(self, 500) == -1) + return NULL; + } + Py_RETURN_NONE; +} + + +static PyMethodDef Producer_methods[] = { + { "produce", (PyCFunction)Producer_produce, + METH_VARARGS|METH_KEYWORDS, + ".. py:function:: produce(topic, [value], [key], [partition], [callback])\n" + "\n" + " Produce message to topic.\n" + " This is an asynchronous operation, an application may use the " + "``ondelivery`` argument to pass a function (or lambda) that " + "will be called from :py:func:`poll()` when the message has been " + "succesfully delivered or permanently fails delivery.\n" + "\n" + " :param str topic: Topic to produce message to\n" + " :param str value: Message payload\n" + " :param str key: Message key\n" + " :param int partition: Partition to produce to, elses uses the " + "configured partitioner.\n" + " :param func ondelivery(err,msg): Delivery report callback to call " + "(from :py:func:`poll()` or :py:func:`flush()`) on succesful or " + "failed delivery\n" + "\n" + " :rtype: None\n" + " :raises BufferError: if the internal producer message queue is " + "full (``queue.buffering.max.messages`` exceeded)\n" + " :raises KafkaException: for other errors, see exception code\n" + "\n" + }, + + { "poll", (PyCFunction)Producer_poll, METH_VARARGS|METH_KEYWORDS, + ".. py:function:: poll([timeout])\n" + "\n" + " Polls the producer for events and calls the corresponding " + "callbacks (if registered).\n" + "\n" + " Callbacks:\n" + "\n" + " - ``ondelivery`` callbacks from :py:func:`produce()`\n" + " - ...\n" + "\n" + " :param float timeout: Maximum time to block waiting for events.\n" + " :returns: Number of events processed (callbacks served)\n" + " :rtype: int\n" + "\n" + }, + + { "flush", (PyCFunction)Producer_flush, METH_NOARGS, + " Wait for all messages in the Producer queue to be delivered.\n" + " This is a convenience method that calls :py:func:`poll()` until " + ":py:func:`len()` is zero.\n" + "\n" + ".. note:: See :py:func:`poll()` for a description on what " + "callbacks may be triggered.\n" + "\n" + }, + { NULL } +}; + + +static Py_ssize_t Producer__len__ (Producer *self) { + return rd_kafka_outq_len(self->rk); +} + + +static PySequenceMethods Producer_seq_methods = { + (lenfunc)Producer__len__ /* sq_length */ +}; + + +static PyObject *Producer_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs); + +PyTypeObject ProducerType = { + PyVarObject_HEAD_INIT(NULL, 0) + "confluent_kafka.Producer", /*tp_name*/ + sizeof(Producer), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)Producer_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + &Producer_seq_methods, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + "Asynchronous Kafka Producer\n" + "\n" + ".. py:function:: Producer(**kwargs)\n" + "\n" + " Create new Producer instance using provided configuration dict.\n" + "\n" + "\n" + ".. py:function:: len()\n" + "\n" + " :returns: Number of messages and Kafka protocol requests waiting to be delivered to broker.\n" + " :rtype: int\n" + "\n", /*tp_doc*/ + (traverseproc)Producer_traverse, /* tp_traverse */ + (inquiry)Producer_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + Producer_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + Producer_new /* tp_new */ +}; + + + +static PyObject *Producer_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + Producer *self; + char errstr[256]; + rd_kafka_conf_t *conf; + + self = (Producer *)ProducerType.tp_alloc(&ProducerType, 0); + if (!self) + return NULL; + + if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self, + args, kwargs))) { + Py_DECREF(self); + return NULL; + } + + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + self->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, + errstr, sizeof(errstr)); + if (!self->rk) { + cfl_PyErr_Format(rd_kafka_last_error(), + "Failed to create producer: %s", errstr); + Py_DECREF(self); + return NULL; + } + + return (PyObject *)self; +} + + + diff --git a/README.md b/README.md new file mode 100644 index 000000000..eac61b88d --- /dev/null +++ b/README.md @@ -0,0 +1,59 @@ +Confluent's Apache Kafka client for Python +========================================== + + +Prerequisites +=============== + + librdkafka >=0.9.1 (or master>=2016-04-13) + py.test (pip install pytest) + + +Build +===== + +For Python 2: + + python setup.by build + + +For Python 3: + + python3 setup.by build + + +Install +======= +Preferably in a virtualenv: + + pip install . + + +Run unit-tests +============== + + py.test + + +Run integration tests +===================== +WARNING:: These tests require an active Kafka cluster and will make use of + a topic named 'test'. + + ./integration_test.py + + + +Generate documentation +====================== + + make docs + +or: + + python setup.by build_sphinx + + +Documentation will be generated in docs/_build/ + + diff --git a/confluent_kafka.c b/confluent_kafka.c new file mode 100644 index 000000000..deffd83f3 --- /dev/null +++ b/confluent_kafka.c @@ -0,0 +1,1283 @@ +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "confluent_kafka.h" + +#include + + +/** + * @brief KNOWN ISSUES + * + * - Partitioners will cause a dead-lock with librdkafka, because: + * GIL + topic lock in topic_new is different lock order than + * topic lock in msg_partitioner + GIL. + * This needs to be sorted out in librdkafka, preferably making the + * partitioner run without any locks taken. + * Until this is fixed the partitioner is ignored and librdkafka's + * default will be used. + * - TopicPartition.offset should probably be None for the INVALID offset + * rather than exposing the special value -1001. + * - KafkaError type .tp_doc allocation is lost on exit. + * + */ + + +PyObject *KafkaException; + + +/**************************************************************************** + * + * + * KafkaError + * + * + * FIXME: Pre-create simple instances for each error code, only instantiate + * a new object if a rich error string is provided. + * + ****************************************************************************/ +typedef struct { + PyObject_HEAD + rd_kafka_resp_err_t code; /* Error code */ + char *str; /* Human readable representation of error, if one + * was provided by librdkafka. + * Else falls back on err2str(). */ +} KafkaError; + + +static PyObject *KafkaError_code (KafkaError *self, PyObject *ignore) { + return PyLong_FromLong(self->code); +} + +static PyObject *KafkaError_str (KafkaError *self, PyObject *ignore) { + if (self->str) + return cfl_PyUnistr(_FromString(self->str)); + else + return cfl_PyUnistr(_FromString(rd_kafka_err2str(self->code))); +} + +static PyObject *KafkaError_name (KafkaError *self, PyObject *ignore) { + /* FIXME: Pre-create name objects */ + return cfl_PyUnistr(_FromString(rd_kafka_err2name(self->code))); +} + + +static PyMethodDef KafkaError_methods[] = { + { "code", (PyCFunction)KafkaError_code, METH_NOARGS, + " Returns the error/event code for comparison to" + "KafkaError..\n" + "\n" + " :returns: error/event code\n" + " :rtype: int\n" + "\n" + }, + { "str", (PyCFunction)KafkaError_str, METH_NOARGS, + " Returns the human-readable error/event string.\n" + "\n" + " :returns: error/event message string\n" + " :rtype: str\n" + "\n" + }, + { "name", (PyCFunction)KafkaError_name, METH_NOARGS, + " Returns the enum name for error/event.\n" + "\n" + " :returns: error/event enum name string\n" + " :rtype: str\n" + "\n" + }, + + { NULL } +}; + + +static void KafkaError_dealloc (KafkaError *self) { + if (self->str) + free(self->str); + Py_TYPE(self)->tp_free((PyObject *)self); +} + + +static PyObject *KafkaError_str0 (KafkaError *self) { + return cfl_PyUnistr(_FromFormat("KafkaError{code=%s,val=%d,str=\"%s\"}", + rd_kafka_err2name(self->code), + self->code, + self->str ? self->str : + rd_kafka_err2str(self->code))); +} + +static long KafkaError_hash (KafkaError *self) { + return self->code; +} + +static PyTypeObject KafkaErrorType; + +static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2, + int op) { + int code2; + int r; + PyObject *result; + + if (Py_TYPE(o2) == &KafkaErrorType) + code2 = ((KafkaError *)o2)->code; + else + code2 = PyLong_AsLong(o2); + + switch (op) + { + case Py_LT: + r = self->code < code2; + break; + case Py_LE: + r = self->code <= code2; + break; + case Py_EQ: + r = self->code == code2; + break; + case Py_NE: + r = self->code != code2; + break; + case Py_GT: + r = self->code > code2; + break; + case Py_GE: + r = self->code >= code2; + break; + default: + r = 0; + break; + } + + result = r ? Py_True : Py_False; + Py_INCREF(result); + return result; +} + + +static PyTypeObject KafkaErrorType = { + PyVarObject_HEAD_INIT(NULL, 0) + "confluent_kafka.KafkaError", /*tp_name*/ + sizeof(KafkaError), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)KafkaError_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)KafkaError_str0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + (hashfunc)KafkaError_hash, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /*tp_flags*/ + "Kafka error and event object\n" + "\n" + " The KafkaError class serves multiple purposes:\n" + "\n" + " - Propagation of errors\n" + " - Propagation of events\n" + " - Exceptions\n" + "\n" + " This class is not user-instantiable.\n" + "\n", /*tp_doc*/ + 0, /* tp_traverse */ + 0, /* tp_clear */ + (richcmpfunc)KafkaError_richcompare, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + KafkaError_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0 /* tp_alloc */ +}; + + +/** + * @brief Initialize a KafkaError object. + */ +static void KafkaError_init (KafkaError *self, + rd_kafka_resp_err_t code, const char *str) { + self->code = code; + if (str) + self->str = strdup(str); + else + self->str = NULL; +} + +/** + * @brief Internal factory to create KafkaError object. + */ +PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) { + + KafkaError *self; + va_list ap; + char buf[512]; + + self = (KafkaError *)KafkaErrorType. + tp_alloc(&KafkaErrorType, 0); + if (!self) + return NULL; + + if (fmt) { + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + } + + KafkaError_init(self, err, fmt ? buf : NULL); + + return (PyObject *)self; +} + +/** + * @brief Internal factory to create KafkaError object. + * @returns a new KafkaError object if \p err != 0, else a None object. + */ +static PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, + const char *str) { + if (!err) + Py_RETURN_NONE; + return KafkaError_new0(err, str); +} + + + + +/**************************************************************************** + * + * + * Message + * + * + * + * + ****************************************************************************/ + + +PyObject *Message_error (Message *self, PyObject *ignore) { + if (self->error) { + Py_INCREF(self->error); + return self->error; + } else + Py_RETURN_NONE; +} + +static PyObject *Message_value (Message *self, PyObject *ignore) { + if (self->value) { + Py_INCREF(self->value); + return self->value; + } else + Py_RETURN_NONE; +} + + +static PyObject *Message_key (Message *self, PyObject *ignore) { + if (self->key) { + Py_INCREF(self->key); + return self->key; + } else + Py_RETURN_NONE; +} + +static PyObject *Message_topic (Message *self, PyObject *ignore) { + if (self->topic) { + Py_INCREF(self->topic); + return self->topic; + } else + Py_RETURN_NONE; +} + +static PyObject *Message_partition (Message *self, PyObject *ignore) { + if (self->partition != RD_KAFKA_PARTITION_UA) + return PyLong_FromLong(self->partition); + else + Py_RETURN_NONE; +} + + +static PyObject *Message_offset (Message *self, PyObject *ignore) { + if (self->offset >= 0) + return PyLong_FromLong(self->offset); + else + Py_RETURN_NONE; +} + + +static PyMethodDef Message_methods[] = { + { "error", (PyCFunction)Message_error, METH_NOARGS, + " The message object is also used to propagate errors and events, " + "an application must check error() to determine if the Message " + "is a proper message (error() returns None) or an error or event " + "(error() returns a KafkaError object)\n" + "\n" + " :rtype: None or :py:class:`KafkaError`\n" + "\n" + }, + + { "value", (PyCFunction)Message_value, METH_NOARGS, + " :returns: message value (payload) or None if not available.\n" + " :rtype: str or None\n" + "\n" + }, + { "key", (PyCFunction)Message_key, METH_NOARGS, + " :returns: message key or None if not available.\n" + " :rtype: str or None\n" + "\n" + }, + { "topic", (PyCFunction)Message_topic, METH_NOARGS, + " :returns: topic name or None if not available.\n" + " :rtype: str or None\n" + "\n" + }, + { "partition", (PyCFunction)Message_partition, METH_NOARGS, + " :returns: partition number or None if not available.\n" + " :rtype: int or None\n" + "\n" + }, + { "offset", (PyCFunction)Message_offset, METH_NOARGS, + " :returns: message offset or None if not available.\n" + " :rtype: int or None\n" + "\n" + }, + { NULL } +}; + +static int Message_clear (Message *self) { + if (self->topic) { + Py_DECREF(self->topic); + self->topic = NULL; + } + if (self->value) { + Py_DECREF(self->value); + self->value = NULL; + } + if (self->key) { + Py_DECREF(self->key); + self->key = NULL; + } + if (self->error) { + Py_DECREF(self->error); + self->error = NULL; + } + return 0; +} + + +static void Message_dealloc (Message *self) { + Message_clear(self); + PyObject_GC_UnTrack(self); + Py_TYPE(self)->tp_free((PyObject *)self); +} + + +static int Message_traverse (Message *self, + visitproc visit, void *arg) { + if (self->topic) + Py_VISIT(self->topic); + if (self->value) + Py_VISIT(self->value); + if (self->key) + Py_VISIT(self->key); + if (self->error) + Py_VISIT(self->error); + return 0; +} + +static Py_ssize_t Message__len__ (Message *self) { + return self->value ? PyObject_Length(self->value) : 0; +} + +static PySequenceMethods Message_seq_methods = { + (lenfunc)Message__len__ /* sq_length */ +}; + +PyTypeObject MessageType = { + PyVarObject_HEAD_INIT(NULL, 0) + "confluent_kafka.Message", /*tp_name*/ + sizeof(Message), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)Message_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + &Message_seq_methods, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + "The Message object represents either a single consumed or " + "produced message, or an event (:py:func:`error()` is not None).\n" + "\n" + "An application must check with :py:func:`error()` to see if the " + "object is a proper message (error() returns None) or an " + "error/event.\n" + "\n" + "This class is not user-instantiable.\n" + "\n" + ".. py:function:: len()\n" + "\n" + " :returns: Message value (payload) size in bytes\n" + " :rtype: int\n" + "\n", /*tp_doc*/ + (traverseproc)Message_traverse, /* tp_traverse */ + (inquiry)Message_clear, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + Message_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0 /* tp_alloc */ +}; + +/** + * @brief Internal factory to create Message object from message_t + */ +PyObject *Message_new0 (const rd_kafka_message_t *rkm) { + Message *self; + + self = (Message *)MessageType.tp_alloc(&MessageType, 0); + if (!self) + return NULL; + + self->error = KafkaError_new_or_None(rkm->err, + rkm->err ? + rd_kafka_message_errstr(rkm) : + NULL); + + if (rkm->rkt) + self->topic = cfl_PyUnistr( + _FromString(rd_kafka_topic_name(rkm->rkt))); + if (rkm->payload) + self->value = cfl_PyUnistr(_FromStringAndSize(rkm->payload, + rkm->len)); + if (rkm->key) + self->key = cfl_PyUnistr( + _FromStringAndSize(rkm->key, rkm->key_len)); + + self->partition = rkm->partition; + self->offset = rkm->offset; + + return (PyObject *)self; +} + + + + +/**************************************************************************** + * + * + * TopicPartition + * + * + * + * + ****************************************************************************/ +typedef struct { + PyObject_HEAD + char *topic; + int partition; + int64_t offset; + PyObject *error; +} TopicPartition; + + +static int TopicPartition_clear (TopicPartition *self) { + if (self->topic) { + free(self->topic); + self->topic = NULL; + } + if (self->error) { + Py_DECREF(self->error); + self->error = NULL; + } + return 0; +} + +static void TopicPartition_dealloc (TopicPartition *self) { + PyObject_GC_UnTrack(self); + + TopicPartition_clear(self); + + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *TopicPartition_new0 (const char *topic, int partition, + long long offset, + rd_kafka_resp_err_t err); + + +static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args, + PyObject *kwargs) { + const char *topic; + int partition = RD_KAFKA_PARTITION_UA; + long long offset = RD_KAFKA_OFFSET_INVALID; + static char *kws[] = { "topic", + "partition", + "offset", + NULL }; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|iL", kws, + &topic, &partition, &offset)) + return NULL; + + return TopicPartition_new0(topic, partition, offset, 0); +} + + + +static int TopicPartition_traverse (TopicPartition *self, + visitproc visit, void *arg) { + if (self->error) + Py_VISIT(self->error); + return 0; +} + + +static PyMemberDef TopicPartition_members[] = { + { "topic", T_STRING, offsetof(TopicPartition, topic), READONLY, + "Topic name" }, + { "partition", T_INT, offsetof(TopicPartition, partition), 0, + "Partition number" }, + { "offset", T_LONGLONG, offsetof(TopicPartition, offset), 0, + "Offset" }, /* FIXME: Possibly use None for INVALID offset (-1001) */ + { "error", T_OBJECT, offsetof(TopicPartition, error), READONLY, + "Indicates an error (with :py:class:`KafkaError`) unless None." }, + { NULL } +}; + + +static PyObject *TopicPartition_str0 (TopicPartition *self) { + PyObject *errstr = self->error == Py_None ? NULL : + cfl_PyObject_Unistr(self->error); + PyObject *ret; + ret = cfl_PyUnistr( + _FromFormat("TopicPartition{topic=%s,partition=%"PRId32 + ",offset=%"PRId64",error=%s}", + self->topic, self->partition, + self->offset, + errstr ? cfl_PyUnistr_AsUTF8(errstr) : "None")); + if (errstr) + Py_DECREF(errstr); + return ret; +} + + +static PyTypeObject TopicPartitionType; + +static PyObject * +TopicPartition_richcompare (TopicPartition *self, PyObject *o2, + int op) { + TopicPartition *a = self, *b; + int tr, pr; + int r; + PyObject *result; + + if (Py_TYPE(o2) != Py_TYPE(self)) { + PyErr_SetNone(PyExc_NotImplementedError); + return NULL; + } + + b = (TopicPartition *)o2; + + tr = strcmp(a->topic, b->topic); + pr = a->partition - b->partition; + switch (op) + { + case Py_LT: + r = tr < 0 || (tr == 0 && pr < 0); + break; + case Py_LE: + r = tr < 0 || (tr == 0 && pr <= 0); + break; + case Py_EQ: + r = (tr == 0 && pr == 0); + break; + case Py_NE: + r = (tr != 0 || pr != 0); + break; + case Py_GT: + r = tr > 0 || (tr == 0 && pr > 0); + break; + case Py_GE: + r = tr > 0 || (tr == 0 && pr >= 0); + break; + default: + r = 0; + break; + } + + result = r ? Py_True : Py_False; + Py_INCREF(result); + return result; +} + + +static long TopicPartition_hash (TopicPartition *self) { + PyObject *topic = cfl_PyUnistr(_FromString(self->topic)); + long r = PyObject_Hash(topic) ^ self->partition; + Py_DECREF(topic); + return r; +} + + +static PyTypeObject TopicPartitionType = { + PyVarObject_HEAD_INIT(NULL, 0) + "confluent_kafka.TopicPartition", /*tp_name*/ + sizeof(TopicPartition), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)TopicPartition_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)TopicPartition_str0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + (hashfunc)TopicPartition_hash, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC, /*tp_flags*/ + "TopicPartition is a generic type to hold a single partition and " + "various information about it.\n" + "\n" + "It is typically used to provide a list of topics or partitions for " + "various operations, such as :py:func:`Consumer.assign()`.\n" + "\n" + ".. py:function:: TopicPartition(topic, [partition], [offset])\n" + "\n" + " Instantiate a TopicPartition object.\n" + "\n" + " :param string topic: Topic name\n" + " :param int partition: Partition id\n" + " :param int offset: Initial partition offset\n" + " :rtype: TopicPartition\n" + "\n" + "\n", /*tp_doc*/ + (traverseproc)TopicPartition_traverse, /* tp_traverse */ + (inquiry)TopicPartition_clear, /* tp_clear */ + (richcmpfunc)TopicPartition_richcompare, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + TopicPartition_members,/* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + TopicPartition_new /* tp_new */ +}; + +/** + * @brief Internal factory to create a TopicPartition object. + */ +static PyObject *TopicPartition_new0 (const char *topic, int partition, + long long offset, + rd_kafka_resp_err_t err) { + TopicPartition *self; + + self = (TopicPartition *)TopicPartitionType.tp_alloc( + &TopicPartitionType, 0); + if (!self) + return NULL; + + self->topic = strdup(topic); + self->partition = partition; + self->offset = offset; + self->error = KafkaError_new_or_None(err, NULL); + + return (PyObject *)self; +} + + + + +/** + * @brief Convert C rd_kafka_topic_partition_list_t to Python list(TopicPartition). + * + * @returns The new Python list object. + */ +PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { + PyObject *parts; + size_t i; + + parts = PyList_New(c_parts->cnt); + + for (i = 0 ; i < c_parts->cnt ; i++) { + const rd_kafka_topic_partition_t *rktpar = &c_parts->elems[i]; + PyList_SET_ITEM(parts, i, + TopicPartition_new0( + rktpar->topic, rktpar->partition, + rktpar->offset, rktpar->err)); + } + + return parts; + +} + +/** + * @brief Convert Python list(TopicPartition) to C rd_kafka_topic_partition_list_t. + * + * @returns The new C list on success or NULL on error. + */ +rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) { + rd_kafka_topic_partition_list_t *c_parts; + size_t i; + + if (!PyList_Check(plist)) { + PyErr_SetString(PyExc_TypeError, + "requires list of confluent_kafka.TopicPartition"); + return NULL; + } + + c_parts = rd_kafka_topic_partition_list_new(PyList_Size(plist)); + + for (i = 0 ; i < PyList_Size(plist) ; i++) { + TopicPartition *tp = (TopicPartition *) + PyList_GetItem(plist, i); + + if (PyObject_Type((PyObject *)tp) != + (PyObject *)&TopicPartitionType) { + PyErr_Format(PyExc_TypeError, + "expected %s", + TopicPartitionType.tp_name); + rd_kafka_topic_partition_list_destroy(c_parts); + return NULL; + } + + rd_kafka_topic_partition_list_add(c_parts, + tp->topic, + tp->partition)->offset = + tp->offset; + } + + return c_parts; +} + + + + +/**************************************************************************** + * + * + * Common helpers + * + * + * + * + ****************************************************************************/ + + +/** + * Populate topic conf from provided dict. + * + * Will raise an exception on error and return -1, or returns 0 on success. + */ +static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what, + PyObject *dict) { + Py_ssize_t pos = 0; + PyObject *ko, *vo; + + if (!PyDict_Check(dict)) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s: requires a dict", what); + return -1; + } + + while (PyDict_Next(dict, &pos, &ko, &vo)) { + PyObject *ks; + PyObject *vs; + const char *k; + const char *v; + char errstr[256]; + + if (!(ks = cfl_PyObject_Unistr(ko))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration property " + "value as type unicode string"); + return -1; + } + + if (!(vs = cfl_PyObject_Unistr(vo))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration property " + "value as type unicode string"); + Py_DECREF(ks); + return -1; + } + + k = cfl_PyUnistr_AsUTF8(ks); + v = cfl_PyUnistr_AsUTF8(vs); + + if (rd_kafka_topic_conf_set(tconf, k, v, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s: %s", what, errstr); + Py_DECREF(ks); + Py_DECREF(vs); + return -1; + } + + Py_DECREF(ks); + Py_DECREF(vs); + } + + return 0; +} + + + +/** + * @brief Set single special producer config value. + * + * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised). + */ +static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf, + rd_kafka_topic_conf_t *tconf, + const char *name, PyObject *valobj) { + PyObject *vs; + const char *val; + + if (!strcasecmp(name, "delivery_callback")) { + if (!PyCallable_Check(valobj)) { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s requires a callable " + "object", name); + return -1; + } + + self->default_dr_cb = valobj; + Py_INCREF(self->default_dr_cb); + + return 1; + + } else if (!strcasecmp(name, "partitioner") || + !strcasecmp(name, "partitioner_callback")) { + + if ((vs = cfl_PyObject_Unistr(valobj))) { + /* Use built-in C partitioners, + * based on their name. */ + val = cfl_PyUnistr_AsUTF8(vs); + + if (!strcmp(val, "random")) + rd_kafka_topic_conf_set_partitioner_cb( + tconf, rd_kafka_msg_partitioner_random); + else if (!strcmp(val, "consistent")) + rd_kafka_topic_conf_set_partitioner_cb( + tconf, rd_kafka_msg_partitioner_consistent); + else if (!strcmp(val, "consistent_random")) + rd_kafka_topic_conf_set_partitioner_cb( + tconf, rd_kafka_msg_partitioner_consistent_random); + else { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "unknown builtin partitioner: %s " + "(available: random, consistent, consistent_random)", + val); + Py_DECREF(vs); + return -1; + } + + Py_DECREF(vs); + + } else { + /* Custom partitioner (Python callback) */ + + if (!PyCallable_Check(valobj)) { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s requires a callable " + "object", name); + return -1; + } + + /* FIXME: Error out until GIL+rdkafka lock-ordering is fixed. */ + if (1) { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, + "custom partitioner support not yet implemented"); + return -1; + } + + if (self->partitioner_cb) + Py_DECREF(self->partitioner_cb); + + self->partitioner_cb = valobj; + Py_INCREF(self->partitioner_cb); + + /* Use trampoline to call Python code. */ + rd_kafka_topic_conf_set_partitioner_cb(tconf, + Producer_partitioner_cb); + } + + return 1; + } + + return 0; /* Not handled */ +} + + +/** + * Common config setup for Kafka client handles. + * + * Returns a conf object on success or NULL on failure in which case + * an exception has been raised. + */ +rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, + void *self0, + PyObject *args, + PyObject *kwargs) { + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *tconf; + Py_ssize_t pos = 0; + PyObject *ko, *vo; + int32_t (*partitioner_cb) (const rd_kafka_topic_t *, + const void *, size_t, int32_t, + void *, void *) = partitioner_cb; + + if (!kwargs) { + /* If no kwargs, fall back on single dict arg, if any. */ + if (!args || !PyTuple_Check(args) || PyTuple_Size(args) < 1 || + !PyDict_Check((kwargs = PyTuple_GetItem(args, 0)))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration dict"); + return NULL; + } + } + + conf = rd_kafka_conf_new(); + tconf = rd_kafka_topic_conf_new(); + + /* Convert kwargs dict to config key-value pairs. */ + while (PyDict_Next(kwargs, &pos, &ko, &vo)) { + PyObject *ks; + PyObject *vs = NULL; + const char *k; + const char *v; + char errstr[256]; + + if (!(ks = cfl_PyObject_Unistr(ko))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration property name " + "as type unicode string"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + return NULL; + } + + k = cfl_PyUnistr_AsUTF8(ks); + if (!strcmp(k, "default.topic.config")) { + if (populate_topic_conf(tconf, k, vo) == -1) { + Py_DECREF(ks); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + return NULL; + } + + Py_DECREF(ks); + continue; + } + + /* Special handling for certain config keys. */ + if (ktype == RD_KAFKA_PRODUCER) { + int r; + + r = producer_conf_set_special((Producer *)self0, + conf, tconf, k, vo); + if (r == -1) { + /* Error */ + Py_DECREF(ks); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + return NULL; + + } else if (r == 1) { + /* Handled */ + continue; + } + + /* FALLTHRU */ + } + + + /* + * Pass configuration property through to librdkafka. + */ + if (!(vs = cfl_PyObject_Unistr(vo))) { + PyErr_SetString(PyExc_TypeError, + "expected configuration property " + "value as type unicode string"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(ks); + return NULL; + } + v = cfl_PyUnistr_AsUTF8(vs); + + if (rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s", errstr); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(vs); + Py_DECREF(ks); + return NULL; + } + + Py_DECREF(vs); + Py_DECREF(ks); + } + + rd_kafka_topic_conf_set_opaque(tconf, self0); + rd_kafka_conf_set_default_topic_conf(conf, tconf); + + rd_kafka_conf_set_opaque(conf, self0); + + return conf; +} + + + + + +/**************************************************************************** + * + * + * Base + * + * + * + * + ****************************************************************************/ + + +static PyObject *libversion (PyObject *self, PyObject *args) { + return Py_BuildValue("si", + rd_kafka_version_str(), + rd_kafka_version()); +} + +static PyObject *version (PyObject *self, PyObject *args) { + return Py_BuildValue("si", "0.9.1", 0x00090100); +} + +static PyMethodDef confluent_kafka_methods[] = { + {"libversion", libversion, METH_NOARGS, + " Retrieve librdkafka version string and integer\n" + "\n" + " :returns: (version_string, version_int) tuple\n" + " :rtype: tuple(str,int)\n" + "\n" + }, + {"version", version, METH_NOARGS, + " Retrieve module version string and integer\n" + "\n" + " :returns: (version_string, version_int) tuple\n" + " :rtype: tuple(str,int)\n" + "\n" + }, + { NULL } +}; + + +/** + * @brief Add librdkafka error enums to KafkaError's type dict. + * @returns an updated doc string containing all error constants. + */ +static char *KafkaError_add_errs (PyObject *dict, const char *origdoc) { + const struct rd_kafka_err_desc *descs; + size_t cnt; + size_t i; + char *doc; + size_t dof = 0, dsize; + /* RST grid table column widths */ +#define _COL1_W 50 +#define _COL2_W 100 /* Must be larger than COL1 */ + char dash[_COL2_W], eq[_COL2_W]; + + rd_kafka_get_err_descs(&descs, &cnt); + + memset(dash, '-', sizeof(dash)); + memset(eq, '=', sizeof(eq)); + + /* Setup output doc buffer. */ + dof = strlen(origdoc); + dsize = dof + 500 + (cnt * 200); + doc = malloc(dsize); + memcpy(doc, origdoc, dof+1); + +#define _PRINT(...) do { \ + char tmpdoc[512]; \ + size_t _len; \ + _len = snprintf(tmpdoc, sizeof(tmpdoc), __VA_ARGS__); \ + if (_len > sizeof(tmpdoc)) _len = sizeof(tmpdoc)-1; \ + if (dof + _len >= dsize) { \ + dsize += 2; \ + doc = realloc(doc, dsize); \ + } \ + memcpy(doc+dof, tmpdoc, _len+1); \ + dof += _len; \ + } while (0) + + /* Error constant table header (RST grid table) */ + _PRINT("Error and event constants:\n\n" + "+-%.*s-+-%.*s-+\n" + "| %-*.*s | %-*.*s |\n" + "+=%.*s=+=%.*s=+\n", + _COL1_W, dash, _COL2_W, dash, + _COL1_W, _COL1_W, "Constant", _COL2_W, _COL2_W, "Description", + _COL1_W, eq, _COL2_W, eq); + + for (i = 0 ; i < cnt ; i++) { + PyObject *code; + + if (!descs[i].desc) + continue; + + code = PyLong_FromLong(descs[i].code); + + PyDict_SetItemString(dict, descs[i].name, code); + + Py_DECREF(code); + + _PRINT("| %-*.*s | %-*.*s |\n" + "+-%.*s-+-%.*s-+\n", + _COL1_W, _COL1_W, descs[i].name, + _COL2_W, _COL2_W, descs[i].desc, + _COL1_W, dash, _COL2_W, dash); + } + + _PRINT("\n"); + + return doc; // FIXME: leak +} + + +#ifdef PY3 +static struct PyModuleDef confluent_kafka_moduledef = { + PyModuleDef_HEAD_INIT, + "confluent_kafka", /* m_name */ + "Confluent's Apache Kafka Python client", /* m_doc */ + -1, /* m_size */ + confluent_kafka_methods, /* m_methods */ +}; +#endif + + +static PyObject *_init_confluent_kafka (void) { + PyObject *m; + + if (PyType_Ready(&KafkaErrorType) < 0) + return NULL; + if (PyType_Ready(&MessageType) < 0) + return NULL; + if (PyType_Ready(&TopicPartitionType) < 0) + return NULL; + if (PyType_Ready(&ProducerType) < 0) + return NULL; + if (PyType_Ready(&ConsumerType) < 0) + return NULL; + +#ifdef PY3 + m = PyModule_Create(&confluent_kafka_moduledef); +#else + m = Py_InitModule3("confluent_kafka", confluent_kafka_methods, + "Confluent's Apache Kafka Python client"); +#endif + if (!m) + return NULL; + + Py_INCREF(&KafkaErrorType); + KafkaErrorType.tp_doc = + KafkaError_add_errs(KafkaErrorType.tp_dict, + KafkaErrorType.tp_doc); + PyModule_AddObject(m, "KafkaError", (PyObject *)&KafkaErrorType); + + Py_INCREF(&MessageType); + PyModule_AddObject(m, "Message", (PyObject *)&MessageType); + + Py_INCREF(&TopicPartitionType); + PyModule_AddObject(m, "TopicPartition", + (PyObject *)&TopicPartitionType); + + Py_INCREF(&ProducerType); + PyModule_AddObject(m, "Producer", (PyObject *)&ProducerType); + + Py_INCREF(&ConsumerType); + PyModule_AddObject(m, "Consumer", (PyObject *)&ConsumerType); + + KafkaException = PyErr_NewExceptionWithDoc( + "confluent_kafka.KafkaException", + "Kafka exception that wraps the :py:class:`KafkaError` " + "class.\n" + "\n" + "Use ``exception.args[0]`` to extract the " + ":py:class:`KafkaError` object\n" + "\n", + NULL, NULL); + Py_INCREF(KafkaException); + PyModule_AddObject(m, "KafkaException", KafkaException); + + return m; +} + + +#ifdef PY3 +PyMODINIT_FUNC PyInit_confluent_kafka (void) { + return _init_confluent_kafka(); +} +#else +PyMODINIT_FUNC initconfluent_kafka (void) { + _init_confluent_kafka(); +} +#endif diff --git a/confluent_kafka.h b/confluent_kafka.h new file mode 100644 index 000000000..07799e762 --- /dev/null +++ b/confluent_kafka.h @@ -0,0 +1,205 @@ +/** + * Copyright 2016 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +#if PY_MAJOR_VERSION >= 3 +#define PY3 +#include +#endif + + + +/**************************************************************************** + * + * + * Python 2 & 3 portability + * + * Binary data (we call it cfl_PyBin): + * Python 2: string + * Python 3: bytes + * + * Unicode Strings (we call it cfl_PyUnistr): + * Python 2: unicode + * Python 3: strings + * + ****************************************************************************/ + +#ifdef PY3 +/** + * @brief Binary type, use as cfl_PyBin(_X(A,B)) where _X() is the type-less + * suffix of a PyBinary/Str_X() function +*/ +#define cfl_PyBin(X) PyBinary ## X + +/** + * @brief Unicode type, same usage as PyBin() + */ +#define cfl_PyUnistr(X) PyUnicode ## X + +/** + * @returns Unicode Python object as char * in UTF-8 encoding + */ +#define cfl_PyUnistr_AsUTF8(X) PyUnicode_AsUTF8(X) + +/** + * @returns Unicode Python string object + */ +#define cfl_PyObject_Unistr(X) PyObject_Str(X) +#else + +/* See comments above */ +#define cfl_PyBin(X) PyString ## X +#define cfl_PyUnistr(X) PyUnicode ## X +#define cfl_PyUnistr_AsUTF8(X) PyBytes_AsString(PyUnicode_AsUTF8String(X)) +#define cfl_PyObject_Unistr(X) PyObject_Unicode(X) +#endif + + +/**************************************************************************** + * + * + * KafkaError + * + * + * + * + ****************************************************************************/ +extern PyObject *KafkaException; + +PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...); + + +/** + * @brief Raise an exception using KafkaError. + * \p err and and \p ... (string representation of error) is set on the returned + * KafkaError object. + */ +#define cfl_PyErr_Format(err,...) do { \ + PyObject *_eo = KafkaError_new0(err, __VA_ARGS__); \ + PyErr_SetObject(KafkaException, _eo); \ + } while (0) + +/**************************************************************************** + * + * + * Common + * + * + * + * + ****************************************************************************/ +rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, + void *self0, + PyObject *args, + PyObject *kwargs); +PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts); +rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist); + + +/**************************************************************************** + * + * + * Message + * + * + * + * + ****************************************************************************/ + +/** + * @brief confluent_kafka.Message object + */ +typedef struct { + PyObject_HEAD + PyObject *topic; + PyObject *value; + PyObject *key; + PyObject *error; + int32_t partition; + int64_t offset; +} Message; + +extern PyTypeObject MessageType; + +PyObject *Message_new0 (const rd_kafka_message_t *rkm); +PyObject *Message_error (Message *self, PyObject *ignore); + + +/**************************************************************************** + * + * + * Producer + * + * + * + * + ****************************************************************************/ + +/** + * @brief confluent_kafka.Producer object + */ +typedef struct { + PyObject_HEAD + rd_kafka_t *rk; + PyObject *default_dr_cb; + PyObject *partitioner_cb; /**< Registered Python partitioner */ + int32_t (*c_partitioner_cb) ( + const rd_kafka_topic_t *, + const void *, size_t, int32_t, + void *, void *); /**< Fallback C partitioner*/ + int callback_crashed; + PyThreadState *thread_state; +} Producer; + + +extern PyTypeObject ProducerType; + +int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt, + const void *keydata, + size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, void *msg_opaque); + + +/**************************************************************************** + * + * + * Consumer + * + * + * + * + ****************************************************************************/ + +/** + * @brief confluent_kafka.Consumer object + */ +typedef struct { + PyObject_HEAD + rd_kafka_t *rk; + int rebalance_assigned; /* Rebalance: Callback performed assign() call.*/ + PyObject *on_assign; /* Rebalance: on_assign callback */ + PyObject *on_revoke; /* Rebalance: on_revoke callback */ + int callback_crashed; + PyThreadState *thread_state; +} Consumer; + +extern PyTypeObject ConsumerType ; + diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 000000000..fa385fb6f --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make ' where is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR)/* + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/confluent-kafka.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/confluent-kafka.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/confluent-kafka" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/confluent-kafka" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/docs/conf.py b/docs/conf.py new file mode 100644 index 000000000..3d78ca9df --- /dev/null +++ b/docs/conf.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# +# confluent-kafka documentation build configuration file, created by +# sphinx-quickstart on Mon Feb 1 15:12:01 2016. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os +from glob import glob + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +sys.path[:0] = [os.path.abspath(x) for x in glob('../build/lib.*')] + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.coverage', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'confluent-kafka' +copyright = u'2016, Confluent Inc.' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +version = '0.9.1' +# The full version, including alpha/beta/rc tags. +release = '0.9.1' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +#keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# " v documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +#html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'confluent-kafkadoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { +# The paper size ('letterpaper' or 'a4paper'). +#'papersize': 'letterpaper', + +# The font size ('10pt', '11pt' or '12pt'). +#'pointsize': '10pt', + +# Additional stuff for the LaTeX preamble. +#'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'confluent-kafka.tex', u'confluent-kafka Documentation', + u'Magnus Edenhill', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'confluent-kafka', u'confluent-kafka Documentation', + [u'Magnus Edenhill'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'confluent-kafka', u'confluent-kafka Documentation', + u'Magnus Edenhill', 'confluent-kafka', 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False diff --git a/docs/index.rst b/docs/index.rst new file mode 100644 index 000000000..a9c0b681d --- /dev/null +++ b/docs/index.rst @@ -0,0 +1,44 @@ +Welcome to Confluent's Apache Kafka Python client documentation +=============================================================== + +Indices and tables +================== + +* :ref:`genindex` + +:mod:`confluent_kafka` --- Confluent's Apache Kafka Python client +***************************************************************** + +.. automodule:: confluent_kafka + :synopsis: Confluent's Apache Kafka Python client. + :members: + + +Configuration +============= +Configuration of producer and consumer instances is performed by +providing a dict of configuration properties to the instance constructor, e.g.:: + + conf = {'bootstrap.servers': 'mybroker.com', + 'group.id': 'mygroup', 'session.timeout.ms': 6000, + 'default.topic.config': {'auto.offset.reset': 'smallest'}} + consumer = confluent_kafka.Consumer(**conf) + +The supported configuration values are dictated by the underlying +librdkafka C library. For the full range of configuration properties +please consult librdkafka's documentation: +https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + +The Python bindings also provide some additional configuration properties: + +* ``default.topic.config``: value is a dict of topic-level configuration + properties that are applied to all used topics for the instance. + +* ``delivery_callback`` (**Producer**): value is a Python function reference + that is called once for each produced message to indicate the final + delivery result (success or failure). + This property may also be set per-message by passing ``callback=somefunc`` + to the confluent_kafka.Producer.produce() function. + + + diff --git a/docs/make-doc.bat b/docs/make-doc.bat new file mode 100644 index 000000000..882eb84a6 --- /dev/null +++ b/docs/make-doc.bat @@ -0,0 +1,242 @@ +@ECHO OFF + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set BUILDDIR=_build +set ALLSPHINXOPTS=-d %BUILDDIR%/doctrees %SPHINXOPTS% . +set I18NSPHINXOPTS=%SPHINXOPTS% . +if NOT "%PAPER%" == "" ( + set ALLSPHINXOPTS=-D latex_paper_size=%PAPER% %ALLSPHINXOPTS% + set I18NSPHINXOPTS=-D latex_paper_size=%PAPER% %I18NSPHINXOPTS% +) + +if "%1" == "" goto help + +if "%1" == "help" ( + :help + echo.Please use `make ^` where ^ is one of + echo. html to make standalone HTML files + echo. dirhtml to make HTML files named index.html in directories + echo. singlehtml to make a single large HTML file + echo. pickle to make pickle files + echo. json to make JSON files + echo. htmlhelp to make HTML files and a HTML help project + echo. qthelp to make HTML files and a qthelp project + echo. devhelp to make HTML files and a Devhelp project + echo. epub to make an epub + echo. latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter + echo. text to make text files + echo. man to make manual pages + echo. texinfo to make Texinfo files + echo. gettext to make PO message catalogs + echo. changes to make an overview over all changed/added/deprecated items + echo. xml to make Docutils-native XML files + echo. pseudoxml to make pseudoxml-XML files for display purposes + echo. linkcheck to check all external links for integrity + echo. doctest to run all doctests embedded in the documentation if enabled + goto end +) + +if "%1" == "clean" ( + for /d %%i in (%BUILDDIR%\*) do rmdir /q /s %%i + del /q /s %BUILDDIR%\* + goto end +) + + +%SPHINXBUILD% 2> nul +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "html" ( + %SPHINXBUILD% -b html %ALLSPHINXOPTS% %BUILDDIR%/html + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/html. + goto end +) + +if "%1" == "dirhtml" ( + %SPHINXBUILD% -b dirhtml %ALLSPHINXOPTS% %BUILDDIR%/dirhtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/dirhtml. + goto end +) + +if "%1" == "singlehtml" ( + %SPHINXBUILD% -b singlehtml %ALLSPHINXOPTS% %BUILDDIR%/singlehtml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The HTML pages are in %BUILDDIR%/singlehtml. + goto end +) + +if "%1" == "pickle" ( + %SPHINXBUILD% -b pickle %ALLSPHINXOPTS% %BUILDDIR%/pickle + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the pickle files. + goto end +) + +if "%1" == "json" ( + %SPHINXBUILD% -b json %ALLSPHINXOPTS% %BUILDDIR%/json + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can process the JSON files. + goto end +) + +if "%1" == "htmlhelp" ( + %SPHINXBUILD% -b htmlhelp %ALLSPHINXOPTS% %BUILDDIR%/htmlhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run HTML Help Workshop with the ^ +.hhp project file in %BUILDDIR%/htmlhelp. + goto end +) + +if "%1" == "qthelp" ( + %SPHINXBUILD% -b qthelp %ALLSPHINXOPTS% %BUILDDIR%/qthelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; now you can run "qcollectiongenerator" with the ^ +.qhcp project file in %BUILDDIR%/qthelp, like this: + echo.^> qcollectiongenerator %BUILDDIR%\qthelp\confluent-kafka.qhcp + echo.To view the help file: + echo.^> assistant -collectionFile %BUILDDIR%\qthelp\confluent-kafka.ghc + goto end +) + +if "%1" == "devhelp" ( + %SPHINXBUILD% -b devhelp %ALLSPHINXOPTS% %BUILDDIR%/devhelp + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. + goto end +) + +if "%1" == "epub" ( + %SPHINXBUILD% -b epub %ALLSPHINXOPTS% %BUILDDIR%/epub + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The epub file is in %BUILDDIR%/epub. + goto end +) + +if "%1" == "latex" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + if errorlevel 1 exit /b 1 + echo. + echo.Build finished; the LaTeX files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdf" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf + cd %BUILDDIR%/.. + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "latexpdfja" ( + %SPHINXBUILD% -b latex %ALLSPHINXOPTS% %BUILDDIR%/latex + cd %BUILDDIR%/latex + make all-pdf-ja + cd %BUILDDIR%/.. + echo. + echo.Build finished; the PDF files are in %BUILDDIR%/latex. + goto end +) + +if "%1" == "text" ( + %SPHINXBUILD% -b text %ALLSPHINXOPTS% %BUILDDIR%/text + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The text files are in %BUILDDIR%/text. + goto end +) + +if "%1" == "man" ( + %SPHINXBUILD% -b man %ALLSPHINXOPTS% %BUILDDIR%/man + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The manual pages are in %BUILDDIR%/man. + goto end +) + +if "%1" == "texinfo" ( + %SPHINXBUILD% -b texinfo %ALLSPHINXOPTS% %BUILDDIR%/texinfo + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The Texinfo files are in %BUILDDIR%/texinfo. + goto end +) + +if "%1" == "gettext" ( + %SPHINXBUILD% -b gettext %I18NSPHINXOPTS% %BUILDDIR%/locale + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The message catalogs are in %BUILDDIR%/locale. + goto end +) + +if "%1" == "changes" ( + %SPHINXBUILD% -b changes %ALLSPHINXOPTS% %BUILDDIR%/changes + if errorlevel 1 exit /b 1 + echo. + echo.The overview file is in %BUILDDIR%/changes. + goto end +) + +if "%1" == "linkcheck" ( + %SPHINXBUILD% -b linkcheck %ALLSPHINXOPTS% %BUILDDIR%/linkcheck + if errorlevel 1 exit /b 1 + echo. + echo.Link check complete; look for any errors in the above output ^ +or in %BUILDDIR%/linkcheck/output.txt. + goto end +) + +if "%1" == "doctest" ( + %SPHINXBUILD% -b doctest %ALLSPHINXOPTS% %BUILDDIR%/doctest + if errorlevel 1 exit /b 1 + echo. + echo.Testing of doctests in the sources finished, look at the ^ +results in %BUILDDIR%/doctest/output.txt. + goto end +) + +if "%1" == "xml" ( + %SPHINXBUILD% -b xml %ALLSPHINXOPTS% %BUILDDIR%/xml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The XML files are in %BUILDDIR%/xml. + goto end +) + +if "%1" == "pseudoxml" ( + %SPHINXBUILD% -b pseudoxml %ALLSPHINXOPTS% %BUILDDIR%/pseudoxml + if errorlevel 1 exit /b 1 + echo. + echo.Build finished. The pseudo-XML files are in %BUILDDIR%/pseudoxml. + goto end +) + +:end diff --git a/examples/consumer.py b/examples/consumer.py new file mode 100755 index 000000000..d9bda0b4e --- /dev/null +++ b/examples/consumer.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Example high-level Kafka 0.9 balanced Consumer +# + +from confluent_kafka import Consumer, KafkaException, KafkaError +import sys + +if __name__ == '__main__': + if len(sys.argv) < 4: + sys.stderr.write('Usage: %s ..\n' % sys.argv[0]) + sys.exit(1) + + broker = sys.argv[1] + group = sys.argv[2] + topics = sys.argv[3:] + + # Consumer configuration + # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, + 'default.topic.config': {'auto.offset.reset': 'smallest'}} + + + # Create Consumer instance + c = Consumer(**conf) + + def print_assignment (consumer, partitions): + print('Assignment:', partitions) + + # Subscribe to topics + c.subscribe(topics, on_assign=print_assignment) + + # Read messages from Kafka, print to stdout + try: + while True: + msg = c.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + # Error or event + if msg.error().code() == KafkaError._PARTITION_EOF: + # End of partition event + sys.stderr.write('%% %s [%d] reached end at offset %d\n' % + (msg.topic(), msg.partition(), msg.offset())) + elif msg.error(): + # Error + raise KafkaException(msg.error()) + else: + # Proper message + sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % + (msg.topic(), msg.partition(), msg.offset(), + str(msg.key()))) + print(msg.value()) + + except KeyboardInterrupt: + sys.stderr.write('%% Aborted by user\n') + + # Close down consumer to commit final offsets. + c.close() diff --git a/examples/producer.py b/examples/producer.py new file mode 100755 index 000000000..903ecb4af --- /dev/null +++ b/examples/producer.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Example Kafka Producer. +# Reads lines from stdin and sends to Kafka. +# + +from confluent_kafka import Producer +import sys + +if __name__ == '__main__': + if len(sys.argv) != 3: + sys.stderr.write('Usage: %s \n' % sys.argv[0]) + sys.exit(1) + + broker = sys.argv[1] + topic = sys.argv[2] + + # Producer configuration + # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + conf = {'bootstrap.servers': broker} + + # Create Producer instance + p = Producer(**conf) + + # Optional per-message delivery callback (triggered by poll() or flush()) + # when a message has been successfully delivered or permanently + # failed delivery (after retries). + def delivery_callback (err, msg): + if err: + sys.stderr.write('%% Message failed delivery: %s\n' % err) + else: + sys.stderr.write('%% Message delivered to %s [%d]\n' % \ + (msg.topic(), msg.partition())) + + # Read lines from stdin, produce each line to Kafka + for line in sys.stdin: + try: + # Produce line (without newline) + p.produce(topic, line.rstrip(), callback=delivery_callback) + + except BufferError as e: + sys.stderr.write('%% Local producer queue is full ' \ + '(%d messages awaiting delivery): try again\n' % + len(p)) + + # Serve delivery callback queue. + # NOTE: Since produce() is an asynchronous API this poll() call + # will most likely not serve the delivery callback for the + # last produce()d message. + p.poll(0) + + # Wait until all messages have been delivered + sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) + p.flush() diff --git a/integration_test.py b/integration_test.py new file mode 100755 index 000000000..357e2c679 --- /dev/null +++ b/integration_test.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python +# +# +# Copyright 2016 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +""" Test script for confluent_kafka module """ + +import confluent_kafka +import re +import time +import uuid +import sys + +try: + from progress.bar import Bar + with_progress = True +except ImportError as e: + with_progress = False + +# Kafka bootstrap server(s) +bootstrap_servers = 'localhost' + + + + + +class MyTestDr(object): + """ Producer: Delivery report callback """ + + def __init__(self, silent=False): + super(MyTestDr, self).__init__() + self.msgs_delivered = 0 + self.bytes_delivered = 0 + self.silent = silent + + @staticmethod + def _delivery(err, msg, silent=False): + if err: + print('Message delivery failed (%s [%s]): %s' % \ + (msg.topic(), str(msg.partition()), err)) + return 0 + else: + if not silent: + print('Message delivered to %s [%s] at offset [%s]: %s' % \ + (msg.topic(), msg.partition(), msg.offset(), msg.value())) + return 1 + + def delivery(self, err, msg): + if err: + print('Message delivery failed (%s [%s]): %s' % \ + (msg.topic(), str(msg.partition()), err)) + return + elif not self.silent: + print('Message delivered to %s [%s] at offset [%s]: %s' % \ + (msg.topic(), msg.partition(), msg.offset(), msg.value())) + self.msgs_delivered += 1 + self.bytes_delivered += len(msg) + + + +def verify_producer(): + """ Verify basic Producer functionality """ + + # Producer config + conf = {'bootstrap.servers': bootstrap_servers, + 'default.topic.config':{'produce.offset.report': True}} + + # Create producer + p = confluent_kafka.Producer(**conf) + print('producer at %s' % p) + + # Produce some messages + p.produce('test', 'Hello Python!') + p.produce('test', key='Just a key') + p.produce('test', partition=1, value='Strictly for partition 1', + key='mykey') + + # Produce more messages, now with delivery report callbacks in various forms. + mydr = MyTestDr() + p.produce('test', value='This one has a dr callback', + callback=mydr.delivery) + p.produce('test', value='This one has a lambda', + callback=lambda err, msg: MyTestDr._delivery(err, msg)) + p.produce('test', value='This one has neither') + + # Produce even more messages + for i in range(0, 10): + p.produce('test', value='Message #%d' % i, key=str(i), + callback=mydr.delivery) + p.poll(0) + + print('Waiting for %d messages to be delivered' % len(p)) + + # Block until all messages are delivered/failed + p.flush() + + + +def verify_producer_performance(with_dr_cb=True): + """ Time how long it takes to produce and delivery X messages """ + conf = {'bootstrap.servers': bootstrap_servers} + + p = confluent_kafka.Producer(**conf) + + topic = 'test' + msgcnt = 1000000 + msgsize = 100 + msg_pattern = 'test.py performance' + msg_payload = (msg_pattern * int(msgsize / len(msg_pattern)))[0:msgsize] + + dr = MyTestDr(silent=True) + + t_produce_start = time.time() + msgs_produced = 0 + msgs_backpressure = 0 + print('# producing %d messages to topic %s' % (msgcnt, topic)) + + if with_progress: + bar = Bar('Producing', max=msgcnt) + else: + bar = None + + for i in range(0, msgcnt): + try: + if with_dr_cb: + p.produce('test', value=msg_payload, callback=dr.delivery) + else: + p.produce('test', value=msg_payload) + except BufferError as e: + # Local queue is full (slow broker connection?) + msgs_backpressure += 1 + if bar is not None and (msgs_backpressure % 1000) == 0: + bar.next(n=0) + p.poll(0) + continue + + if bar is not None and (msgs_produced % 5000) == 0: + bar.next(n=5000) + msgs_produced += 1 + p.poll(0) + + t_produce_spent = time.time() - t_produce_start + + bytecnt = msgs_produced * msgsize + + if bar is not None: + bar.finish() + + print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' % \ + (msgs_produced, bytecnt / (1024*1024), t_produce_spent, + msgs_produced / t_produce_spent, + (bytecnt/t_produce_spent) / (1024*1024))) + print('# %d messages not produce()d due to backpressure (local queue full)' % msgs_backpressure) + + print('waiting for %d/%d deliveries' % (len(p), msgs_produced)) + # Wait for deliveries + p.flush() + t_delivery_spent = time.time() - t_produce_start + + + print('# producing %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' % \ + (msgs_produced, bytecnt / (1024*1024), t_produce_spent, + msgs_produced / t_produce_spent, + (bytecnt/t_produce_spent) / (1024*1024))) + + # Fake numbers if not using a dr_cb + if not with_dr_cb: + print('# not using dr_cb') + dr.msgs_delivered = msgs_produced + dr.bytes_delivered = bytecnt + + print('# delivering %d messages (%.2fMb) took %.3fs: %d msgs/s, %.2f Mb/s' % \ + (dr.msgs_delivered, dr.bytes_delivered / (1024*1024), t_delivery_spent, + dr.msgs_delivered / t_delivery_spent, + (dr.bytes_delivered/t_delivery_spent) / (1024*1024))) + print('# post-produce delivery wait took %.3fs' % \ + (t_delivery_spent - t_produce_spent)) + + +def verify_consumer(): + """ Verify basic Consumer functionality """ + + # Consumer config + conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': 'test.py', + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + # Create consumer + c = confluent_kafka.Consumer(**conf) + + # Subscribe to a list of topics + c.subscribe(["test"]) + + max_msgcnt = 100 + msgcnt = 0 + + while True: + # Consume until EOF or error + + # Consume message (error()==0) or event (error()!=0) + msg = c.poll() + if msg is None: + raise Exception('Got timeout from poll() without a timeout set: %s' % msg) + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + print('Reached end of %s [%d] at offset %d' % \ + (msg.topic(), msg.partition(), msg.offset())) + break + else: + print('Consumer error: %s: ignoring' % msg.error()) + break + + if False: + print('%s[%d]@%d: key=%s, value=%s' % \ + (msg.topic(), msg.partition(), msg.offset(), + msg.key(), msg.value())) + + if (msg.offset() % 5) == 0: + # Async commit + c.commit(msg, async=True) + elif (msg.offset() % 4) == 0: + c.commit(msg, async=False) + + msgcnt += 1 + if msgcnt >= max_msgcnt: + print('max_msgcnt %d reached' % msgcnt) + break + + + # Close consumer + c.close() + + + # Start a new client and get the committed offsets + c = confluent_kafka.Consumer(**conf) + offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition("test", p), range(0,3)))) + for tp in offsets: + print(tp) + + c.close() + + + + +def verify_consumer_performance(): + """ Verify Consumer performance """ + + conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': uuid.uuid1(), + 'session.timeout.ms': 6000, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + c = confluent_kafka.Consumer(**conf) + + def my_on_assign (consumer, partitions): + print('on_assign:', len(partitions), 'partitions:') + for p in partitions: + print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) + consumer.assign(partitions) + + def my_on_revoke (consumer, partitions): + print('on_revoke:', len(partitions), 'partitions:') + for p in partitions: + print(' %s [%d] @ %d' % (p.topic, p.partition, p.offset)) + consumer.unassign() + + c.subscribe(["test"], on_assign=my_on_assign, on_revoke=my_on_revoke) + + max_msgcnt = 1000000 + bytecnt = 0 + msgcnt = 0 + + print('Will now consume %d messages' % max_msgcnt) + + if with_progress: + bar = Bar('Consuming', max=max_msgcnt, + suffix='%(index)d/%(max)d [%(eta_td)s]') + else: + bar = None + + while True: + # Consume until EOF or error + + msg = c.poll(timeout=20.0) + if msg is None: + raise Exception('Stalled at %d/%d message, no new messages for 20s' % + (msgcnt, max_msgcnt)) + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + # Reached EOF for a partition, ignore. + continue + else: + raise confluent_kafka.KafkaException(msg.error()) + + + bytecnt += len(msg) + msgcnt += 1 + + if bar is not None and (msgcnt % 10000) == 0: + bar.next(n=10000) + + if msgcnt == 1: + t_first_msg = time.time() + if msgcnt >= max_msgcnt: + break + + if bar is not None: + bar.finish() + + if msgcnt > 0: + t_spent = time.time() - t_first_msg + print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \ + (msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent, + (bytecnt / t_spent) / (1024*1024))) + + print('closing consumer') + c.close() + + + +if __name__ == '__main__': + + if len(sys.argv) > 1: + bootstrap_servers = sys.argv[1] + + print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) + print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion()) + + print('=' * 30, 'Verifying Producer', '=' * 30) + verify_producer() + + print('=' * 30, 'Verifying Consumer', '=' * 30) + verify_consumer() + + print('=' * 30, 'Verifying Producer performance (with dr_cb)', '=' * 30) + verify_producer_performance(with_dr_cb=True) + + print('=' * 30, 'Verifying Producer performance (without dr_cb)', '=' * 30) + verify_producer_performance(with_dr_cb=False) + + print('=' * 30, 'Verifying Consumer performance', '=' * 30) + verify_consumer_performance() + + print('=' * 30, 'Done', '=' * 30) + + diff --git a/setup.py b/setup.py new file mode 100644 index 000000000..40dc69267 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +from setuptools import setup +from distutils.core import Extension + + +module = Extension('confluent_kafka', + include_dirs = ['/usr/local/include'], + libraries= ['rdkafka'], + sources=['confluent_kafka.c', 'Producer.c', 'Consumer.c']) + +setup (name='confluent-kafka', + version='0.9.1', + description='Confluent\'s Apache Kafka client for Python', + author='Confluent Inc', + author_email='support@confluent.io', + url='https://github.com/confluentinc/confluent-kafka-python', + ext_modules=[module]) + diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 000000000..8e67d8a54 --- /dev/null +++ b/tests/README.md @@ -0,0 +1,7 @@ +Unit tests +========== + +Run from top-level directory with: + + py.test + diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py new file mode 100644 index 000000000..0af99d074 --- /dev/null +++ b/tests/test_Consumer.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python + +from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException + + +def test_basic_api(): + """ Basic API tests, these wont really do anything since there is no + broker configured. """ + + try: + kc = Consumer() + except TypeError as e: + assert str(e) == "expected configuration dict" + + kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100'}) + + kc.subscribe(["test"]) + kc.unsubscribe() + + def dummy_assign_revoke (consumer, partitions): + pass + + kc.subscribe(["test"], on_assign=dummy_assign_revoke, on_revoke=dummy_assign_revoke) + kc.unsubscribe() + + msg = kc.poll(timeout=0.001) + if msg is None: + print('OK: poll() timeout') + elif msg.error(): + print('OK: consumer error: %s' % msg.error().str()) + else: + print('OK: consumed message') + + partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3))) + kc.assign(partitions) + + kc.unassign() + + kc.commit(async=True) + + try: + kc.commit(async=False) + except KafkaException as e: + assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._WAIT_COORD) + + # Get current position, should all be invalid. + kc.position(partitions) + assert len([p for p in partitions if p.offset == -1001]) == len(partitions) + + try: + offsets = kc.committed(partitions, timeout=0.001) + except KafkaException as e: + assert e.args[0].code() == KafkaError._TIMED_OUT + + + kc.close() + diff --git a/tests/test_Producer.py b/tests/test_Producer.py new file mode 100644 index 000000000..68ce03032 --- /dev/null +++ b/tests/test_Producer.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python + +from confluent_kafka import Producer, KafkaError, KafkaException + + +def test_basic_api(): + """ Basic API tests, these wont really do anything since there is no + broker configured. """ + + try: + p = Producer() + except TypeError as e: + assert str(e) == "expected configuration dict" + + + p = Producer({'socket.timeout.ms':10, + 'default.topic.config': {'message.timeout.ms': 10}}) + + p.produce('mytopic') + p.produce('mytopic', value='somedata', key='a key') + + def on_delivery(err,msg): + print('delivery', str) + # Since there is no broker, produced messages should time out. + assert err.code() == KafkaError._MSG_TIMED_OUT + + p.produce(topic='another_topic', value='testing', partition=9, + callback=on_delivery) + + p.poll(0.001) + + p.flush() + + diff --git a/tests/test_TopicPartition.py b/tests/test_TopicPartition.py new file mode 100644 index 000000000..b31335942 --- /dev/null +++ b/tests/test_TopicPartition.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +from confluent_kafka import TopicPartition + +def test_sort(): + """ TopicPartition sorting (rich comparator) """ + + # sorting uses the comparator + correct = [TopicPartition('topic1', 3), + TopicPartition('topic3', 0), + TopicPartition('topicA', 5), + TopicPartition('topicA', 5)] + + tps = sorted([TopicPartition('topicA', 5), + TopicPartition('topic3', 0), + TopicPartition('topicA', 5), + TopicPartition('topic1', 3)]) + + assert correct == tps + + +def test_cmp(): + """ TopicPartition comparator """ + + assert TopicPartition('aa', 19002) > TopicPartition('aa', 0) + assert TopicPartition('aa', 13) >= TopicPartition('aa', 12) + assert TopicPartition('BaB', 9) != TopicPartition('Card', 9) + assert TopicPartition('b3x', 4) == TopicPartition('b3x', 4) + assert TopicPartition('ulv', 2) < TopicPartition('xy', 0) + assert TopicPartition('ulv', 2) <= TopicPartition('ulv', 3) + + +def test_hash(): + + tp1 = TopicPartition('test', 99) + tp2 = TopicPartition('somethingelse', 12) + assert hash(tp1) != hash(tp2) + diff --git a/tests/test_docs.py b/tests/test_docs.py new file mode 100644 index 000000000..c44912d2a --- /dev/null +++ b/tests/test_docs.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python + +import confluent_kafka +import re + + +def test_verify_docs(): + """ Make sure all exported functions, classes, etc, have proper docstrings + """ + fails = 0 + + for n in dir(confluent_kafka): + if n[0:2] == '__': + # Skip internals + continue + + o = confluent_kafka.__dict__.get(n) + d = o.__doc__ + if not d: + print('Missing __doc__ for: %s (type %s)' % (n, type(o))) + fails += 1 + elif not re.search(r':', d): + print('Missing Doxygen tag for: %s (type %s)' % (n, type(o))) + fails += 1 + + assert fails == 0 + + diff --git a/tests/test_enums.py b/tests/test_enums.py new file mode 100644 index 000000000..64f46221e --- /dev/null +++ b/tests/test_enums.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python + +import confluent_kafka + +def test_enums(): + """ Make sure librdkafka error enums are reachable directly from the + KafkaError class without an instantiated object. """ + print(confluent_kafka.KafkaError._NO_OFFSET) + print(confluent_kafka.KafkaError.REBALANCE_IN_PROGRESS) diff --git a/tests/test_misc.py b/tests/test_misc.py new file mode 100644 index 000000000..f29eb4853 --- /dev/null +++ b/tests/test_misc.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python + +import confluent_kafka + + +def test_version(): + print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) + sver, iver = confluent_kafka.version() + assert len(sver) > 0 + assert iver > 0 + + print('Using librdkafka version %s (0x%x)' % confluent_kafka.libversion()) + sver, iver = confluent_kafka.libversion() + assert len(sver) > 0 + assert iver > 0 +