Skip to content

Commit bc3bfef

Browse files
committed
feat: add elastic search ingester module
Signed-off-by: Martin Chodur <[email protected]>
1 parent 38ce5db commit bc3bfef

File tree

14 files changed

+855
-6
lines changed

14 files changed

+855
-6
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## Unreleased
8+
### Added
9+
- [#82](https://github.com/seznam/slo-exporter/pull/82) New module `elasticSerachIngester`, for more info see [the docs](./docs/modules/elasticsearch_ingester.md)
810

911
## [v6.11.0] 2022-01-19
1012
### Added

cmd/slo_exporter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"github.com/seznam/slo-exporter/pkg/elasticsearch_ingester"
67
"runtime"
78

89
"github.com/gorilla/mux"
@@ -71,6 +72,8 @@ func moduleFactory(moduleName string, logger logrus.FieldLogger, conf *viper.Vip
7172
return prometheus_ingester.NewFromViper(conf, logger)
7273
case "kafkaIngester":
7374
return kafka_ingester.NewFromViper(conf, logger)
75+
case "elasticSearchIngester":
76+
return elasticsearch_ingester.NewFromViper(conf, logger)
7477
case "envoyAccessLogServer":
7578
return envoy_access_log_server.NewFromViper(conf, logger)
7679
case "eventMetadataRenamer":

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Only produces new events from the specified data source.
5858
- [`prometheusIngester`](modules/prometheus_ingester.md)
5959
- [`envoyAccessLogServer`](modules/envoy_access_log_server.md)
6060
- [`kafkaIngester`](modules/kafka_ingester.md)
61+
- [`elasticSearchIngester`](modules/elasticsearch_ingester.md)
6162
6263
##### Processors:
6364
Reads input events, does some processing based in the module type and produces modified event.
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Elasticsearch ingester
2+
3+
| | |
4+
|----------------|-------------------------|
5+
| `moduleName` | `elasticSearchIngester` |
6+
| Module type | `producer` |
7+
| Output event | `raw` |
8+
9+
This module allows you to real time follow all new documents using Elasticsearch query and compute SLO based on those.
10+
11+
Most common use case would be, if running in Kubernetes for example and already collecting logs using the ELK stack. You
12+
can simply hook to those data and compute SLO based on those application logs.
13+
14+
### Elastic search versions and support
15+
16+
Currently, only v7 is supported.
17+
18+
### How does it work
19+
20+
The module periodically(interval is configurable) queries(you can pass in custom Lucene query)
21+
the Elasticsearch index(needs to be specified) and for every hit creates a new event from the document. All the
22+
documents needs to have a field with a timestamp(field name and format configurable), so the module can sort them and
23+
store the last queried document timestamp. In next iteration it will use this timestamp as lower limit for the range
24+
query, so it does not miss any entries. Each query is limited by maximum batch size(configurable) co the requests are
25+
not huge.
26+
27+
In case you do not use structured logging and your logs are not indexed, you can specify name of the field with the raw
28+
log entry and regular expression with named groups which, if matched, will be propagated to the event metadata.
29+
30+
### moduleConfig
31+
32+
```yaml
33+
# OPTIONAL Debug logging
34+
debug: false
35+
# REQUIRED Version of the Elasticsearch API to be used, possible values: 7
36+
apiVersion: "v7"
37+
# REQUIRED List of addresses pointing to the Elasticsearch API endpoint to query
38+
addresses:
39+
- "https://foo.bar:4433"
40+
# OPTIONAL Skip verification of the server certificate
41+
insecureSkipVerify: false
42+
# OPTIONAL Timeout for the Elasticsearch API call
43+
timeout: "5s"
44+
# Enable/disable sniffing, autodiscovery of other nodes in Elasticsearch cluster
45+
sniffing: true
46+
# Enable/disable healtchecking of the Elasticsearch nodes
47+
healthchecks: true
48+
49+
# OPTIONAL username to use for authentication
50+
username: "foo"
51+
# OPTIONAL password to use for authentication
52+
password: "bar"
53+
# OPTIONAL Client certificate to be used for authentication
54+
clientCertFile: "./client.pem"
55+
# OPTIONAL Client certificate key to be used for authentication
56+
clientKeyFile: "./client-key.pem"
57+
# OPTIONAL Custom CA certificate to verify the server certificate
58+
clientCaCertFile: "./ca.cert"
59+
60+
# OPTIONAL Interval how often to check for new documents from the Elasticsearch API.
61+
# If the module was falling behind fo the amount of documents in the Elaseticsearch, it will
62+
# query it more often.
63+
interval: 5s
64+
# REQUIRED Name of the index to be queried
65+
index: "my-index"
66+
# OPTIONAL Additional Lucene query to filter the results
67+
query: "app_name: nginx AND namespace: test"
68+
# OPTIONAL Maximum number of documents to be read in one batch
69+
maxBatchSize: 100
70+
71+
# REQUIRED Document filed name containing a timestamp of the event
72+
timestampField: "@timestamp"
73+
# REQUIRED Golang time parse format to parse the timestamp from the timestampField
74+
timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples
75+
# OPTIONAL Name of the field in document containing the raw log message you want to parse
76+
rawLogField: "log"
77+
# OPTIONAL Regular expression to be used to parse the raw log message,
78+
# each matched named group will be propagated to the new event metadata
79+
rawLogParseRegexp: '^(?P<ip>[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P<time>.*?)\] "(?P<httpMethod>[^\s]+)?\s+(?P<httpPath>[^\?\s]+).*'
80+
# OPTIONAL If content of the named group match this regexp, it will be considered as an empty value.
81+
rawLogEmptyGroupRegexp: '^-$'
82+
```

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/hpcloud/tail v1.0.1-0.20180514194441-a1dbeea552b7
1616
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
1717
github.com/klauspost/compress v1.14.1 // indirect
18+
github.com/olivere/elastic/v7 v7.0.31
1819
github.com/prometheus/client_golang v1.11.0
1920
github.com/prometheus/client_model v0.2.0
2021
github.com/prometheus/common v0.31.1

go.sum

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
263263
github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
264264
github.com/aws/aws-sdk-go v1.40.37/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
265265
github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
266+
github.com/aws/aws-sdk-go v1.42.23/go.mod h1:gyRszuZ/icHmHAVE4gc/r+cfCmhA1AD+vqfWbgI+eHs=
266267
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
267268
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
268269
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.8.1/go.mod h1:CM+19rL1+4dFWnOQKwDc7H1KwXTz+h61oUSHyhV0b3o=
@@ -608,6 +609,7 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
608609
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
609610
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
610611
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
612+
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
611613
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
612614
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
613615
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
@@ -1179,6 +1181,7 @@ github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqx
11791181
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
11801182
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
11811183
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
1184+
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
11821185
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
11831186
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
11841187
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
@@ -1285,6 +1288,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN
12851288
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
12861289
github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
12871290
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
1291+
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
1292+
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
12881293
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
12891294
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
12901295
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
@@ -1433,6 +1438,8 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
14331438
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
14341439
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
14351440
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
1441+
github.com/olivere/elastic/v7 v7.0.31 h1:VJu9/zIsbeiulwlRCfGQf6Tzsr++uo+FeUgj5oj+xKk=
1442+
github.com/olivere/elastic/v7 v7.0.31/go.mod h1:idEQxe7Es+Wr4XAuNnJdKeMZufkA9vQprOIFck061vg=
14361443
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
14371444
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
14381445
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -1715,11 +1722,14 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
17151722
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
17161723
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
17171724
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
1718-
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
17191725
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
1726+
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
1727+
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
1728+
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
17201729
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
17211730
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
17221731
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
1732+
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
17231733
github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE=
17241734
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
17251735
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
@@ -2144,6 +2154,7 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
21442154
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21452155
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21462156
golang.org/x/net v0.0.0-20211101193420-4a448f8816b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
2157+
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21472158
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs=
21482159
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21492160
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package elasticsearch_client
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/sirupsen/logrus"
9+
"time"
10+
)
11+
12+
var (
13+
ElasticApiCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{
14+
Name: "elasticsearch_request_seconds",
15+
Help: "Duration histogram of elasticsearch api calls",
16+
Buckets: prometheus.ExponentialBuckets(0.1, 2, 5),
17+
}, []string{"api_version", "endpoint", "error"})
18+
)
19+
20+
type Config struct {
21+
Addresses []string
22+
Username string
23+
Password string
24+
Timeout time.Duration
25+
Healtchecks bool
26+
Sniffing bool
27+
InsecureSkipVerify bool
28+
ClientCertFile string
29+
ClientKeyFile string
30+
CaCertFile string
31+
Debug bool
32+
}
33+
34+
type Client interface {
35+
RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error)
36+
}
37+
38+
var clientFactory = map[string]func(config Config, logger logrus.FieldLogger) (Client, error){
39+
"v7": NewV7Client,
40+
}
41+
42+
func NewClient(version string, config Config, logger logrus.FieldLogger) (Client, error) {
43+
factoryFn, ok := clientFactory[version]
44+
if !ok {
45+
var supportedValues []string
46+
for k, _ := range clientFactory {
47+
supportedValues = append(supportedValues, k)
48+
}
49+
return nil, fmt.Errorf("unsupported Elasticsearch API version %s, only supported values are: %s", version, supportedValues)
50+
}
51+
return factoryFn(config, logger)
52+
}

pkg/elasticsearch_client/mock.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package elasticsearch_client
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
)
8+
9+
func NewClientMock(data []json.RawMessage, documentsLeft int, err error) Client {
10+
return &clientMock{
11+
data: data,
12+
documentsLeft: documentsLeft,
13+
error: err,
14+
}
15+
}
16+
17+
type clientMock struct {
18+
data []json.RawMessage
19+
documentsLeft int
20+
error error
21+
}
22+
23+
func (c *clientMock) RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error) {
24+
return c.data, c.documentsLeft, c.error
25+
}

pkg/elasticsearch_client/v7.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package elasticsearch_client
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"crypto/x509"
7+
"encoding/json"
8+
"fmt"
9+
elasticV7 "github.com/olivere/elastic/v7"
10+
"github.com/sirupsen/logrus"
11+
"io/ioutil"
12+
"net"
13+
"net/http"
14+
"time"
15+
)
16+
17+
func NewV7Client(config Config, logger logrus.FieldLogger) (Client, error) {
18+
var clientCertFn func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
19+
if config.ClientKeyFile != "" && config.ClientCertFile != "" {
20+
clientCertFn = func(_ *tls.CertificateRequestInfo) (*tls.Certificate, error) {
21+
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile)
22+
if err != nil {
23+
return nil, fmt.Errorf("failed to read client certs %s, %s: %w", config.ClientCertFile, config.ClientKeyFile, err)
24+
}
25+
return &cert, nil
26+
}
27+
}
28+
29+
var clientCaCertPool *x509.CertPool
30+
if config.CaCertFile != "" {
31+
cert, err := ioutil.ReadFile(config.CaCertFile)
32+
if err != nil {
33+
return nil, fmt.Errorf("failed to read clientCaCertFile %s: %w", config.CaCertFile, err)
34+
}
35+
clientCaCertPool = x509.NewCertPool()
36+
clientCaCertPool.AppendCertsFromPEM(cert)
37+
}
38+
httpClient := http.Client{
39+
Transport: &http.Transport{
40+
ResponseHeaderTimeout: config.Timeout,
41+
DialContext: (&net.Dialer{Timeout: config.Timeout}).DialContext,
42+
TLSClientConfig: &tls.Config{
43+
InsecureSkipVerify: config.InsecureSkipVerify,
44+
GetClientCertificate: clientCertFn,
45+
ClientCAs: clientCaCertPool,
46+
},
47+
},
48+
Timeout: config.Timeout,
49+
}
50+
opts := []elasticV7.ClientOptionFunc{
51+
elasticV7.SetHttpClient(&httpClient),
52+
elasticV7.SetErrorLog(logger),
53+
elasticV7.SetURL(config.Addresses...),
54+
elasticV7.SetScheme("https"),
55+
elasticV7.SetSniff(config.Sniffing),
56+
elasticV7.SetHealthcheck(config.Healtchecks),
57+
}
58+
if config.Debug {
59+
opts = append(opts, elasticV7.SetTraceLog(logger), elasticV7.SetInfoLog(logger))
60+
}
61+
if config.Username != "" || config.Password != "" {
62+
opts = append(opts, elasticV7.SetBasicAuth(config.Username, config.Password))
63+
}
64+
cli, err := elasticV7.NewClient(opts...)
65+
if err != nil {
66+
return nil, err
67+
}
68+
return &v7Client{client: cli, logger: logger}, nil
69+
}
70+
71+
type v7Client struct {
72+
logger logrus.FieldLogger
73+
client *elasticV7.Client
74+
}
75+
76+
func (v *v7Client) RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, int, error) {
77+
filters := []elasticV7.Query{
78+
elasticV7.NewRangeQuery(timestampField).From(since),
79+
}
80+
if query != "" {
81+
filters = append(filters, elasticV7.NewQueryStringQuery(query))
82+
}
83+
q := elasticV7.NewBoolQuery().Filter(filters...)
84+
start := time.Now()
85+
result, err := v.client.Search().Index(index).TimeoutInMillis(int(timeout.Milliseconds())).Size(size).Sort(timestampField, true).Query(q).Do(ctx)
86+
if err != nil {
87+
ElasticApiCall.WithLabelValues("v7", "rangeSearch", err.Error()).Observe(time.Since(start).Seconds())
88+
return nil, 0, err
89+
}
90+
ElasticApiCall.WithLabelValues("v7", "rangeSearch", "").Observe(time.Since(start).Seconds())
91+
v.logger.WithFields(logrus.Fields{"index": index, "hits": len(result.Hits.Hits), "duration_ms": result.TookInMillis, "query": query, "since": since}).Debug("elastic search range search call")
92+
msgs := make([]json.RawMessage, len(result.Hits.Hits))
93+
for i, h := range result.Hits.Hits {
94+
msgs[i] = h.Source
95+
}
96+
return msgs, int(result.TotalHits()) - len(result.Hits.Hits), err
97+
}

0 commit comments

Comments
 (0)