-
Notifications
You must be signed in to change notification settings - Fork 5k
Add byte-based ingestion limits to the queue and output #39776
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
faec
wants to merge
121
commits into
elastic:main
Choose a base branch
from
faec:queue-byte-limits
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
121 commits
Select commit
Hold shift + click to select a range
a4019ec
cleanup
faec a3d3757
cleanups
faec 597e0a5
break input sources up into separate helper functions
faec 0df748a
finish helper function split
faec 4b70900
rewrite the sqsReader main loop
faec 90d9e24
simplify sqsReader loop
faec 5f94e9b
adjust variable names
faec b797261
remove unused parameter
faec 88f3980
createS3Lister -> createS3Poller
faec 9f32df6
remove unused error checks
faec 48ec82a
cleanup
faec 58e084a
make a wrapper for v2.Canceler that doesn't use an extra goroutine
faec 1974f8f
remove unused parameter
faec 646374c
cleanup
faec a43cae6
remove redundant helper
faec f46ef06
adjust variable names
faec d9be04b
remove extra index indirection in state lookup
faec 5e1fbcc
remove redundant sync.Map
faec c16a22f
merge redundant state maps
faec f07915a
remove redundant state map
faec 0f483a3
simplify s3Poller worker handling
faec 8916d91
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec a8cb6bd
simplify waitgroup handling / unused errors
faec 78a7db4
clean up context handling
faec edc1bd3
adjust delay timer
faec 1497be4
remove unused struct fields
faec a3e0dc8
cleanup
faec 219e857
Refactor cloudwatch worker task allocation
faec 977a0d3
add unit tests for cloudwatchPoller.receive
faec 71134f9
Merge branch 'main' of github.com:elastic/beats into cloudwatch-fix
faec 6cf5506
update changelog
faec ff24571
make check
faec 5ec8a86
Merge branch 'cloudwatch-fix' into awss3-cleanup
faec 2a6abb8
Remove unused custom semaphore helper
faec 12a2a3c
cleanups in input.go
faec dd29fa0
revert unintentional return value change
faec 4956db9
Concurrency / error handling fixes in awss3
faec fc641e1
give the registry accessor its own mutex
faec 4a9cb60
update tests
faec 959d557
Merge branch 'main' of github.com:elastic/beats into s3-concurrency-fix
faec 3d93d22
make check
faec 7d6369f
lint
faec b4b5b28
lint
faec 45619e3
Merge branch 'main' of github.com:elastic/beats into s3-concurrency-fix
faec e88be00
Merge branch 's3-concurrency-fix' of github.com:faec/beats into awss3…
faec 1308a2d
Merge branch 'main' into s3-concurrency-fix
faec 0abf663
Merge branch 's3-concurrency-fix' into awss3-cleanup
faec 942ae03
cleaning up context use
faec e84471b
Merge branch 'main' into s3input-cleanup
faec 0289604
Merge branch 's3input-cleanup' into awss3-cleanup
faec 2c084bb
splitting S3 and SQS into distinct inputs internally
faec dbe4691
splitting awss3 into two input objects
faec 73d1465
Merge branch 'main' of github.com:elastic/beats into s3input-cleanup
faec ad7d342
Merge branch 's3input-cleanup' into awss3-cleanup
faec e05c45d
reorganize {s3,sqs}.go by adding {s3,sqs}_input.go for the code speci…
faec 54f0a87
clean up sqs helpers
faec 122ee8c
Merge branch 'main' into awss3-cleanup
faec d396457
fix merge
faec be54ac7
update tests
faec 568d2b0
merge sqsReaderInput and sqsReader
faec 383c111
get tests building again
faec 4b2ea11
remove redundant fields
faec de36816
more reorganization
faec 3117334
organizing
faec 6b43ac5
reordering code
faec d38af54
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec 3712af4
clean up states initialization
faec 894ba4c
remove unused helper
faec 31f3b95
working on test updates
faec 7d12f0a
fix benchmark tests
faec f1b7761
updating unit tests
faec fa22239
fix remaining tests
faec e253681
remove unused debug parameter
faec 5b922df
Merge branch 'main' of github.com:elastic/beats into awss3-cleanup
faec 1fef199
remove commented code
faec 800fc73
move helper function
faec 1694f0d
clean up aws client config modifiers
faec 63be523
reorder helper functions
faec 1bae757
reorder helper functions
faec 939c38f
update comments
faec b032106
move log creation earlier
faec 0995921
update comments
faec 5d9f731
make check
faec a8323a4
Working on queue byte limits
faec 8ab68ed
Fill out more of the byte limits API, remove EntryID
faec 60e1c2c
convert circular buffer indices to a helper type storing the absolute…
faec 812f87d
Finish most byte bounds logic, add bulk_max_bytes to ES output
faec 3fcb875
remove drop on cancel option
faec 47c07a6
producer.Cancel -> producer.Close
faec f9d4c39
remove OnDrop callbacks
faec d398b46
remove internal cancellation helpers
faec bad2498
remove the queue's shipper metrics hook
faec eca723b
remove unused fields and producer cancel tests
faec c79cc3e
Merge branch 'remove-producer-cancel' into queue-metrics
faec bffd70d
fix merge
faec 7409be1
moving metric ownership around
faec 75ac0f4
plumbing for queue metrics
faec ca949b8
flesh out queue observer internals
faec a279862
update queue filled percent
faec d0f1939
Merge branch 'main' into queue-metrics
faec ee2fa2b
Merge branch 'main' into memqueue-byte-limits
faec f512643
Merge branch 'main' into remove-producer-cancel
faec 6d585bf
Merge branch 'remove-producer-cancel' into memqueue-byte-limits
faec 517ffe1
clean up shipper metric hooks
faec 4f6d02c
use the metrics observer from the memqueue
faec fd18f4e
configure gauges
faec 2f6ba9b
report queue metrics from the disk queue
faec ce2c287
fix disk queue initialization
faec e70c13b
outputObserver -> retryObserver
faec e6dbb2d
move queue draining logic into the queue
faec afa3793
shadow acked var the simple way
faec f8157ac
Merge branch 'queue-metrics' into memqueue-byte-limits
faec 5a158ec
memqueue uses event or byte limits, not both
faec 24c5564
fix byte vs event logic
faec 3da6d32
clean up FIFO handling
faec ac94a2b
replace batchList implementation with FIFO helper
faec d1ba6a1
remove unrelated test change
faec 91e4534
remove unused error
faec 5aa42eb
Merge branch 'main' of github.com:elastic/beats into queue-byte-limits
faec b585727
fix merge + tests
faec 3643a8f
Add docs / parameter checks
faec File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| // Licensed to Elasticsearch B.V. under one or more contributor | ||
| // license agreements. See the NOTICE file distributed with | ||
| // this work for additional information regarding copyright | ||
| // ownership. Elasticsearch B.V. licenses this file to you under | ||
| // the Apache License, Version 2.0 (the "License"); you may | ||
| // not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| package fifo | ||
|
|
||
| type FIFO[T any] struct { | ||
| first *node[T] | ||
| last *node[T] | ||
| } | ||
|
|
||
| type node[T any] struct { | ||
| next *node[T] | ||
| value T | ||
| } | ||
|
|
||
| func (f *FIFO[T]) Add(value T) { | ||
| newNode := &node[T]{value: value} | ||
| if f.first == nil { | ||
| f.first = newNode | ||
| } else { | ||
| f.last.next = newNode | ||
| } | ||
| f.last = newNode | ||
| } | ||
|
|
||
| func (f *FIFO[T]) Empty() bool { | ||
| return f.first == nil | ||
| } | ||
|
|
||
| // Return the first value (if present) without removing it from the queue. | ||
| // Returns a default value if the queue is empty. To recognize this case, | ||
| // check (*FIFO).Empty(). | ||
| func (f *FIFO[T]) First() T { | ||
| if f.first == nil { | ||
| var none T | ||
| return none | ||
| } | ||
| return f.first.value | ||
| } | ||
|
|
||
| // Remove the first entry in this FIFO and return it. | ||
| func (f *FIFO[T]) ConsumeFirst() T { | ||
| result := f.First() | ||
| f.Remove() | ||
| return result | ||
| } | ||
|
|
||
| // Append another FIFO queue to an existing one. Takes ownership of | ||
| // the given FIFO's contents. | ||
| func (f *FIFO[T]) Concat(list FIFO[T]) { | ||
| if list.Empty() { | ||
| return | ||
| } | ||
| if f.Empty() { | ||
| *f = list | ||
| return | ||
| } | ||
| f.last.next = list.first | ||
| f.last = list.last | ||
| } | ||
|
|
||
| // Remove the first entry in the queue. Does nothing if the FIFO is empty. | ||
| func (f *FIFO[T]) Remove() { | ||
| if f.first != nil { | ||
| f.first = f.first.next | ||
| if f.first == nil { | ||
| f.last = nil | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,7 +35,7 @@ func Fail(err error) (Group, error) { return Group{}, err } | |
| // instances. The first argument is expected to contain a queue | ||
| // config.Namespace. The queue config is passed to assign the queue | ||
| // factory when elastic-agent reloads the output. | ||
| func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { | ||
| func Success(cfg config.Namespace, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, clients ...Client) (Group, error) { | ||
| var q queue.QueueFactory | ||
| if cfg.IsSet() && cfg.Config().Enabled() { | ||
| switch cfg.Name() { | ||
|
|
@@ -60,7 +60,8 @@ func Success(cfg config.Namespace, batchSize, retry int, encoderFactory queue.En | |
| } | ||
| return Group{ | ||
| Clients: clients, | ||
| BatchSize: batchSize, | ||
| BatchEvents: batchEvents, | ||
| BatchBytes: batchBytes, | ||
| Retry: retry, | ||
| QueueFactory: q, | ||
| EncoderFactory: encoderFactory, | ||
|
|
@@ -80,12 +81,12 @@ func NetworkClients(netclients []NetworkClient) []Client { | |
| // The first argument is expected to contain a queue config.Namespace. | ||
| // The queue config is passed to assign the queue factory when | ||
| // elastic-agent reloads the output. | ||
| func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { | ||
| func SuccessNet(cfg config.Namespace, loadbalance bool, batchEvents, batchBytes, retry int, encoderFactory queue.EncoderFactory, netclients []NetworkClient) (Group, error) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it worth defining a wrapper struct for |
||
|
|
||
| if !loadbalance { | ||
| return Success(cfg, batchSize, retry, encoderFactory, NewFailoverClient(netclients)) | ||
| return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, NewFailoverClient(netclients)) | ||
| } | ||
|
|
||
| clients := NetworkClients(netclients) | ||
| return Success(cfg, batchSize, retry, encoderFactory, clients...) | ||
| return Success(cfg, batchEvents, batchBytes, retry, encoderFactory, clients...) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.