Skip to content
Merged
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12413,11 +12413,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.23.0
Version: v0.23.2-0.20250829071144-ae7d1fb67c70
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].0/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/[email protected].2-0.20250829071144-ae7d1fb67c70/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ replace (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption => github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0-elastic
github.com/apoydence/eachers => github.com/poy/eachers v0.0.0-20181020210610-23942921fe77 //indirect, see https://github.com/elastic/beats/pull/29780 for details.
github.com/dop251/goja => github.com/elastic/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/elastic/elastic-agent-libs => github.com/elastic/elastic-agent-libs v0.23.2-0.20250829071144-ae7d1fb67c70
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/elastic/fsnotify v1.6.1-0.20240920222514-49f82bdbc9e3
github.com/google/gopacket => github.com/elastic/gopacket v1.1.20-0.20241002174017-e8c5fda595e6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,8 @@ github.com/elastic/elastic-agent-autodiscover v0.10.0 h1:WJ4zl9uSfk1kHmn2B/0byQB
github.com/elastic/elastic-agent-autodiscover v0.10.0/go.mod h1:Nf3zh9FcJ9nTTswTwDTUAqXmvQllOrNliM6xmORSxwE=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.23.0 h1:xpMKkrw59QUYWUx/q2TMpJU+6vwB3Mw1VPhRPcGhBMo=
github.com/elastic/elastic-agent-libs v0.23.0/go.mod h1:xSeIP3NtOIT4N2pPS4EyURmS1Q8mK0lWZ8Wd1Du6q3w=
github.com/elastic/elastic-agent-libs v0.23.2-0.20250829071144-ae7d1fb67c70 h1:WK/tKSCtoZq9DZCEqrecDXalCEwJjbaP0EfNA0Y85KQ=
github.com/elastic/elastic-agent-libs v0.23.2-0.20250829071144-ae7d1fb67c70/go.mod h1:xSeIP3NtOIT4N2pPS4EyURmS1Q8mK0lWZ8Wd1Du6q3w=
github.com/elastic/elastic-agent-system-metrics v0.13.2 h1:R4ogKHESuWhWTtopnw/aHnBxxSZbxd7KHV4GefdwT2M=
github.com/elastic/elastic-agent-system-metrics v0.13.2/go.mod h1:ezM1kzDUT+vWXFh5oK8QXB/AEB0UoLWqWA8rkRicFFo=
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/dialchain/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand All @@ -40,7 +41,7 @@ func TLSLayer(cfg *tlscommon.TLSConfig, to time.Duration) Layer {
// This gets us the timestamp for when the TLS layer will start the handshake.
next = startTimerAfterDial(&timer, next)

dialer := transport.TLSDialer(next, cfg, to)
dialer := transport.TLSDialer(next, cfg, to, logp.NewLogger(""))
return afterDial(dialer, func(conn net.Conn) (net.Conn, error) {
tlsConn, ok := conn.(*cryptoTLS.Conn)
if !ok {
Expand Down
10 changes: 7 additions & 3 deletions libbeat/autodiscover/providers/kubernetes/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ func (c *Config) Validate() error {
if c.Scope != "node" && c.Scope != "cluster" {
return fmt.Errorf("invalid `scope` configured. supported values are `node` and `cluster`")
}
if c.Unique && c.Scope != "cluster" {
logp.L().Warnf("can only set `unique` when scope is `cluster`")
}

return nil
}

// checkUnsupportedParams checks if unsupported/deprecated/discouraged paramaters are set and logs a warning
func (c Config) checkUnsupportedParams(logger *logp.Logger) {
if c.Unique && c.Scope != "cluster" {
logger.Warn("can only set `unique` when scope is `cluster`")
}
}
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func AutodiscoverBuilder(
return nil, errWrap(err)
}

// log warning about any unsupported params
config.checkUnsupportedParams(logger)

client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
return nil, errWrap(err)
Expand Down
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
return nil, err
}

// log warning about any unsupported params
config.checkUnsupportedParams(logger)

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
Expand Down Expand Up @@ -158,7 +161,7 @@
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
annotations = rawAnn.(mapstr.M)

Check failure on line 164 in libbeat/autodiscover/providers/kubernetes/node.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
return nil, err
}

// log warning about any unsupported params
config.checkUnsupportedParams(logger)

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
if config.Scope == "node" {
Expand Down Expand Up @@ -212,7 +215,7 @@
defer p.crossUpdate.RUnlock()

p.logger.Debugf("Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")

Check failure on line 218 in libbeat/autodiscover/providers/kubernetes/pod.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
}

// OnUpdate handles events for pods that have been updated.
Expand All @@ -225,7 +228,7 @@

func (p *pod) unlockedUpdate(obj interface{}) {
p.logger.Debugf("Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")

Check failure on line 231 in libbeat/autodiscover/providers/kubernetes/pod.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
p.emit(obj.(*kubernetes.Pod), "start")
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func NewServiceEventer(
return nil, err
}

// log warning about any unsupported params
config.checkUnsupportedParams(logger)

watcher, err := kubernetes.NewNamedWatcher("service", client, &kubernetes.Service{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
Expand Down
4 changes: 2 additions & 2 deletions libbeat/common/transport/transptest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@
Proxy: proxy,
Timeout: timeout,
}
return transport.NewClient(cfg, "tcp", addr, 0)
return transport.NewClient(cfg, "tcp", addr, 0, logp.NewNopLogger())
}
}

Expand All @@ -189,7 +189,7 @@
TLS: tlsConfig,
Timeout: timeout,
}
return transport.NewClient(cfg, "tcp", addr, 0)
return transport.NewClient(cfg, "tcp", addr, 0, logp.NewNopLogger())
}
}

Expand Down Expand Up @@ -268,7 +268,7 @@
if err != nil {
t.Fatalf("failed to open key file for writing: %v", err)
}
pem.Encode(keyOut, pemBlock)

Check failure on line 271 in libbeat/common/transport/transptest/testing.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `pem.Encode` is not checked (errcheck)
keyOut.Close()

//Decrypt pem block to add it later to the certificate
Expand Down Expand Up @@ -303,7 +303,7 @@
if err != nil {
t.Fatalf("failed to open cert.pem for writing: %s", err)
}
pem.Encode(certOut, &pem.Block{Type: "CERTIFICATE", Bytes: caBytes})

Check failure on line 306 in libbeat/common/transport/transptest/testing.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `pem.Encode` is not checked (errcheck)
certOut.Close()

return nil
Expand Down
2 changes: 1 addition & 1 deletion libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (conn *Connection) Test(d testing.Driver) {
}

netDialer := transport.NetDialer(conn.Transport.Timeout)
tlsDialer := transport.TestTLSDialer(d, netDialer, tls, conn.Transport.Timeout)
tlsDialer := transport.TestTLSDialer(d, netDialer, tls, conn.Transport.Timeout, conn.log)
_, err = tlsDialer.Dial("tcp", address)
d.Fatal("dial up", err)
})
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func MakeLogstashClients(
for i, host := range hosts {
var client outputs.NetworkClient

conn, err := transport.NewClient(transp, "tcp", host, defaultPort)
conn, err := transport.NewClient(transp, "tcp", host, defaultPort, logger)
if err != nil {
return outputs.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func makeRedis(
}
}

conn, err := transport.NewClient(transp, "tcp", hostUrl.Host, defaultPort)
conn, err := transport.NewClient(transp, "tcp", hostUrl.Host, defaultPort, beat.Logger)
if err != nil {
return outputs.Fail(err)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
}

// try to create a dec from the using the codec config
dec, err := decoder.NewDecoder(p.readerConfig.Decoding, streamReader)
dec, err := decoder.NewDecoder(p.readerConfig.Decoding, streamReader, log)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/azureblobstorage/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
if err != nil {
return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err)
}
dec, err := decoder.NewDecoder(j.src.ReaderConfig.Decoding, r)
dec, err := decoder.NewDecoder(j.src.ReaderConfig.Decoding, r, j.log)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c config) Validate() error {
patterns = map[string]*regexp.Regexp{".": nil}
}
wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != ""
_, _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, lib.HTTPOptions{}, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump, false)
_, _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, lib.HTTPOptions{}, patterns, c.XSDs, logp.NewNopLogger(), nil, wantDump, false)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion x-pack/filebeat/input/cel/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,9 @@
</item>
</order>
`
io.ReadAll(r.Body)

Check failure on line 618 in x-pack/filebeat/input/cel/input_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `io.ReadAll` is not checked (errcheck)
r.Body.Close()
w.Write([]byte(text))

Check failure on line 620 in x-pack/filebeat/input/cel/input_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `w.Write` is not checked (errcheck)
})
server := httptest.NewServer(r)
config["resource.url"] = server.URL
Expand Down Expand Up @@ -749,7 +749,7 @@
msg = fmt.Sprintf(`{"error":"expected method was %#q"}`, http.MethodGet)
}

w.Write([]byte(msg))

Check failure on line 752 in x-pack/filebeat/input/cel/input_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `w.Write` is not checked (errcheck)
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -797,7 +797,7 @@
msg = fmt.Sprintf(`{"error":"expected method was %#q"}`, http.MethodGet)
}

w.Write([]byte(msg))

Check failure on line 800 in x-pack/filebeat/input/cel/input_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `w.Write` is not checked (errcheck)
},
want: []map[string]interface{}{
{
Expand Down Expand Up @@ -2173,7 +2173,6 @@
t.Fatalf("failed to remove failure_dumps directory: %v", err)
}

logp.TestingSetup()
for _, test := range inputTests {
t.Run(test.name, func(t *testing.T) {
if reason, skip := skipOnWindows[test.name]; runtime.GOOS == "windows" && skip {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/gcs/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (j *job) decode(ctx context.Context, r io.Reader, id string) error {
if err != nil {
return fmt.Errorf("failed to add gzip decoder to object: %s, with error: %w", j.object.Name, err)
}
dec, err := decoder.NewDecoder(j.src.ReaderConfig.Decoding, r)
dec, err := decoder.NewDecoder(j.src.ReaderConfig.Decoding, r, j.log)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (c config) Validate() error {
patterns = map[string]*regexp.Regexp{".": nil}
}
if c.Program != "" {
_, _, err = newProgram(context.Background(), c.Program, root, patterns, logp.L().Named("input.websocket"))
_, _, err = newProgram(context.Background(), c.Program, root, patterns, logp.NewNopLogger())
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/reader/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package decoder
import (
"fmt"
"io"

"github.com/elastic/elastic-agent-libs/logp"
)

// decoder is an interface for decoding data from an io.Reader.
Expand Down Expand Up @@ -35,7 +37,7 @@ type ValueDecoder interface {
// newDecoder creates a new decoder based on the codec type.
// It returns a decoder type and an error if the codec type is not supported.
// If the reader config codec option is not set, it returns a nil decoder and nil error.
func NewDecoder(cfg Config, r io.Reader) (Decoder, error) {
func NewDecoder(cfg Config, r io.Reader, logger *logp.Logger) (Decoder, error) {
codec := cfg.Codec

if cfg.Codec == nil {
Expand All @@ -51,7 +53,7 @@ func NewDecoder(cfg Config, r io.Reader) (Decoder, error) {
result, _ = NewCSVDecoder(*csv, r)
case cfg.Codec.Parquet != nil:
pqt := codec.Parquet
result, _ = NewParquetDecoder(*pqt, r)
result, _ = NewParquetDecoder(*pqt, r, logger)
default:
return nil, fmt.Errorf("unsupported config value: %v", cfg)
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/libbeat/reader/decoder/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"

"github.com/elastic/beats/v7/x-pack/libbeat/reader/parquet"
"github.com/elastic/elastic-agent-libs/logp"
)

// parquetDecoder is a decoder for parquet data.
Expand All @@ -18,11 +19,11 @@ type parquetDecoder struct {

// newParquetDecoder creates a new parquet decoder. It uses the libbeat parquet reader under the hood.
// It returns an error if the parquet reader cannot be created.
func NewParquetDecoder(config ParquetCodecConfig, r io.Reader) (Decoder, error) {
func NewParquetDecoder(config ParquetCodecConfig, r io.Reader, logger *logp.Logger) (Decoder, error) {
reader, err := parquet.NewBufferedReader(r, &parquet.Config{
ProcessParallel: config.ProcessParallel,
BatchSize: config.BatchSize,
})
}, logger)
if err != nil {
return nil, fmt.Errorf("failed to create parquet decoder: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions x-pack/libbeat/reader/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type BufferedReader struct {
// It will return an error if the parquet data stream cannot be read.
// Note: As io.ReadAll is used, the entire data stream would be read into memory, so very large data streams
// may cause memory bottleneck issues.
func NewBufferedReader(r io.Reader, cfg *Config) (*BufferedReader, error) {
log := logp.L().Named("reader.parquet")
func NewBufferedReader(r io.Reader, cfg *Config, logger *logp.Logger) (*BufferedReader, error) {
log := logger.Named("reader.parquet")

if cfg.BatchSize == 0 {
cfg.BatchSize = 1
Expand All @@ -52,7 +52,7 @@ func NewBufferedReader(r io.Reader, cfg *Config) (*BufferedReader, error) {
if err != nil {
return nil, fmt.Errorf("failed to create parquet reader: %w", err)
}
log.Debugw("created parquet reader")
log.Debug("created parquet reader")

// constructs a reader for converting to Arrow objects from an existing parquet file reader object
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{
Expand All @@ -62,14 +62,14 @@ func NewBufferedReader(r io.Reader, cfg *Config) (*BufferedReader, error) {
if err != nil {
return nil, fmt.Errorf("failed to create pqarrow parquet reader: %w", err)
}
log.Debugw("created pqarrow parquet reader")
log.Debug("created pqarrow parquet reader")

// constructs a record reader that is capable of reding entire sets of arrow records
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to create parquet record reader: %w", err)
}
log.Debugw("initialization process completed")
log.Debug("initialization process completed")

return &BufferedReader{
cfg: cfg,
Expand Down
8 changes: 5 additions & 3 deletions x-pack/libbeat/reader/parquet/parquet_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"path/filepath"
"sync"
"testing"

"github.com/elastic/elastic-agent-libs/logp"
)

// parquetFile is a struct that contains the name of the parquet
Expand Down Expand Up @@ -241,7 +243,7 @@ func BenchmarkReadParquet(b *testing.B) {

// readParquetFile reads entire parquet file
func readParquetFile(b *testing.B, cfg *Config, file *os.File) {
sReader, err := NewBufferedReader(file, cfg)
sReader, err := NewBufferedReader(file, cfg, logp.NewNopLogger())
if err != nil {
b.Fatalf("failed to init stream reader: %v", err)
}
Expand All @@ -260,7 +262,7 @@ func readParquetFile(b *testing.B, cfg *Config, file *os.File) {

// readParquetSingleRow reads only the first row of parquet files
func readParquetSingleRow(b *testing.B, cfg *Config, file *os.File) {
sReader, err := NewBufferedReader(file, cfg)
sReader, err := NewBufferedReader(file, cfg, logp.NewNopLogger())
if err != nil {
b.Fatalf("failed to init stream reader: %v", err)
}
Expand All @@ -279,7 +281,7 @@ func readParquetSingleRow(b *testing.B, cfg *Config, file *os.File) {

// constructBufferedReader constructs a stream reader for reading parquet files
func constructBufferedReader(b *testing.B, cfg *Config, file *os.File) {
sReader, err := NewBufferedReader(file, cfg)
sReader, err := NewBufferedReader(file, cfg, logp.NewNopLogger())
if err != nil {
b.Fatalf("failed to init stream reader: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions x-pack/libbeat/reader/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"encoding/json"
"fmt"
"io"
"math/rand"

Check failure on line 12 in x-pack/libbeat/reader/parquet/parquet_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

import 'math/rand' is not allowed from list 'main': superseded by math/rand/v2 (depguard)
"os"
"path/filepath"
"testing"
Expand All @@ -21,6 +21,7 @@
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
)

// all test files are read from/stored within the "testdata" directory
Expand Down Expand Up @@ -85,7 +86,7 @@

// readAndValidateParquetFile reads the parquet file and validates the data
func readAndValidateParquetFile(t *testing.T, cfg *Config, file *os.File, data map[string]bool) int {
sReader, err := NewBufferedReader(file, cfg)
sReader, err := NewBufferedReader(file, cfg, logptest.NewTestingLogger(t, ""))
if err != nil {
t.Fatalf("failed to init stream reader: %v", err)
}
Expand Down Expand Up @@ -235,7 +236,7 @@

// readAndCompareParquetFile reads the parquet file and compares the data with the input data
func readAndCompareParquetFile(t *testing.T, cfg *Config, file *os.File, data map[int]string, rows int, maxRowsToCompare int) {
sReader, err := NewBufferedReader(file, cfg)
sReader, err := NewBufferedReader(file, cfg, logp.NewNopLogger())
if err != nil {
t.Fatalf("failed to init stream reader: %v", err)
}
Expand Down
Loading