Skip to content

pq: reduce read contention when caught up #17765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

yaauie
Copy link
Member

@yaauie yaauie commented Jun 30, 2025

Release notes

  • Significantly improves throughput of the persisted queue

What does this PR do?

Eliminates per-event overhead from workers consuming from a persisted queue.

By keeping track of the lock-holding reader's demand (a quantity of events and a deadline), we can avoid waking up the reader for each event that is written to the queue, significantly reducing the time that the lock-holding writer is blocked by synchronous read operations. The lock-holding reader is awoken when its demand is met, has expired, or when the writer has rotated pages.

Why is it important/What is the impact to the user?

Significantly improves PQ throughput.

From my local tests (ARM / M3 Pro 12 CPU / SSD) with input { java_generator {} } output { sink {} }:

batch size unpatched this patch
125 260k 498k
2000 219k 516k

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • [ ] I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • [ ]

How to test this PR locally

  1. run a generator-to-sink pipeline:
    bin/logstash -e 'input { java_generator {} } output { sink {} }'
    
  2. observe throughput via API:
    curl --silent 'localhost:9600/_node/stats?pretty=true' | jq .flow.output_throughput
    

Related issues

By keeping track of the lock-holding reader's demand (a quantity of events and
a deadline), we can avoid waking up the reader for each event that is written
to the queue, significantly reducing the time that the lock-holding _writer_
is blocked by synchronous read operations. The lock-holding reader is awoken
when its demand is met, has expired, or when the writer has rotated pages.

| batch size | unpatched | this patch |
| ---------- | --------- | ---------- |
|        125 |      260k |       498k |
|       2000 |      219k |       516k |
Copy link
Contributor

🤖 GitHub comments

Expand to view the GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

Copy link
Contributor

mergify bot commented Jun 30, 2025

This pull request does not have a backport label. Could you fix it @yaauie? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-8./d is the label to automatically backport to the 8./d branch. /d is the digit.
  • If no backport is necessary, please add the backport-skip label

Copy link

Quality Gate passed Quality Gate passed

Issues
0 New issues
0 Fixed issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarQube

@elasticmachine
Copy link
Collaborator

💚 Build Succeeded

@kaisecheng kaisecheng self-requested a review July 1, 2025 13:11
Copy link
Contributor

@kaisecheng kaisecheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is smart solution. I am looking forward to seeing the performance boost on the benchmark graph :)

I left two comments regarding the lock behaviour.

@@ -64,6 +66,9 @@ public final class Queue implements Closeable {

protected volatile long unreadCount;

// the readDemand is a record of the currently-waiting-reader's demand and expiry
private volatile ReadDemand readDemand;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ReadDemand is operated inside the lock.lock() either in writer or readers. The value is promised to be up-to-date, so I think volatile can be removed.

Comment on lines +935 to +938
this.readDemand = new ReadDemand(deadlineMillis, elementsNeeded);

boolean unElapsed = this.notEmpty.awaitUntil(new Date(deadlineMillis));
this.readDemand = null;
Copy link
Contributor

@kaisecheng kaisecheng Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have big concern but wanna clarify the behaviour.

Reader releases the lock when notEmpty.awaitUntil() is called and do the waiting. Potentially, other readers could acquire the lock and update this.readDemand to a further delay deadlineMillis. Updating this.readDemand is not thread-safe and I believe it is not required to be. The worst case is the deadline extends to (timeout 50ms x number of threads).

Is my understanding correct? If so, can we add a comment to remind that the action is not atomic?

Copy link
Member Author

@yaauie yaauie Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other readers cannot acquire the lock because Queue#readSerializedBatch and Queue#nonBlockReadBatch (the only places where a reader can acquire the queue's lock) are synchronized.

[EDIT: I believe that Queue#nonBlockReadBatch is only referenced from tests; I cannot find other references to this, or to non_block_read_batch that could be called from the ruby-side.]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah.. you are right

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants