Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
19d7e2e
Instantiated metric 'pipelines.<pipeline id>.batch.count' to count nu…
andsel Aug 18, 2025
4ec8189
Exposed just the lifetime value for a flow metric computed using tota…
andsel Aug 18, 2025
dcb8543
Instantiated metric 'pipelines.<pipeline id>.batch.total_bytes' to su…
andsel Aug 19, 2025
e633d3a
[Code clean] Removed unused code
andsel Aug 19, 2025
ad8a829
Added RubyNil in the list of class types that estimateMemory can work…
andsel Aug 19, 2025
42e266a
Refact: used the BYTES constant for each Java scalar type
andsel Aug 19, 2025
d9c841a
Added RubySymbol and RubyFloat to the list of class types that estima…
andsel Aug 19, 2025
8fc8398
[Test] added integration test to verify monitoring API return expecte…
andsel Aug 20, 2025
5929104
Introduced 'pipeline.batch.metrics' to manage the way queue reader cl…
andsel Aug 20, 2025
fdb7648
Implemented logic to use in batch metrics creation and exposure
andsel Aug 21, 2025
91196f8
[Test] Extracted common part of test code in test initializer method
andsel Aug 21, 2025
a7e3a7f
[Log] added the batch metric sampling value on pipeline start
andsel Aug 21, 2025
fa873d3
Changed event_count metric to use a specific event counter of batch f…
andsel Aug 21, 2025
5ae12f6
[Refact] Extracted batch metric initialisation in separate method
andsel Aug 21, 2025
8fd2bcc
[Minor] Fixed import and removed dev comments
andsel Aug 21, 2025
e2f440c
[Fix] Used Byte.BYTES constant for Bytes size in bytes
andsel Sep 10, 2025
e57f7be
[Refactoring] Separated batch's metrics code in its own class
andsel Sep 10, 2025
1bf7650
Renamed setting to manage the collection type from pipeline.batch.met…
andsel Sep 10, 2025
e6938f5
Rename setting to pipeline.batch.metrics.sampling_mode and the value …
andsel Sep 17, 2025
7608486
Updated estimate memory of a batch to log an error if one of any even…
andsel Sep 17, 2025
7a0a2c2
Fixes after rebase to main
andsel Sep 18, 2025
1d0d3b4
[Fix] Avoid to increment batch counts for empty batches
andsel Sep 18, 2025
d2e481f
[Test] Covered QueueReadClientBatchMetrics with tests
andsel Sep 18, 2025
d82ed1f
[Cleanup] Inverted condition to improve readability
andsel Sep 18, 2025
e5ec911
[Fix] Avoid to update batch and total event counter if we encouter an…
andsel Sep 18, 2025
2deae4c
Round bytes_size and event_count per batch to integers because float …
andsel Sep 18, 2025
5179c56
Fixes after reabse to main to pull Settings builder
andsel Sep 19, 2025
ea69549
Fixed failing API tests after switching to settings builder
andsel Sep 19, 2025
1e68d32
[Refactoring] extracted common code in method to decode string settin…
andsel Sep 19, 2025
666e95b
Fixed constructor of JRubyWrappedAckedQueueExt to retrieve batch metr…
andsel Sep 19, 2025
324922a
[Test] Covered monitoring API response with test
andsel Sep 22, 2025
166f3fa
[Refactoring] Removed batchMetrics mode from PQ settings and passed a…
andsel Sep 22, 2025
a71ff25
[Refactoring] Introduce create method in JrubyWrappedSynchronousQueue…
andsel Sep 22, 2025
5b971b3
Minor, removed commented code
andsel Sep 22, 2025
f2e85ee
Fix, avoid to invoke a round on possible nil content. In case the bat…
andsel Sep 23, 2025
affee72
Switched default batch mode to minimal
andsel Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@
#
# pipeline.batch.delay: 50
#
# Set the pipeline's batch metrics reporting mode. It can be "disabled" to disable it.
# "minimal" to collect only 1% of the batches metrics, "full" to collect all batches.
# Default is "disabled".
#
# pipeline.batch.metrics.sampling_mode: "disabled"
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
Expand Down
18 changes: 18 additions & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ def plugin_stats(stats, plugin_type)
end
end

def refine_batch_metrics(stats)
{
:event_count => {
:average => {
# average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
:lifetime => stats[:batch][:event_count][:average].value["lifetime"].round
}
},
:byte_size => {
:average => {
:lifetime => stats[:batch][:byte_size][:average].value["lifetime"].round
}
}
}
end
private :refine_batch_metrics

def report(stats, extended_stats = nil, opts = {})
ret = {
:events => stats[:events],
Expand All @@ -190,6 +207,7 @@ def report(stats, extended_stats = nil, opts = {})
:batch_delay => stats.dig(:config, :batch_delay),
}
}
ret[:batch] = refine_batch_metrics(stats) if stats.include?(:batch)
ret[:dead_letter_queue] = stats[:dlq] if stats.include?(:dlq)

