Skip to content

Commit b977bb1

Browse files
committed
feat: add elastic search ingester module
Signed-off-by: Martin Chodur <[email protected]>
1 parent 38ce5db commit b977bb1

File tree

11 files changed

+539
-6
lines changed

11 files changed

+539
-6
lines changed

cmd/slo_exporter.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"context"
55
"fmt"
6+
"github.com/seznam/slo-exporter/pkg/elasticsearch_ingester"
67
"runtime"
78

89
"github.com/gorilla/mux"
@@ -71,6 +72,8 @@ func moduleFactory(moduleName string, logger logrus.FieldLogger, conf *viper.Vip
7172
return prometheus_ingester.NewFromViper(conf, logger)
7273
case "kafkaIngester":
7374
return kafka_ingester.NewFromViper(conf, logger)
75+
case "elasticSearchIngester":
76+
return elasticsearch_ingester.NewFromViper(conf, logger)
7477
case "envoyAccessLogServer":
7578
return envoy_access_log_server.NewFromViper(conf, logger)
7679
case "eventMetadataRenamer":

docs/configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ Only produces new events from the specified data source.
5858
- [`prometheusIngester`](modules/prometheus_ingester.md)
5959
- [`envoyAccessLogServer`](modules/envoy_access_log_server.md)
6060
- [`kafkaIngester`](modules/kafka_ingester.md)
61+
- [`elasticSearchIngester`](modules/elasticsearch_ingester.md)
6162
6263
##### Processors:
6364
Reads input events, does some processing based in the module type and produces modified event.
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Elasticsearch ingester
2+
3+
| | |
4+
|----------------|-------------------------|
5+
| `moduleName` | `elasticSearchIngester` |
6+
| Module type | `producer` |
7+
| Output event | `raw` |
8+
9+
This module allows you to read events as a documents form Elastic search (assuming ELK stack).
10+
11+
### Elastic search versions and support
12+
Currently, only v7 is supported.
13+
14+
### moduleConfig
15+
```yaml
16+
addresses:
17+
- "https://foo.bar:4433"
18+
index: "*:sklik-production-search"
19+
clientCertFile: "./client.pem"
20+
clientKeyFile: "./client-key.pem"
21+
clientCaCertFile: "./ca.cert"
22+
debug: true
23+
insecureSkipVerify: false
24+
maxBatchSize: 100
25+
interval: 5s
26+
timeout: 5s
27+
timestampField: "@timestamp"
28+
timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples
29+
query: "app_name: nginx AND namespace: test"
30+
rawLogField: "log"
31+
rawLogParseRegexp: '^(?P<ip>[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P<time>.*?)\] "(?P<httpMethod>[^\s]+)?\s+(?P<httpPath>[^\?\s]+)(?P<httpQuery>[^\s]+)?\s+(?P<protocolVersion>[^\s]+)\s*" (?P<statusCode>\d+) \d+ "(?P<referer>.*?)" uag="(?P<userAgent>[^"]+)" "[^"]+" ua="[^"]+" rt="(?P<requestDuration>\d+(\.\d+)??)".*? cc="(?P<contentClass>[^"]*)".*? ignore-slo="(?P<ignoreSloHeader>[^"]*)" slo-domain="(?P<sloDomain>[^"]*)" slo-app="(?P<sloApp>[^"]*)" slo-class="(?P<sloClass>[^"]*)" slo-endpoint="(?P<sloEndpoint>[^"]*)" slo-result="(?P<sloResult>[^"]*)"'
32+
rawLogEmptyGroupRegexp: '^-$'
33+
```

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/seznam/slo-exporter
33
go 1.16
44

55
require (
6+
github.com/elastic/go-elasticsearch/v6 v6.8.10
67
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021
78
github.com/go-kit/kit v0.12.0
89
github.com/go-test/deep v1.0.6
@@ -15,6 +16,9 @@ require (
1516
github.com/hpcloud/tail v1.0.1-0.20180514194441-a1dbeea552b7
1617
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334
1718
github.com/klauspost/compress v1.14.1 // indirect
19+
github.com/olivere/elastic v6.2.37+incompatible // indirect
20+
github.com/olivere/elastic/v6 v6.2.1
21+
github.com/olivere/elastic/v7 v7.0.31
1822
github.com/prometheus/client_golang v1.11.0
1923
github.com/prometheus/client_model v0.2.0
2024
github.com/prometheus/common v0.31.1

go.sum

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z
263263
github.com/aws/aws-sdk-go v1.40.11/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
264264
github.com/aws/aws-sdk-go v1.40.37/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
265265
github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
266+
github.com/aws/aws-sdk-go v1.42.23/go.mod h1:gyRszuZ/icHmHAVE4gc/r+cfCmhA1AD+vqfWbgI+eHs=
266267
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
267268
github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4=
268269
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.8.1/go.mod h1:CM+19rL1+4dFWnOQKwDc7H1KwXTz+h61oUSHyhV0b3o=
@@ -568,6 +569,8 @@ github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7j
568569
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
569570
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
570571
github.com/efficientgo/tools/extkingpin v0.0.0-20210609125236-d73259166f20/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q=
572+
github.com/elastic/go-elasticsearch/v6 v6.8.10 h1:2lN0gJ93gMBXvkhwih5xquldszpm8FlUwqG5sPzr6a8=
573+
github.com/elastic/go-elasticsearch/v6 v6.8.10/go.mod h1:UwaDJsD3rWLM5rKNFzv9hgox93HoX8utj1kxD9aFUcI=
571574
github.com/elastic/go-sysinfo v1.0.1/go.mod h1:O/D5m1VpYLwGjCYzEt63g3Z1uO3jXfwyzzjiW90t8cY=
572575
github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0=
573576
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
@@ -608,6 +611,8 @@ github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
608611
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c/go.mod h1:WQX+afhrekY9rGK+WT4xvKSlzmia9gDoLYu4GGYGASQ=
609612
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
610613
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
614+
github.com/fortytw2/leaktest v1.2.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
615+
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
611616
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
612617
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
613618
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
@@ -1179,6 +1184,7 @@ github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqx
11791184
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
11801185
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
11811186
github.com/joncrlsn/dque v2.2.1-0.20200515025108-956d14155fa2+incompatible/go.mod h1:hDZb8oMj3Kp8MxtbNLg9vrtAUDHjgI1yZvqivT4O8Iw=
1187+
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
11821188
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
11831189
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0=
11841190
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
@@ -1285,6 +1291,8 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN
12851291
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
12861292
github.com/mailru/easyjson v0.7.1/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
12871293
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
1294+
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
1295+
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
12881296
github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE=
12891297
github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0=
12901298
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
@@ -1433,6 +1441,12 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
14331441
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
14341442
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
14351443
github.com/olekukonko/tablewriter v0.0.2/go.mod h1:rSAaSIOAGT9odnlyGlUfAJaoc5w2fSBUmeGDbRWPxyQ=
1444+
github.com/olivere/elastic v6.2.37+incompatible h1:UfSGJem5czY+x/LqxgeCBgjDn6St+z8OnsCuxwD3L0U=
1445+
github.com/olivere/elastic v6.2.37+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
1446+
github.com/olivere/elastic/v6 v6.2.1 h1:tZ2NZWoFCdFnuQg1q9JCyjN6YTczNF03tLj954ptqNc=
1447+
github.com/olivere/elastic/v6 v6.2.1/go.mod h1:OeCPPyGCIn9j7/1Dk+tGE7gsezYo9lsJIiHhZjT/qQ4=
1448+
github.com/olivere/elastic/v7 v7.0.31 h1:VJu9/zIsbeiulwlRCfGQf6Tzsr++uo+FeUgj5oj+xKk=
1449+
github.com/olivere/elastic/v7 v7.0.31/go.mod h1:idEQxe7Es+Wr4XAuNnJdKeMZufkA9vQprOIFck061vg=
14361450
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
14371451
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
14381452
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -1715,11 +1729,14 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
17151729
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
17161730
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
17171731
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
1718-
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
17191732
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
1733+
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
1734+
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
1735+
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
17201736
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
17211737
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
17221738
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
1739+
github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
17231740
github.com/snowflakedb/gosnowflake v1.3.4/go.mod h1:NsRq2QeiMUuoNUJhp5Q6xGC4uBrsS9g6LwZVEkTWgsE=
17241741
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
17251742
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
@@ -2144,6 +2161,7 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx
21442161
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21452162
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21462163
golang.org/x/net v0.0.0-20211101193420-4a448f8816b3/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
2164+
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21472165
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d h1:1n1fc535VhN8SYtD4cDUyNlfpAF2ROMM9+11equK3hs=
21482166
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
21492167
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package elasticsearch_ingester
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"github.com/prometheus/client_golang/prometheus"
7+
"time"
8+
)
9+
10+
var (
11+
elasticApiCall = prometheus.NewHistogramVec(prometheus.HistogramOpts{
12+
Name: "elasticsearch_request_seconds",
13+
Help: "Duration histogram of elasticsearch api calls",
14+
Buckets: prometheus.ExponentialBuckets(0.1, 2, 5),
15+
}, []string{"api_version", "endpoint", "error"})
16+
)
17+
18+
type document struct {
19+
timestamp time.Time
20+
fields map[string]string
21+
}
22+
23+
type elasticClient interface {
24+
RangeSearch(ctx context.Context, index, timestampField string, since time.Time, size int, query string, timeout time.Duration) ([]json.RawMessage, error)
25+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package elasticsearch_ingester
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/sirupsen/logrus"
9+
"go.uber.org/atomic"
10+
"regexp"
11+
"sync"
12+
"time"
13+
14+
tailer_module "github.com/seznam/slo-exporter/pkg/tailer"
15+
)
16+
17+
var (
18+
searchedDocuments = prometheus.NewCounter(prometheus.CounterOpts{
19+
Name: "searched_documents_total",
20+
Help: "How many documents were retrieved from the elastic search",
21+
})
22+
lastSearchTimestamp = prometheus.NewGauge(prometheus.GaugeOpts{
23+
Name: "last_document_timestamp_seconds",
24+
Help: "Timestamp of the last processed document, next fetch will read since this timestamp",
25+
})
26+
missingRawLogField = prometheus.NewCounter(prometheus.CounterOpts{
27+
Name: "missing_raw_log_filed_total",
28+
Help: "How many times defined raw log wasn't found in the document",
29+
})
30+
invalidrawLogFormat = prometheus.NewCounter(prometheus.CounterOpts{
31+
Name: "raw_log_invalid_format_total",
32+
Help: "How many times the raw log had invalid format",
33+
})
34+
missingTimestampField = prometheus.NewCounter(prometheus.CounterOpts{
35+
Name: "missing_timestamp_field_total",
36+
Help: "How many times the timestamp field was missing",
37+
})
38+
invalidTimestampFormat = prometheus.NewCounter(prometheus.CounterOpts{
39+
Name: "invalid_timestamp_format_total",
40+
Help: "How many times the timestamp field had invalid format",
41+
})
42+
)
43+
44+
func newTailer(logger logrus.FieldLogger, client elasticClient, index, timestampField, timestampFormat, rawLogField string, rawLogFormatRegexp, rawLogEmptyGroupRegexp *regexp.Regexp, query string, timeout time.Duration, maxBatchSize int) tailer {
45+
return tailer{
46+
client: client,
47+
index: index,
48+
timestampField: timestampField,
49+
timestampFormat: timestampFormat,
50+
rawLogField: rawLogField,
51+
rawLogFormatRegexp: rawLogFormatRegexp,
52+
rawLogEmptyGroupRegexp: rawLogEmptyGroupRegexp,
53+
lastTimestamp: time.Now(),
54+
lastTimestampMtx: sync.RWMutex{},
55+
maxBatchSize: maxBatchSize,
56+
timeout: timeout,
57+
query: query,
58+
logger: logger,
59+
}
60+
}
61+
62+
type tailer struct {
63+
client elasticClient
64+
index string
65+
timestampField string
66+
timestampFormat string
67+
rawLogField string
68+
rawLogFormatRegexp *regexp.Regexp
69+
rawLogEmptyGroupRegexp *regexp.Regexp
70+
query string
71+
lastTimestamp time.Time
72+
lastTimestampMtx sync.RWMutex
73+
maxBatchSize int
74+
timeout time.Duration
75+
logger logrus.FieldLogger
76+
77+
processing atomic.Bool
78+
}
79+
80+
func (t *tailer) newDocumentFromJson(data json.RawMessage) (document, error) {
81+
newDoc := document{
82+
timestamp: time.Now(),
83+
fields: map[string]string{},
84+
}
85+
86+
var fields map[string]interface{}
87+
err := json.Unmarshal(data, &fields)
88+
if err != nil {
89+
return newDoc, fmt.Errorf("unable to unmarshall document body: %w", err)
90+
}
91+
for k, v := range fields {
92+
newDoc.fields[k] = fmt.Sprint(v)
93+
}
94+
95+
if t.rawLogField != "" {
96+
rawLog, ok := newDoc.fields[t.rawLogField]
97+
if !ok {
98+
missingRawLogField.Inc()
99+
t.logger.WithField("document", newDoc.fields).Warnf("document missing the raw log field %s", t.rawLogField)
100+
} else {
101+
rawLogFields, err := tailer_module.ParseLine(t.rawLogFormatRegexp, t.rawLogEmptyGroupRegexp, rawLog)
102+
if err != nil {
103+
invalidrawLogFormat.Inc()
104+
t.logger.WithField("document", newDoc.fields).Warnf("document has invalid format of the raw log field %s", t.rawLogField)
105+
}
106+
for k, v := range rawLogFields {
107+
newDoc.fields[k] = v
108+
}
109+
}
110+
}
111+
112+
timeFiled, ok := newDoc.fields[t.timestampField]
113+
if !ok {
114+
missingTimestampField.Inc()
115+
t.logger.WithField("document", newDoc.fields).Warnf("document missing the timestamp field %s, using now instead", t.timestampField)
116+
return newDoc, nil
117+
} else {
118+
ts, err := time.Parse(t.timestampFormat, timeFiled)
119+
if err != nil {
120+
invalidTimestampFormat.Inc()
121+
t.logger.WithField("document", newDoc.fields).WithField("timestamp", timeFiled).Warnf("document has invalid timestamp field %s, using now instead", t.timestampField)
122+
return newDoc, nil
123+
}
124+
newDoc.timestamp = ts
125+
t.lastTimestamp = ts
126+
lastSearchTimestamp.Set(float64(t.lastTimestamp.Unix()))
127+
}
128+
return newDoc, nil
129+
}
130+
131+
func (t *tailer) run(ctx context.Context, interval time.Duration) chan document {
132+
ticker := time.NewTicker(interval)
133+
outChan := make(chan document, t.maxBatchSize)
134+
go func() {
135+
defer ticker.Stop()
136+
defer close(outChan)
137+
for {
138+
select {
139+
case <-ctx.Done():
140+
return
141+
case <-ticker.C:
142+
if t.processing.Load() {
143+
t.logger.Warnf("skipping scheduled query")
144+
continue
145+
}
146+
t.processing.Store(true)
147+
148+
jsonDocs, err := t.client.RangeSearch(ctx, t.index, t.timestampField, t.lastTimestamp, t.maxBatchSize, t.query, t.timeout)
149+
if err != nil {
150+
t.logger.WithFields(logrus.Fields{"error": err, "since": t.lastTimestamp}).Error("failed to search data from elastic search")
151+
continue
152+
}
153+
for _, jd := range jsonDocs {
154+
select {
155+
case <-ctx.Done():
156+
return
157+
default:
158+
newDoc, err := t.newDocumentFromJson(jd)
159+
if err != nil {
160+
t.logger.WithFields(logrus.Fields{"error": err, "document": jd}).Errorf("failed to read document")
161+
}
162+
searchedDocuments.Inc()
163+
outChan <- newDoc
164+
}
165+
}
166+
}
167+
t.processing.Store(false)
168+
}
169+
}()
170+
return outChan
171+
}

0 commit comments

Comments
 (0)