@@ -39,45 +39,34 @@ const (
3939 consumer = "consumer"
4040)
4141
42- type watcherMetrics struct {
43- recordsRead * prometheus.CounterVec
44- recordDecodeFails * prometheus.CounterVec
45- samplesSentPreTailing * prometheus.CounterVec
46- currentSegment * prometheus.GaugeVec
47- }
48-
49- var (
50- lrMetrics = NewLiveReaderMetrics (prometheus .DefaultRegisterer )
51- )
52-
42+ // fromTime returns a new millisecond timestamp from a time.
5343// This function is copied from prometheus/prometheus/pkg/timestamp to avoid adding vendor to TSDB repo.
54-
55- // FromTime returns a new millisecond timestamp from a time.
56- func FromTime (t time.Time ) int64 {
44+ func fromTime (t time.Time ) int64 {
5745 return t .Unix ()* 1000 + int64 (t .Nanosecond ())/ int64 (time .Millisecond )
5846}
5947
60- // func init() {
61- // prometheus.MustRegister(watcherRecordsRead)
62- // prometheus.MustRegister(watcherRecordDecodeFails)
63- // prometheus.MustRegister(watcherSamplesSentPreTailing)
64- // prometheus.MustRegister(watcherCurrentSegment)
65- // }
66-
67- type writeTo interface {
48+ type WriteTo interface {
6849 Append ([]record.RefSample ) bool
6950 StoreSeries ([]record.RefSeries , int )
7051 SeriesReset (int )
7152}
7253
54+ type watcherMetrics struct {
55+ recordsRead * prometheus.CounterVec
56+ recordDecodeFails * prometheus.CounterVec
57+ samplesSentPreTailing * prometheus.CounterVec
58+ currentSegment * prometheus.GaugeVec
59+ }
60+
7361// Watcher watches the TSDB WAL for a given WriteTo.
7462type Watcher struct {
7563 name string
76- writer writeTo
64+ writer WriteTo
7765 logger log.Logger
7866 walDir string
7967 lastCheckpoint string
8068 metrics * watcherMetrics
69+ readerMetrics * liveReaderMetrics
8170
8271 startTime int64
8372
@@ -144,22 +133,24 @@ func NewWatcherMetrics(reg prometheus.Registerer) *watcherMetrics {
144133}
145134
146135// NewWatcher creates a new WAL watcher for a given WriteTo.
147- func NewWatcher (logger log.Logger , metrics * watcherMetrics , name string , writer writeTo , walDir string ) * Watcher {
136+ func NewWatcher (reg prometheus. Registerer , logger log.Logger , name string , writer WriteTo , walDir string ) * Watcher {
148137 if logger == nil {
149138 logger = log .NewNopLogger ()
150139 }
151140
152- return & Watcher {
153- logger : logger ,
154- metrics : metrics ,
155- writer : writer ,
156- walDir : path .Join (walDir , "wal" ),
157- name : name ,
158- quit : make (chan struct {}),
159- done : make (chan struct {}),
141+ w := Watcher {
142+ logger : logger ,
143+ writer : writer ,
144+ metrics : NewWatcherMetrics (reg ),
145+ readerMetrics : NewLiveReaderMetrics (reg ),
146+ walDir : path .Join (walDir , "wal" ),
147+ name : name ,
148+ quit : make (chan struct {}),
149+ done : make (chan struct {}),
160150
161151 maxSegment : - 1 ,
162152 }
153+ return & w
163154}
164155
165156func (w * Watcher ) setMetrics () {
@@ -175,7 +166,7 @@ func (w *Watcher) setMetrics() {
175166// Start the Watcher.
176167func (w * Watcher ) Start () {
177168 w .setMetrics ()
178- level .Info (w .logger ).Log ("msg" , "starting WAL watcher" , "queue " , w .name )
169+ level .Info (w .logger ).Log ("msg" , "starting WAL watcher" , "consumer " , w .name )
179170
180171 go w .loop ()
181172}
@@ -200,7 +191,7 @@ func (w *Watcher) loop() {
200191
201192 // We may encourter failures processing the WAL; we should wait and retry.
202193 for ! isClosed (w .quit ) {
203- w .startTime = FromTime (time .Now ())
194+ w .startTime = fromTime (time .Now ())
204195 if err := w .run (); err != nil {
205196 level .Error (w .logger ).Log ("msg" , "error tailing WAL" , "err" , err )
206197 }
@@ -263,7 +254,7 @@ func (w *Watcher) run() error {
263254func (w * Watcher ) findSegmentForIndex (index int ) (int , error ) {
264255 refs , err := w .segments (w .walDir )
265256 if err != nil {
266- return - 1 , nil
257+ return - 1 , err
267258 }
268259
269260 for _ , r := range refs {
@@ -278,7 +269,7 @@ func (w *Watcher) findSegmentForIndex(index int) (int, error) {
278269func (w * Watcher ) firstAndLast () (int , int , error ) {
279270 refs , err := w .segments (w .walDir )
280271 if err != nil {
281- return - 1 , - 1 , nil
272+ return - 1 , - 1 , err
282273 }
283274
284275 if len (refs ) == 0 {
@@ -323,7 +314,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
323314 }
324315 defer segment .Close ()
325316
326- reader := NewLiveReader (w .logger , lrMetrics , segment )
317+ reader := NewLiveReader (w .logger , w . readerMetrics , segment )
327318
328319 readTicker := time .NewTicker (readPeriod )
329320 defer readTicker .Stop ()
@@ -448,11 +439,12 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
448439 dec record.Decoder
449440 series []record.RefSeries
450441 samples []record.RefSample
442+ send []record.RefSample
451443 )
452444
453445 for r .Next () && ! isClosed (w .quit ) {
454446 rec := r .Record ()
455- w .recordsReadMetric .WithLabelValues (Type (dec .Type (rec ))).Inc ()
447+ w .recordsReadMetric .WithLabelValues (recordType (dec .Type (rec ))).Inc ()
456448
457449 switch dec .Type (rec ) {
458450 case record .Series :
@@ -474,7 +466,6 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
474466 w .recordDecodeFailsMetric .Inc ()
475467 return err
476468 }
477- var send []record.RefSample
478469 for _ , s := range samples {
479470 if s .T > w .startTime {
480471 send = append (send , s )
@@ -483,6 +474,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
483474 if len (send ) > 0 {
484475 // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks).
485476 w .writer .Append (send )
477+ send = send [:0 ]
486478 }
487479
488480 case record .Tombstones :
@@ -498,7 +490,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
498490 return r .Err ()
499491}
500492
501- func Type (rt record.Type ) string {
493+ func recordType (rt record.Type ) string {
502494 switch rt {
503495 case record .Invalid :
504496 return "invalid"
@@ -538,7 +530,7 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {
538530 }
539531 defer sr .Close ()
540532
541- r := NewLiveReader (w .logger , lrMetrics , sr )
533+ r := NewLiveReader (w .logger , w . readerMetrics , sr )
542534 if err := w .readSegment (r , index , false ); err != io .EOF && err != nil {
543535 return errors .Wrap (err , "readSegment" )
544536 }
@@ -554,7 +546,8 @@ func (w *Watcher) readCheckpoint(checkpointDir string) error {
554546
555547func checkpointNum (dir string ) (int , error ) {
556548 // Checkpoint dir names are in the format checkpoint.000001
557- chunks := strings .Split (dir , "." )
549+ // dir may contain a hidden directory, so only check the base directory
550+ chunks := strings .Split (path .Base (dir ), "." )
558551 if len (chunks ) != 2 {
559552 return 0 , errors .Errorf ("invalid checkpoint dir string: %s" , dir )
560553 }
0 commit comments