# if extended_stats were provided, enrich the return value
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def self.as_java_range(r)
Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable,
Setting::PasswordSetting.new("api.ssl.keystore.password", nil, false).nullable,
Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]),
Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "disabled", true, ["disabled", "minimal", "full"]),
Setting::StringSetting.new("queue.type", "memory", true, ["persisted", "memory"]),
Setting::BooleanSetting.new("queue.drain", false),
Setting::Bytes.new("queue.page_capacity", "64mb"),
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def start_workers
@preserve_event_order = preserve_event_order?(pipeline_workers)
batch_size = settings.get("pipeline.batch.size")
batch_delay = settings.get("pipeline.batch.delay")
batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode")

max_inflight = batch_size * pipeline_workers

Expand All @@ -287,6 +288,7 @@ def start_workers
"pipeline.batch.size" => batch_size,
"pipeline.batch.delay" => batch_delay,
"pipeline.max_inflight" => max_inflight,
"batch_metric_sampling" => batch_metric_sampling,
"pipeline.sources" => pipeline_source_details)
@logger.info("Starting pipeline", pipeline_log_params)

Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def self.included(base)
"path.dead_letter_queue",
"path.queue",
"pipeline.batch.delay",
"pipeline.batch.metrics.sampling_mode",
"pipeline.batch.size",
"pipeline.id",
"pipeline.reloadable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

require "logstash/instrument/namespaced_metric"
# require "logstash/instrument/collector"

describe LogStash::WrappedAckedQueue, :stress_test => true do
let(:path) { Stud::Temporary.directory }
Expand All @@ -29,11 +30,13 @@
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue_settings) do
java_import org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.checkpointMaxAcks(queue_checkpoint_acks)
.checkpointMaxWrites(queue_checkpoint_writes)
.queueMaxBytes(queue_capacity)
.batchMetricMode(BatchMetricMode::DISABLED)
.build
end

Expand Down
21 changes: 19 additions & 2 deletions logstash-core/spec/logstash/api/modules/node_stats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
require "logstash/api/modules/node_stats"

describe LogStash::Api::Modules::NodeStats do
# enable PQ to ensure PQ-related metrics are present
include_context "api setup", {"queue.type" => "persisted"}

include_context "api setup", {
# enable PQ to ensure PQ-related metrics are present
"queue.type" => "persisted",
#enable batch metrics
"pipeline.batch.metrics.sampling_mode" => "full"
}
include_examples "not found"

extend ResourceDSLMethods
Expand Down Expand Up @@ -142,6 +147,18 @@
"path" => String,
"free_space_in_bytes" => Numeric
}
},
"batch" => {
"event_count" => {
"average" => {
"lifetime" => Numeric
}
},
"byte_size" => {
"average" => {
"lifetime" => Numeric
}
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def threaded_read_client
end

context "WrappedSynchronousQueue" do
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024, "disabled") }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand All @@ -127,12 +127,14 @@ def threaded_read_client
let(:path) { Stud::Temporary.directory }

let(:queue_settings) do
java_import org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode
LogStash::AckedQueue.file_settings_builder(path)
.capacity(1024)
.maxUnread(10)
.checkpointMaxAcks(1024)
.checkpointMaxWrites(1024)
.queueMaxBytes(4096)
.batchMetricMode(BatchMetricMode::DISABLED)
.build
end

Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
LogStash::Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "disabled", true, ["disabled", "minimal", "full"]),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
]
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@
let(:path) { Stud::Temporary.directory }

let(:queue_settings) do
java_import org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode
LogStash::AckedQueue.file_settings_builder(path)
.capacity(page_capacity)
.maxUnread(max_events)
.checkpointMaxAcks(checkpoint_acks)
.checkpointMaxWrites(checkpoint_writes)
.queueMaxBytes(max_bytes)
.batchMetricMode(BatchMetricMode::DISABLED)
.build
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
require "logstash/instrument/collector"

describe LogStash::WrappedSynchronousQueue do
subject {LogStash::WrappedSynchronousQueue.new(5)}
subject {LogStash::WrappedSynchronousQueue.new(5, "disabled")}

describe "queue clients" do
context "when requesting a write client" do
Expand Down
122 changes: 121 additions & 1 deletion logstash-core/src/main/java/org/logstash/ConvertedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,26 @@
package org.logstash;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.jruby.RubyBignum;
import org.jruby.RubyBoolean;
import org.jruby.RubyFixnum;
import org.jruby.RubyFloat;
import org.jruby.RubyHash;
import org.jruby.RubyNil;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.ext.bigdecimal.RubyBigDecimal;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ext.JrubyTimestampExtLibrary;

