Skip to content

Implement BufferedTokenizer to return an iterable that can verify size limit for every token emitted #17229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jul 2, 2025

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Mar 5, 2025

Release notes

Reimplements BufferedTokenizer to leverage pure Java classes instead of use JRuby runtime's classes.

What does this PR do?

Reimplement the BufferedTokenizerExt in pure Java using an iterable, while the BufferedTokenizerExt is a shell around this new class.

The principal method extract, which dice the data by separator, instead of returning a RubyArray now return an Iterable which applies the size limit check.
The returned iterable is made of DataSplitter iterator which accumulates data in a StringBuilder and then dice the data by separator. This iterator moves the size check limit on the read side (before was on the write phase), however, it implements a logic to avoid accumulating segments when the size limit has been already passed on during the writing side.

To be compliant with some usage patterns which expect an empty? method to be present in the returned object from extract, like this, the extract method of the BufferedTokenizerExt return an Iterable adapter custom class with such method.

On the test side the code that tested BufferedTokenizerExt is moved to test the new BufferedTokenizer, so some test classes was renamed:

  • BufferedTokenizerExtTest mostly becomes BufferedTokenizerTest, but there is still a small BufferedTokenizerExtTest remained to test charset conversion use cases.
  • BufferedTokenizerExtWithDelimiterTest -> BufferedTokenizerWithDelimiterTest
  • BufferedTokenizerExtWithSizeLimitTest -> BufferedTokenizerWithSizeLimitTest
  • a test used to verify overflow condition givenTooLongInputExtractDoesntOverflow (code ref) was removed because not anymore applicable.

On the benchmarking side:

  • BufferedTokenizerExtBenchmark -> BufferedTokenizerBenchmark with the adaptation to the new tokenizer class.

As can be seen by benchmark reports in Logs section, this PR provide almost an improvement of 6x against the previous implementation.

Why is it important/What is the impact to the user?

As a developer I want the BufferedTokenizer implementation is simpler than the existing.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

How to test this PR locally

Run same tests as in:

bin/logstash -e "input { tcp { port => 1234 codec => line { charset => 'ISO8859-1' } } } output { stdout { codec => rubydebug } }"
  • using the following script to send £ sign in latin1
require 'socket'

hostname = 'localhost'
port = 1234

socket = TCPSocket.open(hostname, port)

text = "\xA3" # the £ symbol in ISO-8859-1 aka Latin-1
text.force_encoding("ISO-8859-1")
socket.puts(text)

socket.close

Related issues

Logs

Benchmarks

The benchmarks was updated to run for 3 seconds (instead of 100 ms) and to report in milliseconds (instead of nanoseconds).

baseline

Ran with:

./gradlew jmh -Pinclude="org.logstash.benchmark.BufferedTokenizerExtBenchmark.*"
Benchmark                                                               Mode  Cnt     Score   Error   Units
BufferedTokenizerExtBenchmark.multipleTokenPerFragment                 thrpt   10   553.913 ± 6.223  ops/ms
BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments  thrpt   10   222.815 ± 4.411  ops/ms
BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment                  thrpt   10  1549.777 ± 9.237  ops/ms

this PR

Ran with:

./gradlew jmh -Pinclude="org.logstash.benchmark.BufferedTokenizerBenchmark.*"
Benchmark                                                            Mode  Cnt     Score     Error   Units
BufferedTokenizerBenchmark.multipleTokenPerFragment                 thrpt   10  3308.716 ± 167.549  ops/ms
BufferedTokenizerBenchmark.multipleTokensCrossingMultipleFragments  thrpt   10  1245.505 ±  52.843  ops/ms
BufferedTokenizerBenchmark.onlyOneTokenPerFragment                  thrpt   10  9468.777 ± 182.184  ops/ms

Copy link
Contributor

mergify bot commented Mar 5, 2025

This pull request does not have a backport label. Could you fix it @andsel? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • backport-8.x is the label to automatically backport to the 8.x branch.

Copy link
Contributor

mergify bot commented Mar 5, 2025

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Mar 5, 2025
@andsel andsel removed the backport-8.x Automated backport to the 8.x branch with mergify label Mar 5, 2025
Copy link
Contributor

