Skip to content

MINIFICPP-2542 ConsumeKafka late offset commit #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmake/Spdlog.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include(FetchContent)

set(SPDLOG_FMT_EXTERNAL ON CACHE STRING "" FORCE)
FetchContent_Declare(Spdlog
URL https://github.com/gabime/spdlog/archive/refs/tags/v1.14.1.tar.gz
URL_HASH SHA256=1586508029a7d0670dfcb2d97575dcdc242d3868a259742b69f100801ab4e16b
URL https://github.com/gabime/spdlog/archive/refs/tags/v1.15.1.tar.gz
URL_HASH SHA256=25c843860f039a1600f232c6eb9e01e6627f7d030a2ae5e232bdd3c9205d26cc
OVERRIDE_FIND_PACKAGE
)
FetchContent_MakeAvailable(Spdlog)
5 changes: 5 additions & 0 deletions cmake/fmt.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
# under the License.
#
include(FetchContent)
set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/fmt/add_error_message_to_std__error_code_formatter.patch")
set(PC ${Bash_EXECUTABLE} -c "set -x &&\
(\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")")

FetchContent_Declare(Fmt
URL https://github.com/fmtlib/fmt/archive/refs/tags/11.0.2.tar.gz
URL_HASH SHA256=6cb1e6d37bdcb756dbbe59be438790db409cdb4868c66e888d5df9f13f7c027f
PATCH_COMMAND "${PC}"
OVERRIDE_FIND_PACKAGE
)
FetchContent_MakeAvailable(Fmt)
3 changes: 1 addition & 2 deletions docker/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
pyjks==20.0.0
shortuuid==1.0.13
behavex==4.0.10
behavex==4.1.0
docker==7.1.0
confluent-kafka==2.8.0
PyYAML==6.0.2
m2crypto==0.41.0
watchdog==6.0.0
Expand Down
10 changes: 0 additions & 10 deletions docker/test/integration/cluster/ContainerStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .containers.MinifiContainer import MinifiContainer
from .containers.NifiContainer import NifiContainer
from .containers.NifiContainer import NiFiOptions
from .containers.ZookeeperContainer import ZookeeperContainer
from .containers.KafkaBrokerContainer import KafkaBrokerContainer
from .containers.S3ServerContainer import S3ServerContainer
from .containers.AzureStorageServerContainer import AzureStorageServerContainer
Expand Down Expand Up @@ -121,15 +120,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
image_store=self.image_store,
command=command))
elif engine == 'kafka-broker':
zookeeper_name = self.get_container_name_with_postfix('zookeeper')
if zookeeper_name not in self.containers:
self.containers.setdefault(zookeeper_name,
ZookeeperContainer(feature_context=feature_context,
name=zookeeper_name,
vols=self.vols,
network=self.network,
image_store=self.image_store,
command=command))
return self.containers.setdefault(container_name,
KafkaBrokerContainer(feature_context=feature_context,
name=container_name,
Expand Down
10 changes: 8 additions & 2 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from .checkers.AzureChecker import AzureChecker
from .checkers.ElasticSearchChecker import ElasticSearchChecker
from .checkers.GcsChecker import GcsChecker
from .checkers.KafkaHelper import KafkaHelper
from .checkers.PostgresChecker import PostgresChecker
from .checkers.PrometheusChecker import PrometheusChecker
from .checkers.SplunkChecker import SplunkChecker
Expand All @@ -56,6 +57,7 @@ def __init__(self, context, feature_id):
self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator)
self.modbus_checker = ModbusChecker(self.container_communicator)
self.couchbase_checker = CouchbaseChecker()
self.kafka_checker = KafkaHelper(self.container_communicator, feature_id)

def cleanup(self):
self.container_store.cleanup()
Expand Down Expand Up @@ -276,9 +278,13 @@ def check_query_results(self, postgresql_container_name, query, number_of_rows,
def segfault_happened(self):
return self.segfault

def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name: str, count: int):
kafka_container_name = self.container_store.get_container_name_with_postfix(kafka_container_name)
return self.wait_for_app_logs_regex(kafka_container_name, "Assignment received from leader.*for group docker_test_group", 60)
return self.wait_for_app_logs_regex(kafka_container_name, "Assignment received from leader.*for group docker_test_group", 60, count)

def wait_for_kafka_consumer_to_be_reregistered(self, kafka_container_name):
kafka_container_name = self.container_store.get_container_name_with_postfix(kafka_container_name)
return self.wait_for_app_logs_regex(kafka_container_name, "Assignment received from leader.*for group docker_test_group.*Assignment received from leader.*for group docker_test_group.", 60)

def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds):
return self.prometheus_checker.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds)
Expand Down
58 changes: 58 additions & 0 deletions docker/test/integration/cluster/checkers/KafkaHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import io
import logging
import docker


