Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supplement noti records for agg events #2296

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions alert/sender/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,34 +129,39 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
return
}

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, event, "callback", ctx.Stats, event)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, event, "callback", ctx.Stats, ctx.Events)
}

func doSendAndRecord(ctx *ctx.Context, url, token string, body interface{}, channel string,
stats *astats.Stats, event *models.AlertCurEvent) {
stats *astats.Stats, events []*models.AlertCurEvent) {
res, err := doSend(url, body, channel, stats)
NotifyRecord(ctx, event, channel, token, res, err)
NotifyRecord(ctx, events, channel, token, res, err)
}

func NotifyRecord(ctx *ctx.Context, evt *models.AlertCurEvent, channel, target, res string, err error) {
noti := models.NewNotificationRecord(evt, channel, target)
if err != nil {
noti.SetStatus(models.NotiStatusFailure)
noti.SetDetails(err.Error())
} else if res != "" {
noti.SetDetails(string(res))
func NotifyRecord(ctx *ctx.Context, evts []*models.AlertCurEvent, channel, target, res string, err error) {
// 一个通知可能对应多个 event,都需要记录
notis := make([]*models.NotificaitonRecord, 0, len(evts))
for _, evt := range evts {
noti := models.NewNotificationRecord(evt, channel, target)
if err != nil {
noti.SetStatus(models.NotiStatusFailure)
noti.SetDetails(err.Error())
} else if res != "" {
noti.SetDetails(string(res))
}
notis = append(notis, noti)
}

if !ctx.IsCenter {
_, err := poster.PostByUrlsWithResp[int64](ctx, "/v1/n9e/notify-record", noti)
_, err := poster.PostByUrlsWithResp[int64](ctx, "/v1/n9e/notify-record", notis)
if err != nil {
logger.Errorf("add noti:%v failed, err: %v", noti, err)
logger.Errorf("add notis:%v failed, err: %v", notis, err)
}
return
}

if err := noti.Add(ctx); err != nil {
logger.Errorf("add noti:%v failed, err: %v", noti, err)
if err := models.DB(ctx).CreateInBatches(notis, 100).Error; err != nil {
logger.Errorf("add notis:%v failed, err: %v", notis, err)
}
}

Expand Down
5 changes: 2 additions & 3 deletions alert/sender/dingtalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
}
}

doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Dingtalk, ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Dingtalk, ctx.Stats, ctx.Events)
}
}

Expand Down Expand Up @@ -97,8 +97,7 @@ func (ds *DingtalkSender) CallBack(ctx CallBackContext) {
body.Markdown.Text = message
}

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body,
"callback", ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback", ctx.Stats, ctx.Events)
}

// extract urls and ats from Users
Expand Down
13 changes: 6 additions & 7 deletions alert/sender/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ type EmailSender struct {
}

type EmailContext struct {
event *models.AlertCurEvent
mail *gomail.Message
events []*models.AlertCurEvent
mail *gomail.Message
}

func (es *EmailSender) Send(ctx MessageContext) {
Expand All @@ -42,7 +42,7 @@ func (es *EmailSender) Send(ctx MessageContext) {
subject = ctx.Events[0].RuleName
}
content := BuildTplMessage(models.Email, es.contentTpl, ctx.Events)
es.WriteEmail(subject, content, tos, ctx.Events[0])
es.WriteEmail(subject, content, tos, ctx.Events)

ctx.Stats.AlertNotifyTotal.WithLabelValues(models.Email).Add(float64(len(tos)))
}
Expand Down Expand Up @@ -79,16 +79,15 @@ func SendEmail(subject, content string, tos []string, stmp aconf.SMTPConfig) err
return nil
}

func (es *EmailSender) WriteEmail(subject, content string, tos []string,
event *models.AlertCurEvent) {
func (es *EmailSender) WriteEmail(subject, content string, tos []string, events []*models.AlertCurEvent) {
m := gomail.NewMessage()

m.SetHeader("From", es.smtp.From)
m.SetHeader("To", tos...)
m.SetHeader("Subject", subject)
m.SetBody("text/html", content)

mailch <- &EmailContext{event, m}
mailch <- &EmailContext{events, m}
}

func dialSmtp(d *gomail.Dialer) gomail.SendCloser {
Expand Down Expand Up @@ -206,7 +205,7 @@ func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
if err == nil {
msg = "ok"
}
NotifyRecord(ctx, m.event, models.Email, to, msg, err)
NotifyRecord(ctx, m.events, models.Email, to, msg, err)
}

size++
Expand Down
5 changes: 2 additions & 3 deletions alert/sender/feishu.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ func (fs *FeishuSender) CallBack(ctx CallBackContext) {
},
}

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback", ctx.Stats, ctx.Events)
}

