From fc98bab43d5e4946033979820606780cd77c7b1f Mon Sep 17 00:00:00 2001
From: Federico Valeri <fedevaleri@gmail.com>
Date: Tue, 17 Dec 2024 10:03:10 +0100
Subject: [PATCH 1/3] Add support for Strimzi Metrics Reporter

This patch adds support for metrics types configuration, integrating the Strimzi Metrics Reporter, and custom Prometheus JMX Exporter configuration.
The custom configuration feature is needed for Cluster Operator integration, but it can also be leveraged by standalone users.
See Proposal 43 for more details.

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
---
 CHANGELOG.md                                  |   9 ++
 config/application.properties                 |  11 ++
 .../assembly-kafka-bridge-config.adoc         |   6 +-
 ...-configuring-kafka-bridge-jmx-metrics.adoc |  33 ++++
 ...proc-configuring-kafka-bridge-metrics.adoc |  30 ----
 ...-configuring-kafka-bridge-smr-metrics.adoc |  36 +++++
 pom.xml                                       |  27 ++++
 .../io/strimzi/kafka/bridge/Application.java  | 142 ++++++++++--------
 .../strimzi/kafka/bridge/MetricsReporter.java |  72 ---------
 .../kafka/bridge/config/BridgeConfig.java     |  61 +++++++-
 .../strimzi/kafka/bridge/http/HttpBridge.java |  19 ++-
 .../{ => metrics}/JmxCollectorRegistry.java   |  38 ++---
 .../kafka/bridge/metrics/MetricsReporter.java |  94 ++++++++++++
 .../kafka/bridge/metrics/MetricsType.java     |  30 ++++
 .../metrics/StrimziCollectorRegistry.java     |  55 +++++++
 .../kafka/bridge/config/ConfigTest.java       |  79 +++++++++-
 .../bridge/http/ConsumerGeneratedNameIT.java  |   8 +-
 .../http/DisablingConsumerProducerIT.java     |   5 +-
 .../strimzi/kafka/bridge/http/HttpCorsIT.java |   8 +-
 .../kafka/bridge/http/InvalidProducerIT.java  |   4 +-
 .../http/base/HttpBridgeITAbstract.java       |  11 +-
 .../metrics/JmxCollectorRegistryTest.java     |  73 +++++++++
 .../bridge/metrics/MetricsReporterTest.java   | 113 ++++++++++++++
 .../metrics/StrimziCollectorRegistryTest.java |  63 ++++++++
 24 files changed, 809 insertions(+), 218 deletions(-)
 create mode 100644 documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
 delete mode 100644 documentation/modules/proc-configuring-kafka-bridge-metrics.adoc
 create mode 100644 documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
 delete mode 100644 src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
 rename src/main/java/io/strimzi/kafka/bridge/{ => metrics}/JmxCollectorRegistry.java (54%)
 create mode 100644 src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
 create mode 100644 src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
 create mode 100644 src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java
 create mode 100644 src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java
 create mode 100644 src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
 create mode 100644 src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java

diff --git a/CHANGELOG.md b/CHANGELOG.md
index dde54216e..4e154dec0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,15 @@
   * Both the `/openapi` and `/openapi/v3` endpoints return the OpenAPI v3 definition of the bridge REST API.
 * Removed script to build bridge configuration within the container. 
   It is going to be set up by the Strimzi operator within a ConfigMap and mounted as volume on the bridge pod.
+* Added support for [Strimzi Metrics Reporter](https://github.com/strimzi/metrics-reporter).
+  This is a Kafka plugin that directly exposes metrics in Prometheus format without using JMX, and can be enabled by setting `bridge.metrics=strimziMetricsReporter`.
+* Added support for custom Prometheus JMX Exporter configuration.
+  Set `bridge.metrics.exporter.config.path=/path/to/my-exporter-config.yml` to use your custom config.
+
+### Changes, deprecations and removals
+
+* `KAFKA_BRIDGE_METRICS_ENABLED` configuration has been deprecated.
+  Set `bridge.metrics=jmxPrometheusExporter` to keep using Prometheus JMX Exporter.
 
 ## 0.31.1
 
diff --git a/config/application.properties b/config/application.properties
index 423b17886..c384d1227 100644
--- a/config/application.properties
+++ b/config/application.properties
@@ -1,5 +1,16 @@
 #Bridge related settings
 bridge.id=my-bridge
+
+# uncomment the following line to enable JMX Prometheus Exporter, check the documentation for more details
+#bridge.metrics=jmxPrometheusExporter
+# optionally, you can set the file path of your custom configuration
+#bridge.metrics.exporter.config.path=/path/to/my-exporter-config.yaml
+
+# uncomment the following line to enable Strimzi Metrics Reporter, check the documentation for more details
+#bridge.metrics=strimziMetricsReporter
+# optionally, you can filter the exposed metrics using a comma separated list of regexes
+#kafka.prometheus.metrics.reporter.allowlist=.*
+
 # uncomment the following line (bridge.tracing) to enable OpenTelemetry tracing, check the documentation for more details
 #bridge.tracing=opentelemetry
 
diff --git a/documentation/assemblies/assembly-kafka-bridge-config.adoc b/documentation/assemblies/assembly-kafka-bridge-config.adoc
index 9cb1238f0..8fb731e84 100644
--- a/documentation/assemblies/assembly-kafka-bridge-config.adoc
+++ b/documentation/assemblies/assembly-kafka-bridge-config.adoc
@@ -8,11 +8,13 @@
 [role="_abstract"]
 Configure a deployment of the Kafka Bridge using configuration properties.
 Configure Kafka and specify the HTTP connection details needed to be able to interact with Kafka.
+It is possible to enable metrics in Prometheus format with JMX Prometheus Exporter or Strimzi Metrics Reporter.
 You can also use configuration properties to enable and use distributed tracing with the Kafka Bridge.
 Distributed tracing allows you to track the progress of transactions between applications in a distributed system.
 
 NOTE: Use the `KafkaBridge` resource to configure properties when you are xref:overview-components-running-kafka-bridge-cluster-{context}[running the Kafka Bridge on Kubernetes].
 
 include::modules/proc-configuring-kafka-bridge.adoc[leveloffset=+1]
-include::modules/proc-configuring-kafka-bridge-metrics.adoc[leveloffset=+1]
-include::modules/proc-configuring-kafka-bridge-tracing.adoc[leveloffset=+1]
\ No newline at end of file
+include::modules/proc-configuring-kafka-bridge-jmx-metrics.adoc[leveloffset=+1]
+include::modules/proc-configuring-kafka-bridge-smr-metrics.adoc[leveloffset=+1]
+include::modules/proc-configuring-kafka-bridge-tracing.adoc[leveloffset=+1]
diff --git a/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
new file mode 100644
index 000000000..9d5e9a030
--- /dev/null
+++ b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
@@ -0,0 +1,33 @@
+[id='proc-configuring-kafka-bridge-jmx-metrics-{context}']
+= Configuring JMX Exporter metrics
+
+[role="_abstract"]
+Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration.
+
+.Prerequisites
+
+* xref:proc-downloading-kafka-bridge-{context}[The Kafka Bridge installation archive is downloaded].
+
+.Procedure
+
+. Set the `bridge.metrics` configuration to `jmxPrometheusExporter`.
++
+.Configuration for enabling metrics
+
+[source,properties]
+----
+bridge.metrics=jmxPrometheusExporter
+----
++
+Optionally, you can set a custom JMX Exporter configuration using the `bridge.metrics.exporter.config.path` property.
+If not set, a default embedded configuration file will be used.
+
+. Run the Kafka Bridge script to enable metrics.
++
+.Running the Kafka Bridge to enable metrics
+[source,shell]
+----
+./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
+----
++
+With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format.
diff --git a/documentation/modules/proc-configuring-kafka-bridge-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-metrics.adoc
deleted file mode 100644
index 94a9c37dd..000000000
--- a/documentation/modules/proc-configuring-kafka-bridge-metrics.adoc
+++ /dev/null
@@ -1,30 +0,0 @@
-[id='proc-configuring-kafka-bridge-metrics-{context}']
-= Configuring metrics
-
-[role="_abstract"]
-Enable metrics for the Kafka Bridge by setting the `KAFKA_BRIDGE_METRICS_ENABLED` environment variable.
-
-.Prerequisites
-
-* xref:proc-downloading-kafka-bridge-{context}[The Kafka Bridge installation archive is downloaded].
-
-.Procedure
-
-. Set the environment variable for enabling metrics to `true`.
-+
-.Environment variable for enabling metrics
-
-[source,properties]
-----
-KAFKA_BRIDGE_METRICS_ENABLED=true
-----
-
-. Run the Kafka Bridge script to enable metrics.
-+
-.Running the Kafka Bridge to enable metrics
-[source,shell]
-----
-./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
-----
-+
-With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format.
\ No newline at end of file
diff --git a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
new file mode 100644
index 000000000..2acb8c26d
--- /dev/null
+++ b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
@@ -0,0 +1,36 @@
+[id='proc-configuring-kafka-bridge-smr-metrics-{context}']
+= Configuring Strimzi Metrics Reporter metrics
+
+[role="_abstract"]
+Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration.
+
+.Prerequisites
+
+* xref:proc-downloading-kafka-bridge-{context}[The Kafka Bridge installation archive is downloaded].
+
+.Procedure
+
+. Set the `bridge.metrics` configuration to `strimziMetricsReporter` and related configuration.
++
+.Configuration for enabling metrics
+
+[source,properties]
+----
+bridge.metrics=strimziMetricsReporter
+----
++
+Optionally, you can set a comma-separated list of regexes used to filter exposed metrics using the `kafka.prometheus.metrics.reporter.allowlist` property.
+If not set, all available metrics will be exposed.
++
+You can add any plugin configuration to the Bridge properties file using the `kafka.` prefix.
+See the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter] documentation for more details.
+
+. Run the Kafka Bridge script to enable metrics.
++
+.Running the Kafka Bridge to enable metrics
+[source,shell]
+----
+./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
+----
++
+With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format.
diff --git a/pom.xml b/pom.xml
index 0031a798c..756ef2bd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,7 @@
 		<checkstyle.version>10.12.2</checkstyle.version>
 		<hamcrest.version>2.2</hamcrest.version>
 		<junit.version>5.8.2</junit.version>
+		<mockito.version>4.11.0</mockito.version>
 		<maven-jar-plugin.version>3.3.0</maven-jar-plugin.version>
 		<maven-surefire-plugin.version>3.0.0-M7</maven-surefire-plugin.version>
 		<maven-failsafe-plugin.version>3.0.0-M7</maven-failsafe-plugin.version>
@@ -131,6 +132,7 @@
 		<spotbugs.version>4.7.3</spotbugs.version>
 		<maven.spotbugs.version>4.7.3.0</maven.spotbugs.version>
 		<strimzi-oauth.version>0.15.0</strimzi-oauth.version>
+		<strimzi-metrics-reporter.version>0.1.0</strimzi-metrics-reporter.version>
 		<opentelemetry.version>1.34.1</opentelemetry.version>
 		<opentelemetry-alpha.version>1.34.1-alpha</opentelemetry-alpha.version>
 		<opentelemetry.instrumentation.version>1.32.0-alpha</opentelemetry.instrumentation.version>
