Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dataobj): predicate pushdown metadata label filters #16846

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions pkg/dataobj/logs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type LogsReader struct {

buf []dataset.Row

reader *dataset.Reader
columns []dataset.Column
columnDesc []*logsmd.ColumnDesc
reader *dataset.Reader
columns []dataset.Column

sectionInfo *filemd.SectionInfo
columnDesc []*logsmd.ColumnDesc
}

// NewLogsReader creates a new LogsReader that reads from the logs section of
Expand Down Expand Up @@ -136,31 +138,64 @@ func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
return n, nil
}

func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()
sec, err := r.findSection(ctx)
// Columns returns the column descriptions for the logs section.
func (r *LogsReader) Columns(ctx context.Context) ([]*logsmd.ColumnDesc, error) {
if r.columnDesc != nil {
return r.columnDesc, nil
}

si, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
return nil, fmt.Errorf("finding section: %w", err)
}

columnDescs, err := dec.Columns(ctx, sec)
columnDesc, err := r.obj.dec.LogsDecoder().Columns(ctx, si)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
return nil, fmt.Errorf("reading columns: %w", err)
}

r.sectionInfo = si
r.columnDesc = columnDesc

return r.columnDesc, nil
}

func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()

// sectionInfo and columnDesc could be populated by a previous call to Columns method.
// avoid re-reading them.
if r.sectionInfo == nil {
si, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}

r.sectionInfo = si
}

if r.columnDesc == nil {
columnDescs, err := dec.Columns(ctx, r.sectionInfo)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}

r.columnDesc = columnDescs
}

dset := encoding.LogsDataset(dec, sec)
dset := encoding.LogsDataset(dec, r.sectionInfo)
columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}

// r.predicate doesn't contain mappings of stream IDs; we need to build
// that as a separate predicate and AND them together.
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs)
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, r.columnDesc)
if r.predicate != nil {
predicate = dataset.AndPredicate{
Left: predicate,
Right: translateLogsPredicate(r.predicate, columns, columnDescs),
Right: translateLogsPredicate(r.predicate, columns, r.columnDesc),
}
}

Expand All @@ -178,7 +213,6 @@ func (r *LogsReader) initReader(ctx context.Context) error {
r.reader.Reset(readerOpts)
}

r.columnDesc = columnDescs
r.columns = columns
r.ready = true
return nil
Expand Down
110 changes: 110 additions & 0 deletions pkg/dataobj/predicate.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package dataobj

import (
"fmt"
"strings"
"time"
)

