From cfe16bf249fb5f92607343a1ad40ea0eac8a4015 Mon Sep 17 00:00:00 2001 From: chenshangmin Date: Fri, 14 Jan 2022 13:16:38 +0800 Subject: [PATCH] =?UTF-8?q?[PDR-16012][feat]logkit=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=BB=B6=E8=BF=9F=EF=BC=8C=E5=86=85=E9=83=A8?= =?UTF-8?q?queue=E9=95=BF=E5=BA=A6=E6=8C=87=E6=A0=87=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mgr/metric_runner.go | 15 ++++-- mgr/runner.go | 16 ++++-- reader/meta.go | 2 +- reader/meta_test.go | 2 +- sender/fault_tolerant.go | 105 ++++++++++++++++++++++++++++++++++++--- utils/models/models.go | 2 + 6 files changed, 123 insertions(+), 19 deletions(-) diff --git a/mgr/metric_runner.go b/mgr/metric_runner.go index 50c46bc58..b8b7b6b3d 100644 --- a/mgr/metric_runner.go +++ b/mgr/metric_runner.go @@ -295,7 +295,7 @@ func (r *MetricRunner) Run() { dataCnt := 0 datas := make([]Data, 0) metricTime := time.Now() - tags[metric.Timestamp] = metricTime.Format(time.RFC3339Nano) + tags[metric.Timestamp] = metricTime.UnixNano()/1e6 for _, c := range r.collectors { metricName := c.Name() tmpdatas, err := c.Collect() @@ -610,10 +610,14 @@ func (mr *MetricRunner) StatusRestore() { } sStatus, ok := s.(sender.StatsSender) if ok { - sStatus.Restore(&StatsInfo{ + statsInfo:=&StatsInfo{ Success: info[0], Errors: info[1], - }) + } + if len(info)>2{ + statsInfo.FtSendLag=info[2] + } + sStatus.Restore(statsInfo) } status, ext := mr.rs.SenderStats[name] if !ext { @@ -635,7 +639,7 @@ func (mr *MetricRunner) StatusBackup() { status.ParserStats.Success, status.ParserStats.Errors, }, - SenderCnt: map[string][2]int64{}, + SenderCnt: map[string][]int64{}, } for _, s := range mr.senders { name := s.Name() @@ -646,9 +650,10 @@ func (mr *MetricRunner) StatusBackup() { status.SenderStats[name] = senderStats } if sta, exist := status.SenderStats[name]; exist { - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } } diff --git a/mgr/runner.go b/mgr/runner.go index e850234ce..90b3d649d 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -1476,10 +1476,14 @@ func (r *LogExportRunner) StatusRestore() { } sStatus, ok := s.(sender.StatsSender) if ok { - sStatus.Restore(&StatsInfo{ + statsInfo:=&StatsInfo{ Success: info[0], Errors: info[1], - }) + } + if len(info)>2{ + statsInfo.FtSendLag=info[2] + } + sStatus.Restore(statsInfo) } status, ext := r.rs.SenderStats[name] if !ext { @@ -1519,7 +1523,7 @@ func (r *LogExportRunner) StatusBackup() { status.ParserStats.Errors, }, TransCnt: map[string][2]int64{}, - SenderCnt: map[string][2]int64{}, + SenderCnt: map[string][]int64{}, } r.historyMutex.Lock() defer r.historyMutex.Unlock() @@ -1535,9 +1539,10 @@ func (r *LogExportRunner) StatusBackup() { for idx, t := range r.transformers { name := formatTransformName(t.Type(), idx) sta := t.Stats() - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } @@ -1563,9 +1568,10 @@ func (r *LogExportRunner) StatusBackup() { status.SenderStats[name] = senderStats } if sta, exist := status.SenderStats[name]; exist { - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } } diff --git a/reader/meta.go b/reader/meta.go index c03c02c5f..6196e86bd 100644 --- a/reader/meta.go +++ b/reader/meta.go @@ -48,7 +48,7 @@ const ( type Statistic struct { ReaderCnt int64 `json:"reader_count"` // 读取总条数 ParserCnt [2]int64 `json:"parser_connt"` // [解析成功, 解析失败] - SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败] + SenderCnt map[string][]int64 `json:"sender_count"` // [发送成功, 发送失败] TransCnt map[string][2]int64 `json:"transform_count"` // [解析成功, 解析失败] ReadErrors ErrorStatistic `json:"read_errors"` ParseErrors ErrorStatistic `json:"parse_errors"` diff --git a/reader/meta_test.go b/reader/meta_test.go index 195557364..0a0e91e09 100644 --- a/reader/meta_test.go +++ b/reader/meta_test.go @@ -109,7 +109,7 @@ func TestMeta(t *testing.T) { stat := &Statistic{ ReaderCnt: 6, ParserCnt: [2]int64{6, 8}, - SenderCnt: map[string][2]int64{ + SenderCnt: map[string][]int64{ "aaa": {1, 2}, "bbb": {5, 6}, }, diff --git a/sender/fault_tolerant.go b/sender/fault_tolerant.go index 0322d1397..f43a8228b 100644 --- a/sender/fault_tolerant.go +++ b/sender/fault_tolerant.go @@ -4,8 +4,10 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "math" "os" + "path/filepath" "strconv" "strings" "sync" @@ -36,6 +38,8 @@ const ( KeyUnMarshalError = "Data unmarshal failed" // NumUnMarshalError NumUnMarshalError = 10 + // lag file + LagFilename = "meta.lag" ) var _ SkipDeepCopySender = &FtSender{} @@ -206,6 +210,9 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende isBlock: opt.isBlock, backoff: utils.NewBackoff(2, 1, 1*time.Second, 5*time.Minute), } + ftSender.statsMutex.Lock() + ftSender.stats.FtSendLag = ftSender.readLag() + ftSender.statsMutex.Unlock() if opt.innerSenderType == TypePandora { ftSender.pandoraKeyCache = make(map[string]KeyInfo) @@ -273,9 +280,17 @@ func (ft *FtSender) RawSend(datas []string) error { } else { // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag se.AddSuccessNum(len(datas)) + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag + int64(len(datas)) + ft.statsMutex.Unlock() ft.backoff.Reset() } se.FtQueueLag = ft.BackupQueue.Depth() + ft.logQueue.Depth() + if se.FtQueueLag == 0 { + ft.statsMutex.Lock() + ft.stats.FtSendLag = 0 + ft.statsMutex.Unlock() + } } return se } @@ -314,7 +329,7 @@ func (ft *FtSender) Send(datas []Data) error { } if ft.isBlock { - log.Error("Runner[%v] Sender[%v] try Send Datas err: %v", ft.runnerName, ft.innerSender.Name(), err) + log.Errorf("Runner[%v] Sender[%v] try Send Datas err: %v", ft.runnerName, ft.innerSender.Name(), err) return se } @@ -354,9 +369,17 @@ func (ft *FtSender) Send(datas []Data) error { } else { // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag se.AddSuccessNum(len(datas)) + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag + int64(len(datas)) + ft.statsMutex.Unlock() ft.backoff.Reset() } se.FtQueueLag = ft.BackupQueue.Depth() + ft.logQueue.Depth() + if se.FtQueueLag == 0 { + ft.statsMutex.Lock() + ft.stats.FtSendLag = 0 + ft.statsMutex.Unlock() + } return se } @@ -395,6 +418,9 @@ func (ft *FtSender) Close() error { // persist queue's meta data ft.logQueue.Close() ft.BackupQueue.Close() + ft.statsMutex.Lock() + ft.writeLag(ft.stats.FtSendLag) + ft.statsMutex.Unlock() return ft.innerSender.Close() } @@ -481,6 +507,9 @@ func (ft *FtSender) saveToFile(datas []Data) error { } func (ft *FtSender) asyncSendLogFromQueue() { + // if not sleep, queue lag may be cleared + time.Sleep(time.Second * 10) + for i := 0; i < ft.procs; i++ { if ft.opt.sendRaw { readLinesChan := make(<-chan []string) @@ -506,18 +535,32 @@ func (ft *FtSender) asyncSendLogFromQueue() { } // trySend 从bytes反序列化数据后尝试发送数据 -func (ft *FtSender) trySendBytes(dat []byte, failSleep int, isRetry bool) (backDataContext []*datasContext, err error) { +func (ft *FtSender) trySendBytes(dat []byte, failSleep int, isRetry bool, isFromQueue bool) (backDataContext []*datasContext, err error) { if ft.opt.sendRaw { datas, err := ft.unmarshalRaws(dat) if err != nil { return nil, errors.New(KeyUnMarshalError + ":" + err.Error()) } + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() + return ft.backOffSendRawFromQueue(datas, failSleep, isRetry) } datas, err := ft.unmarshalData(dat) if err != nil { return nil, errors.New(KeyUnMarshalError + ":" + err.Error()) + + } + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 } + ft.statsMutex.Unlock() return ft.backOffSendFromQueue(datas, failSleep, isRetry) } @@ -566,6 +609,9 @@ func (ft *FtSender) trySendRaws(datas []string, failSleep int, isRetry bool) (ba log.Errorf("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d", ft.runnerName, ft.innerSender.Name(), ft.BackupQueue.Name(), err, len(datas)) return nil, nil } + ft.statsMutex.Lock() + ft.stats.FtSendLag += int64(len(v.Lines)) + ft.statsMutex.Unlock() } time.Sleep(time.Second * time.Duration(math.Pow(2, float64(failSleep)))) @@ -620,6 +666,9 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac log.Errorf("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d", ft.runnerName, ft.innerSender.Name(), ft.BackupQueue.Name(), err, len(datas)) return nil, nil } + ft.statsMutex.Lock() + ft.stats.FtSendLag += int64(len(v.Datas)) + ft.statsMutex.Unlock() } time.Sleep(time.Second * time.Duration(math.Pow(2, float64(failSleep)))) @@ -896,8 +945,14 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r } else { select { case bytes := <-readChan: - backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry) + backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry, true) case datas := <-readDatasChan: + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() backDataContext, err = ft.backOffSendRawFromQueue(datas, numWaits, isRetry) case <-timer.C: continue @@ -917,7 +972,7 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r unmarshalDataError++ if unmarshalDataError > NumUnMarshalError { time.Sleep(time.Second) - log.Errorf("Runner[%s] Sender[%s] sleep 1s due to unmarshal err", ft.runnerName, ft.innerSender.Name(), queueName, err) + log.Errorf("Runner[%s] Sender[%s] queue[%s] sleep 1s due to unmarshal err %v", ft.runnerName, ft.innerSender.Name(), queueName, err) } } else { unmarshalDataError = 0 @@ -939,7 +994,6 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read defer timer.Stop() numWaits := 1 unmarshalDataError := 0 - var curDataContext, otherDataContext []*datasContext var curIdx int var backDataContext []*datasContext @@ -955,8 +1009,14 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read } else { select { case bytes := <-readChan: - backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry) + backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry, true) case datas := <-readDatasChan: + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() backDataContext, err = ft.backOffSendFromQueue(datas, numWaits, isRetry) case <-timer.C: continue @@ -976,7 +1036,7 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read unmarshalDataError++ if unmarshalDataError > NumUnMarshalError { time.Sleep(time.Second) - log.Errorf("Runner[%s] Sender[%s] sleep 1s due to unmarshal err", ft.runnerName, ft.innerSender.Name(), queueName, err) + log.Errorf("Runner[%s] Sender[%s] queue[%s] sleep 1s due to unmarshal err %v", ft.runnerName, ft.innerSender.Name(), queueName, err) } } else { unmarshalDataError = 0 @@ -1225,3 +1285,34 @@ func (ft *FtSender) backOffReTrySendRaw(lines []string, isRetry bool) (res []*da time.Sleep(backoff.Duration()) } } + +// readLag read lag from file +func (ft *FtSender) readLag() int64 { + path := filepath.Join(ft.opt.saveLogPath, LagFilename) + f, err := ioutil.ReadFile(path) + if err != nil { + log.Errorf("Runner[%v] Sender[%v] read file error : %v", ft.runnerName, ft.innerSender.Name(), err) + return 0 + } + lag, err := strconv.ParseInt(string(f), 10, 64) + if err != nil { + log.Errorf("Runner[%v] Sender[%v] parse lag error : %v", ft.runnerName, ft.innerSender.Name(), err) + } + return lag +} + +// writeLag write lag into file +func (ft *FtSender) writeLag(lag int64) error { + path := filepath.Join(ft.opt.saveLogPath, LagFilename) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666) + defer func() { + file.Sync() + file.Close() + }() + if err != nil { + return err + } + lagStr := strconv.FormatInt(lag, 10) + _, err = file.WriteString(lagStr) + return err +} diff --git a/utils/models/models.go b/utils/models/models.go index e839760bd..843ddf0f7 100644 --- a/utils/models/models.go +++ b/utils/models/models.go @@ -186,6 +186,7 @@ type LagInfo struct { Size int64 `json:"size"` SizeUnit string `json:"sizeunit"` Ftlags int64 `json:"ftlags"` + FtSendLags int64 `json:"ft_send_lags"` Total int64 `json:"total"` } @@ -205,6 +206,7 @@ type StatsInfo struct { Trend string `json:"trend"` LastError string `json:"last_error"` FtQueueLag int64 `json:"-"` + FtSendLag int64 `json:"ft_send_lag"` } type ErrorStatistic struct {