@@ -295,6 +297,11 @@
 			<artifactId>micrometer-registry-prometheus</artifactId>
 			<version>${micrometer.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>io.strimzi</groupId>
+			<artifactId>metrics-reporter</artifactId>
+			<version>${strimzi-metrics-reporter.version}</version>
+		</dependency>
 		<dependency>
 			<groupId>io.prometheus.jmx</groupId>
 			<artifactId>collector</artifactId>
@@ -305,6 +312,16 @@
 			<artifactId>prometheus-metrics-model</artifactId>
 			<version>${prometheus-client.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>prometheus-metrics-instrumentation-jvm</artifactId>
+			<version>${prometheus-client.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>prometheus-metrics-exporter-httpserver</artifactId>
+			<version>${prometheus-client.version}</version>
+		</dependency>
 		<dependency>
 			<groupId>io.prometheus</groupId>
 			<artifactId>prometheus-metrics-exposition-textformats</artifactId>
@@ -421,6 +438,12 @@
 			<version>${hamcrest.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-core</artifactId>
+			<version>${mockito.version}</version>
+			<scope>test</scope>
+		</dependency>
 		<dependency>
 			<groupId>io.strimzi</groupId>
 			<artifactId>strimzi-test-container</artifactId>
@@ -584,6 +607,10 @@
 									<!-- OpenTelemetry - used via classpath configuration opentelemetry-exporter-sender-grpc is required at runtime to replace OKHTTP -->
 									<ignoredUnusedDeclaredDependency>io.grpc:grpc-netty-shaded:jar</ignoredUnusedDeclaredDependency>
 									<ignoredUnusedDeclaredDependency>com.google.guava:guava</ignoredUnusedDeclaredDependency>
+									<!-- Strimzi Metrics Reporter - used via classpath configuration -->
+									<ignoredUnusedDeclaredDependency>io.strimzi:metrics-reporter</ignoredUnusedDeclaredDependency>
+									<ignoredUnusedDeclaredDependency>io.prometheus:prometheus-metrics-instrumentation-jvm</ignoredUnusedDeclaredDependency>
+									<ignoredUnusedDeclaredDependency>io.prometheus:prometheus-metrics-exporter-httpserver</ignoredUnusedDeclaredDependency>
 								</ignoredUnusedDeclaredDependencies>
 						</configuration>
 					</execution>
diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java
index 04a3f6e6d..2b441222e 100644
--- a/src/main/java/io/strimzi/kafka/bridge/Application.java
+++ b/src/main/java/io/strimzi/kafka/bridge/Application.java
@@ -5,10 +5,13 @@
 
 package io.strimzi.kafka.bridge;
 
-import io.micrometer.core.instrument.MeterRegistry;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.ConfigRetriever;
 import io.strimzi.kafka.bridge.http.HttpBridge;
+import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
+import io.strimzi.kafka.bridge.metrics.MetricsType;
+import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
 import io.strimzi.kafka.bridge.tracing.TracingUtil;
 import io.vertx.core.Future;
 import io.vertx.core.Promise;
@@ -18,7 +21,6 @@
 import io.vertx.micrometer.MetricsDomain;
 import io.vertx.micrometer.MicrometerMetricsOptions;
 import io.vertx.micrometer.VertxPrometheusOptions;
-import io.vertx.micrometer.backends.BackendRegistries;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.Option;
@@ -34,6 +36,7 @@
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Map;
@@ -46,8 +49,6 @@
 public class Application {
     private static final Logger LOGGER = LogManager.getLogger(Application.class);
 
-    private static final String KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
-
     /**
      * Bridge entrypoint
      *
@@ -56,27 +57,12 @@ public class Application {
     public static void main(String[] args) {
         LOGGER.info("Strimzi Kafka Bridge {} is starting", Application.class.getPackage().getImplementationVersion());
         try {
-            VertxOptions vertxOptions = new VertxOptions();
-            JmxCollectorRegistry jmxCollectorRegistry = null;
-            if (Boolean.parseBoolean(System.getenv(KAFKA_BRIDGE_METRICS_ENABLED))) {
-                LOGGER.info("Metrics enabled and exposed on the /metrics endpoint");
-                // setup Micrometer metrics options
-                vertxOptions.setMetricsOptions(metricsOptions());
-                jmxCollectorRegistry = getJmxCollectorRegistry();
-            }
-            Vertx vertx = Vertx.vertx(vertxOptions);
-            // MeterRegistry default instance is just null if metrics are not enabled in the VertxOptions instance
-            MeterRegistry meterRegistry = BackendRegistries.getDefaultNow();
-            MetricsReporter metricsReporter = new MetricsReporter(jmxCollectorRegistry, meterRegistry);
-
-
             CommandLine commandLine = new DefaultParser().parse(generateOptions(), args);
-
             Map<String, Object> config = ConfigRetriever.getConfig(absoluteFilePath(commandLine.getOptionValue("config-file")));
             BridgeConfig bridgeConfig = BridgeConfig.fromMap(config);
             LOGGER.info("Bridge configuration {}", bridgeConfig);
 
-            deployHttpBridge(vertx, bridgeConfig, metricsReporter).onComplete(done -> {
+            deployHttpBridge(bridgeConfig).onComplete(done -> {
                 if (done.succeeded()) {
                     // register tracing - if set, etc
                     TracingUtil.initialize(bridgeConfig);
@@ -88,70 +74,103 @@ public static void main(String[] args) {
         }
     }
 
-    /**
-     * Set up the Vert.x metrics options
-     *
-     * @return instance of the MicrometerMetricsOptions on Vert.x
-     */
-    private static MicrometerMetricsOptions metricsOptions() {
-        Set<String> set = new HashSet<>();
-        set.add(MetricsDomain.NAMED_POOLS.name());
-        set.add(MetricsDomain.VERTICLES.name());
-        return new MicrometerMetricsOptions()
-                .setPrometheusOptions(new VertxPrometheusOptions().setEnabled(true))
-                // define the labels on the HTTP server related metrics
-                .setLabels(EnumSet.of(Label.HTTP_PATH, Label.HTTP_METHOD, Label.HTTP_CODE))
-                // disable metrics about pool and verticles
-                .setDisabledMetricsCategories(set)
-                .setJvmMetricsEnabled(true)
-                .setEnabled(true);
-    }
-
     /**
      * Deploys the HTTP bridge into a new verticle
      *
-     * @param vertx                 Vertx instance
      * @param bridgeConfig          Bridge configuration
-     * @param metricsReporter       MetricsReporter instance for scraping metrics from different registries
      * @return                      Future for the bridge startup
      */
-    private static Future<HttpBridge> deployHttpBridge(Vertx vertx, BridgeConfig bridgeConfig, MetricsReporter metricsReporter)  {
+    private static Future<HttpBridge> deployHttpBridge(BridgeConfig bridgeConfig) 
+            throws MalformedObjectNameException, IOException {
         Promise<HttpBridge> httpPromise = Promise.promise();
 
+        Vertx vertx = createVertxInstance(bridgeConfig);
+        MetricsReporter metricsReporter = getMetricsReporter(bridgeConfig);
         HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);
         vertx.deployVerticle(httpBridge)
-                .onComplete(done -> {
-                    if (done.succeeded()) {
-                        LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
-                        httpPromise.complete(httpBridge);
-                    } else {
-                        LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
-                        httpPromise.fail(done.cause());
+            .onComplete(done -> {
+                if (done.succeeded()) {
+                    LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
+                    if (metricsReporter != null) {
+                        LOGGER.info("Metrics of type '{}' enabled and exposed on /metrics endpoint", bridgeConfig.getMetrics());
                     }
-                });
+                    httpPromise.complete(httpBridge);
+                } else {
+                    LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
+                    httpPromise.fail(done.cause());
+                }
+            });
 
         return httpPromise.future();
     }
 
+    private static Vertx createVertxInstance(BridgeConfig bridgeConfig) {
+        VertxOptions vertxOptions = new VertxOptions();
+        if (bridgeConfig.getMetrics() != null) {
+            vertxOptions.setMetricsOptions(metricsOptions()); // enable Vertx metrics
+        }
+        Vertx vertx = Vertx.vertx(vertxOptions);
+        return vertx;
+    }
+
     /**
-     * Return a JmxCollectorRegistry instance with the YAML configuration filters
+     * Set up the Vert.x metrics options
      *
-     * @return JmxCollectorRegistry instance
-     * @throws MalformedObjectNameException
-     * @throws IOException
+     * @return instance of the MicrometerMetricsOptions on Vert.x
      */
-    private static JmxCollectorRegistry getJmxCollectorRegistry()
+    private static MicrometerMetricsOptions metricsOptions() {
+        Set<String> set = new HashSet<>();
+        set.add(MetricsDomain.NAMED_POOLS.name());
+        set.add(MetricsDomain.VERTICLES.name());
+        return new MicrometerMetricsOptions()
+            .setPrometheusOptions(new VertxPrometheusOptions().setEnabled(true))
+            // define the labels on the HTTP server related metrics
+            .setLabels(EnumSet.of(Label.HTTP_PATH, Label.HTTP_METHOD, Label.HTTP_CODE))
+            // disable metrics about pool and verticles
+            .setDisabledMetricsCategories(set)
+            .setJvmMetricsEnabled(true)
+            .setEnabled(true);
+    }
+
+    private static MetricsReporter getMetricsReporter(BridgeConfig bridgeConfig) 
             throws MalformedObjectNameException, IOException {
-        InputStream is = Application.class.getClassLoader().getResourceAsStream("jmx_metrics_config.yaml");
-        if (is == null) {
-            return null;
+        if (bridgeConfig.getMetrics() != null) {
+            if (bridgeConfig.getMetrics().equals(MetricsType.JMX_EXPORTER.toString())) {
+                return new MetricsReporter(getJmxCollectorRegistry(bridgeConfig));
+            } else if (bridgeConfig.getMetrics().equals(MetricsType.STRIMZI_REPORTER.toString())) {
+                return new MetricsReporter(new StrimziCollectorRegistry());
+            }
         }
+        return null;
+    }
 
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
-            String yaml = reader
+    /**
+     * Return a JmxCollectorRegistry instance with the YAML configuration filters.
+     * This is loaded from a custom config file if present, or from the default configuration file.
+     *
+     * @return JmxCollectorRegistry instance
+     * @throws MalformedObjectNameException
+     * @throws IOException
+     */
+    private static JmxCollectorRegistry getJmxCollectorRegistry(BridgeConfig bridgeConfig) throws MalformedObjectNameException, IOException {
+        if (bridgeConfig.getJmxExporterConfigPath() != null && Files.exists(bridgeConfig.getJmxExporterConfigPath())) {
+            // read custom configuration file
+            LOGGER.info("Loading custom JMX Exporter configuration from {}", bridgeConfig.getJmxExporterConfigPath());
+            String yaml = Files.readString(bridgeConfig.getJmxExporterConfigPath(), StandardCharsets.UTF_8);
+            return new JmxCollectorRegistry(yaml);
+        } else {
+            // fallback to default configuration
+            LOGGER.info("Loading default JMX Exporter configuration");
+            InputStream is = Application.class.getClassLoader().getResourceAsStream("jmx_metrics_config.yaml");
+            if (is == null) {
+                return null;
+            }
+            try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
+                String yaml = reader
                     .lines()
                     .collect(Collectors.joining("\n"));
-            return new JmxCollectorRegistry(yaml);
+                return new JmxCollectorRegistry(yaml);
+            }
         }
     }
 
@@ -161,7 +180,6 @@ private static JmxCollectorRegistry getJmxCollectorRegistry()
      * @return command line options
      */
     private static Options generateOptions() {
-
         Option configFileOption = Option.builder()
                 .required(true)
                 .hasArg(true)
diff --git a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
deleted file mode 100644
index ab8e2f315..000000000
--- a/src/main/java/io/strimzi/kafka/bridge/MetricsReporter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-
-package io.strimzi.kafka.bridge;
-
-import io.micrometer.core.instrument.Meter;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
-import io.micrometer.prometheus.PrometheusNamingConvention;
-
-/**
- * Used for scraping and reporting metrics in Prometheus format
- */
-public class MetricsReporter {
-
-    private final JmxCollectorRegistry jmxCollectorRegistry;
-    private final MeterRegistry meterRegistry;
-
-    /**
-     * Constructor
-     *
-     * @param jmxCollectorRegistry JmxCollectorRegistry instance for scraping metrics from JMX endpoints
-     * @param meterRegistry MeterRegistry instance for scraping metrics exposed through Vert.x
-     */
-    public MetricsReporter(JmxCollectorRegistry jmxCollectorRegistry, MeterRegistry meterRegistry) {
-        this.jmxCollectorRegistry = jmxCollectorRegistry;
-        this.meterRegistry = meterRegistry;
-        if (this.meterRegistry instanceof PrometheusMeterRegistry) {
-            this.meterRegistry.config().namingConvention(new MetricsNamingConvention());
-        }
-    }
-
-    private static class MetricsNamingConvention extends PrometheusNamingConvention {
-        @Override
-        public String name(String name, Meter.Type type, String baseUnit) {
-            String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
-            return super.name(metricName, type, baseUnit);
-        }
-    }
-
-    /**
-     * @return JmxCollectorRegistry instance for scraping metrics from JMX endpoints
-     */
-    public JmxCollectorRegistry getJmxCollectorRegistry() {
-        return jmxCollectorRegistry;
-    }
-
-    /**
-     * @return MeterRegistry instance for scraping metrics exposed through Vert.x
-     */
-    public MeterRegistry getMeterRegistry() {
-        return meterRegistry;
-    }
-
-    /**
-     * Scrape metrics on the provided registries returning them in the Prometheus format
-     *
-     * @return metrics in Prometheus format as String
-     */
-    public String scrape() {
-        StringBuilder sb = new StringBuilder();
-        if (jmxCollectorRegistry != null) {
-            sb.append(jmxCollectorRegistry.scrape());
-        }
-        if (meterRegistry instanceof PrometheusMeterRegistry) {
-            sb.append(((PrometheusMeterRegistry) meterRegistry).scrape());
-        }
-        return sb.toString();
-    }
-}
diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
index 919c7f924..d07ddca68 100644
--- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
+++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
@@ -6,14 +6,20 @@
 package io.strimzi.kafka.bridge.config;
 
 import io.strimzi.kafka.bridge.http.HttpConfig;
+import io.strimzi.kafka.bridge.metrics.MetricsType;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.nio.file.Path;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
  * Bridge configuration properties
  */
 public class BridgeConfig extends AbstractConfig {
+    private static final Logger LOGGER = LogManager.getLogger(BridgeConfig.class);
 
     /** Prefix for all the specific bridge configuration parameters */
     public static final String BRIDGE_CONFIG_PREFIX = "bridge.";
@@ -21,6 +27,12 @@ public class BridgeConfig extends AbstractConfig {
     /** Bridge identification number */
     public static final String BRIDGE_ID = BRIDGE_CONFIG_PREFIX + "id";
 
+    /** Metrics system to be used in the bridge */
+    public static final String METRICS_TYPE = BRIDGE_CONFIG_PREFIX + "metrics";
+
+    /** JMX Exporter configuration file path */
+    public static final String JMX_EXPORTER_CONFIG_PATH = METRICS_TYPE + ".exporter.config.path";
+    
     /** Tracing system to be used in the bridge */
     public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
 
@@ -55,21 +67,42 @@ public HttpConfig getHttpConfig() {
     }
 
     /**
-     * Loads the entire bridge configuration parameters from a related map
+     * Loads the entire bridge configuration parameters from a related map.
+     * Some validation and default values are also applied.
      *
      * @param map map from which loading configuration parameters
      * @return overall bridge configuration
      */
     public static BridgeConfig fromMap(Map<String, Object> map) {
+        validateAndApplyDefaults(map);
         KafkaConfig kafkaConfig = KafkaConfig.fromMap(map);
         HttpConfig httpConfig = HttpConfig.fromMap(map);
-
         return new BridgeConfig(map.entrySet().stream()
                 .filter(e -> e.getKey().startsWith(BridgeConfig.BRIDGE_CONFIG_PREFIX))
                 .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
                 kafkaConfig, httpConfig);
     }
 
+    private static void validateAndApplyDefaults(Map<String, Object> map) {
+        String metricsTypeValue = (String) map.get(METRICS_TYPE);
+        if (metricsTypeValue != null) {
+            // validate metrics type
+            if (!metricsTypeValue.equals(MetricsType.JMX_EXPORTER.toString())
+                    && !metricsTypeValue.equals(MetricsType.STRIMZI_REPORTER.toString())) {
+                throw new IllegalArgumentException(
+                    String.format("Invalid %s configuration, choose one of %s and %s", 
+                        METRICS_TYPE, MetricsType.JMX_EXPORTER, MetricsType.STRIMZI_REPORTER)
+                );
+            }
+            // apply default Strimzi Metrics Reporter configurations if not present
+            if (metricsTypeValue.equals(MetricsType.STRIMZI_REPORTER.toString())) {
+                map.putIfAbsent("kafka.metric.reporters", "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter");
+                map.putIfAbsent("kafka.prometheus.metrics.reporter.listener.enable", "false");
+                map.putIfAbsent("kafka.prometheus.metrics.reporter.allowlist", ".*");
+            }
+        }
+    }
+
     @Override
     public String toString() {
         return "BridgeConfig(" +
@@ -90,6 +123,30 @@ public String getBridgeID() {
         }
     }
 
+    /**
+     * @return the metric system to be used in the bridge
+     */
+    public String getMetrics() {
+        final String envVarValue = System.getenv("KAFKA_BRIDGE_METRICS_ENABLED");
+        if (envVarValue != null) {
+            LOGGER.warn("KAFKA_BRIDGE_METRICS_ENABLED is deprecated, use bridge.metrics configuration");
+        }
+        return (String) Optional.ofNullable(config.get(BridgeConfig.METRICS_TYPE))
+            .orElse(Boolean.parseBoolean(envVarValue)
+                ? MetricsType.JMX_EXPORTER.toString() : null);
+    }
+
+    /**
+     * @return the JMX Exporter configuration file path
+     */
+    public Path getJmxExporterConfigPath() {
+        if (config.get(BridgeConfig.JMX_EXPORTER_CONFIG_PATH) == null) {
+            return null;
+        } else {
+            return Path.of((String) config.get(BridgeConfig.JMX_EXPORTER_CONFIG_PATH));
+        }
+    }
+
     /**
      * @return the tracing system to be used in the bridge
      */
diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
index c05d43960..656734b54 100644
--- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
+++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
@@ -15,10 +15,10 @@
 import io.strimzi.kafka.bridge.ConsumerInstanceId;
 import io.strimzi.kafka.bridge.EmbeddedFormat;
 import io.strimzi.kafka.bridge.IllegalEmbeddedFormatException;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.http.converter.JsonUtils;
 import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.vertx.core.AbstractVerticle;
 import io.vertx.core.Promise;
 import io.vertx.core.file.FileSystem;
@@ -140,7 +140,6 @@ private void startInactiveConsumerDeletionTimer(Long timeout) {
 
     @Override
     public void start(Promise<Void> startPromise) {
-
         RouterBuilder.create(vertx, "openapi.json")
                 .onComplete(ar -> {
                     if (ar.succeeded()) {
@@ -169,7 +168,9 @@ public void start(Promise<Void> startPromise) {
                         routerBuilder.operation(this.OPENAPI.getOperationId().toString()).handler(this.OPENAPI);
                         routerBuilder.operation(this.OPENAPIV2.getOperationId().toString()).handler(this.OPENAPIV2);
                         routerBuilder.operation(this.OPENAPIV3.getOperationId().toString()).handler(this.OPENAPIV3);
-                        routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS);
+                        if (metricsReporter != null) {
+                            routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS);
+                        }
                         routerBuilder.operation(this.INFO.getOperationId().toString()).handler(this.INFO);
                         if (this.bridgeConfig.getHttpConfig().isCorsEnabled()) {
                             routerBuilder.rootHandler(getCorsHandler());
@@ -179,14 +180,14 @@ public void start(Promise<Void> startPromise) {
                         }
 
                         this.router = routerBuilder.createRouter();
-
+                        
                         // handling validation errors and not existing endpoints
                         this.router.errorHandler(HttpResponseStatus.BAD_REQUEST.code(), this::errorHandler);
                         this.router.errorHandler(HttpResponseStatus.NOT_FOUND.code(), this::errorHandler);
-
-                        if (this.metricsReporter.getMeterRegistry() != null) {
+        
+                        if (this.metricsReporter != null && this.metricsReporter.getVertxRegistry() != null) {
                             // exclude to report the HTTP server metrics for the /metrics endpoint itself
-                            this.metricsReporter.getMeterRegistry().config().meterFilter(
+                            this.metricsReporter.getVertxRegistry().config().meterFilter(
                                     MeterFilter.deny(meter -> "/metrics".equals(meter.getTag(Label.HTTP_PATH.toString())))
                             );
                         }
@@ -607,6 +608,8 @@ private void errorHandler(RoutingContext routingContext) {
             }
         } else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) {
             message = HttpResponseStatus.NOT_FOUND.reasonPhrase();
+        } else if (routingContext.statusCode() == HttpResponseStatus.NOT_IMPLEMENTED.code()) {
+            message = HttpResponseStatus.NOT_IMPLEMENTED.reasonPhrase();
         }
 
         HttpBridgeError error = new HttpBridgeError(routingContext.statusCode(), message);
@@ -823,7 +826,7 @@ public void process(RoutingContext routingContext) {
             openapi(routingContext);
         }
     };
-
+    
     final static HttpOpenApiOperation OPENAPIV2 = new HttpOpenApiOperation(HttpOpenApiOperations.OPENAPIV2) {
 
         @Override
diff --git a/src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java
similarity index 54%
rename from src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java
rename to src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java
index 27107d3d6..72bbc35ed 100644
--- a/src/main/java/io/strimzi/kafka/bridge/JmxCollectorRegistry.java
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java
@@ -3,7 +3,7 @@
  * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
  */
 
-package io.strimzi.kafka.bridge;
+package io.strimzi.kafka.bridge.metrics;
 
 import io.prometheus.jmx.JmxCollector;
 import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
@@ -11,48 +11,50 @@
 
 import javax.management.MalformedObjectNameException;
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 /**
- * Allow to collect JMX metrics exposing them in the Prometheus format
+ * Allow to collect JMX metrics exposing them in the Prometheus format.
  */
 public class JmxCollectorRegistry {
-    private final PrometheusRegistry collectorRegistry;
-    private final PrometheusTextFormatWriter textFormatter = new PrometheusTextFormatWriter(true);
+    private final PrometheusRegistry registry;
+    private final PrometheusTextFormatWriter textFormatter;
 
     /**
-     * Constructor
+     * Constructor.
      *
      * @param yamlConfig YAML configuration string with metrics filtering rules
      * @throws MalformedObjectNameException Throws MalformedObjectNameException
      */
     public JmxCollectorRegistry(String yamlConfig) throws MalformedObjectNameException {
-        new JmxCollector(yamlConfig).register();
-        collectorRegistry = PrometheusRegistry.defaultRegistry;
+        // note that Prometheus default registry is a singleton, so it is shared with JmxCollector
+        this(new JmxCollector(yamlConfig), PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true));
     }
 
     /**
-     * Constructor
+     * Constructor.
      *
-     * @param yamlFileConfig file containing the YAML configuration with metrics filtering rules
-     * @throws MalformedObjectNameException Throws MalformedObjectNameException
-     * @throws IOException Throws IOException
+     * @param jmxCollector JMX collector registry
+     * @param registry Prometheus collector registry
+     * @param textFormatter Prometheus text formatter
      */
-    public JmxCollectorRegistry(File yamlFileConfig) throws MalformedObjectNameException, IOException {
-        new JmxCollector(yamlFileConfig).register();
-        collectorRegistry = PrometheusRegistry.defaultRegistry;
+    /* test */ JmxCollectorRegistry(JmxCollector jmxCollector,
+                                    PrometheusRegistry registry,
+                                    PrometheusTextFormatWriter textFormatter) {
+        jmxCollector.register();
+        this.registry = registry;
+        this.textFormatter = textFormatter;
     }
 
     /**
-     * @return Content that should be included in the response body for an endpoint designated for
-     * Prometheus to scrape from.
+     * @return Content that should be included in the response body for 
+     * an endpoint designated for Prometheus to scrape from.
      */
     public String scrape() {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
-            textFormatter.write(stream, collectorRegistry.scrape());
+            textFormatter.write(stream, registry.scrape());
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
new file mode 100644
index 000000000..f81139a67
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+
+package io.strimzi.kafka.bridge.metrics;
+
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import io.micrometer.prometheus.PrometheusNamingConvention;
+import io.vertx.micrometer.backends.BackendRegistries;
+
+/**
+ * Used for scraping and reporting metrics in Prometheus format.
+ */
+public class MetricsReporter {
+    private final JmxCollectorRegistry jmxRegistry;
+    private final StrimziCollectorRegistry strimziRegistry;
+    private final PrometheusMeterRegistry vertxRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints
+     */
+    public MetricsReporter(JmxCollectorRegistry jmxRegistry) {
+        // default registry is null if metrics are not enabled in the VertxOptions instance
+        this(jmxRegistry, null, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow());
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter
+     */
+    public MetricsReporter(StrimziCollectorRegistry strimziRegistry) {
+        // default registry is null if metrics are not enabled in the VertxOptions instance
+        this(null, strimziRegistry, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow());
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints
+     * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter
+     * @param vertxRegistry Registry instance for scraping metrics exposed through Vert.x
+     */
+    public MetricsReporter(JmxCollectorRegistry jmxRegistry,
+                           StrimziCollectorRegistry strimziRegistry,
+                           PrometheusMeterRegistry vertxRegistry) {
+        this.jmxRegistry = jmxRegistry;
+        this.strimziRegistry = strimziRegistry;
+        this.vertxRegistry = vertxRegistry;
+        if (vertxRegistry != null) {
+            this.vertxRegistry.config().namingConvention(new MetricsNamingConvention());
+        }
+    }
+
+    private static class MetricsNamingConvention extends PrometheusNamingConvention {
+        @Override
+        public String name(String name, Meter.Type type, String baseUnit) {
+            String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
+            return super.name(metricName, type, baseUnit);
+        }
+    }
+
+    /**
+     * @return Registry instance for scraping metrics exposed through Vert.x.
+     *         This is null if metrics are not enabled in the VertxOptions instance.
+     */
+    public MeterRegistry getVertxRegistry() {
+        return vertxRegistry;
+    }
+
+    /**
+     * Scrape metrics on the provided registries returning them in the Prometheus format
+     *
+     * @return metrics in Prometheus format as String
+     */
+    public String scrape() {
+        StringBuilder sb = new StringBuilder();
+        if (jmxRegistry != null) {
+            sb.append(jmxRegistry.scrape());
+        }
+        if (strimziRegistry != null) {
+            sb.append(strimziRegistry.scrape());
+        }
+        if (vertxRegistry != null) {
+            sb.append(vertxRegistry.scrape());
+        }
+        return sb.toString();
+    }
+}
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
new file mode 100644
index 000000000..97d13d465
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.bridge.metrics;
+
+/**
+ * Metrics type.
+ */
+public enum MetricsType {
+    /** JMX Prometheus Exporter.  */
+    JMX_EXPORTER("jmxPrometheusExporter"),
+    
+    /** Strimzi Metrics Reporter. */
+    STRIMZI_REPORTER("strimziMetricsReporter");
+
+    private final String text;
+
+    /**
+     * @param text
+     */
+    MetricsType(final String text) {
+        this.text = text;
+    }
+    
+    @Override
+    public String toString() {
+        return text;
+    }
+}
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java
new file mode 100644
index 000000000..e21a13216
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+
+package io.strimzi.kafka.bridge.metrics;
+
+import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
+import io.prometheus.metrics.model.registry.PrometheusRegistry;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Allow to collect Strimzi Reporter metrics exposing them in the Prometheus format.
+ */
+public class StrimziCollectorRegistry {
+    private final PrometheusRegistry registry;
+    private final PrometheusTextFormatWriter textFormatter;
+
+    /**
+     * Constructor.
+     */
+    public StrimziCollectorRegistry() {
+        // note that Prometheus default registry is a singleton, so it is shared with Strimzi Metrics Reporter
+        this(PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true));
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param registry Prometheus collector registry
+     * @param textFormatter Prometheus text formatter
+     */
+    /* test */ StrimziCollectorRegistry(PrometheusRegistry registry,
+                                        PrometheusTextFormatWriter textFormatter) {
+        this.registry = registry;
+        this.textFormatter = textFormatter;
+    }
+
+    /**
+     * @return Content that should be included in the response body for 
+     * an endpoint designated for Prometheus to scrape from.
+     */
+    public String scrape() {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            textFormatter.write(stream, registry.scrape());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return stream.toString(StandardCharsets.UTF_8);
+    }
+}
diff --git a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
index a237e2fc6..af42d737a 100644
--- a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
+++ b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
@@ -2,25 +2,28 @@
  * Copyright Strimzi authors.
  * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
  */
-
 package io.strimzi.kafka.bridge.config;
 
+import io.strimzi.kafka.bridge.metrics.MetricsType;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.jupiter.api.Test;
 
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Some config related classes unit tests
  */
 public class ConfigTest {
-
     @Test
     public void testConfig() {
         Map<String, Object> map = new HashMap<>();
@@ -64,8 +67,8 @@ public void testHidingPassword() {
         BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
         assertThat(bridgeConfig.getKafkaConfig().getConfig().size(), is(6));
 
-        assertThat(bridgeConfig.getKafkaConfig().toString().contains("ssl.truststore.password=" + storePassword), is(false));
-        assertThat(bridgeConfig.getKafkaConfig().toString().contains("ssl.truststore.password=[hidden]"), is(true));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), not(containsString("ssl.truststore.password=" + storePassword)));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("ssl.truststore.password=[hidden]"));
     }
 
     @Test
@@ -81,4 +84,72 @@ public void testHttpDefaults() {
         assertThat(bridgeConfig.getHttpConfig().isConsumerEnabled(), is(true));
         assertThat(bridgeConfig.getHttpConfig().isProducerEnabled(), is(true));
     }
+
+    @Test
+    public void testJmxExporterMetricsType() {
+        String configFilePah = "/tmp/my-exporter-config.yaml";
+        
+        Map<String, Object> map = Map.of(
+            "bridge.id", "my-bridge",
+            "kafka.bootstrap.servers", "localhost:9092",
+            "bridge.metrics", "jmxPrometheusExporter",
+            "bridge.metrics.exporter.config.path", configFilePah,
+            "http.host", "0.0.0.0",
+            "http.port", "8080"
+        );
+
+        BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
+        assertThat(bridgeConfig.getMetrics(), is(MetricsType.JMX_EXPORTER.toString()));
+        assertThat(bridgeConfig.getJmxExporterConfigPath(), is(Path.of(configFilePah)));
+    }
+
+    @Test
+    public void testStrimziReporterMetricsType() {
+        Map<String, Object> map = new HashMap<>(Map.of(
+            "bridge.id", "my-bridge",
+            "kafka.bootstrap.servers", "localhost:9092",
+            "bridge.metrics", "strimziMetricsReporter",
+            "http.host", "0.0.0.0",
+            "http.port", "8080"
+        ));
+
+        BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
+        assertThat(bridgeConfig.getMetrics(), is(MetricsType.STRIMZI_REPORTER.toString()));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.listener.enable=false"));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=.*"));
+    }
+
+    @Test
+    public void testStrimziReporterWithCustomConfig() {
+        Map<String, Object> map = new HashMap<>(Map.of(
+            "bridge.id", "my-bridge",
+            "kafka.bootstrap.servers", "localhost:9092",
+            "bridge.metrics", "strimziMetricsReporter",
+            "kafka.metric.reporters", "my.domain.CustomMetricReporter,io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter",
+            "kafka.prometheus.metrics.reporter.allowlist", "kafka_log.*,kafka_network.*",
+            "http.host", "0.0.0.0",
+            "http.port", "8080"
+        ));
+
+        BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
+        assertThat(bridgeConfig.getMetrics(), is(MetricsType.STRIMZI_REPORTER.toString()));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("metric.reporters=my.domain.CustomMetricReporter,io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.listener.enable=false"));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=kafka_log.*,kafka_network.*"));
+    }
+
+    @Test
+    public void testInvalidMetricsType() {
+        Map<String, Object> map = Map.of(
+            "bridge.id", "my-bridge",
+            "kafka.bootstrap.servers", "localhost:9092",
+            "bridge.metrics", "invalidReporterType",
+            "http.host", "0.0.0.0",
+            "http.port", "8080"
+        );
+
+        Exception e = assertThrows(IllegalArgumentException.class, () -> BridgeConfig.fromMap(map));
+        assertThat(e.getMessage(), is("Invalid bridge.metrics configuration, choose one of jmxPrometheusExporter and strimziMetricsReporter"));
+    }
 }
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
index 2cdd376d1..4d5a0c3d1 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
@@ -4,15 +4,14 @@
  */
 package io.strimzi.kafka.bridge.http;
 
-import io.micrometer.core.instrument.MeterRegistry;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.strimzi.kafka.bridge.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaConfig;
 import io.strimzi.kafka.bridge.config.KafkaConsumerConfig;
 import io.strimzi.kafka.bridge.config.KafkaProducerConfig;
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
+import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -79,7 +78,6 @@ public class ConsumerGeneratedNameIT {
     private static HttpBridge httpBridge;
     private static WebClient client;
     private static BridgeConfig bridgeConfig;
-    private static MeterRegistry meterRegistry = null;
     private static JmxCollectorRegistry jmxCollectorRegistry = null;
 
     ConsumerService consumerService() {
@@ -98,7 +96,7 @@ static void beforeAll(VertxTestContext context) {
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
 
             bridgeConfig = BridgeConfig.fromMap(config);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry));
+            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
index 2cc30792b..c96490f4b 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
@@ -7,7 +7,6 @@
 
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.strimzi.kafka.bridge.BridgeContentType;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaConfig;
 import io.strimzi.kafka.bridge.config.KafkaConsumerConfig;
@@ -15,6 +14,8 @@
 import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
 import io.strimzi.kafka.bridge.http.services.ProducerService;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
+import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonArray;
@@ -166,7 +167,7 @@ private void startBridge(VertxTestContext context, Map<String, Object> config) t
         CompletableFuture<Boolean> startBridge = new CompletableFuture<>();
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             BridgeConfig bridgeConfig = BridgeConfig.fromMap(config);
-            HttpBridge httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(null, null));
+            HttpBridge httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry()));
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> startBridge.complete(true)));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
index 36f0c831c..43955c40d 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
@@ -5,14 +5,13 @@
 
 package io.strimzi.kafka.bridge.http;
 
-import io.micrometer.core.instrument.MeterRegistry;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.strimzi.kafka.bridge.BridgeContentType;
-import io.strimzi.kafka.bridge.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaConfig;
 import io.strimzi.kafka.bridge.facades.AdminClientFacade;
+import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -58,7 +57,6 @@ public class HttpCorsIT {
 
     static BridgeConfig bridgeConfig;
     static StrimziKafkaContainer kafkaContainer;
-    static MeterRegistry meterRegistry = null;
     static JmxCollectorRegistry jmxCollectorRegistry = null;
     static AdminClientFacade adminClientFacade;
 
@@ -298,7 +296,7 @@ private void configureBridge(boolean corsEnabled, String methodsAllowed) {
             config.put(HttpConfig.HTTP_CORS_ALLOWED_METHODS, methodsAllowed != null ? methodsAllowed : "GET,POST,PUT,DELETE,OPTIONS,PATCH");
 
             bridgeConfig = BridgeConfig.fromMap(config);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry));
+            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
         }
     }
 }
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
index 6f8239204..fe5f117d7 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
@@ -5,12 +5,12 @@
 package io.strimzi.kafka.bridge.http;
 
 import io.strimzi.kafka.bridge.BridgeContentType;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.clients.BasicKafkaClient;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaProducerConfig;
 import io.strimzi.kafka.bridge.facades.AdminClientFacade;
 import io.strimzi.kafka.bridge.http.base.HttpBridgeITAbstract;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonArray;
@@ -76,7 +76,7 @@ static void beforeAll(VertxTestContext context) {
 
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             bridgeConfig = BridgeConfig.fromMap(cfg);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry));
+            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
index cac91599c..2417b141d 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
@@ -5,9 +5,6 @@
 
 package io.strimzi.kafka.bridge.http.base;
 
-import io.micrometer.core.instrument.MeterRegistry;
-import io.strimzi.kafka.bridge.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.MetricsReporter;
 import io.strimzi.kafka.bridge.clients.BasicKafkaClient;
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaConfig;
@@ -20,6 +17,9 @@
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
 import io.strimzi.kafka.bridge.http.services.ProducerService;
 import io.strimzi.kafka.bridge.http.services.SeekService;
+import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
+import io.strimzi.kafka.bridge.metrics.MetricsReporter;
+import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -97,8 +97,6 @@ public abstract class HttpBridgeITAbstract {
     protected static AdminClientFacade adminClientFacade;
     protected static HttpBridge httpBridge;
     protected static BridgeConfig bridgeConfig;
-
-    protected static MeterRegistry meterRegistry = null;
     protected static JmxCollectorRegistry jmxCollectorRegistry = null;
 
     protected BaseService baseService() {
@@ -128,7 +126,8 @@ static void beforeAll(VertxTestContext context) {
 
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             bridgeConfig = BridgeConfig.fromMap(config);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry, meterRegistry));
+            
+            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry()));
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java
new file mode 100644
index 000000000..007cefa71
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.bridge.metrics;
+
+import io.prometheus.jmx.JmxCollector;
+import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
+import io.prometheus.metrics.model.registry.PrometheusRegistry;
+import io.prometheus.metrics.model.snapshots.MetricSnapshots;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class JmxCollectorRegistryTest {
+    @Test
+    void shouldReturnFormattedMetrics() throws IOException {
+        JmxCollector mockJmxCollector = mock(JmxCollector.class);
+            
+        PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
+        MetricSnapshots mockSnapshots = mock(MetricSnapshots.class);
+        when(mockPromRegistry.scrape()).thenReturn(mockSnapshots);
+        
+        PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class);
+        doAnswer(invocation -> {
+            ByteArrayOutputStream stream = invocation.getArgument(0);
+            stream.write("test_metric\n".getBytes(StandardCharsets.UTF_8));
+            return null;
+        }).when(mockPromFormatter).write(any(), any());
+
+        JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter);
+        
+        String result = collectorRegistry.scrape();
+        assertThat(result, containsString("test_metric"));
+        assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length()));
+    }
+
+    @Test
+    void shouldHandleIoException() throws IOException {
+        JmxCollector mockJmxCollector = mock(JmxCollector.class);
+        
+        PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
+        MetricSnapshots mockSnapshots = mock(MetricSnapshots.class);
+        when(mockPromRegistry.scrape()).thenReturn(mockSnapshots);
+
+        PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class);
+        doThrow(new IOException("Test exception"))
+            .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots));
+
+        JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter);
+        
+        RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape());
+        assertThat(exception.getMessage(), containsString("Test exception"));
+    }
+
+    @Test
+    void shouldThrowWithInvalidYaml() {
+        assertThrows(ClassCastException.class, () -> new JmxCollectorRegistry("invalid"));
+    }
+}
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
new file mode 100644
index 000000000..c68ca2309
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.bridge.metrics;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class MetricsReporterTest {
+    @Test
+    void shouldReturnMetrics() {
+        JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class);
+        when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n");
+
+        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
+        when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n");
+
+        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
+        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
+        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
+        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
+
+        MetricsReporter metricsReporter = new MetricsReporter(
+            mockJmxRegistry, mockStrimziRegistry, mockVertxRegistry);
+
+        String result = metricsReporter.scrape();
+
+        assertThat(result, containsString("jmx_metrics"));
+        assertThat(result, containsString("strimzi_metrics"));
+        assertThat(result, containsString("vertx_metrics"));
+
+        verify(mockJmxRegistry).scrape();
+        verify(mockStrimziRegistry).scrape();
+        verify(mockVertxRegistry).scrape();
+    }
+
+    @Test
+    void shouldReturnMetricsWithoutStrimziRegistry() {
+        JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class);
+        when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n");
+
+        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
+        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
+        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
+        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
+
+        MetricsReporter metricsReporter = new MetricsReporter(mockJmxRegistry, null, mockVertxRegistry);
+
+        String result = metricsReporter.scrape();
+
+        assertThat(result, containsString("jmx_metrics"));
+        assertThat(result, containsString("vertx_metrics"));
+
+        verify(mockJmxRegistry).scrape();
+        verify(mockVertxRegistry).scrape();
+    }
+
+    @Test
+    void shouldReturnMetricsWithoutJmxRegistry() {
+        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
+        when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n");
+
+        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
+        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
+        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
+        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
+
+        MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
+
+        String result = metricsReporter.scrape();
+
+        assertThat(result, containsString("strimzi_metrics"));
+        assertThat(result, containsString("vertx_metrics"));
+
+        verify(mockStrimziRegistry).scrape();
+        verify(mockVertxRegistry).scrape();
+    }
+
+    @Test
+    void testGetVertxRegistry() {
+        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
+        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
+        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
+        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
+
+        MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
+
+        MeterRegistry result = metricsReporter.getVertxRegistry();
+
+        assertThat(result, is(mockVertxRegistry));
+    }
+
+    @Test
+    void testNamingConventionAppliedForPrometheusRegistry() {
+        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
+        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
+        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
+        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
+        
+        new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
+
+        verify(mockVertxRegistry).config();
+    }
+}
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java
new file mode 100644
index 000000000..bab4d7eb3
--- /dev/null
+++ b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.bridge.metrics;
+
+import io.prometheus.metrics.expositionformats.PrometheusTextFormatWriter;
+import io.prometheus.metrics.model.registry.PrometheusRegistry;
+import io.prometheus.metrics.model.snapshots.MetricSnapshots;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class StrimziCollectorRegistryTest {
+    @Test
+    void shouldReturnFormattedMetrics() throws IOException {
+        PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
+        MetricSnapshots mockSnapshots = mock(MetricSnapshots.class);
+        when(mockPromRegistry.scrape()).thenReturn(mockSnapshots);
+        
+        PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class);
+        doAnswer(invocation -> {
+            ByteArrayOutputStream stream = invocation.getArgument(0);
+            stream.write("test_metric\n".getBytes(StandardCharsets.UTF_8));
+            return null;
+        }).when(mockPromFormatter).write(any(), any());
+
+        StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter);
+        
+        String result = collectorRegistry.scrape();
+        assertThat(result, containsString("test_metric"));
+        assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length()));
+    }
+
+    @Test
+    void shouldHandleIoException() throws IOException {
+        PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
+        MetricSnapshots mockSnapshots = mock(MetricSnapshots.class);
+        when(mockPromRegistry.scrape()).thenReturn(mockSnapshots);
+
+        PrometheusTextFormatWriter mockPromFormatter = mock(PrometheusTextFormatWriter.class);
+        doThrow(new IOException("Test exception"))
+            .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots));
+
+        StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter);
+        
+        RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape());
+        assertThat(exception.getMessage(), containsString("Test exception"));
+    }
+}