class KafkaHelper:
def __init__(self, container_communicator, feature_id):
self.container_communicator = container_communicator
self.feature_id = feature_id

def create_topic(self, container_name: str, topic_name: str):
logging.info(f"Creating topic {topic_name} in {container_name}")
(code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {container_name}:9092"])
logging.info(output)
return code == 0

def produce_message(self, container_name: str, topic_name: str, message: str):
logging.info(f"Sending {message} to {container_name}:{topic_name}")
(code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message}'"])
logging.info(output)
return code == 0

def produce_message_with_key(self, container_name: str, topic_name: str, message: str, message_key: str):
logging.info(f"Sending {message} to {container_name}:{topic_name}")
(code, output) = self.container_communicator.execute_command(container_name, ["/bin/bash", "-c", f"/opt/bitnami/kafka/bin/kafka-console-producer.sh --property 'key.separator=:' --property 'parse.key=true' --topic {topic_name} --bootstrap-server {container_name}:9092 <<< '{message_key}:{message}'"])
logging.info(output)
return code == 0

def run_python_in_kafka_helper_docker(self, command: str):
try:
self.container_communicator.client.images.get("kafka-helper")
except docker.errors.ImageNotFound:
dockerfile_content = """
FROM python:3.13-slim-bookworm
RUN pip install confluent-kafka
"""
dockerfile_stream = io.BytesIO(dockerfile_content.encode("utf-8"))
image, _ = self.container_communicator.client.images.build(fileobj=dockerfile_stream, tag="kafka-helper")

output = self.container_communicator.client.containers.run("kafka-helper", ["python", "-c", command], remove=True, stdout=True, stderr=True, network=f"minifi_integration_test_network-{self.feature_id}")
logging.info(output)
return True
113 changes: 81 additions & 32 deletions docker/test/integration/cluster/containers/KafkaBrokerContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import logging
import tempfile
import os
import docker.types
import jks
from OpenSSL import crypto
Expand All @@ -34,62 +33,112 @@ def __init__(self, feature_context, name, vols, network, image_store, command=No
pke = jks.PrivateKeyEntry.new('kafka-broker-cert', [crypto.dump_certificate(crypto.FILETYPE_ASN1, kafka_cert)], crypto.dump_privatekey(crypto.FILETYPE_ASN1, kafka_key), 'rsa_raw')
server_keystore = jks.KeyStore.new('jks', [pke])

self.server_keystore_file = tempfile.NamedTemporaryFile(delete=False)
server_keystore.save(self.server_keystore_file.name, 'abcdefgh')
self.server_keystore_file.close()
with tempfile.NamedTemporaryFile(delete=False, suffix=".jks") as server_keystore_file:
server_keystore.save(server_keystore_file.name, 'abcdefgh')
self.server_keystore_file_path = server_keystore_file.name

self.server_truststore_file = tempfile.NamedTemporaryFile(delete=False)
self.server_truststore_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, feature_context.root_ca_cert))
self.server_truststore_file.close()
trusted_cert = jks.TrustedCertEntry.new(
'root-ca', # Alias for the certificate
crypto.dump_certificate(crypto.FILETYPE_ASN1, feature_context.root_ca_cert)
)

self.server_properties_file = tempfile.NamedTemporaryFile(delete=False)
self.feature_id = feature_context.id
with open(os.environ['TEST_DIRECTORY'] + "/resources/kafka_broker/conf/server.properties") as server_properties_file:
server_properties_content = server_properties_file.read()
patched_server_properties_content = server_properties_content.replace("kafka-broker", f"kafka-broker-{feature_context.id}")
self.server_properties_file.write(patched_server_properties_content.encode())
self.server_properties_file.close()
os.chmod(self.server_properties_file.name, 0o644)
# Create a JKS keystore that will serve as a truststore with the trusted cert entry.
truststore = jks.KeyStore.new('jks', [trusted_cert])