/**
* <p>This class is an internal API and behaves very different from a standard {@link Map}.</p>
Expand All @@ -41,7 +53,7 @@
* intern pool to ensure identity match of equivalent strings.
* For performance, we keep a global cache of strings that have been interned for use with {@link ConvertedMap},
* and encourage interning through {@link ConvertedMap#internStringForUseAsKey(String)} to avoid
* the performance pentalty of the global string intern pool.
* the performance penalty of the global string intern pool.
*/
public final class ConvertedMap extends IdentityHashMap<String, Object> {

Expand Down Expand Up @@ -157,4 +169,112 @@ public Object unconvert() {
private static String convertKey(final RubyString key) {
return internStringForUseAsKey(key.asJavaString());
}

public long estimateMemory() {
return values().stream()
.map(this::estimateMemory)
.mapToLong(Long::longValue)
.sum();
}

@SuppressWarnings({"rawtypes", "unchecked"})
private long estimateMemory(Object o) {
if (o instanceof Boolean) {
return Byte.BYTES;
}
if (o instanceof Byte) {
return Byte.BYTES;
}
if (o instanceof Short) {
return Short.BYTES;
}
if (o instanceof Integer) {
return Integer.BYTES;
}
if (o instanceof Long) {
return Long.BYTES;
}
if (o instanceof Float) {
return Float.BYTES;
}
if (o instanceof Double) {
return Double.BYTES;
}
if (o instanceof Character) {
return Character.BYTES;
}
if (o instanceof String) {
return ((String) o).getBytes().length;
}
if (o instanceof RubyString) {
return ((RubyString) o).getBytes().length;
}

if (o instanceof Collection) {
Collection c = (Collection) o;
long memory = 0L;
for (Object v : c) {
memory += estimateMemory(v);
}
return memory;
}

if (o instanceof ConvertedMap) {
ConvertedMap c = (ConvertedMap) o;
return c.estimateMemory();
}

if (o instanceof Map) {
// this case shouldn't happen because all Map are converted to ConvertedMap
Map<String, Object> m = (Map<String, Object>) o;
long memory = 0L;
for (Map.Entry e : m.entrySet()) {
memory += estimateMemory(e.getKey());
memory += estimateMemory(e.getValue());
}
return memory;
}
if (o instanceof JrubyTimestampExtLibrary.RubyTimestamp) {
// wraps an java.time.Instant which is made of long and int
return Long.BYTES + Integer.BYTES;
}
if (o instanceof BigInteger) {
return ((BigInteger) o).toByteArray().length;
}
if (o instanceof BigDecimal) {
// BigInteger has 4 fields, one reference 2 ints (scale and precision) and a long.
return 8 + 2 * Integer.BYTES + Long.BYTES;
}
if (o instanceof RubyBignum) {
RubyBignum rbn = (RubyBignum) o;
return ((RubyFixnum) rbn.size()).getLongValue();
}
if (o instanceof RubyBigDecimal) {
RubyBigDecimal rbd = (RubyBigDecimal) o;
// wraps a Java BigDecimal so we can return the size of that:
return estimateMemory(rbd.getValue());
}
if (o instanceof RubyFixnum) {
// like an int value
return Integer.BYTES;
}
if (o instanceof RubyBoolean) {
return Byte.BYTES;
}
if (o instanceof RubyNil) {
return 8 + Integer.BYTES; // object reference, one int
}
if (o instanceof RubySymbol) {
return estimateMemory(((RubySymbol) o).asJavaString());
}
if (o instanceof RubyFloat) {
return Double.BYTES;
}

throw new IllegalArgumentException(
"Unsupported type encountered in estimateMemory: " + o.getClass().getName() +
". Please ensure all objects passed to estimateMemory are of supported types. " +
"Refer to the ConvertedMap.estimateMemory method for the list of supported types."
);
}
}
13 changes: 13 additions & 0 deletions logstash-core/src/main/java/org/logstash/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ private void initFailTag(final Object tag) {
* and needs to be converted to a list before appending to it.
* @param existing Existing Tag
* @param tag Tag to add
*
*/
private void scalarTagFallback(final String existing, final String tag) {
final List<String> tags = new ArrayList<>(2);
Expand Down Expand Up @@ -567,4 +568,16 @@ private static String getCanonicalFieldReference(final FieldReference field) {
return path.stream().collect(Collectors.joining("][", "[", "]"));
}
}

/**
* @return a byte size estimation of the event, based on the payloads carried by nested data structures,
* without considering the space needed by the JVM to represent the object itself.
*
* */
public long estimateMemory() {
long total = 0;
total += data.estimateMemory();
total += metadata.estimateMemory();
return total;
}
}
Loading