func (fs *FeishuSender) Send(ctx MessageContext) {
Expand All @@ -77,7 +76,7 @@ func (fs *FeishuSender) Send(ctx MessageContext) {
IsAtAll: false,
}
}
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Feishu, ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Feishu, ctx.Stats, ctx.Events)
}
}

Expand Down
6 changes: 2 additions & 4 deletions alert/sender/feishucard.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ func (fs *FeishuCardSender) CallBack(ctx CallBackContext) {
}
parsedURL.RawQuery = ""

doSendAndRecord(ctx.Ctx, parsedURL.String(), parsedURL.String(), body, "callback",
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, parsedURL.String(), parsedURL.String(), body, "callback", ctx.Stats, ctx.Events)
}

func (fs *FeishuCardSender) Send(ctx MessageContext) {
Expand All @@ -160,8 +159,7 @@ func (fs *FeishuCardSender) Send(ctx MessageContext) {
body.Card.Elements[0].Text.Content = message
body.Card.Elements[2].Elements[0].Content = SendTitle
for i, url := range urls {
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.FeishuCard,
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.FeishuCard, ctx.Stats, ctx.Events)
}
}

Expand Down
5 changes: 2 additions & 3 deletions alert/sender/lark.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func (lk *LarkSender) CallBack(ctx CallBackContext) {
},
}

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback", ctx.Stats, ctx.Events)
}

func (lk *LarkSender) Send(ctx MessageContext) {
Expand All @@ -44,7 +43,7 @@ func (lk *LarkSender) Send(ctx MessageContext) {
Text: message,
},
}
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Lark, ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Lark, ctx.Stats, ctx.Events)
}
}

Expand Down
5 changes: 2 additions & 3 deletions alert/sender/larkcard.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func (fs *LarkCardSender) CallBack(ctx CallBackContext) {
}
parsedURL.RawQuery = ""

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback", ctx.Stats, ctx.Events)
}

func (fs *LarkCardSender) Send(ctx MessageContext) {
Expand All @@ -81,7 +80,7 @@ func (fs *LarkCardSender) Send(ctx MessageContext) {
body.Card.Elements[0].Text.Content = message
body.Card.Elements[2].Elements[0].Content = SendTitle
for i, url := range urls {
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.LarkCard, ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.LarkCard, ctx.Stats, ctx.Events)
}
}

Expand Down
10 changes: 5 additions & 5 deletions alert/sender/mm.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (ms *MmSender) Send(ctx MessageContext) {
Text: message,
Tokens: urls,
Stats: ctx.Stats,
}, ctx.Events[0], models.Mm)
}, ctx.Events, models.Mm)
}

func (ms *MmSender) CallBack(ctx CallBackContext) {
Expand All @@ -56,7 +56,7 @@ func (ms *MmSender) CallBack(ctx CallBackContext) {
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
}, ctx.Events[0], "callback")
}, ctx.Events, "callback")
}

func (ms *MmSender) extract(users []*models.User) []string {
Expand All @@ -69,12 +69,12 @@ func (ms *MmSender) extract(users []*models.User) []string {
return tokens
}

func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurEvent, channel string) {
func SendMM(ctx *ctx.Context, message MatterMostMessage, events []*models.AlertCurEvent, channel string) {
for i := 0; i < len(message.Tokens); i++ {
u, err := url.Parse(message.Tokens[i])
if err != nil {
logger.Errorf("mm_sender: failed to parse error=%v", err)
NotifyRecord(ctx, event, channel, message.Tokens[i], "", err)
NotifyRecord(ctx, events, channel, message.Tokens[i], "", err)
continue
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurE
Username: username,
Text: txt + message.Text,
}
doSendAndRecord(ctx, ur, message.Tokens[i], body, channel, message.Stats, event)
doSendAndRecord(ctx, ur, message.Tokens[i], body, channel, message.Stats, events)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion alert/sender/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func alertingCallScript(ctx *ctx.Context, stdinBytes []byte, notifyScript models
}

err, isTimeout := sys.WrapTimeout(cmd, time.Duration(config.Timeout)*time.Second)
NotifyRecord(ctx, event, channel, cmd.String(), "", buildErr(err, isTimeout))
NotifyRecord(ctx, []*models.AlertCurEvent{event}, channel, cmd.String(), "", buildErr(err, isTimeout))

if isTimeout {
if err == nil {
Expand Down
10 changes: 5 additions & 5 deletions alert/sender/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (ts *TelegramSender) CallBack(ctx CallBackContext) {
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
}, ctx.Events[0], "callback")
}, ctx.Events, "callback")
}

func (ts *TelegramSender) Send(ctx MessageContext) {
Expand All @@ -55,7 +55,7 @@ func (ts *TelegramSender) Send(ctx MessageContext) {
Text: message,
Tokens: tokens,
Stats: ctx.Stats,
}, ctx.Events[0], models.Telegram)
}, ctx.Events, models.Telegram)
}