From d931a3ba80d7dc08bd5a409fc0e3526f661b6d73 Mon Sep 17 00:00:00 2001
From: Federico Valeri <fedevaleri@gmail.com>
Date: Fri, 7 Feb 2025 15:00:33 +0100
Subject: [PATCH 2/3] Address comments

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
---
 CHANGELOG.md                                  |   2 +-
 config/application.properties                 |   6 +-
 .../assembly-kafka-bridge-config.adoc         |   2 +-
 .../openapi.json-generate-apidoc.sha256       |   2 +-
 documentation/book/api/index.adoc             |   5 +
 ...-configuring-kafka-bridge-jmx-metrics.adoc |  14 +--
 ...-configuring-kafka-bridge-smr-metrics.adoc |  24 ++--
 pom.xml                                       |   3 +
 .../io/strimzi/kafka/bridge/Application.java  |  87 +++-----------
 .../kafka/bridge/config/BridgeConfig.java     |  41 ++++---
 .../strimzi/kafka/bridge/http/HttpBridge.java |  91 ++++++++++----
 ...Registry.java => JmxMetricsCollector.java} |  22 ++--
 .../bridge/metrics/MetricsCollector.java      |  61 ++++++++++
 .../kafka/bridge/metrics/MetricsReporter.java |  94 ---------------
 .../kafka/bridge/metrics/MetricsType.java     |  20 +++-
 ...stry.java => StrimziMetricsCollector.java} |  20 ++--
 src/main/resources/openapi.json               |   3 +
 .../kafka/bridge/config/ConfigTest.java       |  10 +-
 .../bridge/http/ConsumerGeneratedNameIT.java  |   5 +-
 .../http/DisablingConsumerProducerIT.java     |   4 +-
 .../strimzi/kafka/bridge/http/HttpCorsIT.java |   5 +-
 .../kafka/bridge/http/InvalidProducerIT.java  |   3 +-
 .../http/base/HttpBridgeITAbstract.java       |   8 +-
 ...Test.java => JmxMetricsCollectorTest.java} |  14 +--
 .../bridge/metrics/MetricsReporterTest.java   | 113 ------------------
 ....java => StrimziMetricsCollectorTest.java} |  12 +-
 26 files changed, 264 insertions(+), 407 deletions(-)
 rename src/main/java/io/strimzi/kafka/bridge/metrics/{JmxCollectorRegistry.java => JmxMetricsCollector.java} (69%)
 create mode 100644 src/main/java/io/strimzi/kafka/bridge/metrics/MetricsCollector.java
 delete mode 100644 src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
 rename src/main/java/io/strimzi/kafka/bridge/metrics/{StrimziCollectorRegistry.java => StrimziMetricsCollector.java} (66%)
 rename src/test/java/io/strimzi/kafka/bridge/metrics/{JmxCollectorRegistryTest.java => JmxMetricsCollectorTest.java} (83%)
 delete mode 100644 src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
 rename src/test/java/io/strimzi/kafka/bridge/metrics/{StrimziCollectorRegistryTest.java => StrimziMetricsCollectorTest.java} (84%)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4e154dec0..01878d03a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,7 +10,7 @@
 * Removed script to build bridge configuration within the container. 
   It is going to be set up by the Strimzi operator within a ConfigMap and mounted as volume on the bridge pod.
 * Added support for [Strimzi Metrics Reporter](https://github.com/strimzi/metrics-reporter).
-  This is a Kafka plugin that directly exposes metrics in Prometheus format without using JMX, and can be enabled by setting `bridge.metrics=strimziMetricsReporter`.
+  Set `bridge.metrics=strimziMetricsReporter` to enable it.
 * Added support for custom Prometheus JMX Exporter configuration.
   Set `bridge.metrics.exporter.config.path=/path/to/my-exporter-config.yml` to use your custom config.
 
diff --git a/config/application.properties b/config/application.properties
index c384d1227..ca0657f5e 100644
--- a/config/application.properties
+++ b/config/application.properties
@@ -1,14 +1,14 @@
 #Bridge related settings
 bridge.id=my-bridge
 
-# uncomment the following line to enable JMX Prometheus Exporter, check the documentation for more details
+# uncomment the following line to enable Prometheus JMX Exporter, check the documentation for more details
 #bridge.metrics=jmxPrometheusExporter
-# optionally, you can set the file path of your custom configuration
+# optionally, set the file path of your custom configuration
 #bridge.metrics.exporter.config.path=/path/to/my-exporter-config.yaml
 
 # uncomment the following line to enable Strimzi Metrics Reporter, check the documentation for more details
 #bridge.metrics=strimziMetricsReporter
-# optionally, you can filter the exposed metrics using a comma separated list of regexes
+# optionally, filter the exposed metrics of all internal Kafka clients using a comma separated list of regexes
 #kafka.prometheus.metrics.reporter.allowlist=.*
 
 # uncomment the following line (bridge.tracing) to enable OpenTelemetry tracing, check the documentation for more details
diff --git a/documentation/assemblies/assembly-kafka-bridge-config.adoc b/documentation/assemblies/assembly-kafka-bridge-config.adoc
index 8fb731e84..3a2c9cc0e 100644
--- a/documentation/assemblies/assembly-kafka-bridge-config.adoc
+++ b/documentation/assemblies/assembly-kafka-bridge-config.adoc
@@ -8,7 +8,7 @@
 [role="_abstract"]
 Configure a deployment of the Kafka Bridge using configuration properties.
 Configure Kafka and specify the HTTP connection details needed to be able to interact with Kafka.
-It is possible to enable metrics in Prometheus format with JMX Prometheus Exporter or Strimzi Metrics Reporter.
+It is possible to enable metrics in Prometheus format with Prometheus JMX Exporter or Strimzi Metrics Reporter.
 You can also use configuration properties to enable and use distributed tracing with the Kafka Bridge.
 Distributed tracing allows you to track the progress of transactions between applications in a distributed system.
 
diff --git a/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256 b/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256
index 20d035074..393174d0d 100644
--- a/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256
+++ b/documentation/book/api/.openapi-generator/openapi.json-generate-apidoc.sha256
@@ -1 +1 @@
-638e4ea2961241ead6552ccef364f2617c3b602e4bf9dff7786f5c463b07d063
\ No newline at end of file
+4502368c51039e785930e38d1b134385ac0f20259b840e32f3f96ffee9bbe26c
\ No newline at end of file
diff --git a/documentation/book/api/index.adoc b/documentation/book/api/index.adoc
index 7c54138bf..e7bb27529 100644
--- a/documentation/book/api/index.adoc
+++ b/documentation/book/api/index.adoc
@@ -1399,6 +1399,11 @@ Retrieves the bridge metrics in Prometheus format.
 | Metrics in Prometheus format retrieved successfully.
 |  <<String>>
 
+
+| 404
+| The metrics endpoint is not enabled.
+|  <<>>
+
 |===
 
 ===== Samples
diff --git a/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
index 9d5e9a030..35833377e 100644
--- a/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
+++ b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
@@ -1,8 +1,8 @@
 [id='proc-configuring-kafka-bridge-jmx-metrics-{context}']
-= Configuring JMX Exporter metrics
+= Configuring Prometheus JMX Exporter metrics
 
 [role="_abstract"]
-Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration.
+Enable metrics with Prometheus JMX Exporter by setting the `bridge.metrics` option to `jmxPrometheusExporter`.
 
 .Prerequisites
 
@@ -19,15 +19,15 @@ Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuratio
 bridge.metrics=jmxPrometheusExporter
 ----
 +
-Optionally, you can set a custom JMX Exporter configuration using the `bridge.metrics.exporter.config.path` property.
-If not set, a default embedded configuration file will be used.
+Optionally, you can configure a custom Prometheus JMX Exporter configuration using the `bridge.metrics.exporter.config.path` property.
+If not configured, a default embedded configuration file will be used.
 
-. Run the Kafka Bridge script to enable metrics.
+. Run the Kafka Bridge run script.
 +
-.Running the Kafka Bridge to enable metrics
+.Running the Kafka Bridge
 [source,shell]
 ----
 ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
 ----
 +
-With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format.
+With metrics enabled, you can scrape metrics in the Prometheus format from the `/metrics` endpoint of the Kafka Bridge.
diff --git a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
index 2acb8c26d..892d8e4a4 100644
--- a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
+++ b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
@@ -2,7 +2,7 @@
 = Configuring Strimzi Metrics Reporter metrics
 
 [role="_abstract"]
-Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuration.
+Enable metrics with Strimzi Metrics Reporter by setting the `bridge.metrics` option to `strimziMetricsReporter`.
 
 .Prerequisites
 
@@ -10,7 +10,7 @@ Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuratio
 
 .Procedure
 
-. Set the `bridge.metrics` configuration to `strimziMetricsReporter` and related configuration.
+. Set the `bridge.metrics` configuration to `strimziMetricsReporter`.
 +
 .Configuration for enabling metrics
 
@@ -19,18 +19,24 @@ Enable metrics for the Kafka Bridge by setting the `bridge.metrics` configuratio
 bridge.metrics=strimziMetricsReporter
 ----
 +
-Optionally, you can set a comma-separated list of regexes used to filter exposed metrics using the `kafka.prometheus.metrics.reporter.allowlist` property.
-If not set, all available metrics will be exposed.
-+
-You can add any plugin configuration to the Bridge properties file using the `kafka.` prefix.
+Optionally, you can configure a comma-separated list of regexes to filter exposed metrics using the `kafka.prometheus.metrics.reporter.allowlist` property.
+If not configured, a default set of metrics will be exposed.
+
+When needed, it is possible to configure the allowlist per client type.
+For example, set `kafka.admin.prometheus.metrics.reporter.allowlist=` to exclude all admin client metrics.
+
+In case the same property is set with multiple prefixes, the most specific prefix wins.
+For example, `kafka.producer.prometheus.metrics.reporter.allowlist` wins over `kafka.prometheus.metrics.reporter.allowlist`. 
+
+You can add any plugin configuration to the Bridge properties file using `kafka.`, `kafka.admin.`, `kafka.producer.`, and `kafka.consumer.` prefixes.
 See the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter] documentation for more details.
 
