diff --git a/alert/dispatch/dispatch.go b/alert/dispatch/dispatch.go index 19f44576d..5f5a5145c 100644 --- a/alert/dispatch/dispatch.go +++ b/alert/dispatch/dispatch.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/json" "html/template" + "net/url" "strconv" + "strings" "sync" "time" @@ -31,6 +33,7 @@ type Dispatch struct { alerting aconf.Alerting Senders map[string]sender.Sender + CallBacks map[string]sender.CallBacker tpls map[string]*template.Template ExtraSenders map[string]sender.Sender BeforeSenderHook func(*models.AlertCurEvent) bool @@ -99,6 +102,17 @@ func (e *Dispatch) relaodTpls() error { models.FeishuCard: sender.NewSender(models.FeishuCard, tmpTpls), } + // domain -> Callback() + callbacks := map[string]sender.CallBacker{ + models.DingtalkDomain: sender.NewCallBacker(models.DingtalkDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.WecomDomain: sender.NewCallBacker(models.WecomDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.FeishuDomain: sender.NewCallBacker(models.FeishuDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.TelegramDomain: sender.NewCallBacker(models.TelegramDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.FeishuCardDomain: sender.NewCallBacker(models.FeishuCardDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.IbexDomain: sender.NewCallBacker(models.IbexDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + models.DefaultDomain: sender.NewCallBacker(models.DefaultDomain, e.targetCache, e.userCache, e.taskTplsCache, tmpTpls), + } + e.RwLock.RLock() for channelName, extraSender := range e.ExtraSenders { senders[channelName] = extraSender @@ -108,6 +122,7 @@ func (e *Dispatch) relaodTpls() error { e.RwLock.Lock() e.tpls = tmpTpls e.Senders = senders + e.CallBacks = callbacks e.RwLock.Unlock() return nil } @@ -243,7 +258,7 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not } // handle event callbacks - sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.userCache, e.taskTplsCache, e.Astats) + e.SendCallbacks(rule, notifyTarget, event) // handle global webhooks sender.SendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats) @@ -252,6 +267,47 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not go sender.MayPluginNotify(e.genNoticeBytes(event), e.notifyConfigCache.GetNotifyScript(), e.Astats) } +func (e *Dispatch) SendCallbacks(rule *models.AlertRule, notifyTarget *NotifyTarget, event *models.AlertCurEvent) { + + uids := notifyTarget.ToUidList() + urls := notifyTarget.ToCallbackList() + for _, urlStr := range urls { + if len(urlStr) == 0 { + continue + } + + cbCtx := sender.BuildCallBackContext(e.ctx, urlStr, rule, []*models.AlertCurEvent{event}, uids, e.userCache, e.Astats) + + if strings.HasPrefix(urlStr, "${ibex}") { + e.CallBacks[models.IbexDomain].CallBack(cbCtx) + continue + } + + if !(strings.HasPrefix(urlStr, "http://") || strings.HasPrefix(urlStr, "https://")) { + cbCtx.CallBackURL = "http://" + urlStr + } + + parsedURL, err := url.Parse(urlStr) + if err != nil { + logger.Errorf("SendCallbacks: failed to url.Parse(urlStr=%s): %v", urlStr, err) + continue + } + + // process feishu card + if parsedURL.Host == models.FeishuDomain && parsedURL.Query().Get("card") == "1" { + e.CallBacks[models.FeishuCardDomain].CallBack(cbCtx) + continue + } + + callBacker, ok := e.CallBacks[parsedURL.Host] + if ok { + callBacker.CallBack(cbCtx) + } else { + e.CallBacks[models.DefaultDomain].CallBack(cbCtx) + } + } +} + type Notice struct { Event *models.AlertCurEvent `json:"event"` Tpls map[string]string `json:"tpls"` diff --git a/alert/dispatch/notify_target.go b/alert/dispatch/notify_target.go index 83ba8d162..84dc7fbbf 100644 --- a/alert/dispatch/notify_target.go +++ b/alert/dispatch/notify_target.go @@ -84,6 +84,14 @@ func (s *NotifyTarget) ToWebhookList() []*models.Webhook { return webhooks } +func (s *NotifyTarget) ToUidList() []int64 { + uids := make([]int64, len(s.userMap)) + for uid, _ := range s.userMap { + uids = append(uids, uid) + } + return uids +} + // Dispatch 抽象由告警事件到信息接收者的路由策略 // rule: 告警规则 // event: 告警事件 diff --git a/alert/sender/callback.go b/alert/sender/callback.go index c61cfc50e..77bd3e635 100644 --- a/alert/sender/callback.go +++ b/alert/sender/callback.go @@ -1,9 +1,8 @@ package sender import ( - "encoding/json" - "fmt" - "strconv" + "html/template" + "net/url" "strings" "time" @@ -13,264 +12,126 @@ import ( "github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/poster" - imodels "github.com/flashcatcloud/ibex/src/models" - "github.com/flashcatcloud/ibex/src/storage" "github.com/toolkits/pkg/logger" ) -func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, - taskTplCache *memsto.TaskTplCache, stats *astats.Stats) { - for _, url := range urls { - if url == "" { - continue - } - - if strings.HasPrefix(url, "${ibex}") { - if !event.IsRecovered { - handleIbex(ctx, url, event, targetCache, userCache, taskTplCache) - } - continue - } - - if !(strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://")) { - url = "http://" + url - } - - stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() - resp, code, err := poster.PostJSON(url, 5*time.Second, event, 3) - if err != nil { - logger.Errorf("event_callback_fail(rule_id=%d url=%s), resp: %s, err: %v, code: %d", event.RuleId, url, string(resp), err, code) - stats.AlertNotifyErrorTotal.WithLabelValues("rule_callback").Inc() - } else { - logger.Infof("event_callback_succ(rule_id=%d url=%s), resp: %s, code: %d", event.RuleId, url, string(resp), code) - } - } -} - -type TaskCreateReply struct { - Err string `json:"err"` - Dat int64 `json:"dat"` // task.id -} - -func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, - taskTplCache *memsto.TaskTplCache) { - if imodels.DB() == nil && ctx.IsCenter { - logger.Warning("event_callback_ibex: db is nil") - return - } - - arr := strings.Split(url, "/") - - var idstr string - var host string - - if len(arr) > 1 { - idstr = arr[1] - } - - if len(arr) > 2 { - host = arr[2] +type ( + // CallBacker 进行回调的接口 + CallBacker interface { + CallBack(ctx CallBackContext) } - id, err := strconv.ParseInt(idstr, 10, 64) - if err != nil { - logger.Errorf("event_callback_ibex: failed to parse url: %s", url) - return + // CallBackContext 回调时所需的上下文 + CallBackContext struct { + Ctx *ctx.Context + CallBackURL string + Users []*models.User + Rule *models.AlertRule + Events []*models.AlertCurEvent + Stats *astats.Stats } - if host == "" { - // 用户在callback url中没有传入host,就从event中解析 - host = event.TargetIdent - } + DefaultCallBacker struct{} +) - if host == "" { - logger.Error("event_callback_ibex: failed to get host") - return - } +func BuildCallBackContext(ctx *ctx.Context, callBackURL string, rule *models.AlertRule, events []*models.AlertCurEvent, + uids []int64, userCache *memsto.UserCacheType, stats *astats.Stats) CallBackContext { + users := userCache.GetByUserIds(uids) - tpl := taskTplCache.Get(id) - if tpl == nil { - logger.Errorf("event_callback_ibex: no such tpl(%d)", id) - return + return CallBackContext{ + Ctx: ctx, + CallBackURL: callBackURL, + Rule: rule, + Events: events, + Users: users, + Stats: stats, } +} - // check perm - // tpl.GroupId - host - account 三元组校验权限 - can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache) +func ExtractAtsParams(rawURL string) []string { + ans := make([]string, 0, 1) + parsedURL, err := url.Parse(rawURL) if err != nil { - logger.Errorf("event_callback_ibex: check perm fail: %v", err) - return + logger.Errorf("ExtractAtsParams(url=%s), err: %v", rawURL, err) + return ans } - if !can { - logger.Errorf("event_callback_ibex: user(%s) no permission", tpl.UpdateBy) - return + queryParams := parsedURL.Query() + atParam := queryParams.Get("ats") + if atParam == "" { + return ans } - tagsMap := make(map[string]string) - for i := 0; i < len(event.TagsJSON); i++ { - pair := strings.TrimSpace(event.TagsJSON[i]) - if pair == "" { - continue - } + // Split the atParam by comma and return the result as a slice + return strings.Split(atParam, ",") +} - arr := strings.Split(pair, "=") - if len(arr) != 2 { - continue +func NewCallBacker( + key string, + targetCache *memsto.TargetCacheType, + userCache *memsto.UserCacheType, + taskTplCache *memsto.TaskTplCache, + tpls map[string]*template.Template, +) CallBacker { + + switch key { + case models.IbexDomain: // Distribute to Ibex + return &IbexCallBacker{ + targetCache: targetCache, + userCache: userCache, + taskTplCache: taskTplCache, } + case models.DefaultDomain: // default callback + return &DefaultCallBacker{} + case models.DingtalkDomain: + return &DingtalkSender{tpl: tpls[models.Dingtalk]} + case models.WecomDomain: + return &WecomSender{tpl: tpls[models.Wecom]} + case models.FeishuDomain: + return &FeishuSender{tpl: tpls[models.Feishu]} + case models.FeishuCardDomain: + return &FeishuCardSender{tpl: tpls[models.FeishuCard]} + //case models.Mm: + // return &MmSender{tpl: tpls[models.Mm]} + case models.TelegramDomain: + return &TelegramSender{tpl: tpls[models.Telegram]} + } + + return nil +} - tagsMap[arr[0]] = arr[1] - } - // 附加告警级别 告警触发值标签 - tagsMap["alert_severity"] = strconv.Itoa(event.Severity) - tagsMap["alert_trigger_value"] = event.TriggerValue - - tags, err := json.Marshal(tagsMap) - if err != nil { - logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v", tagsMap) +func (c *DefaultCallBacker) CallBack(ctx CallBackContext) { + if len(ctx.CallBackURL) == 0 || len(ctx.Events) == 0 { return } - // call ibex - in := models.TaskForm{ - Title: tpl.Title + " FH: " + host, - Account: tpl.Account, - Batch: tpl.Batch, - Tolerance: tpl.Tolerance, - Timeout: tpl.Timeout, - Pause: tpl.Pause, - Script: tpl.Script, - Args: tpl.Args, - Stdin: string(tags), - Action: "start", - Creator: tpl.UpdateBy, - Hosts: []string{host}, - AlertTriggered: true, - } + event := ctx.Events[0] - id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter) + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() + resp, code, err := poster.PostJSON(ctx.CallBackURL, 5*time.Second, event, 3) if err != nil { - logger.Errorf("event_callback_ibex: call ibex fail: %v", err) - return - } - - // write db - record := models.TaskRecord{ - Id: id, - EventId: event.Id, - GroupId: tpl.GroupId, - Title: in.Title, - Account: in.Account, - Batch: in.Batch, - Tolerance: in.Tolerance, - Timeout: in.Timeout, - Pause: in.Pause, - Script: in.Script, - Args: in.Args, - CreateAt: time.Now().Unix(), - CreateBy: in.Creator, - } - - if err = record.Add(ctx); err != nil { - logger.Errorf("event_callback_ibex: persist task_record fail: %v", err) - } -} - -func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) { - user := userCache.GetByUsername(username) - if user != nil && user.IsAdmin() { - return true, nil - } - - target, has := targetCache.Get(host) - if !has { - return false, nil + logger.Errorf("event_callback_fail(rule_id=%d url=%s), resp: %s, err: %v, code: %d", + event.RuleId, ctx.CallBackURL, string(resp), err, code) + ctx.Stats.AlertNotifyErrorTotal.WithLabelValues("rule_callback").Inc() + } else { + logger.Infof("event_callback_succ(rule_id=%d url=%s), resp: %s, code: %d", + event.RuleId, ctx.CallBackURL, string(resp), code) } - - return target.GroupId == tpl.GroupId, nil } -func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) { - hosts := cleanHosts(f.Hosts) - if len(hosts) == 0 { - return 0, fmt.Errorf("arg(hosts) empty") - } +func doSend(url string, body interface{}, channel string, stats *astats.Stats) { + stats.AlertNotifyTotal.WithLabelValues(channel).Inc() - taskMeta := &imodels.TaskMeta{ - Title: f.Title, - Account: f.Account, - Batch: f.Batch, - Tolerance: f.Tolerance, - Timeout: f.Timeout, - Pause: f.Pause, - Script: f.Script, - Args: f.Args, - Stdin: f.Stdin, - Creator: f.Creator, - } - - err := taskMeta.CleanFields() + res, code, err := poster.PostJSON(url, time.Second*5, body, 3) if err != nil { - return 0, err - } - - taskMeta.HandleFH(hosts[0]) - - // 任务类型分为"告警规则触发"和"n9e center用户下发"两种; - // 边缘机房"告警规则触发"的任务不需要规划,并且它可能是失联的,无法使用db资源,所以放入redis缓存中,直接下发给agentd执行 - if !isCenter && f.AlertTriggered { - if err := taskMeta.Create(); err != nil { - // 当网络不连通时,生成唯一的id,防止边缘机房中不同任务的id相同; - // 方法是,redis自增id去防止同一个机房的不同n9e edge生成的id相同; - // 但没法防止不同边缘机房生成同样的id,所以,生成id的数据不会上报存入数据库,只用于闭环执行。 - taskMeta.Id, err = storage.IdGet() - if err != nil { - return 0, err - } - } - - taskHost := imodels.TaskHost{ - Id: taskMeta.Id, - Host: hosts[0], - Status: "running", - } - if err = taskHost.Create(); err != nil { - logger.Warningf("task_add_fail: authUser=%s title=%s err=%s", authUser, taskMeta.Title, err.Error()) - } - - // 缓存任务元信息和待下发的任务 - err = taskMeta.Cache(hosts[0]) - if err != nil { - return 0, err - } - + logger.Errorf("%s_sender: result=fail url=%s code=%d error=%v req:%v response=%s", channel, url, code, err, body, string(res)) + stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc() } else { - // 如果是中心机房,还是保持之前的逻辑 - err = taskMeta.Save(hosts, f.Action) - if err != nil { - return 0, err - } + logger.Infof("%s_sender: result=succ url=%s code=%d req:%v response=%s", channel, url, code, body, string(res)) } - - logger.Infof("task_add_succ: authUser=%s title=%s", authUser, taskMeta.Title) - return taskMeta.Id, nil } -func cleanHosts(formHosts []string) []string { - cnt := len(formHosts) - arr := make([]string, 0, cnt) - for i := 0; i < cnt; i++ { - item := strings.TrimSpace(formHosts[i]) - if item == "" { - continue - } - - if strings.HasPrefix(item, "#") { - continue - } - - arr = append(arr, item) - } - - return arr +type TaskCreateReply struct { + Err string `json:"err"` + Dat int64 `json:"dat"` // task.id } diff --git a/alert/sender/dingtalk.go b/alert/sender/dingtalk.go index 10aaeddca..cde80d24e 100644 --- a/alert/sender/dingtalk.go +++ b/alert/sender/dingtalk.go @@ -1,15 +1,9 @@ package sender import ( + "github.com/ccfos/nightingale/v6/models" "html/template" "strings" - "time" - - "github.com/ccfos/nightingale/v6/alert/astats" - "github.com/ccfos/nightingale/v6/models" - "github.com/ccfos/nightingale/v6/pkg/poster" - - "github.com/toolkits/pkg/logger" ) type dingtalkMarkdown struct { @@ -28,6 +22,10 @@ type dingtalk struct { At dingtalkAt `json:"at"` } +var ( + _ CallBacker = (*DingtalkSender)(nil) +) + type DingtalkSender struct { tpl *template.Template } @@ -72,6 +70,37 @@ func (ds *DingtalkSender) Send(ctx MessageContext) { } } +func (ds *DingtalkSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + + body := dingtalk{ + Msgtype: "markdown", + Markdown: dingtalkMarkdown{ + Title: ctx.Events[0].RuleName, + }, + } + + ats := ExtractAtsParams(ctx.CallBackURL) + message := BuildTplMessage(models.Dingtalk, ds.tpl, ctx.Events) + + if len(ats) > 0 { + body.Markdown.Text = message + "\n@" + strings.Join(ats, "@") + body.At = dingtalkAt{ + AtMobiles: ats, + IsAtAll: false, + } + } else { + // NoAt in url + body.Markdown.Text = message + } + + doSend(ctx.CallBackURL, body, models.Dingtalk, ctx.Stats) + + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() +} + // extract urls and ats from Users func (ds *DingtalkSender) extract(users []*models.User) ([]string, []string) { urls := make([]string, 0, len(users)) @@ -91,15 +120,3 @@ func (ds *DingtalkSender) extract(users []*models.User) ([]string, []string) { } return urls, ats } - -func doSend(url string, body interface{}, channel string, stats *astats.Stats) { - stats.AlertNotifyTotal.WithLabelValues(channel).Inc() - - res, code, err := poster.PostJSON(url, time.Second*5, body, 3) - if err != nil { - logger.Errorf("%s_sender: result=fail url=%s code=%d error=%v req:%v response=%s", channel, url, code, err, body, string(res)) - stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc() - } else { - logger.Infof("%s_sender: result=succ url=%s code=%d req:%v response=%s", channel, url, code, body, string(res)) - } -} diff --git a/alert/sender/feishu.go b/alert/sender/feishu.go index 0fef968ae..13a9f5040 100644 --- a/alert/sender/feishu.go +++ b/alert/sender/feishu.go @@ -1,6 +1,7 @@ package sender import ( + "fmt" "html/template" "strings" @@ -22,10 +23,41 @@ type feishu struct { At feishuAt `json:"at"` } +var ( + _ CallBacker = (*FeishuSender)(nil) +) + type FeishuSender struct { tpl *template.Template } +func (fs *FeishuSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + + ats := ExtractAtsParams(ctx.CallBackURL) + message := BuildTplMessage(models.Feishu, fs.tpl, ctx.Events) + + if len(ats) > 0 { + atTags := "" + for _, at := range ats { + atTags += fmt.Sprintf(" ", at) + } + message = atTags + message + } + + body := feishu{ + Msgtype: "text", + Content: feishuContent{ + Text: message, + }, + } + + doSend(ctx.CallBackURL, body, models.Feishu, ctx.Stats) + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() +} + func (fs *FeishuSender) Send(ctx MessageContext) { if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return diff --git a/alert/sender/feishucard.go b/alert/sender/feishucard.go index 6f104e150..bff199250 100644 --- a/alert/sender/feishucard.go +++ b/alert/sender/feishucard.go @@ -3,6 +3,7 @@ package sender import ( "fmt" "html/template" + "net/url" "strings" "github.com/ccfos/nightingale/v6/models" @@ -91,6 +92,37 @@ var ( } ) +func (fs *FeishuCardSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + + message := BuildTplMessage(models.FeishuCard, fs.tpl, ctx.Events) + color := "red" + lowerUnicode := strings.ToLower(message) + if strings.Count(lowerUnicode, Recovered) > 0 && strings.Count(lowerUnicode, Triggered) > 0 { + color = "orange" + } else if strings.Count(lowerUnicode, Recovered) > 0 { + color = "green" + } + + SendTitle := fmt.Sprintf("🔔 %s", ctx.Events[0].RuleName) + body.Card.Header.Title.Content = SendTitle + body.Card.Header.Template = color + body.Card.Elements[0].Text.Content = message + body.Card.Elements[2].Elements[0].Content = SendTitle + + // This is to be compatible with the feishucard interface, if with query string parameters, the request will fail + // Remove query parameters from the URL, + parsedURL, err := url.Parse(ctx.CallBackURL) + if err != nil { + return + } + parsedURL.RawQuery = "" + + doSend(parsedURL.String(), body, models.FeishuCard, ctx.Stats) +} + func (fs *FeishuCardSender) Send(ctx MessageContext) { if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return diff --git a/alert/sender/ibex.go b/alert/sender/ibex.go new file mode 100644 index 000000000..9b2dca26c --- /dev/null +++ b/alert/sender/ibex.go @@ -0,0 +1,265 @@ +// @Author: Ciusyan 6/5/24 + +package sender + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/ccfos/nightingale/v6/memsto" + "github.com/ccfos/nightingale/v6/models" + "github.com/ccfos/nightingale/v6/pkg/ctx" + imodels "github.com/flashcatcloud/ibex/src/models" + "github.com/flashcatcloud/ibex/src/storage" + + "github.com/toolkits/pkg/logger" +) + +var ( + _ CallBacker = (*IbexCallBacker)(nil) +) + +type IbexCallBacker struct { + targetCache *memsto.TargetCacheType + userCache *memsto.UserCacheType + taskTplCache *memsto.TaskTplCache +} + +func (c *IbexCallBacker) CallBack(ctx CallBackContext) { + if len(ctx.CallBackURL) == 0 || len(ctx.Events) == 0 { + return + } + + event := ctx.Events[0] + + if event.IsRecovered { + return + } + + c.handleIbex(ctx.Ctx, ctx.CallBackURL, event) +} + +func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent) { + if imodels.DB() == nil && ctx.IsCenter { + logger.Warning("event_callback_ibex: db is nil") + return + } + + arr := strings.Split(url, "/") + + var idstr string + var host string + + if len(arr) > 1 { + idstr = arr[1] + } + + if len(arr) > 2 { + host = arr[2] + } + + id, err := strconv.ParseInt(idstr, 10, 64) + if err != nil { + logger.Errorf("event_callback_ibex: failed to parse url: %s", url) + return + } + + if host == "" { + // 用户在callback url中没有传入host,就从event中解析 + host = event.TargetIdent + } + + if host == "" { + logger.Error("event_callback_ibex: failed to get host") + return + } + + tpl := c.taskTplCache.Get(id) + if tpl == nil { + logger.Errorf("event_callback_ibex: no such tpl(%d)", id) + return + } + + // check perm + // tpl.GroupId - host - account 三元组校验权限 + can, err := canDoIbex(tpl.UpdateBy, tpl, host, c.targetCache, c.userCache) + if err != nil { + logger.Errorf("event_callback_ibex: check perm fail: %v", err) + return + } + + if !can { + logger.Errorf("event_callback_ibex: user(%s) no permission", tpl.UpdateBy) + return + } + + tagsMap := make(map[string]string) + for i := 0; i < len(event.TagsJSON); i++ { + pair := strings.TrimSpace(event.TagsJSON[i]) + if pair == "" { + continue + } + + arr := strings.Split(pair, "=") + if len(arr) != 2 { + continue + } + + tagsMap[arr[0]] = arr[1] + } + // 附加告警级别 告警触发值标签 + tagsMap["alert_severity"] = strconv.Itoa(event.Severity) + tagsMap["alert_trigger_value"] = event.TriggerValue + + tags, err := json.Marshal(tagsMap) + if err != nil { + logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v", tagsMap) + return + } + + // call ibex + in := models.TaskForm{ + Title: tpl.Title + " FH: " + host, + Account: tpl.Account, + Batch: tpl.Batch, + Tolerance: tpl.Tolerance, + Timeout: tpl.Timeout, + Pause: tpl.Pause, + Script: tpl.Script, + Args: tpl.Args, + Stdin: string(tags), + Action: "start", + Creator: tpl.UpdateBy, + Hosts: []string{host}, + AlertTriggered: true, + } + + id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter) + if err != nil { + logger.Errorf("event_callback_ibex: call ibex fail: %v", err) + return + } + + // write db + record := models.TaskRecord{ + Id: id, + EventId: event.Id, + GroupId: tpl.GroupId, + Title: in.Title, + Account: in.Account, + Batch: in.Batch, + Tolerance: in.Tolerance, + Timeout: in.Timeout, + Pause: in.Pause, + Script: in.Script, + Args: in.Args, + CreateAt: time.Now().Unix(), + CreateBy: in.Creator, + } + + if err = record.Add(ctx); err != nil { + logger.Errorf("event_callback_ibex: persist task_record fail: %v", err) + } +} + +func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) { + user := userCache.GetByUsername(username) + if user != nil && user.IsAdmin() { + return true, nil + } + + target, has := targetCache.Get(host) + if !has { + return false, nil + } + + return target.GroupId == tpl.GroupId, nil +} + +func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) { + hosts := cleanHosts(f.Hosts) + if len(hosts) == 0 { + return 0, fmt.Errorf("arg(hosts) empty") + } + + taskMeta := &imodels.TaskMeta{ + Title: f.Title, + Account: f.Account, + Batch: f.Batch, + Tolerance: f.Tolerance, + Timeout: f.Timeout, + Pause: f.Pause, + Script: f.Script, + Args: f.Args, + Stdin: f.Stdin, + Creator: f.Creator, + } + + err := taskMeta.CleanFields() + if err != nil { + return 0, err + } + + taskMeta.HandleFH(hosts[0]) + + // 任务类型分为"告警规则触发"和"n9e center用户下发"两种; + // 边缘机房"告警规则触发"的任务不需要规划,并且它可能是失联的,无法使用db资源,所以放入redis缓存中,直接下发给agentd执行 + if !isCenter && f.AlertTriggered { + if err := taskMeta.Create(); err != nil { + // 当网络不连通时,生成唯一的id,防止边缘机房中不同任务的id相同; + // 方法是,redis自增id去防止同一个机房的不同n9e edge生成的id相同; + // 但没法防止不同边缘机房生成同样的id,所以,生成id的数据不会上报存入数据库,只用于闭环执行。 + taskMeta.Id, err = storage.IdGet() + if err != nil { + return 0, err + } + } + + taskHost := imodels.TaskHost{ + Id: taskMeta.Id, + Host: hosts[0], + Status: "running", + } + if err = taskHost.Create(); err != nil { + logger.Warningf("task_add_fail: authUser=%s title=%s err=%s", authUser, taskMeta.Title, err.Error()) + } + + // 缓存任务元信息和待下发的任务 + err = taskMeta.Cache(hosts[0]) + if err != nil { + return 0, err + } + + } else { + // 如果是中心机房,还是保持之前的逻辑 + err = taskMeta.Save(hosts, f.Action) + if err != nil { + return 0, err + } + } + + logger.Infof("task_add_succ: authUser=%s title=%s", authUser, taskMeta.Title) + return taskMeta.Id, nil +} + +func cleanHosts(formHosts []string) []string { + cnt := len(formHosts) + arr := make([]string, 0, cnt) + for i := 0; i < cnt; i++ { + item := strings.TrimSpace(formHosts[i]) + if item == "" { + continue + } + + if strings.HasPrefix(item, "#") { + continue + } + + arr = append(arr, item) + } + + return arr +} diff --git a/alert/sender/mm.go b/alert/sender/mm.go index 747357892..93ff90778 100644 --- a/alert/sender/mm.go +++ b/alert/sender/mm.go @@ -45,6 +45,21 @@ func (ms *MmSender) Send(ctx MessageContext) { }) } +func (ms *MmSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + message := BuildTplMessage(models.Mm, ms.tpl, ctx.Events) + + SendMM(MatterMostMessage{ + Text: message, + Tokens: []string{ctx.CallBackURL}, + Stats: ctx.Stats, + }) + + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() +} + func (ms *MmSender) extract(users []*models.User) []string { tokens := make([]string, 0, len(users)) for _, user := range users { diff --git a/alert/sender/telegram.go b/alert/sender/telegram.go index abb416f79..d1730282c 100644 --- a/alert/sender/telegram.go +++ b/alert/sender/telegram.go @@ -21,10 +21,29 @@ type telegram struct { Text string `json:"text"` } +var ( + _ CallBacker = (*TelegramSender)(nil) +) + type TelegramSender struct { tpl *template.Template } +func (ts *TelegramSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + + message := BuildTplMessage(models.Telegram, ts.tpl, ctx.Events) + SendTelegram(TelegramMessage{ + Text: message, + Tokens: []string{ctx.CallBackURL}, + Stats: ctx.Stats, + }) + + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() +} + func (ts *TelegramSender) Send(ctx MessageContext) { if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return diff --git a/alert/sender/wecom.go b/alert/sender/wecom.go index 4c91ab26b..d36d02dca 100644 --- a/alert/sender/wecom.go +++ b/alert/sender/wecom.go @@ -16,10 +16,31 @@ type wecom struct { Markdown wecomMarkdown `json:"markdown"` } +var ( + _ CallBacker = (*WecomSender)(nil) +) + type WecomSender struct { tpl *template.Template } +func (ws *WecomSender) CallBack(ctx CallBackContext) { + if len(ctx.Events) == 0 || len(ctx.CallBackURL) == 0 { + return + } + + message := BuildTplMessage(models.Wecom, ws.tpl, ctx.Events) + body := wecom{ + Msgtype: "markdown", + Markdown: wecomMarkdown{ + Content: message, + }, + } + + doSend(ctx.CallBackURL, body, models.Wecom, ctx.Stats) + ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc() +} + func (ws *WecomSender) Send(ctx MessageContext) { if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return diff --git a/models/alert_subscribe.go b/models/alert_subscribe.go index 24c754a91..63f2f1cbc 100644 --- a/models/alert_subscribe.go +++ b/models/alert_subscribe.go @@ -394,6 +394,10 @@ func (s *AlertSubscribe) ModifyEvent(event *AlertCurEvent) { if s.RedefineWebhooks == 1 { event.Callbacks = s.Webhooks event.CallbacksJSON = s.WebhooksJson + } else { + // 将 callback 重置为空,防止事件被订阅之后,再次将事件发送给回调地址 + event.Callbacks = "" + event.CallbacksJSON = []string{} } event.NotifyGroups = s.UserGroupIds diff --git a/models/user.go b/models/user.go index e0b9d8110..00fcf89b7 100644 --- a/models/user.go +++ b/models/user.go @@ -33,6 +33,16 @@ const ( FeishuKey = "feishu_robot_token" MmKey = "mm_webhook_url" TelegramKey = "telegram_robot_token" + + DingtalkDomain = "oapi.dingtalk.com" + WecomDomain = "qyapi.weixin.qq.com" + FeishuDomain = "open.feishu.cn" + + // FeishuCardDomain The domain name of the feishu card is the same as the feishu,distinguished by the parameter + FeishuCardDomain = "open.feishu.cn?card=1" + TelegramDomain = "api.telegram.org" + IbexDomain = "ibex" + DefaultDomain = "default" ) var (