func (ts *TelegramSender) extract(users []*models.User) []string {
Expand All @@ -68,11 +68,11 @@ func (ts *TelegramSender) extract(users []*models.User) []string {
return tokens
}

func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.AlertCurEvent, channel string) {
func SendTelegram(ctx *ctx.Context, message TelegramMessage, events []*models.AlertCurEvent, channel string) {
for i := 0; i < len(message.Tokens); i++ {
if !strings.Contains(message.Tokens[i], "/") && !strings.HasPrefix(message.Tokens[i], "https://") {
logger.Errorf("telegram_sender: result=fail invalid token=%s", message.Tokens[i])
NotifyRecord(ctx, event, channel, message.Tokens[i], "", errors.New("invalid token"))
NotifyRecord(ctx, events, channel, message.Tokens[i], "", errors.New("invalid token"))
continue
}
var url string
Expand All @@ -93,6 +93,6 @@ func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.Alert
Text: message.Text,
}

doSendAndRecord(ctx, url, message.Tokens[i], body, channel, message.Stats, event)
doSendAndRecord(ctx, url, message.Tokens[i], body, channel, message.Stats, events)
}
}
11 changes: 2 additions & 9 deletions alert/sender/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *mod
retryCount := 0
for retryCount < 3 {
needRetry, res, err := sendWebhook(conf, event, stats)
NotifyRecord(ctx, event, "webhook", conf.Url, res, err)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, "webhook", conf.Url, res, err)
if !needRetry {
break
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func StartConsumer(ctx *ctx.Context, queue *WebhookQueue, popSize int, webhook *
retryCount := 0
for retryCount < webhook.RetryCount {
needRetry, res, err := sendWebhook(webhook, events, stats)
go RecordEvents(ctx, webhook, events, stats, res, err)
go NotifyRecord(ctx, events, "webhook", webhook.Url, res, err)
if !needRetry {
break
}
Expand All @@ -176,10 +176,3 @@ func StartConsumer(ctx *ctx.Context, queue *WebhookQueue, popSize int, webhook *
}
}
}

func RecordEvents(ctx *ctx.Context, webhook *models.Webhook, events []*models.AlertCurEvent, stats *astats.Stats, res string, err error) {
for _, event := range events {
time.Sleep(time.Millisecond * 10)
NotifyRecord(ctx, event, "webhook", webhook.Url, res, err)
}
}
5 changes: 2 additions & 3 deletions alert/sender/wecom.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func (ws *WecomSender) CallBack(ctx CallBackContext) {
},
}

doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback", ctx.Stats, ctx.Events)
}

func (ws *WecomSender) Send(ctx MessageContext) {
Expand All @@ -54,7 +53,7 @@ func (ws *WecomSender) Send(ctx MessageContext) {
Content: message,
},
}
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Wecom, ctx.Stats, ctx.Events[0])
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Wecom, ctx.Stats, ctx.Events)
}
}

Expand Down
13 changes: 10 additions & 3 deletions center/router/router_notification_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,18 @@ type Record struct {

// notificationRecordAdd
func (rt *Router) notificationRecordAdd(c *gin.Context) {
var req models.NotificaitonRecord
var req []*models.NotificaitonRecord
ginx.BindJSON(c, &req)
err := req.Add(rt.Ctx)
err := models.DB(rt.Ctx).CreateInBatches(req, 100).Error
var ids []int64
if err == nil {
ids = make([]int64, len(req))
for i, noti := range req {
ids[i] = noti.Id
}
}

ginx.NewRender(c).Data(req.Id, err)
ginx.NewRender(c).Data(ids, err)
}

func (rt *Router) notificationRecordList(c *gin.Context) {
Expand Down