-. Run the Kafka Bridge script to enable metrics.
+. Run the Kafka Bridge run script.
 +
-.Running the Kafka Bridge to enable metrics
+.Running the Kafka Bridge
 [source,shell]
 ----
 ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
 ----
 +
-With metrics enabled, you can use `GET /metrics` with the `/metrics` endpoint to retrieve Kafka Bridge metrics in Prometheus format.
+With metrics enabled, you can scrape metrics in the Prometheus format from the `/metrics` endpoint of the Kafka Bridge.
diff --git a/pom.xml b/pom.xml
index 756ef2bd0..2d1002912 100644
--- a/pom.xml
+++ b/pom.xml
@@ -312,6 +312,9 @@
 			<artifactId>prometheus-metrics-model</artifactId>
 			<version>${prometheus-client.version}</version>
 		</dependency>
+		<!-- We include explicitly prometheus-metrics-instrumentation-jvm 
+			and prometheus-metrics-exporter-httpserver to force the use of 
+			Bridge version over metrics-reporter version. -->
 		<dependency>
 			<groupId>io.prometheus</groupId>
 			<artifactId>prometheus-metrics-instrumentation-jvm</artifactId>
diff --git a/src/main/java/io/strimzi/kafka/bridge/Application.java b/src/main/java/io/strimzi/kafka/bridge/Application.java
index 2b441222e..edded0d35 100644
--- a/src/main/java/io/strimzi/kafka/bridge/Application.java
+++ b/src/main/java/io/strimzi/kafka/bridge/Application.java
@@ -2,16 +2,11 @@
  * Copyright Strimzi authors.
  * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
  */