type (
// Predicate is an expression used to filter entries in a data object.
Predicate interface {
isPredicate()
String() string
}

// StreamsPredicate is a [Predicate] that can be used to filter streams in a
Expand Down Expand Up @@ -62,13 +65,15 @@ type (
// predicates.
LabelFilterPredicate struct {
Name string
Desc string // Description of the filter for debugging.
Keep func(name, value string) bool
}

// A LogMessageFilterPredicate is a [LogsPredicate] that requires the log message
// of the entry to pass a Keep function.
LogMessageFilterPredicate struct {
Keep func(line []byte) bool
Desc string // Description of the filter for debugging.
}

// A MetadataMatcherPredicate is a [LogsPredicate] that requires a metadata
Expand All @@ -86,6 +91,7 @@ type (
// predicates.
MetadataFilterPredicate struct {
Key string
Desc string // Description of the filter for debugging.
Keep func(key, value string) bool
}
)
Expand All @@ -109,3 +115,107 @@ func (LabelFilterPredicate) predicateKind(StreamsPredicate) {}
func (MetadataMatcherPredicate) predicateKind(LogsPredicate) {}
func (MetadataFilterPredicate) predicateKind(LogsPredicate) {}
func (LogMessageFilterPredicate) predicateKind(LogsPredicate) {}

func (p AndPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("(")
sb.WriteString(p.Left.String())
sb.WriteString(" AND ")
sb.WriteString(p.Right.String())
sb.WriteString(")")
return sb.String()
}

func (p OrPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("(")
sb.WriteString(p.Left.String())
sb.WriteString(" OR ")
sb.WriteString(p.Right.String())
sb.WriteString(")")
return sb.String()
}

func (p NotPredicate[P]) String() string {
var sb strings.Builder
sb.WriteString("NOT(")
sb.WriteString(p.Inner.String())
sb.WriteString(")")
return sb.String()
}

func (p TimeRangePredicate[P]) String() string {
var sb strings.Builder

// Use standard mathematical interval notation
sb.WriteString("TimeRange")
if p.IncludeStart {
sb.WriteString("[")
} else {
sb.WriteString("(")
}

sb.WriteString(p.StartTime.Format(time.RFC3339))
sb.WriteString(", ")
sb.WriteString(p.EndTime.Format(time.RFC3339))

if p.IncludeEnd {
sb.WriteString("]")
} else {
sb.WriteString(")")
}

return sb.String()
}

func (p LabelMatcherPredicate) String() string {
var sb strings.Builder
sb.WriteString("Label(")
sb.WriteString(p.Name)
sb.WriteString("=")
sb.WriteString(p.Value)
sb.WriteString(")")
return sb.String()
}

func (p LabelFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("LabelFilter(")
sb.WriteString(p.Name)
if p.Desc != "" {
sb.WriteString(fmt.Sprintf(", description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}

func (p LogMessageFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("LogMessageFilter(")
if p.Desc != "" {
sb.WriteString(fmt.Sprintf("description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}

func (p MetadataMatcherPredicate) String() string {
var sb strings.Builder
sb.WriteString("Metadata(")
sb.WriteString(p.Key)
sb.WriteString("=")
sb.WriteString(p.Value)
sb.WriteString(")")
return sb.String()
}

func (p MetadataFilterPredicate) String() string {
var sb strings.Builder
sb.WriteString("MetadataFilter(")
sb.WriteString(p.Key)
if p.Desc != "" {
sb.WriteString(fmt.Sprintf(", description=%q", p.Desc))
}
sb.WriteString(")")
return sb.String()
}
108 changes: 108 additions & 0 deletions pkg/dataobj/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,3 +489,111 @@ func evaluateRecordPredicate(p dataset.Predicate, ts time.Time) bool {
panic("unexpected predicate")
}
}

func TestPredicateString(t *testing.T) {
// Test time values
startTime := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
endTime := time.Date(2023, 1, 2, 0, 0, 0, 0, time.UTC)
testCases := []struct {
name string
pred Predicate
expected string
}{
{
name: "LabelMatcherPredicate",
pred: LabelMatcherPredicate{Name: "app", Value: "frontend"},
expected: "Label(app=frontend)",
},
{
name: "LabelFilterPredicate",
pred: LabelFilterPredicate{Name: "env", Desc: `env=~"(prod|dev)"`},
expected: `LabelFilter(env, description="env=~\"(prod|dev)\"")`,
},
{
name: "MetadataMatcherPredicate",
pred: MetadataMatcherPredicate{Key: "trace_id", Value: "abc123"},
expected: "Metadata(trace_id=abc123)",
},
{
name: "MetadataFilterPredicate",
pred: MetadataFilterPredicate{Key: "span_id", Desc: `span_id=~"123"`},
expected: `MetadataFilter(span_id, description="span_id=~\"123\"")`,
},
{
name: "LogMessageFilterPredicate",
pred: LogMessageFilterPredicate{Desc: `line=~"error"`},
expected: `LogMessageFilter(description="line=~\"error\"")`,
},
{
name: "TimeRangePredicate - inclusive start, exclusive end",
pred: TimeRangePredicate[StreamsPredicate]{
StartTime: startTime,
EndTime: endTime,
IncludeStart: true,
IncludeEnd: false,
},
expected: "TimeRange[2023-01-01T00:00:00Z, 2023-01-02T00:00:00Z)",
},
{
name: "TimeRangePredicate - exclusive start, inclusive end",
pred: TimeRangePredicate[StreamsPredicate]{
StartTime: startTime,
EndTime: endTime,
IncludeStart: false,
IncludeEnd: true,
},
expected: "TimeRange(2023-01-01T00:00:00Z, 2023-01-02T00:00:00Z]",
},
{
name: "NotPredicate",
pred: NotPredicate[StreamsPredicate]{
Inner: LabelMatcherPredicate{Name: "app", Value: "frontend"},
},
expected: "NOT(Label(app=frontend))",
},
{
name: "AndPredicate",
pred: AndPredicate[StreamsPredicate]{
Left: LabelMatcherPredicate{Name: "app", Value: "frontend"},
Right: LabelMatcherPredicate{Name: "env", Value: "prod"},
},
expected: "(Label(app=frontend) AND Label(env=prod))",
},
{
name: "OrPredicate",
pred: OrPredicate[StreamsPredicate]{
Left: LabelMatcherPredicate{Name: "app", Value: "frontend"},
Right: LabelMatcherPredicate{Name: "app", Value: "backend"},
},
expected: "(Label(app=frontend) OR Label(app=backend))",
},
{
name: "Complex nested predicate",
pred: OrPredicate[StreamsPredicate]{
Left: AndPredicate[StreamsPredicate]{
Left: LabelMatcherPredicate{Name: "app", Value: "service"},
Right: LabelMatcherPredicate{Name: "env", Value: "prod"},
},
Right: AndPredicate[StreamsPredicate]{
Left: LabelFilterPredicate{Name: "region", Desc: `region=~"us-west"`},
Right: TimeRangePredicate[StreamsPredicate]{
StartTime: startTime,
EndTime: endTime,
IncludeStart: true,
IncludeEnd: false,
},
},
},
expected: `((Label(app=service) AND Label(env=prod)) OR (LabelFilter(region, description="region=~\"us-west\"") AND TimeRange[2023-01-01T00:00:00Z, 2023-01-02T00:00:00Z)))`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.pred.String()
if result != tc.expected {
t.Errorf("Expected: %s, Got: %s", tc.expected, result)
}
})
}
}
Loading