Skip to content

Commit 7a4a514

Browse files
committed
fixup! label filter predicate pushdown
1 parent fd42110 commit 7a4a514

File tree

3 files changed

+152
-143
lines changed

3 files changed

+152
-143
lines changed

pkg/dataobj/logs_reader.go

+17-10
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,33 @@ func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
138138
return n, nil
139139
}
140140

141+
// Columns returns the column descriptions for the logs section.
141142
func (r *LogsReader) Columns(ctx context.Context) ([]*logsmd.ColumnDesc, error) {
142143
if r.columnDesc != nil {
143144
return r.columnDesc, nil
144145
}
145146

146-
dec := r.obj.dec.LogsDecoder()
147147
si, err := r.findSection(ctx)
148148
if err != nil {
149149
return nil, fmt.Errorf("finding section: %w", err)
150150
}
151-
r.sectionInfo = si
152151

153-
columnDescs, err := dec.Columns(ctx, r.sectionInfo)
152+
columnDesc, err := r.obj.dec.LogsDecoder().Columns(ctx, si)
154153
if err != nil {
155154
return nil, fmt.Errorf("reading columns: %w", err)
156155
}
157156

158-
r.columnDesc = columnDescs
157+
r.sectionInfo = si
158+
r.columnDesc = columnDesc
159+
159160
return r.columnDesc, nil
160161
}
161162

162163
func (r *LogsReader) initReader(ctx context.Context) error {
163164
dec := r.obj.dec.LogsDecoder()
165+
166+
// sectionInfo and columnDesc could be populated by a previous call to Columns method.
167+
// avoid re-reading them.
164168
if r.sectionInfo == nil {
165169
si, err := r.findSection(ctx)
166170
if err != nil {
@@ -170,9 +174,13 @@ func (r *LogsReader) initReader(ctx context.Context) error {
170174
r.sectionInfo = si
171175
}
172176

173-
columnDescs, err := dec.Columns(ctx, r.sectionInfo)
174-
if err != nil {
175-
return fmt.Errorf("reading columns: %w", err)
177+
if r.columnDesc == nil {
178+
columnDescs, err := dec.Columns(ctx, r.sectionInfo)
179+
if err != nil {
180+
return fmt.Errorf("reading columns: %w", err)
181+
}
182+
183+
r.columnDesc = columnDescs
176184
}
177185

178186
dset := encoding.LogsDataset(dec, r.sectionInfo)
@@ -183,11 +191,11 @@ func (r *LogsReader) initReader(ctx context.Context) error {
183191

184192
// r.predicate doesn't contain mappings of stream IDs; we need to build
185193
// that as a separate predicate and AND them together.
186-
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs)
194+
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, r.columnDesc)
187195
if r.predicate != nil {
188196
predicate = dataset.AndPredicate{
189197
Left: predicate,
190-
Right: translateLogsPredicate(r.predicate, columns, columnDescs),
198+
Right: translateLogsPredicate(r.predicate, columns, r.columnDesc),
191199
}
192200
}
193201

@@ -205,7 +213,6 @@ func (r *LogsReader) initReader(ctx context.Context) error {
205213
r.reader.Reset(readerOpts)
206214
}
207215

208-
r.columnDesc = columnDescs
209216
r.columns = columns
210217
r.ready = true
211218
return nil

0 commit comments

Comments
 (0)