mergify bot commented Mar 5, 2025

This pull request does not have a backport label. Could you fix it @andsel? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • backport-8.x is the label to automatically backport to the 8.x branch.

Copy link
Contributor

mergify bot commented Mar 5, 2025

backport-8.x has been added to help with the transition to the new branch 8.x.
If you don't need it please use backport-skip label.

@mergify mergify bot added the backport-8.x Automated backport to the 8.x branch with mergify label Mar 5, 2025
@andsel andsel self-assigned this Mar 5, 2025
@andsel andsel changed the title Fix/bufftok to return itereator Implement BufferedTokenizer to return an iterable that can verify size limit for every token emitted Mar 6, 2025
@andsel andsel marked this pull request as ready for review March 12, 2025 14:29
@andsel andsel force-pushed the fix/bufftok_to_return_itereator branch 2 times, most recently from 1fcb6a8 to 3fe0d5a Compare March 31, 2025 10:53
@yaauie yaauie self-requested a review April 9, 2025 20:10
Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't fully-validated yet, but wanted to pass on some bits from my first pass:

  • BufferedTokenizerExt§IterableAdapterWithEmptyCheck#isEmpty is inverted
  • specs can be improved (yaauie@b524a67) with a custom matcher that validates both empty? (which maps to isEmpty) and entries (which is provided by the jruby shim extending java-Iterator with RubyEnumerable)

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is on the right track, and appreciate the clean-Java implementation.

While the previous implementations have not been thread-safe and
had undefined behaviour when contending threads invoked BufferedTokenizer#extract and/or BufferedTokenizer#flush, making the BufferedTokenizer#extract return a lazy iterator introduces some risk, as interacting with that iterator mutates the underlying buffer.

Looking at all of the current uses of FileWatch::BufferedTokenizer in core and plugins, I don't see this as a significant risk, but if we wanted to mitigate it we would need to synchronize all of the methods on BufferedTokenizer§DataSplitter that deal with mutable state.


I've added some notes about reducing overhead, correctly reporting when the buffer is non-empty with unprocessed bytes, and clearing the accumulator during a flush operation.

@andsel andsel force-pushed the fix/bufftok_to_return_itereator branch from 3fe0d5a to 9741517 Compare April 10, 2025 15:33
@andsel andsel requested a review from yaauie April 14, 2025 08:45
Copy link

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that emitting a lazy Iterable<String> is beneficial, especially since we do not enforce the size limit of the accumulator until read-time. Making the returned object be lazy causes us to break the encapsulation in surprising ways.

}