-
 package io.strimzi.kafka.bridge;
 
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.ConfigRetriever;
 import io.strimzi.kafka.bridge.http.HttpBridge;
-import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
-import io.strimzi.kafka.bridge.metrics.MetricsType;
-import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
 import io.strimzi.kafka.bridge.tracing.TracingUtil;
 import io.vertx.core.Future;
 import io.vertx.core.Promise;
@@ -30,18 +25,11 @@
 import org.apache.logging.log4j.Logger;
 
 import javax.management.MalformedObjectNameException;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Apache Kafka bridge main application class
@@ -85,28 +73,25 @@ private static Future<HttpBridge> deployHttpBridge(BridgeConfig bridgeConfig)
         Promise<HttpBridge> httpPromise = Promise.promise();
 
         Vertx vertx = createVertxInstance(bridgeConfig);
-        MetricsReporter metricsReporter = getMetricsReporter(bridgeConfig);
-        HttpBridge httpBridge = new HttpBridge(bridgeConfig, metricsReporter);
+        HttpBridge httpBridge = new HttpBridge(bridgeConfig);
+
         vertx.deployVerticle(httpBridge)
-            .onComplete(done -> {
-                if (done.succeeded()) {
-                    LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
-                    if (metricsReporter != null) {
-                        LOGGER.info("Metrics of type '{}' enabled and exposed on /metrics endpoint", bridgeConfig.getMetrics());
+                .onComplete(done -> {
+                    if (done.succeeded()) {
+                        LOGGER.info("HTTP verticle instance deployed [{}]", done.result());
+                        httpPromise.complete(httpBridge);
+                    } else {
+                        LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
+                        httpPromise.fail(done.cause());
                     }
-                    httpPromise.complete(httpBridge);
-                } else {
-                    LOGGER.error("Failed to deploy HTTP verticle instance", done.cause());
-                    httpPromise.fail(done.cause());
-                }
-            });
+                });
 
         return httpPromise.future();
     }
 
     private static Vertx createVertxInstance(BridgeConfig bridgeConfig) {
         VertxOptions vertxOptions = new VertxOptions();
-        if (bridgeConfig.getMetrics() != null) {
+        if (bridgeConfig.getMetricsType() != null) {
             vertxOptions.setMetricsOptions(metricsOptions()); // enable Vertx metrics
         }
         Vertx vertx = Vertx.vertx(vertxOptions);
@@ -119,61 +104,17 @@ private static Vertx createVertxInstance(BridgeConfig bridgeConfig) {
      * @return instance of the MicrometerMetricsOptions on Vert.x
      */
     private static MicrometerMetricsOptions metricsOptions() {
-        Set<String> set = new HashSet<>();
-        set.add(MetricsDomain.NAMED_POOLS.name());
-        set.add(MetricsDomain.VERTICLES.name());
         return new MicrometerMetricsOptions()
             .setPrometheusOptions(new VertxPrometheusOptions().setEnabled(true))
             // define the labels on the HTTP server related metrics
             .setLabels(EnumSet.of(Label.HTTP_PATH, Label.HTTP_METHOD, Label.HTTP_CODE))
             // disable metrics about pool and verticles
-            .setDisabledMetricsCategories(set)
-            .setJvmMetricsEnabled(true)
+            .setDisabledMetricsCategories(
+                Set.of(MetricsDomain.NAMED_POOLS.name(), MetricsDomain.VERTICLES.name())
+            ).setJvmMetricsEnabled(true)
             .setEnabled(true);
     }
 
-    private static MetricsReporter getMetricsReporter(BridgeConfig bridgeConfig) 
-            throws MalformedObjectNameException, IOException {
-        if (bridgeConfig.getMetrics() != null) {
-            if (bridgeConfig.getMetrics().equals(MetricsType.JMX_EXPORTER.toString())) {
-                return new MetricsReporter(getJmxCollectorRegistry(bridgeConfig));
-            } else if (bridgeConfig.getMetrics().equals(MetricsType.STRIMZI_REPORTER.toString())) {
-                return new MetricsReporter(new StrimziCollectorRegistry());
-            }
-        }
-        return null;
-    }
-
-    /**
-     * Return a JmxCollectorRegistry instance with the YAML configuration filters.
-     * This is loaded from a custom config file if present, or from the default configuration file.
-     *
-     * @return JmxCollectorRegistry instance
-     * @throws MalformedObjectNameException
-     * @throws IOException
-     */
-    private static JmxCollectorRegistry getJmxCollectorRegistry(BridgeConfig bridgeConfig) throws MalformedObjectNameException, IOException {
-        if (bridgeConfig.getJmxExporterConfigPath() != null && Files.exists(bridgeConfig.getJmxExporterConfigPath())) {
-            // read custom configuration file
-            LOGGER.info("Loading custom JMX Exporter configuration from {}", bridgeConfig.getJmxExporterConfigPath());
-            String yaml = Files.readString(bridgeConfig.getJmxExporterConfigPath(), StandardCharsets.UTF_8);
-            return new JmxCollectorRegistry(yaml);
-        } else {
-            // fallback to default configuration
-            LOGGER.info("Loading default JMX Exporter configuration");
-            InputStream is = Application.class.getClassLoader().getResourceAsStream("jmx_metrics_config.yaml");
-            if (is == null) {
-                return null;
-            }
-            try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
-                String yaml = reader
-                    .lines()
-                    .collect(Collectors.joining("\n"));
-                return new JmxCollectorRegistry(yaml);
-            }
-        }
-    }
-
     /**
      * Generate the command line options
      *
diff --git a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
index d07ddca68..0ff662206 100644
--- a/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
+++ b/src/main/java/io/strimzi/kafka/bridge/config/BridgeConfig.java
@@ -31,10 +31,14 @@ public class BridgeConfig extends AbstractConfig {
     public static final String METRICS_TYPE = BRIDGE_CONFIG_PREFIX + "metrics";
 
     /** JMX Exporter configuration file path */
-    public static final String JMX_EXPORTER_CONFIG_PATH = METRICS_TYPE + ".exporter.config.path";
+    private static final String JMX_EXPORTER_CONFIG_PATH = METRICS_TYPE + ".exporter.config.path";
     
     /** Tracing system to be used in the bridge */
-    public static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
+    private static final String TRACING_TYPE = BRIDGE_CONFIG_PREFIX + "tracing";
+    
+    /** Default Strimzi Metrics Reporter allow list. */
+    /* test */ static final String DEFAULT_STRIMZI_METRICS_REPORTER_ALLOW_LIST = "kafka_consumer_consumer_metrics.*, " +
+        "kafka_producer_kafka_metrics_count_count, kafka_producer_producer_metrics.*";
 
     private final KafkaConfig kafkaConfig;
     private final HttpConfig httpConfig;
@@ -84,21 +88,15 @@ public static BridgeConfig fromMap(Map<String, Object> map) {
     }
 
     private static void validateAndApplyDefaults(Map<String, Object> map) {
-        String metricsTypeValue = (String) map.get(METRICS_TYPE);
-        if (metricsTypeValue != null) {
-            // validate metrics type
-            if (!metricsTypeValue.equals(MetricsType.JMX_EXPORTER.toString())
-                    && !metricsTypeValue.equals(MetricsType.STRIMZI_REPORTER.toString())) {
-                throw new IllegalArgumentException(
-                    String.format("Invalid %s configuration, choose one of %s and %s", 
-                        METRICS_TYPE, MetricsType.JMX_EXPORTER, MetricsType.STRIMZI_REPORTER)
-                );
-            }
-            // apply default Strimzi Metrics Reporter configurations if not present
-            if (metricsTypeValue.equals(MetricsType.STRIMZI_REPORTER.toString())) {
+        if (map.get(METRICS_TYPE) != null) {
+            // get and validate metrics type
+            MetricsType metricsType = MetricsType.fromString((String) map.get(METRICS_TYPE));
+            // apply default Strimzi Metrics Reporter configuration if not present
+            if (metricsType == MetricsType.STRIMZI_REPORTER) {
+                LOGGER.info("Using default metrics configuration");
                 map.putIfAbsent("kafka.metric.reporters", "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter");
                 map.putIfAbsent("kafka.prometheus.metrics.reporter.listener.enable", "false");
-                map.putIfAbsent("kafka.prometheus.metrics.reporter.allowlist", ".*");
+                map.putIfAbsent("kafka.prometheus.metrics.reporter.allowlist", DEFAULT_STRIMZI_METRICS_REPORTER_ALLOW_LIST);
             }
         }
     }
@@ -124,20 +122,21 @@ public String getBridgeID() {
     }
 
     /**
-     * @return the metric system to be used in the bridge
+     * @return the metric type to be used in the bridge
      */
-    public String getMetrics() {
+    public MetricsType getMetricsType() {
         final String envVarValue = System.getenv("KAFKA_BRIDGE_METRICS_ENABLED");
         if (envVarValue != null) {
             LOGGER.warn("KAFKA_BRIDGE_METRICS_ENABLED is deprecated, use bridge.metrics configuration");
         }
-        return (String) Optional.ofNullable(config.get(BridgeConfig.METRICS_TYPE))
-            .orElse(Boolean.parseBoolean(envVarValue)
-                ? MetricsType.JMX_EXPORTER.toString() : null);
+        
+        return Optional.ofNullable((String) config.get(BridgeConfig.METRICS_TYPE))
+            .map(MetricsType::fromString)
+            .orElseGet(() -> Boolean.parseBoolean(envVarValue) ? MetricsType.JMX_EXPORTER : null);
     }
 
     /**
-     * @return the JMX Exporter configuration file path
+     * @return the Prometheus JMX Exporter configuration file path
      */
     public Path getJmxExporterConfigPath() {
         if (config.get(BridgeConfig.JMX_EXPORTER_CONFIG_PATH) == null) {
diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
index 656734b54..af5e6ca57 100644
--- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
+++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
@@ -2,7 +2,6 @@
  * Copyright Strimzi authors.
  * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
  */
-
 package io.strimzi.kafka.bridge.http;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -18,7 +17,10 @@
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.http.converter.JsonUtils;
 import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
+import io.strimzi.kafka.bridge.metrics.JmxMetricsCollector;
+import io.strimzi.kafka.bridge.metrics.MetricsCollector;
+import io.strimzi.kafka.bridge.metrics.MetricsType;
+import io.strimzi.kafka.bridge.metrics.StrimziMetricsCollector;
 import io.vertx.core.AbstractVerticle;
 import io.vertx.core.Promise;
 import io.vertx.core.file.FileSystem;
@@ -43,11 +45,19 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import javax.management.MalformedObjectNameException;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT;
 import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS;
@@ -59,7 +69,7 @@
 /**
  * Main bridge class listening for connections and handling HTTP requests.
  */
-@SuppressWarnings({"checkstyle:MemberName"})
+@SuppressWarnings({"checkstyle:MemberName", "checkstyle:ClassDataAbstractionCoupling", "checkstyle:ClassFanOutComplexity"})
 public class HttpBridge extends AbstractVerticle {
     private static final Logger LOGGER = LogManager.getLogger(HttpBridge.class);
 
@@ -76,17 +86,59 @@ public class HttpBridge extends AbstractVerticle {
 
     private final Map<ConsumerInstanceId, Long> timestampMap = new HashMap<>();
 
-    private final MetricsReporter metricsReporter;
+    private MetricsCollector metricsCollector = null;
 
     /**
      * Constructor
      *
      * @param bridgeConfig bridge configuration
-     * @param metricsReporter MetricsReporter instance for scraping metrics from different registries
      */
-    public HttpBridge(BridgeConfig bridgeConfig, MetricsReporter metricsReporter) {
+    public HttpBridge(BridgeConfig bridgeConfig) {
         this.bridgeConfig = bridgeConfig;
-        this.metricsReporter = metricsReporter;
+        if (bridgeConfig.getMetricsType() != null) {
+            if (bridgeConfig.getMetricsType() == MetricsType.JMX_EXPORTER) {
+                this.metricsCollector = createJmxMetricsCollector(bridgeConfig);
+            } else if (bridgeConfig.getMetricsType() == MetricsType.STRIMZI_REPORTER) {
+                this.metricsCollector = new StrimziMetricsCollector();
+            }
+        }
+        if (metricsCollector != null) {
+            LOGGER.info("Metrics of type '{}' enabled and exposed on /metrics endpoint", bridgeConfig.getMetricsType());
+        }
+    }
+
+    /**
+     * Create a JmxMetricsCollector instance with the YAML configuration filters.
+     * This is loaded from a custom config file if present or from the default configuration file.
+     *
+     * @return JmxCollectorRegistry instance
+     */
+    private static JmxMetricsCollector createJmxMetricsCollector(BridgeConfig bridgeConfig) {
+        try {
+            if (bridgeConfig.getJmxExporterConfigPath() == null) {
+                // load default configuration
+                LOGGER.info("Using default JMX metrics configuration");
+                InputStream is = Application.class.getClassLoader().getResourceAsStream("jmx_metrics_config.yaml");
+                if (is == null) {
+                    return null;
+                }
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
+                    String yaml = reader
+                        .lines()
+                        .collect(Collectors.joining("\n"));
+                    return new JmxMetricsCollector(yaml);
+                }
+            } else if (Files.exists(bridgeConfig.getJmxExporterConfigPath())) {
+                // load custom configuration file
+                LOGGER.info("Loading custom JMX metrics configuration file from {}", bridgeConfig.getJmxExporterConfigPath());
+                String yaml = Files.readString(bridgeConfig.getJmxExporterConfigPath(), StandardCharsets.UTF_8);
+                return new JmxMetricsCollector(yaml);
+            } else {
+                throw new RuntimeException("Custom JMX metrics configuration file not found");
+            }
+        } catch (IOException | MalformedObjectNameException e) {
+            throw new RuntimeException("Failed to initialize JMX metrics collector", e);
+        }
     }
 
     private void bindHttpServer(Promise<Void> startPromise) {
@@ -168,9 +220,7 @@ public void start(Promise<Void> startPromise) {
                         routerBuilder.operation(this.OPENAPI.getOperationId().toString()).handler(this.OPENAPI);
                         routerBuilder.operation(this.OPENAPIV2.getOperationId().toString()).handler(this.OPENAPIV2);
                         routerBuilder.operation(this.OPENAPIV3.getOperationId().toString()).handler(this.OPENAPIV3);
-                        if (metricsReporter != null) {
-                            routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS);
-                        }
+                        routerBuilder.operation(this.METRICS.getOperationId().toString()).handler(this.METRICS);
                         routerBuilder.operation(this.INFO.getOperationId().toString()).handler(this.INFO);
                         if (this.bridgeConfig.getHttpConfig().isCorsEnabled()) {
                             routerBuilder.rootHandler(getCorsHandler());
@@ -180,14 +230,14 @@ public void start(Promise<Void> startPromise) {
                         }
 
                         this.router = routerBuilder.createRouter();
-                        
+
                         // handling validation errors and not existing endpoints
                         this.router.errorHandler(HttpResponseStatus.BAD_REQUEST.code(), this::errorHandler);
                         this.router.errorHandler(HttpResponseStatus.NOT_FOUND.code(), this::errorHandler);
-        
-                        if (this.metricsReporter != null && this.metricsReporter.getVertxRegistry() != null) {
+
+                        if (this.metricsCollector != null && this.metricsCollector.getVertxRegistry() != null) {
                             // exclude to report the HTTP server metrics for the /metrics endpoint itself
-                            this.metricsReporter.getVertxRegistry().config().meterFilter(
+                            this.metricsCollector.getVertxRegistry().config().meterFilter(
                                     MeterFilter.deny(meter -> "/metrics".equals(meter.getTag(Label.HTTP_PATH.toString())))
                             );
                         }
@@ -237,7 +287,6 @@ private CorsHandler getCorsHandler() {
 
     @Override
     public void stop(Promise<Void> stopPromise) {
-
         LOGGER.info("Stopping HTTP-Kafka bridge verticle ...");
 
         this.isReady = false;
@@ -559,10 +608,14 @@ private void openapi(RoutingContext routingContext) {
     }
 
     private void metrics(RoutingContext routingContext) {
-        routingContext.response()
+        if (metricsCollector != null) {
+            routingContext.response()
                 .putHeader("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
                 .setStatusCode(HttpResponseStatus.OK.code())
-                .end(metricsReporter.scrape());
+                .end(metricsCollector.scrape());
+        } else {
+            HttpUtils.sendResponse(routingContext, HttpResponseStatus.NOT_FOUND.code(), null, null);
+        }
     }
 
     private void information(RoutingContext routingContext) {
@@ -608,8 +661,6 @@ private void errorHandler(RoutingContext routingContext) {
             }
         } else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) {
             message = HttpResponseStatus.NOT_FOUND.reasonPhrase();
-        } else if (routingContext.statusCode() == HttpResponseStatus.NOT_IMPLEMENTED.code()) {
-            message = HttpResponseStatus.NOT_IMPLEMENTED.reasonPhrase();
         }
 
         HttpBridgeError error = new HttpBridgeError(routingContext.statusCode(), message);
@@ -826,7 +877,7 @@ public void process(RoutingContext routingContext) {
             openapi(routingContext);
         }
     };
-    
+
     final static HttpOpenApiOperation OPENAPIV2 = new HttpOpenApiOperation(HttpOpenApiOperations.OPENAPIV2) {
 
         @Override
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollector.java
similarity index 69%
rename from src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java
rename to src/main/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollector.java
index 72bbc35ed..30eff78f0 100644
--- a/src/main/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistry.java
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollector.java
@@ -15,9 +15,9 @@
 import java.nio.charset.StandardCharsets;
 
 /**
- * Allow to collect JMX metrics exposing them in the Prometheus format.
+ * Collect and scrape JMX metrics in Prometheus format.
  */
-public class JmxCollectorRegistry {
+public class JmxMetricsCollector extends MetricsCollector {
     private final PrometheusRegistry registry;
     private final PrometheusTextFormatWriter textFormatter;
 
@@ -27,8 +27,8 @@ public class JmxCollectorRegistry {
      * @param yamlConfig YAML configuration string with metrics filtering rules
      * @throws MalformedObjectNameException Throws MalformedObjectNameException
      */
-    public JmxCollectorRegistry(String yamlConfig) throws MalformedObjectNameException {
-        // note that Prometheus default registry is a singleton, so it is shared with JmxCollector
+    public JmxMetricsCollector(String yamlConfig) throws MalformedObjectNameException {
+        // Prometheus default registry is a singleton, so it is shared with JmxCollector
         this(new JmxCollector(yamlConfig), PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true));
     }
 
@@ -39,19 +39,17 @@ public JmxCollectorRegistry(String yamlConfig) throws MalformedObjectNameExcepti
      * @param registry Prometheus collector registry
      * @param textFormatter Prometheus text formatter
      */
-    /* test */ JmxCollectorRegistry(JmxCollector jmxCollector,
-                                    PrometheusRegistry registry,
-                                    PrometheusTextFormatWriter textFormatter) {
+    /* test */ JmxMetricsCollector(JmxCollector jmxCollector,
+                                   PrometheusRegistry registry,
+                                   PrometheusTextFormatWriter textFormatter) {
+        super();
         jmxCollector.register();
         this.registry = registry;
         this.textFormatter = textFormatter;
     }
 
-    /**
-     * @return Content that should be included in the response body for 
-     * an endpoint designated for Prometheus to scrape from.
-     */
-    public String scrape() {
+    @Override
+    public String doScrape() {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             textFormatter.write(stream, registry.scrape());
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsCollector.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsCollector.java
new file mode 100644
index 000000000..1ab5c40d7
--- /dev/null
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsCollector.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Strimzi authors.
+ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
+ */
+package io.strimzi.kafka.bridge.metrics;
+
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.prometheus.PrometheusMeterRegistry;
+import io.micrometer.prometheus.PrometheusNamingConvention;
+import io.vertx.micrometer.backends.BackendRegistries;
+
+/**
+ * Abstract class for collecting and exposing metrics.
+ */
+public abstract class MetricsCollector {
+    private final PrometheusMeterRegistry vertxRegistry;
+    
+    MetricsCollector() {
+        this.vertxRegistry = (PrometheusMeterRegistry) BackendRegistries.getDefaultNow();
+        if (vertxRegistry != null) {
+            // replace the default Prometheus naming convention
+            this.vertxRegistry.config().namingConvention(new MetricsNamingConvention());
+        }
+    }
+
+    private static class MetricsNamingConvention extends PrometheusNamingConvention {
+        @Override
+        public String name(String name, Meter.Type type, String baseUnit) {
+            String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
+            return super.name(metricName, type, baseUnit);
+        }
+    }
+
+    /**
+     * @return Registry instance for scraping Vertx metrics.
+     *         This is null if metrics are not enabled in the VertxOptions instance.
+     */
+    public MeterRegistry getVertxRegistry() {
+        return vertxRegistry;
+    }
+
+    /**
+     * Scrape all, including Vertx metrics.
+     * 
+     * @return Raw metrics in Prometheus format.
+     */
+    public String scrape() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(doScrape());
+        if (vertxRegistry != null) {
+            sb.append(vertxRegistry.scrape());
+        }
+        return sb.toString();
+    }
+
+    /**
+     * @return Raw metrics in Prometheus format.
+     */
+    abstract String doScrape();
+}
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
deleted file mode 100644
index f81139a67..000000000
--- a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsReporter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-
-package io.strimzi.kafka.bridge.metrics;
-
-import io.micrometer.core.instrument.Meter;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
-import io.micrometer.prometheus.PrometheusNamingConvention;
-import io.vertx.micrometer.backends.BackendRegistries;
-
-/**
- * Used for scraping and reporting metrics in Prometheus format.
- */
-public class MetricsReporter {
-    private final JmxCollectorRegistry jmxRegistry;
-    private final StrimziCollectorRegistry strimziRegistry;
-    private final PrometheusMeterRegistry vertxRegistry;
-
-    /**
-     * Constructor.
-     *
-     * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints
-     */
-    public MetricsReporter(JmxCollectorRegistry jmxRegistry) {
-        // default registry is null if metrics are not enabled in the VertxOptions instance
-        this(jmxRegistry, null, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow());
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter
-     */
-    public MetricsReporter(StrimziCollectorRegistry strimziRegistry) {
-        // default registry is null if metrics are not enabled in the VertxOptions instance
-        this(null, strimziRegistry, (PrometheusMeterRegistry) BackendRegistries.getDefaultNow());
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param jmxRegistry Registry instance for scraping metrics from JMX endpoints
-     * @param strimziRegistry Registry instance for scraping metrics from Strimzi Metrics Reporter
-     * @param vertxRegistry Registry instance for scraping metrics exposed through Vert.x
-     */
-    public MetricsReporter(JmxCollectorRegistry jmxRegistry,
-                           StrimziCollectorRegistry strimziRegistry,
-                           PrometheusMeterRegistry vertxRegistry) {
-        this.jmxRegistry = jmxRegistry;
-        this.strimziRegistry = strimziRegistry;
-        this.vertxRegistry = vertxRegistry;
-        if (vertxRegistry != null) {
-            this.vertxRegistry.config().namingConvention(new MetricsNamingConvention());
-        }
-    }
-
-    private static class MetricsNamingConvention extends PrometheusNamingConvention {
-        @Override
-        public String name(String name, Meter.Type type, String baseUnit) {
-            String metricName = name.startsWith("vertx.") ? name.replace("vertx.", "strimzi.bridge.") : name;
-            return super.name(metricName, type, baseUnit);
-        }
-    }
-
-    /**
-     * @return Registry instance for scraping metrics exposed through Vert.x.
-     *         This is null if metrics are not enabled in the VertxOptions instance.
-     */
-    public MeterRegistry getVertxRegistry() {
-        return vertxRegistry;
-    }
-
-    /**
-     * Scrape metrics on the provided registries returning them in the Prometheus format
-     *
-     * @return metrics in Prometheus format as String
-     */
-    public String scrape() {
-        StringBuilder sb = new StringBuilder();
-        if (jmxRegistry != null) {
-            sb.append(jmxRegistry.scrape());
-        }
-        if (strimziRegistry != null) {
-            sb.append(strimziRegistry.scrape());
-        }
-        if (vertxRegistry != null) {
-            sb.append(vertxRegistry.scrape());
-        }
-        return sb.toString();
-    }
-}
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
index 97d13d465..53d821980 100644
--- a/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/MetricsType.java
@@ -8,17 +8,14 @@
  * Metrics type.
  */
 public enum MetricsType {
-    /** JMX Prometheus Exporter.  */
+    /** Prometheus JMX Exporter. */
     JMX_EXPORTER("jmxPrometheusExporter"),
     
     /** Strimzi Metrics Reporter. */
     STRIMZI_REPORTER("strimziMetricsReporter");
 
     private final String text;
-
-    /**
-     * @param text
-     */
+    
     MetricsType(final String text) {
         this.text = text;
     }
@@ -27,4 +24,17 @@ public enum MetricsType {
     public String toString() {
         return text;
     }
+
+    /**
+     * @param text Text.
+     * @return Get type from text.
+     */
+    public static MetricsType fromString(String text) {
+        for (MetricsType t : MetricsType.values()) {
+            if (t.text.equalsIgnoreCase(text)) {
+                return t;
+            }
+        }
+        throw new IllegalArgumentException("Metrics type not found: " + text);
+    }
 }
diff --git a/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollector.java
similarity index 66%
rename from src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java
rename to src/main/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollector.java
index e21a13216..97d9fb95f 100644
--- a/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistry.java
+++ b/src/main/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollector.java
@@ -13,17 +13,17 @@
 import java.nio.charset.StandardCharsets;
 
 /**
- * Allow to collect Strimzi Reporter metrics exposing them in the Prometheus format.
+ * Collect and scrape Strimzi Reporter metrics in Prometheus format.
  */
-public class StrimziCollectorRegistry {
+public class StrimziMetricsCollector extends MetricsCollector {
     private final PrometheusRegistry registry;
     private final PrometheusTextFormatWriter textFormatter;
 
     /**
      * Constructor.
      */
-    public StrimziCollectorRegistry() {
-        // note that Prometheus default registry is a singleton, so it is shared with Strimzi Metrics Reporter
+    public StrimziMetricsCollector() {
+        // Prometheus default registry is a singleton, so it is shared with Strimzi Metrics Reporter
         this(PrometheusRegistry.defaultRegistry, new PrometheusTextFormatWriter(true));
     }
 
@@ -33,17 +33,15 @@ public StrimziCollectorRegistry() {
      * @param registry Prometheus collector registry
      * @param textFormatter Prometheus text formatter
      */
-    /* test */ StrimziCollectorRegistry(PrometheusRegistry registry,
-                                        PrometheusTextFormatWriter textFormatter) {
+    /* test */ StrimziMetricsCollector(PrometheusRegistry registry,
+                                       PrometheusTextFormatWriter textFormatter) {
+        super();
         this.registry = registry;
         this.textFormatter = textFormatter;
     }
 
-    /**
-     * @return Content that should be included in the response body for 
-     * an endpoint designated for Prometheus to scrape from.
-     */
-    public String scrape() {
+    @Override
+    public String doScrape() {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             textFormatter.write(stream, registry.scrape());
diff --git a/src/main/resources/openapi.json b/src/main/resources/openapi.json
index dc51164f4..c342353a1 100644
--- a/src/main/resources/openapi.json
+++ b/src/main/resources/openapi.json
@@ -1478,6 +1478,9 @@
                             }
                         },
                         "description": "Metrics in Prometheus format retrieved successfully."
+                    },
+                    "404": {
+                        "description": "The metrics endpoint is not enabled."
                     }
                 },
                 "operationId": "metrics",
diff --git a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
index af42d737a..fe7e4076a 100644
--- a/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
+++ b/src/test/java/io/strimzi/kafka/bridge/config/ConfigTest.java
@@ -99,7 +99,7 @@ public void testJmxExporterMetricsType() {
         );
 
         BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
-        assertThat(bridgeConfig.getMetrics(), is(MetricsType.JMX_EXPORTER.toString()));
+        assertThat(bridgeConfig.getMetricsType(), is(MetricsType.JMX_EXPORTER));
         assertThat(bridgeConfig.getJmxExporterConfigPath(), is(Path.of(configFilePah)));
     }
 
@@ -114,10 +114,10 @@ public void testStrimziReporterMetricsType() {
         ));
 
         BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
-        assertThat(bridgeConfig.getMetrics(), is(MetricsType.STRIMZI_REPORTER.toString()));
+        assertThat(bridgeConfig.getMetricsType(), is(MetricsType.STRIMZI_REPORTER));
         assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"));
         assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.listener.enable=false"));
-        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=.*"));
+        assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=" + BridgeConfig.DEFAULT_STRIMZI_METRICS_REPORTER_ALLOW_LIST));
     }
 
     @Test
@@ -133,7 +133,7 @@ public void testStrimziReporterWithCustomConfig() {
         ));
 
         BridgeConfig bridgeConfig = BridgeConfig.fromMap(map);
-        assertThat(bridgeConfig.getMetrics(), is(MetricsType.STRIMZI_REPORTER.toString()));
+        assertThat(bridgeConfig.getMetricsType(), is(MetricsType.STRIMZI_REPORTER));
         assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("metric.reporters=my.domain.CustomMetricReporter,io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"));
         assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.listener.enable=false"));
         assertThat(bridgeConfig.getKafkaConfig().toString(), containsString("prometheus.metrics.reporter.allowlist=kafka_log.*,kafka_network.*"));
@@ -150,6 +150,6 @@ public void testInvalidMetricsType() {
         );
 
         Exception e = assertThrows(IllegalArgumentException.class, () -> BridgeConfig.fromMap(map));
-        assertThat(e.getMessage(), is("Invalid bridge.metrics configuration, choose one of jmxPrometheusExporter and strimziMetricsReporter"));
+        assertThat(e.getMessage(), is("Metrics type not found: invalidReporterType"));
     }
 }
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
index 4d5a0c3d1..377f7c3cf 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/ConsumerGeneratedNameIT.java
@@ -10,8 +10,6 @@
 import io.strimzi.kafka.bridge.config.KafkaConsumerConfig;
 import io.strimzi.kafka.bridge.config.KafkaProducerConfig;
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
-import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -78,7 +76,6 @@ public class ConsumerGeneratedNameIT {
     private static HttpBridge httpBridge;
     private static WebClient client;
     private static BridgeConfig bridgeConfig;
-    private static JmxCollectorRegistry jmxCollectorRegistry = null;
 
     ConsumerService consumerService() {
         return ConsumerService.getInstance(client);
@@ -96,7 +93,7 @@ static void beforeAll(VertxTestContext context) {
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
 
             bridgeConfig = BridgeConfig.fromMap(config);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
+            httpBridge = new HttpBridge(bridgeConfig);
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
index c96490f4b..e6f2815bf 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/DisablingConsumerProducerIT.java
@@ -14,8 +14,6 @@
 import io.strimzi.kafka.bridge.http.model.HttpBridgeError;
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
 import io.strimzi.kafka.bridge.http.services.ProducerService;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
-import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonArray;
@@ -167,7 +165,7 @@ private void startBridge(VertxTestContext context, Map<String, Object> config) t
         CompletableFuture<Boolean> startBridge = new CompletableFuture<>();
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             BridgeConfig bridgeConfig = BridgeConfig.fromMap(config);
-            HttpBridge httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry()));
+            HttpBridge httpBridge = new HttpBridge(bridgeConfig);
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> startBridge.complete(true)));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
index 43955c40d..b19058028 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/HttpCorsIT.java
@@ -10,8 +10,6 @@
 import io.strimzi.kafka.bridge.config.BridgeConfig;
 import io.strimzi.kafka.bridge.config.KafkaConfig;
 import io.strimzi.kafka.bridge.facades.AdminClientFacade;
-import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -57,7 +55,6 @@ public class HttpCorsIT {
 
     static BridgeConfig bridgeConfig;
     static StrimziKafkaContainer kafkaContainer;
-    static JmxCollectorRegistry jmxCollectorRegistry = null;
     static AdminClientFacade adminClientFacade;
 
     static {
@@ -296,7 +293,7 @@ private void configureBridge(boolean corsEnabled, String methodsAllowed) {
             config.put(HttpConfig.HTTP_CORS_ALLOWED_METHODS, methodsAllowed != null ? methodsAllowed : "GET,POST,PUT,DELETE,OPTIONS,PATCH");
 
             bridgeConfig = BridgeConfig.fromMap(config);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
+            httpBridge = new HttpBridge(bridgeConfig);
         }
     }
 }
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
index fe5f117d7..25b48e551 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/InvalidProducerIT.java
@@ -10,7 +10,6 @@
 import io.strimzi.kafka.bridge.config.KafkaProducerConfig;
 import io.strimzi.kafka.bridge.facades.AdminClientFacade;
 import io.strimzi.kafka.bridge.http.base.HttpBridgeITAbstract;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonArray;
@@ -76,7 +75,7 @@ static void beforeAll(VertxTestContext context) {
 
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             bridgeConfig = BridgeConfig.fromMap(cfg);
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(jmxCollectorRegistry));
+            httpBridge = new HttpBridge(bridgeConfig);
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
index 2417b141d..5aa42b7b4 100644
--- a/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
+++ b/src/test/java/io/strimzi/kafka/bridge/http/base/HttpBridgeITAbstract.java
@@ -17,9 +17,7 @@
 import io.strimzi.kafka.bridge.http.services.ConsumerService;
 import io.strimzi.kafka.bridge.http.services.ProducerService;
 import io.strimzi.kafka.bridge.http.services.SeekService;
-import io.strimzi.kafka.bridge.metrics.JmxCollectorRegistry;
-import io.strimzi.kafka.bridge.metrics.MetricsReporter;
-import io.strimzi.kafka.bridge.metrics.StrimziCollectorRegistry;
+import io.strimzi.kafka.bridge.metrics.MetricsType;
 import io.strimzi.kafka.bridge.utils.Urls;
 import io.strimzi.test.container.StrimziKafkaContainer;
 import io.vertx.core.Vertx;
@@ -88,6 +86,7 @@ public abstract class HttpBridgeITAbstract {
         config.put(KafkaConsumerConfig.KAFKA_CONSUMER_CONFIG_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         config.put(KafkaProducerConfig.KAFKA_PRODUCER_CONFIG_PREFIX + ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000");
         config.put(HttpConfig.HTTP_CONSUMER_TIMEOUT, timeout);
+        config.put(BridgeConfig.METRICS_TYPE, MetricsType.STRIMZI_REPORTER.toString());
         config.put(BridgeConfig.BRIDGE_ID, "my-bridge");
     }
 
@@ -97,7 +96,6 @@ public abstract class HttpBridgeITAbstract {
     protected static AdminClientFacade adminClientFacade;
     protected static HttpBridge httpBridge;
     protected static BridgeConfig bridgeConfig;
-    protected static JmxCollectorRegistry jmxCollectorRegistry = null;
 
     protected BaseService baseService() {
         return BaseService.getInstance(client);
@@ -127,7 +125,7 @@ static void beforeAll(VertxTestContext context) {
         if ("FALSE".equals(BRIDGE_EXTERNAL_ENV)) {
             bridgeConfig = BridgeConfig.fromMap(config);
             
-            httpBridge = new HttpBridge(bridgeConfig, new MetricsReporter(new StrimziCollectorRegistry()));
+            httpBridge = new HttpBridge(bridgeConfig);
 
             LOGGER.info("Deploying in-memory bridge");
             vertx.deployVerticle(httpBridge).onComplete(context.succeeding(id -> context.completeNow()));
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollectorTest.java
similarity index 83%
rename from src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java
rename to src/test/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollectorTest.java
index 007cefa71..4a7f35752 100644
--- a/src/test/java/io/strimzi/kafka/bridge/metrics/JmxCollectorRegistryTest.java
+++ b/src/test/java/io/strimzi/kafka/bridge/metrics/JmxMetricsCollectorTest.java
@@ -25,9 +25,9 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class JmxCollectorRegistryTest {
+class JmxMetricsCollectorTest {
     @Test
-    void shouldReturnFormattedMetrics() throws IOException {
+    void shouldReturnMetrics() throws IOException {
         JmxCollector mockJmxCollector = mock(JmxCollector.class);
             
         PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
@@ -41,9 +41,9 @@ void shouldReturnFormattedMetrics() throws IOException {
             return null;
         }).when(mockPromFormatter).write(any(), any());
 
-        JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter);
+        MetricsCollector metricsCollector = new JmxMetricsCollector(mockJmxCollector, mockPromRegistry, mockPromFormatter);
         
-        String result = collectorRegistry.scrape();
+        String result = metricsCollector.scrape();
         assertThat(result, containsString("test_metric"));
         assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length()));
     }
@@ -60,14 +60,14 @@ void shouldHandleIoException() throws IOException {
         doThrow(new IOException("Test exception"))
             .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots));
 
-        JmxCollectorRegistry collectorRegistry = new JmxCollectorRegistry(mockJmxCollector, mockPromRegistry, mockPromFormatter);
+        MetricsCollector metricsCollector = new JmxMetricsCollector(mockJmxCollector, mockPromRegistry, mockPromFormatter);
         
-        RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape());
+        RuntimeException exception = assertThrows(RuntimeException.class, () -> metricsCollector.doScrape());
         assertThat(exception.getMessage(), containsString("Test exception"));
     }
 
     @Test
     void shouldThrowWithInvalidYaml() {
-        assertThrows(ClassCastException.class, () -> new JmxCollectorRegistry("invalid"));
+        assertThrows(ClassCastException.class, () -> new JmxMetricsCollector("invalid"));
     }
 }
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
deleted file mode 100644
index c68ca2309..000000000
--- a/src/test/java/io/strimzi/kafka/bridge/metrics/MetricsReporterTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright Strimzi authors.
- * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
- */
-package io.strimzi.kafka.bridge.metrics;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.prometheus.PrometheusMeterRegistry;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.is;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-class MetricsReporterTest {
-    @Test
-    void shouldReturnMetrics() {
-        JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class);
-        when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n");
-
-        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
-        when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n");
-
-        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
-        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
-        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
-        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
-
-        MetricsReporter metricsReporter = new MetricsReporter(
-            mockJmxRegistry, mockStrimziRegistry, mockVertxRegistry);
-
-        String result = metricsReporter.scrape();
-
-        assertThat(result, containsString("jmx_metrics"));
-        assertThat(result, containsString("strimzi_metrics"));
-        assertThat(result, containsString("vertx_metrics"));
-
-        verify(mockJmxRegistry).scrape();
-        verify(mockStrimziRegistry).scrape();
-        verify(mockVertxRegistry).scrape();
-    }
-
-    @Test
-    void shouldReturnMetricsWithoutStrimziRegistry() {
-        JmxCollectorRegistry mockJmxRegistry = mock(JmxCollectorRegistry.class);
-        when(mockJmxRegistry.scrape()).thenReturn("jmx_metrics\n");
-
-        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
-        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
-        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
-        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
-
-        MetricsReporter metricsReporter = new MetricsReporter(mockJmxRegistry, null, mockVertxRegistry);
-
-        String result = metricsReporter.scrape();
-
-        assertThat(result, containsString("jmx_metrics"));
-        assertThat(result, containsString("vertx_metrics"));
-
-        verify(mockJmxRegistry).scrape();
-        verify(mockVertxRegistry).scrape();
-    }
-
-    @Test
-    void shouldReturnMetricsWithoutJmxRegistry() {
-        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
-        when(mockStrimziRegistry.scrape()).thenReturn("strimzi_metrics\n");
-
-        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
-        when(mockVertxRegistry.scrape()).thenReturn("vertx_metrics\n");
-        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
-        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
-
-        MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
-
-        String result = metricsReporter.scrape();
-
-        assertThat(result, containsString("strimzi_metrics"));
-        assertThat(result, containsString("vertx_metrics"));
-
-        verify(mockStrimziRegistry).scrape();
-        verify(mockVertxRegistry).scrape();
-    }
-
-    @Test
-    void testGetVertxRegistry() {
-        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
-        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
-        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
-        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
-
-        MetricsReporter metricsReporter = new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
-
-        MeterRegistry result = metricsReporter.getVertxRegistry();
-
-        assertThat(result, is(mockVertxRegistry));
-    }
-
-    @Test
-    void testNamingConventionAppliedForPrometheusRegistry() {
-        StrimziCollectorRegistry mockStrimziRegistry = mock(StrimziCollectorRegistry.class);
-        PrometheusMeterRegistry mockVertxRegistry = mock(PrometheusMeterRegistry.class);
-        MeterRegistry.Config meterRegistryConfig = mock(MeterRegistry.Config.class);
-        when(mockVertxRegistry.config()).thenReturn(meterRegistryConfig);
-        
-        new MetricsReporter(null, mockStrimziRegistry, mockVertxRegistry);
-
-        verify(mockVertxRegistry).config();
-    }
-}
diff --git a/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollectorTest.java
similarity index 84%
rename from src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java
rename to src/test/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollectorTest.java
index bab4d7eb3..5899dd105 100644
--- a/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziCollectorRegistryTest.java
+++ b/src/test/java/io/strimzi/kafka/bridge/metrics/StrimziMetricsCollectorTest.java
@@ -24,9 +24,9 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-class StrimziCollectorRegistryTest {
+class StrimziMetricsCollectorTest {
     @Test
-    void shouldReturnFormattedMetrics() throws IOException {
+    void shouldReturnMetrics() throws IOException {
         PrometheusRegistry mockPromRegistry = mock(PrometheusRegistry.class);
         MetricSnapshots mockSnapshots = mock(MetricSnapshots.class);
         when(mockPromRegistry.scrape()).thenReturn(mockSnapshots);
@@ -38,9 +38,9 @@ void shouldReturnFormattedMetrics() throws IOException {
             return null;
         }).when(mockPromFormatter).write(any(), any());
 
-        StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter);
+        MetricsCollector metricsCollector = new StrimziMetricsCollector(mockPromRegistry, mockPromFormatter);
         
-        String result = collectorRegistry.scrape();
+        String result = metricsCollector.scrape();
         assertThat(result, containsString("test_metric"));
         assertThat(result.getBytes(StandardCharsets.UTF_8).length, is(result.length()));
     }
@@ -55,9 +55,9 @@ void shouldHandleIoException() throws IOException {
         doThrow(new IOException("Test exception"))
             .when(mockPromFormatter).write(any(ByteArrayOutputStream.class), Mockito.eq(mockSnapshots));
 
-        StrimziCollectorRegistry collectorRegistry = new StrimziCollectorRegistry(mockPromRegistry, mockPromFormatter);
+        MetricsCollector metricsCollector = new StrimziMetricsCollector(mockPromRegistry, mockPromFormatter);
         
-        RuntimeException exception = assertThrows(RuntimeException.class, () -> collectorRegistry.scrape());
+        RuntimeException exception = assertThrows(RuntimeException.class, () -> metricsCollector.doScrape());
         assertThat(exception.getMessage(), containsString("Test exception"));
     }
 }

From 40b3c2856a93844e2c96102f9c4757f5332eda02 Mon Sep 17 00:00:00 2001
From: Federico Valeri <fedevaleri@gmail.com>
Date: Thu, 20 Feb 2025 11:19:40 +0100
Subject: [PATCH 3/3] Improve docs

Signed-off-by: Federico Valeri <fedevaleri@gmail.com>
---
 config/application.properties                 |  4 ++--
 .../assembly-kafka-bridge-config.adoc         |  5 ++---
 ...-configuring-kafka-bridge-jmx-metrics.adoc |  8 +++----
 ...-configuring-kafka-bridge-smr-metrics.adoc | 22 +++++++++----------
 4 files changed, 19 insertions(+), 20 deletions(-)

diff --git a/config/application.properties b/config/application.properties
index ca0657f5e..3b5878388 100644
--- a/config/application.properties
+++ b/config/application.properties
@@ -1,12 +1,12 @@
 #Bridge related settings
 bridge.id=my-bridge
 
-# uncomment the following line to enable Prometheus JMX Exporter, check the documentation for more details
+# uncomment the following line to enable Prometheus JMX Exporter, check the Kafka Bridge documentation for more details
 #bridge.metrics=jmxPrometheusExporter
 # optionally, set the file path of your custom configuration
 #bridge.metrics.exporter.config.path=/path/to/my-exporter-config.yaml
 
-# uncomment the following line to enable Strimzi Metrics Reporter, check the documentation for more details
+# uncomment the following line to enable Strimzi Metrics Reporter, check the Kafka Bridge documentation for more details
 #bridge.metrics=strimziMetricsReporter
 # optionally, filter the exposed metrics of all internal Kafka clients using a comma separated list of regexes
 #kafka.prometheus.metrics.reporter.allowlist=.*
diff --git a/documentation/assemblies/assembly-kafka-bridge-config.adoc b/documentation/assemblies/assembly-kafka-bridge-config.adoc
index 3a2c9cc0e..0302a6bc2 100644
--- a/documentation/assemblies/assembly-kafka-bridge-config.adoc
+++ b/documentation/assemblies/assembly-kafka-bridge-config.adoc
@@ -6,9 +6,8 @@
 = Kafka Bridge configuration
 
 [role="_abstract"]
-Configure a deployment of the Kafka Bridge using configuration properties.
-Configure Kafka and specify the HTTP connection details needed to be able to interact with Kafka.
-It is possible to enable metrics in Prometheus format with Prometheus JMX Exporter or Strimzi Metrics Reporter.
+Configure a deployment of the Kafka Bridge with Kafka-related properties and specify the HTTP connection details needed to be able to interact with Kafka.
+Additionally, enable metrics in Prometheus format using either the https://github.com/prometheus/jmx_exporter[Prometheus JMX Exporter] or the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter].
 You can also use configuration properties to enable and use distributed tracing with the Kafka Bridge.
 Distributed tracing allows you to track the progress of transactions between applications in a distributed system.
 
diff --git a/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
index 35833377e..b954891c1 100644
--- a/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
+++ b/documentation/modules/proc-configuring-kafka-bridge-jmx-metrics.adoc
@@ -2,7 +2,7 @@
 = Configuring Prometheus JMX Exporter metrics
 
 [role="_abstract"]
-Enable metrics with Prometheus JMX Exporter by setting the `bridge.metrics` option to `jmxPrometheusExporter`.
+Enable the Prometheus JMX Exporter to collect Kafka Bridge metrics by setting the `bridge.metrics` option to `jmxPrometheusExporter`.
 
 .Prerequisites
 
@@ -19,8 +19,8 @@ Enable metrics with Prometheus JMX Exporter by setting the `bridge.metrics` opti
 bridge.metrics=jmxPrometheusExporter
 ----
 +
-Optionally, you can configure a custom Prometheus JMX Exporter configuration using the `bridge.metrics.exporter.config.path` property.
-If not configured, a default embedded configuration file will be used.
+Optionally, you can add a custom Prometheus JMX Exporter configuration using the `bridge.metrics.exporter.config.path` property.
+If not configured, a default embedded configuration file is used.
 
 . Run the Kafka Bridge run script.
 +
@@ -30,4 +30,4 @@ If not configured, a default embedded configuration file will be used.
 ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
 ----
 +
-With metrics enabled, you can scrape metrics in the Prometheus format from the `/metrics` endpoint of the Kafka Bridge.
+With metrics enabled, you can scrape metrics in Prometheus format from the `/metrics` endpoint of the Kafka Bridge.
diff --git a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
index 892d8e4a4..ce49d3243 100644
--- a/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
+++ b/documentation/modules/proc-configuring-kafka-bridge-smr-metrics.adoc
@@ -2,7 +2,7 @@
 = Configuring Strimzi Metrics Reporter metrics
 
 [role="_abstract"]
-Enable metrics with Strimzi Metrics Reporter by setting the `bridge.metrics` option to `strimziMetricsReporter`.
+Enable the Strimzi Metrics Reporter to collect Kafka Bridge metrics by setting the `bridge.metrics` option to `strimziMetricsReporter`.
 
 .Prerequisites
 
@@ -19,17 +19,17 @@ Enable metrics with Strimzi Metrics Reporter by setting the `bridge.metrics` opt
 bridge.metrics=strimziMetricsReporter
 ----
 +
-Optionally, you can configure a comma-separated list of regexes to filter exposed metrics using the `kafka.prometheus.metrics.reporter.allowlist` property.
-If not configured, a default set of metrics will be exposed.
+Optionally, you can configure a comma-separated list of regular expressions to filter exposed metrics using the `kafka.prometheus.metrics.reporter.allowlist` property.
+If not configured, a default set of metrics is exposed.
 
-When needed, it is possible to configure the allowlist per client type.
-For example, set `kafka.admin.prometheus.metrics.reporter.allowlist=` to exclude all admin client metrics.
-
-In case the same property is set with multiple prefixes, the most specific prefix wins.
-For example, `kafka.producer.prometheus.metrics.reporter.allowlist` wins over `kafka.prometheus.metrics.reporter.allowlist`. 
++
+When needed, it is possible to configure the `allowlist` per client type.
+For example, by using the `kafka.admin` prefix and setting `kafka.admin.prometheus.metrics.reporter.allowlist=`, all admin client metrics are excluded.
 
-You can add any plugin configuration to the Bridge properties file using `kafka.`, `kafka.admin.`, `kafka.producer.`, and `kafka.consumer.` prefixes.
-See the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter] documentation for more details.
++
+You can add any plugin configuration to the Kafka Bridge properties file using `kafka.`, `kafka.admin.`, `kafka.producer.`, and `kafka.consumer.` prefixes.
+In the event that the same property is configured with multiple prefixes, the most specific prefix takes precedence.
+For example, `kafka.producer.prometheus.metrics.reporter.allowlist` takes precedence over `kafka.prometheus.metrics.reporter.allowlist`.
 
 . Run the Kafka Bridge run script.
 +
@@ -39,4 +39,4 @@ See the https://github.com/strimzi/metrics-reporter[Strimzi Metrics Reporter] do
 ./bin/kafka_bridge_run.sh --config-file=<path>/application.properties
 ----
 +
-With metrics enabled, you can scrape metrics in the Prometheus format from the `/metrics` endpoint of the Kafka Bridge.
+With metrics enabled, you can scrape metrics in Prometheus format from the `/metrics` endpoint of the Kafka Bridge.