Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new notify process #2464

Open
wants to merge 6 commits into
base: notify-rule-feat
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplsCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
notifyRuleCache := memsto.NewNotifyRuleCache(ctx, syncStats)
notifyChannelCache := memsto.NewNotifyChannelCache(ctx, syncStats)
messageTemplateCache := memsto.NewMessageTemplateCache(ctx, syncStats)

promClients := prom.NewPromClient(ctx)
dispatch.InitRegisterQueryFunc(promClients)
Expand All @@ -72,7 +75,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {

macros.RegisterMacro(macros.MacroInVain)
dscache.Init(ctx, false)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache, notifyRuleCache, notifyChannelCache, messageTemplateCache)

r := httpx.GinEngine(config.Global.RunMode, config.HTTP,
configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
Expand All @@ -95,7 +98,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {

func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, taskTplsCache *memsto.TaskTplCache, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType, messageTemplateCache *memsto.MessageTemplateCacheType) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
Expand All @@ -110,7 +113,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)

dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, alertc.Alerting, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)

go dp.ReloadTpls()
Expand Down
134 changes: 126 additions & 8 deletions alert/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dispatch
import (
"bytes"
"encoding/json"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"html/template"
"net/url"
"strconv"
Expand Down Expand Up @@ -30,6 +31,10 @@ type Dispatch struct {
notifyConfigCache *memsto.NotifyConfigCacheType
taskTplsCache *memsto.TaskTplCache

notifyRuleCache *memsto.NotifyRuleCacheType
notifyChannelCache *memsto.NotifyChannelCacheType
messageTemplateCache *memsto.MessageTemplateCacheType

alerting aconf.Alerting

Senders map[string]sender.Sender
Expand All @@ -47,15 +52,19 @@ type Dispatch struct {
// 创建一个 Notify 实例
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
taskTplsCache *memsto.TaskTplCache, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
messageTemplateCache *memsto.MessageTemplateCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
notify := &Dispatch{
alertRuleCache: alertRuleCache,
userCache: userCache,
userGroupCache: userGroupCache,
alertSubscribeCache: alertSubscribeCache,
targetCache: targetCache,
notifyConfigCache: notifyConfigCache,
taskTplsCache: taskTplsCache,
alertRuleCache: alertRuleCache,
userCache: userCache,
userGroupCache: userGroupCache,
alertSubscribeCache: alertSubscribeCache,
targetCache: targetCache,
notifyConfigCache: notifyConfigCache,
taskTplsCache: taskTplsCache,
notifyRuleCache: notifyRuleCache,
notifyChannelCache: notifyChannelCache,
messageTemplateCache: messageTemplateCache,

alerting: alerting,

Expand Down Expand Up @@ -131,6 +140,115 @@ func (e *Dispatch) relaodTpls() error {
return nil
}

func (e *Dispatch) HandleEventNotifyV2(event *models.AlertCurEvent, isSubscribe bool) {

if len(event.NotifyRuleIDs) > 0 {
for _, notifyRuleId := range event.NotifyRuleIDs {
notifyRule := e.notifyRuleCache.Get(notifyRuleId)
if notifyRule == nil {
continue
}

for i := range notifyRule.NotifyConfigs {
notifyChannel := e.notifyChannelCache.Get(notifyRule.NotifyConfigs[i].ChannelID)
messageTemplate := e.messageTemplateCache.Get(notifyRule.NotifyConfigs[i].TemplateID)
if notifyChannel == nil || messageTemplate == nil {
continue
}
// todo go send
// todo 聚合 event
e.sendV2([]*models.AlertCurEvent{event}, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
}
}
}
}

func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
// event 内容渲染到 messageTemplate
content := make(map[string]string)
for key, msgTpl := range messageTemplate.Content {
var defs = []string{
"{{$labels := .TagsMap}}",
"{{$value := .TriggerValue}}",
}
text := strings.Join(append(defs, msgTpl), "")
tpl, err := template.New(key).Funcs(tplx.TemplateFuncMap).Parse(text)
if err != nil {
continue
}

var body bytes.Buffer
if err = tpl.Execute(&body, events); err != nil {
continue
}
content[key] = body.String()
}

// notifyConfig 中配置的参数统一到 params 中供发送时替换使用
params := make([]map[string]string, 0)
switch notifyChannel.ParamConfig.ParamType {
case "user_info":
if p, ok := notifyConfig.Params.(models.UserInfoParams); ok {
users := e.userCache.GetByUserIds(p.UserIDs)
userGroups := e.userGroupCache.GetByUserGroupIds(p.UserGroupIDs)
var origin []string
var val []string
if notifyChannel.ParamConfig.UserInfo.ContactKey == "phone" {
Copy link
Member

@710leo 710leo Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里缺少了对用户的其他联系方式的处理,比如公司内部 im,个人相关的 token

for _, user := range users {
origin = append(origin, user.Phone)
}
for _, userGroup := range userGroups {
for _, user := range userGroup.Users {
origin = append(origin, user.Phone)
}
}

} else if notifyChannel.ParamConfig.UserInfo.ContactKey == "email" {
for _, user := range users {
origin = append(origin, user.Email)
}
for _, userGroup := range userGroups {
for _, user := range userGroup.Users {
origin = append(origin, user.Email)
}
}
}

if notifyChannel.ParamConfig.BatchSend {
val = append(val, strings.Join(origin, ","))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接使用逗号把联系方式拼接起来,后面灵活度不太够,可能有接口参数要求是以 ; 分割,或者是字符串数组 phone=["123...","334..."]

} else {
val = origin
}

for i := range val {
param := make(map[string]string)
param[notifyChannel.ParamConfig.UserInfo.ContactKey] = val[i]
params = append(params, param)
}
}
case "flashduty":

case "custom":
if p, ok := notifyConfig.Params.(models.CustomParams); ok {
param := make(map[string]string)
for k, v := range p {
param[k] = v
}
params = append(params, param)
}
}

switch notifyChannel.RequestType {
case "http":
notifyChannel.SendHTTP(content, params, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要把返回的 err 信息打印出来,方便排查问题

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要把 events 也传进去,方便在 http 参数和 body 中使用,更灵活一些

case "email":
notifyChannel.SendEmail(events, content, params, e.notifyChannelCache.GetSmtpClient(notifyChannel.ID))
case "script":
notifyChannel.SendScript(events, content, params)
default:
}
}

// HandleEventNotify 处理event事件的主逻辑
// event: 告警/恢复事件
// isSubscribe: 告警事件是否由subscribe的配置产生
Expand Down
5 changes: 4 additions & 1 deletion center/center.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
notifyRuleCache := memsto.NewNotifyRuleCache(ctx, syncStats)
notifyChannelCache := memsto.NewNotifyChannelCache(ctx, syncStats)
messageTemplateCache := memsto.NewMessageTemplateCache(ctx, syncStats)

sso := sso.Init(config.Center, ctx, configCache)
promClients := prom.NewPromClient(ctx)
Expand All @@ -112,7 +115,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {

macros.RegisterMacro(macros.MacroInVain)
dscache.Init(ctx, false)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, userCache, userGroupCache)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, userCache, userGroupCache, notifyRuleCache, notifyChannelCache, messageTemplateCache)

writers := writer.NewWriters(config.Pushgw)

Expand Down
5 changes: 4 additions & 1 deletion cmd/edge/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplsCache := memsto.NewTaskTplCache(ctx)
notifyRuleCache := memsto.NewNotifyRuleCache(ctx, syncStats)
notifyChannelCache := memsto.NewNotifyChannelCache(ctx, syncStats)
messageTemplateCache := memsto.NewMessageTemplateCache(ctx, syncStats)

promClients := prom.NewPromClient(ctx)

Expand All @@ -83,7 +86,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()

alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache,
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache)
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, userCache, userGroupCache, notifyRuleCache, notifyChannelCache, messageTemplateCache)

alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/prometheus/prometheus v0.47.1
github.com/rakyll/statik v0.1.7
github.com/redis/go-redis/v9 v9.0.2
github.com/satori/go.uuid v1.2.0
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.9.0
github.com/tidwall/gjson v1.14.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
Expand Down
139 changes: 139 additions & 0 deletions memsto/message_template_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package memsto

import (
"fmt"
"sync"
"time"

"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"

"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)

type MessageTemplateCacheType struct {
statTotal int64
statLastUpdated int64
ctx *ctx.Context
stats *Stats

sync.RWMutex
templates map[int64]*models.MessageTemplate // key: template id
}

func NewMessageTemplateCache(ctx *ctx.Context, stats *Stats) *MessageTemplateCacheType {
mtc := &MessageTemplateCacheType{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
stats: stats,
templates: make(map[int64]*models.MessageTemplate),
}
mtc.SyncMessageTemplates()
return mtc
}

func (mtc *MessageTemplateCacheType) Reset() {
mtc.Lock()
defer mtc.Unlock()

mtc.statTotal = -1
mtc.statLastUpdated = -1
mtc.templates = make(map[int64]*models.MessageTemplate)
}

func (mtc *MessageTemplateCacheType) StatChanged(total, lastUpdated int64) bool {
if mtc.statTotal == total && mtc.statLastUpdated == lastUpdated {
return false
}

return true
}

func (mtc *MessageTemplateCacheType) Set(m map[int64]*models.MessageTemplate, total, lastUpdated int64) {
mtc.Lock()
mtc.templates = m
mtc.Unlock()

// only one goroutine used, so no need lock
mtc.statTotal = total
mtc.statLastUpdated = lastUpdated
}

func (mtc *MessageTemplateCacheType) Get(templateId int64) *models.MessageTemplate {
mtc.RLock()
defer mtc.RUnlock()
return mtc.templates[templateId]
}

func (mtc *MessageTemplateCacheType) GetTemplateIds() []int64 {
mtc.RLock()
defer mtc.RUnlock()

count := len(mtc.templates)
list := make([]int64, 0, count)
for templateId := range mtc.templates {
list = append(list, templateId)
}

return list
}

func (mtc *MessageTemplateCacheType) SyncMessageTemplates() {
err := mtc.syncMessageTemplates()
if err != nil {
fmt.Println("failed to sync message templates:", err)
exit(1)
}

go mtc.loopSyncMessageTemplates()
}

func (mtc *MessageTemplateCacheType) loopSyncMessageTemplates() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := mtc.syncMessageTemplates(); err != nil {
logger.Warning("failed to sync message templates:", err)
}
}
}

func (mtc *MessageTemplateCacheType) syncMessageTemplates() error {
start := time.Now()
stat, err := models.MessageTemplateStatistics(mtc.ctx)
if err != nil {
dumper.PutSyncRecord("message_templates", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
return errors.WithMessage(err, "failed to exec MessageTemplateStatistics")
}

if !mtc.StatChanged(stat.Total, stat.LastUpdated) {
mtc.stats.GaugeCronDuration.WithLabelValues("sync_message_templates").Set(0)
mtc.stats.GaugeSyncNumber.WithLabelValues("sync_message_templates").Set(0)
dumper.PutSyncRecord("message_templates", start.Unix(), -1, -1, "not changed")
return nil
}

lst, err := models.MessageTemplateGetsAll(mtc.ctx)
if err != nil {
dumper.PutSyncRecord("message_templates", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to exec MessageTemplateGetsAll")
}

m := make(map[int64]*models.MessageTemplate)
for i := 0; i < len(lst); i++ {
m[lst[i].ID] = lst[i]
}

mtc.Set(m, stat.Total, stat.LastUpdated)

ms := time.Since(start).Milliseconds()
mtc.stats.GaugeCronDuration.WithLabelValues("sync_message_templates").Set(float64(ms))
mtc.stats.GaugeSyncNumber.WithLabelValues("sync_message_templates").Set(float64(len(m)))
logger.Infof("timer: sync message templates done, cost: %dms, number: %d", ms, len(m))
dumper.PutSyncRecord("message_templates", start.Unix(), ms, len(m), "success")

return nil
}
Loading