public void append(String data) {
accumulator.append(data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 it appears as though we have moved our size limit to be entirely a read-time concern, and it will only fire when the separator is observed. If we never observe the separator, we will not cut off the input until we hit OOM.

Consider the case where a caller is passing in chunks, and the sequence of chunks between separators far exceeds the size limit:

    @Test
    public void givenBuffersWithoutTerminatorsBeyondSizeLimit() {
        final BufferedTokenizer sut2 = new BufferedTokenizer("\n", 1000);
        for (int i = 0; i < 100000; i++) {
            Iterable<String> extract = sut2.extract("this is a long string that does not include the separator!");
            assertFalse(extract.iterator().hasNext());
        }
        // by this point we have hundreds of times the limit in the accumulator
        Iterator<String> iterator = sut2.extract("\n").iterator();
        assertTrue(iterator.hasNext());
        assertThrows(IllegalStateException.class, () -> iterator.next());
    }

Copy link
Contributor Author

@andsel andsel May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is #17293 which address the OOM aspect on write and should apply on top of this. In particular the commit 71daf70 moves the size check on write side, we can merge that PR is this one and have one single PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this PR can be merged unless it addresses the OOM case from a write perspective, and it appears to me that #17293 and the referenced 71daf70 still rely on the iterator being read in order to trigger subsequent writes throwing bytes away.

I'm still really curious about what specifically we are trying to solve by breaking the encapsulation to return a lazily-executed iterator from extract. Are we trying to make the buffer resumable, so that it won't emit any one token larger than the limit but can resume once it has been given the separator? Is the problem the individual tokens being too big, or is the problem that the buffer can grow unbounded across multiple writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it appears to me that #17293 and the referenced 71daf70 still rely on the iterator being read in order to trigger subsequent writes throwing bytes away.

The PR #17293 propose the following addition to the DataSplitter.append method (https://github.com/elastic/logstash/pull/17293/files#diff-fc6ecd367c34cf9413b399eb6965f79bd88f49b305d3d2fb16a77aaae83cf315R109-R111):

if (!data.contains(separator) && dropNextPartialFragments) {
    return;
}

It uses a flag that force the append method to drop next segments till a segment with a separator is presented.
PR #17293 switches the size limit check from read side to also the write side.

I'm still really curious about what specifically we are trying to solve by breaking the encapsulation to return a lazily-executed iterator from extract

the encapsulation break is an undesired side effect, the original objectives of this PR were:

  • simplify the chunking code
  • avoid to use Ruby classes
  • being able to recognise size limit violations when the offending token is not the first in analysed fragment

Are we trying to make the buffer resumable, so that it won't emit any one token larger than the limit but can resume once it has been given the separator?

Using an iterator to decode an unbounded and undefined sequence of tokens appeared more natural than returning a string array, plus if the offending token is in the middle of the fragment, the desire is to process the preceding and following tokens, getting notified only when the problematic token is encountered.

Is the problem the individual tokens being too big, or is the problem that the buffer can grow unbounded across multiple writes?

with the accumulator and the iterator these are pretty the same case.

}

public String flush() {
final String flushed = accumulator.substring(currentIdx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that we can accumulate many times the size limit, and a flush yields us a string that is many times the size limit.

    @Test
    public void givenBuffersWithoutTerminatorsBeyondSizeLimitFlush() {
        final BufferedTokenizer sut2 = new BufferedTokenizer("\n", 1000);
        for (int i = 0; i < 100000; i++) {
            Iterable<String> extract = sut2.extract("this is a long string that does not include the separator!");
            assertFalse(extract.iterator().hasNext());
        }
        // by this point we have hundreds of times the limit in the accumulator
        assertThrows(IllegalStateException.class, () -> sut2.flush());
    }

@andsel andsel requested a review from yaauie May 21, 2025 14:39
@andsel andsel force-pushed the fix/bufftok_to_return_itereator branch from c25cbc4 to c0342f6 Compare June 18, 2025 08:15
@andsel
Copy link
Contributor Author

andsel commented Jun 19, 2025

Hi @yaauie with the latest changes to this PR, now the append method actively checks if the accumulator is accumulating fragments passed the sizeLimit(if sizeLimit is set). However, the sizeLimit check and consequent exception throwing is still on the read side, in the iterator returned to consume the tokens.
If you can , please 🙏 review it.

Copy link
Member

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for letting this review drag on.

You've addressed all of my concerns.

LGTM

andsel added 24 commits July 2, 2025 14:29
…, but an OOM error is thrown from JDK libraries if an int overflow happens.
- specs improved (yaauie/logstash@b524a67) with a custom matcher that validates both `empty?` (which maps to isEmpty) and `entries` (which is provided by the jruby shim extending java-Iterator with RubyEnumerable)
… DataSplitter with synchornized so that can be used in multithreaded contexts
…en also unterminated token parts are present in the buffer
…ite side also when there isn't yet done any read
…int) then the indexOf clamp it at least at 0. This test is useful only when the first delimiter is not yet found
@andsel andsel force-pushed the fix/bufftok_to_return_itereator branch from 4f40934 to 1162882 Compare July 2, 2025 12:44
Copy link

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

History

cc @andsel

@andsel andsel merged commit ed40220 into elastic:main Jul 2, 2025
10 checks passed
@andsel andsel removed the backport-8.x Automated backport to the 8.x branch with mergify label Jul 2, 2025
Copy link
Contributor

mergify bot commented Jul 2, 2025

This pull request does not have a backport label. Could you fix it @andsel? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Protect new implementation of BufferedTokenizer against OOM BufferedTokenizerExt applies sizeLimit check only of first token of input fragment
3 participants