-
Vector version: 0.45.0 I am working on some stuff with vector to combine log messages into a single event, so I am using the reduce transform. The issue I have is that the logs don't have an easy "end" state. Being syslog they could be out of order, etc. The information that I do have is how many messages there should be, and the order of the specific event. I have meta information pulled out before they reach vector and it looks like {
"_extra": {"order": 1, "size": 9},
"message": "...",
} I was attempting to do an ends with by doing a check to see if all of the messages have been received, and then also a timeout group_by_key:
type: reduce
inputs:
- log_input
group_by:
- message_key
merge_strategies:
message: array
end_every_period_ms: 60000
ends_when:
type: vrl
source: (is_array(.message) && (to_int!(._extra.size)) == length!(.message)) || (is_string(.message) && to_int!(._extra.size) == 1) but after looking at the code yesterday. realized that the ends_with condition only gets the new event, not the combined data. Any other ideas on how I might achieve this? I attempted to do a shorter expire time, and then to a route to put things back in the queue to further group things unless complete or expired... but it does not like cyclical references... I have been able to get around this by doing an http sink and putting back into the queue if time limit not reached or doesn't have all of the messages sources:
wireless-kafka-file:
type: file
include:
- "/data/sample.json"
data_dir: "/data/data_dir"
ignore_checkpoints: true
http-server-requeue:
type: http_server
address: 127.0.0.1:8000
path: "/requeue"
decoding:
codec: json
transforms:
http-server-remap:
type: remap
inputs:
- http-server-requeue
source: |-
del(.path)
wireless:
type: remap
inputs:
- wireless-kafka-file
source: |-
. = parse_json!(.message)
data = parse_json!(.value)
del(.value)
. = merge!(., data)
.last_timestamp = to_unix_timestamp(now(), "seconds")
group_by_key:
type: reduce
inputs:
- wireless
group_by:
- message_key
merge_strategies:
message: array
last_timestamp: max
end_every_period_ms: 10000
check_all_messages_or_expired:
type: remap
inputs:
- group_by_key
source: |-
if is_array(.message) {
.message = flatten(array!(.message))
} else {
.message = [.message]
}
.has_all_or_expired = false
current_timestamp = to_unix_timestamp(now(), "seconds")
.last_timestamp = int!(.last_timestamp)
message_count = to_int!(._extra.size)
# if have all messages, mark complete
if message_count == length(.message) {
log("HAS ALL MESSAGES!", level: "error", rate_limit_secs: 0)
.has_all_or_expired = true
}
since_last_timestamp = current_timestamp - .last_timestamp
# if it's been 120 seconds since last message, set as expired.
if since_last_timestamp > 120 {
.has_all_or_expired = true
}
# if expired or complete, continue or route back into group_by_key using http sink and source
route_expired_or_complete:
type: route
inputs:
- check_all_messages_or_expired
reroute_unmatched: true
route:
expired-or-complete:
type: vrl
source: "bool(.has_all_or_expired) ?? false"
parse_messages:
type: remap
inputs:
- route_expired_or_complete.expired-or-complete
source: "...."
sinks:
stdout:
type: console
inputs:
- parse_messages
encoding:
codec: json
http-client-requeue:
type: http
inputs:
- route_expired_or_complete._unmatched
uri: http://localhost:8000/requeue
encoding:
codec: json
batch:
max_events: 1000 this works, however it breaks end to end acknowledgements. Would be nice to be able to use some sort of meta data or doing some merging before the ends_with is merged: https://github.com/vectordotdev/vector/blob/master/src/transforms/reduce/transform.rs#L264-L266 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
This is correct. It would be an interesting enhancement to apply predicates to the aggregate event. And IMO trivial to implement. Arguably, this component has a big and complex configuration area so maybe it is worth taking a step back and thinking about refactoring the UX to support this new end condition (vs just adding a new field). |
Beta Was this translation helpful? Give feedback.
This is correct. It would be an interesting enhancement to apply predicates to the aggregate event. And IMO trivial to implement. Arguably, this component has a big and complex configuration area so maybe it is worth taking a step back and thinking about refactoring the UX to support this new end condition (vs just adding a new field).