with tempfile.NamedTemporaryFile(delete=False, suffix=".jks") as server_truststore_file:
truststore.save(server_truststore_file.name, 'abcdefgh')
self.server_truststore_file_path = server_truststore_file.name

with tempfile.NamedTemporaryFile(delete=False, mode="w", encoding="utf-8") as jaas_config_file:
jaas_config_file.write("""
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
""")
self.jaas_config_file_path = jaas_config_file.name

def get_startup_finished_log_entry(self):
return "Recorded new controller, from now on will use broker kafka-broker"
return "Kafka Server started"

def deploy(self):
if not self.set_deployed():
return

logging.info('Creating and running kafka broker docker container...')

self.client.containers.run(
image="ubuntu/kafka:3.1-22.04_beta",
image="bitnami/kafka:3.9.0",
detach=True,
name=self.name,
network=self.network.name,
ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093, '9094/tcp': 9094, '29094/tcp': 29094, '9094/tcp': 9094, '29095/tcp': 29095},
environment=[
"ZOOKEEPER_HOST=zookeeper-" + self.feature_id,
"ZOOKEEPER_PORT=2181"
"KAFKA_CFG_NODE_ID=1",
"KAFKA_CFG_PROCESS_ROLES=controller,broker",
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT",
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER",

f"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka-broker-{self.feature_context.id}:9096",

f"KAFKA_CFG_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094,"
f"SSL://kafka-broker-{self.feature_context.id}:9093,"
f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095,"
f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096",

f"KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker-{self.feature_context.id}:9092,"
f"SASL_PLAINTEXT://kafka-broker-{self.feature_context.id}:9094,"
f"SSL://kafka-broker-{self.feature_context.id}:9093,"
f"SASL_SSL://kafka-broker-{self.feature_context.id}:9095,"
f"CONTROLLER://kafka-broker-{self.feature_context.id}:9096",

"KAFKA_CFG_LOG4J_ROOT_LOGLEVEL=DEBUG",
"KAFKA_CFG_LOG4J_LOGGERS=kafka.controller=DEBUG,kafka.server.KafkaApis=DEBUG",

"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,"
"SASL_PLAINTEXT:SASL_PLAINTEXT,"
"SSL:SSL,"
"SASL_SSL:SASL_SSL,"
"CONTROLLER:PLAINTEXT",

# **If using SASL_PLAINTEXT, provide JAAS config**
'KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN',
'KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN',
'KAFKA_OPTS=-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf',

"KAFKA_CFG_SSL_PROTOCOL=TLS",
"KAFKA_CFG_SSL_ENABLED_PROTOCOLS=TLSv1.2",
"KAFKA_CFG_SSL_KEYSTORE_TYPE=JKS",
"KAFKA_CFG_SSL_KEYSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.keystore.jks",
"KAFKA_CFG_SSL_KEYSTORE_PASSWORD=abcdefgh",
"KAFKA_CFG_SSL_KEY_PASSWORD=abcdefgh",
"KAFKA_CFG_SSL_TRUSTSTORE_TYPE=JKS",
"KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/bitnami/kafka/config/certs/kafka.truststore.jks",
"KAFKA_CFG_SSL_CLIENT_AUTH=none"
],
mounts=[
docker.types.Mount(
type='bind',
source=self.server_properties_file.name,
target='/opt/kafka/config/kraft/server.properties'
),
docker.types.Mount(
type='bind',
source=self.server_properties_file.name,
target='/opt/kafka/config/server.properties'
source=self.server_keystore_file_path,
target='/bitnami/kafka/config/certs/kafka.keystore.jks'
),
docker.types.Mount(
type='bind',
source=self.server_keystore_file.name,
target='/usr/local/etc/kafka/certs/server_keystore.jks'
source=self.server_truststore_file_path,
target='/bitnami/kafka/config/certs/kafka.truststore.jks'
),
docker.types.Mount(
type='bind',
source=self.server_truststore_file.name,
target='/usr/local/etc/kafka/certs/server_truststore.pem'
source=self.jaas_config_file_path,
target='/opt/bitnami/kafka/config/kafka_jaas.conf'
)
],
entrypoint=self.command)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ def deploy(self):
detach=True,
name=self.name,
network=self.network.name,
ports={'2181/tcp': 2181},
entrypoint=self.command)
logging.info('Added container \'%s\'', self.name)
Loading
Loading