diff --git a/packages/api/src/models/alert.ts b/packages/api/src/models/alert.ts index 5ec369156..ba5e765be 100644 --- a/packages/api/src/models/alert.ts +++ b/packages/api/src/models/alert.ts @@ -1,6 +1,7 @@ import mongoose, { Schema } from 'mongoose'; import type { ObjectId } from '.'; +import { Chart } from './dashboard'; export type AlertType = 'presence' | 'absence'; @@ -27,8 +28,29 @@ export type AlertChannel = { webhookId: string; }; -export type AlertSource = 'LOG' | 'CHART'; +export type AlertSource = 'LOG' | 'CHART' | 'CUSTOM'; +export type AlertCustomConfig = Pick; + +export enum CheckerType { + Anomaly = 'anomaly', + Threshold = 'threshold', +} + +interface AnomalyConfig { + models?: AnomalyModel[]; + mode?: 'any' | 'combined'; +} + +export interface AnomalyModel { + name: string; + enabled: boolean; + params: { + [key: string]: unknown; + }; +} + +export type CheckerConfig = AnomalyConfig; export interface IAlert { _id: ObjectId; channel: AlertChannel; @@ -59,10 +81,35 @@ export interface IAlert { at: Date; until: Date; }; + + customConfig?: AlertCustomConfig; + historyWindow?: number; // in minutes + + checker?: { + type: CheckerType; + config?: CheckerConfig; + }; } export type AlertDocument = mongoose.HydratedDocument; +interface IChecker { + type: CheckerType; + config?: CheckerConfig; +} + +const checkerSchema = new Schema({ + type: { + type: String, + enum: Object.values(CheckerType), + required: true, + }, + config: { + type: Schema.Types.Mixed, + required: false, + }, +}); + const AlertSchema = new Schema( { type: { @@ -151,10 +198,30 @@ const AlertSchema = new Schema( required: false, }, }, + customConfig: { + type: Schema.Types.Mixed, + required: false, + }, + historyWindow: { + type: Number, + required: false, + }, + checker: { + type: checkerSchema, + required: false, + default: { + type: CheckerType.Threshold, + }, + }, }, { timestamps: true, }, ); +AlertSchema.index({ + state: 1, + checker: 1, +}); + export default mongoose.model('Alert', AlertSchema); diff --git a/packages/api/src/models/dashboard.ts b/packages/api/src/models/dashboard.ts index 349d1021f..2e1b10d42 100644 --- a/packages/api/src/models/dashboard.ts +++ b/packages/api/src/models/dashboard.ts @@ -18,7 +18,7 @@ type NumberFormat = { unit?: string; }; -type Chart = { +export type Chart = { id: string; name: string; x: number; diff --git a/packages/api/src/tasks/__tests__/checkAlerts.test.ts b/packages/api/src/tasks/__tests__/checkAlerts.test.ts index a3504c2a6..a0b89405b 100644 --- a/packages/api/src/tasks/__tests__/checkAlerts.test.ts +++ b/packages/api/src/tasks/__tests__/checkAlerts.test.ts @@ -7,6 +7,7 @@ import { mockLogsPropertyTypeMappingsModel, mockSpyMetricPropertyTypeMappingsModel, } from '@/fixtures'; +import * as alertsUtils from '@/tasks/alerts/utils'; import { LogType } from '@/utils/logParser'; import * as clickhouse from '../../clickhouse'; @@ -17,20 +18,7 @@ import Dashboard from '../../models/dashboard'; import LogView from '../../models/logView'; import Webhook from '../../models/webhook'; import * as slack from '../../utils/slack'; -import * as checkAlert from '../checkAlerts'; -import { - buildAlertMessageTemplateHdxLink, - buildAlertMessageTemplateTitle, - buildLogSearchLink, - doesExceedThreshold, - escapeJsonString, - expandToNestedObject, - getDefaultExternalAction, - processAlert, - renderAlertTemplate, - roundDownToXMinutes, - translateExternalActionsToInternal, -} from '../checkAlerts'; +import { processAlert } from '../alerts/checkUserAlerts'; describe('checkAlerts', () => { afterAll(async () => { @@ -39,7 +27,7 @@ describe('checkAlerts', () => { it('roundDownToXMinutes', () => { // 1 min - const roundDownTo1Minute = roundDownToXMinutes(1); + const roundDownTo1Minute = alertsUtils.roundDownToXMinutes(1); expect( roundDownTo1Minute(new Date('2023-03-17T22:13:03.103Z')).toISOString(), ).toBe('2023-03-17T22:13:00.000Z'); @@ -48,7 +36,7 @@ describe('checkAlerts', () => { ).toBe('2023-03-17T22:13:00.000Z'); // 5 mins - const roundDownTo5Minutes = roundDownToXMinutes(5); + const roundDownTo5Minutes = alertsUtils.roundDownToXMinutes(5); expect( roundDownTo5Minutes(new Date('2023-03-17T22:13:03.103Z')).toISOString(), ).toBe('2023-03-17T22:10:00.000Z'); @@ -62,7 +50,7 @@ describe('checkAlerts', () => { it('buildLogSearchLink', () => { expect( - buildLogSearchLink({ + alertsUtils.buildLogSearchLink({ startTime: new Date('2023-03-17T22:13:03.103Z'), endTime: new Date('2023-03-17T22:13:59.103Z'), logViewId: '123', @@ -71,7 +59,7 @@ describe('checkAlerts', () => { 'http://localhost:9090/search/123?from=1679091183103&to=1679091239103', ); expect( - buildLogSearchLink({ + alertsUtils.buildLogSearchLink({ startTime: new Date('2023-03-17T22:13:03.103Z'), endTime: new Date('2023-03-17T22:13:59.103Z'), logViewId: '123', @@ -83,25 +71,27 @@ describe('checkAlerts', () => { }); it('doesExceedThreshold', () => { - expect(doesExceedThreshold(true, 10, 11)).toBe(true); - expect(doesExceedThreshold(true, 10, 10)).toBe(true); - expect(doesExceedThreshold(false, 10, 9)).toBe(true); - expect(doesExceedThreshold(false, 10, 10)).toBe(false); + expect(alertsUtils.doesExceedThreshold(true, 10, 11)).toBe(true); + expect(alertsUtils.doesExceedThreshold(true, 10, 10)).toBe(true); + expect(alertsUtils.doesExceedThreshold(false, 10, 9)).toBe(true); + expect(alertsUtils.doesExceedThreshold(false, 10, 10)).toBe(false); }); it('expandToNestedObject', () => { - expect(expandToNestedObject({}).__proto__).toBeUndefined(); - expect(expandToNestedObject({})).toEqual({}); - expect(expandToNestedObject({ foo: 'bar' })).toEqual({ foo: 'bar' }); - expect(expandToNestedObject({ 'foo.bar': 'baz' })).toEqual({ + expect(alertsUtils.expandToNestedObject({}).__proto__).toBeUndefined(); + expect(alertsUtils.expandToNestedObject({})).toEqual({}); + expect(alertsUtils.expandToNestedObject({ foo: 'bar' })).toEqual({ + foo: 'bar', + }); + expect(alertsUtils.expandToNestedObject({ 'foo.bar': 'baz' })).toEqual({ foo: { bar: 'baz' }, }); - expect(expandToNestedObject({ 'foo.bar.baz': 'qux' })).toEqual({ + expect(alertsUtils.expandToNestedObject({ 'foo.bar.baz': 'qux' })).toEqual({ foo: { bar: { baz: 'qux' } }, }); // mix expect( - expandToNestedObject({ + alertsUtils.expandToNestedObject({ 'foo.bar.baz': 'qux', 'foo.bar.quux': 'quuz', 'foo1.bar1.baz1': 'qux1', @@ -112,13 +102,16 @@ describe('checkAlerts', () => { }); // overwriting expect( - expandToNestedObject({ 'foo.bar.baz': 'qux', 'foo.bar': 'quuz' }), + alertsUtils.expandToNestedObject({ + 'foo.bar.baz': 'qux', + 'foo.bar': 'quuz', + }), ).toEqual({ foo: { bar: 'quuz' }, }); // max depth expect( - expandToNestedObject( + alertsUtils.expandToNestedObject( { 'foo.bar.baz.qux.quuz.quux': 'qux', }, @@ -131,15 +124,15 @@ describe('checkAlerts', () => { }); it('escapeJsonString', () => { - expect(escapeJsonString('foo')).toBe('foo'); - expect(escapeJsonString("foo'")).toBe("foo'"); - expect(escapeJsonString('foo"')).toBe('foo\\"'); - expect(escapeJsonString('foo\\')).toBe('foo\\\\'); - expect(escapeJsonString('foo\n')).toBe('foo\\n'); - expect(escapeJsonString('foo\r')).toBe('foo\\r'); - expect(escapeJsonString('foo\t')).toBe('foo\\t'); - expect(escapeJsonString('foo\b')).toBe('foo\\b'); - expect(escapeJsonString('foo\f')).toBe('foo\\f'); + expect(alertsUtils.escapeJsonString('foo')).toBe('foo'); + expect(alertsUtils.escapeJsonString("foo'")).toBe("foo'"); + expect(alertsUtils.escapeJsonString('foo"')).toBe('foo\\"'); + expect(alertsUtils.escapeJsonString('foo\\')).toBe('foo\\\\'); + expect(alertsUtils.escapeJsonString('foo\n')).toBe('foo\\n'); + expect(alertsUtils.escapeJsonString('foo\r')).toBe('foo\\r'); + expect(alertsUtils.escapeJsonString('foo\t')).toBe('foo\\t'); + expect(alertsUtils.escapeJsonString('foo\b')).toBe('foo\\b'); + expect(alertsUtils.escapeJsonString('foo\f')).toBe('foo\\f'); }); describe('Alert Templates', () => { @@ -207,22 +200,26 @@ describe('checkAlerts', () => { }); it('buildAlertMessageTemplateHdxLink', () => { - expect(buildAlertMessageTemplateHdxLink(defaultSearchView)).toBe( + expect( + alertsUtils.buildAlertMessageTemplateHdxLink(defaultSearchView), + ).toBe( 'http://localhost:9090/search/id-123?from=1679091183103&to=1679091239103&q=level%3Aerror+span_name%3A%22http%22', ); - expect(buildAlertMessageTemplateHdxLink(defaultChartView)).toBe( + expect( + alertsUtils.buildAlertMessageTemplateHdxLink(defaultChartView), + ).toBe( 'http://localhost:9090/dashboards/id-123?from=1679089083103&granularity=5+minute&to=1679093339103', ); }); it('buildAlertMessageTemplateTitle', () => { expect( - buildAlertMessageTemplateTitle({ + alertsUtils.buildAlertMessageTemplateTitle({ view: defaultSearchView, }), ).toBe('Alert for "My Search" - 10 lines found'); expect( - buildAlertMessageTemplateTitle({ + alertsUtils.buildAlertMessageTemplateTitle({ view: defaultChartView, }), ).toBe('Alert for "My Chart" in "My Dashboard" - 5 falls below 10'); @@ -230,7 +227,7 @@ describe('checkAlerts', () => { it('getDefaultExternalAction', () => { expect( - getDefaultExternalAction({ + alertsUtils.getDefaultExternalAction({ channel: { type: 'slack_webhook', webhookId: '123', @@ -238,7 +235,7 @@ describe('checkAlerts', () => { } as any), ).toBe('@slack_webhook-123'); expect( - getDefaultExternalAction({ + alertsUtils.getDefaultExternalAction({ channel: { type: 'foo', }, @@ -249,14 +246,14 @@ describe('checkAlerts', () => { it('translateExternalActionsToInternal', () => { // normal expect( - translateExternalActionsToInternal('@slack_webhook-123'), + alertsUtils.translateExternalActionsToInternal('@slack_webhook-123'), ).toMatchInlineSnapshot( `"{{__hdx_notify_channel__ channel=\\"slack_webhook\\" id=\\"123\\"}}"`, ); // with multiple breaks expect( - translateExternalActionsToInternal(` + alertsUtils.translateExternalActionsToInternal(` @slack_webhook-123 `), @@ -268,35 +265,41 @@ describe('checkAlerts', () => { // with body string expect( - translateExternalActionsToInternal('blabla @action-id'), + alertsUtils.translateExternalActionsToInternal('blabla @action-id'), ).toMatchInlineSnapshot( `"blabla {{__hdx_notify_channel__ channel=\\"action\\" id=\\"id\\"}}"`, ); // multiple actions expect( - translateExternalActionsToInternal('blabla @action-id @action2-id2'), + alertsUtils.translateExternalActionsToInternal( + 'blabla @action-id @action2-id2', + ), ).toMatchInlineSnapshot( `"blabla {{__hdx_notify_channel__ channel=\\"action\\" id=\\"id\\"}} {{__hdx_notify_channel__ channel=\\"action2\\" id=\\"id2\\"}}"`, ); // id with special characters expect( - translateExternalActionsToInternal('send @email-mike@hyperdx.io'), + alertsUtils.translateExternalActionsToInternal( + 'send @email-mike@hyperdx.io', + ), ).toMatchInlineSnapshot( `"send {{__hdx_notify_channel__ channel=\\"email\\" id=\\"mike@hyperdx.io\\"}}"`, ); // id with multiple dashes expect( - translateExternalActionsToInternal('@action-id-with-multiple-dashes'), + alertsUtils.translateExternalActionsToInternal( + '@action-id-with-multiple-dashes', + ), ).toMatchInlineSnapshot( `"{{__hdx_notify_channel__ channel=\\"action\\" id=\\"id-with-multiple-dashes\\"}}"`, ); // custom template id expect( - translateExternalActionsToInternal('@action-{{action_id}}'), + alertsUtils.translateExternalActionsToInternal('@action-{{action_id}}'), ).toMatchInlineSnapshot( `"{{__hdx_notify_channel__ channel=\\"action\\" id=\\"{{action_id}}\\"}}"`, ); @@ -327,7 +330,7 @@ describe('checkAlerts', () => { name: 'My_Webhook', }).save(); - await renderAlertTemplate({ + await alertsUtils.renderAlertTemplate({ template: 'Custom body @slack_webhook-My_Web', // partial name should work view: { ...defaultSearchView, @@ -377,7 +380,7 @@ describe('checkAlerts', () => { name: 'My_Webhook', }).save(); - await renderAlertTemplate({ + await alertsUtils.renderAlertTemplate({ template: 'Custom body @slack_webhook-My_Web', // partial name should work view: { ...defaultSearchView, @@ -449,7 +452,7 @@ describe('checkAlerts', () => { name: 'My_Webhook', }).save(); - await renderAlertTemplate({ + await alertsUtils.renderAlertTemplate({ template: 'Custom body @slack_webhook-{{attributes.webhookName}}', // partial name should work view: { ...defaultSearchView, @@ -528,7 +531,7 @@ describe('checkAlerts', () => { name: 'Another_Webhook', }).save(); - await renderAlertTemplate({ + await alertsUtils.renderAlertTemplate({ template: ` {{#is_match "attributes.k8s.pod.name" "otel-collector-123"}} Runbook URL: {{attributes.runbook.url}} @@ -565,7 +568,7 @@ describe('checkAlerts', () => { }); // @slack_webhook should not be called - await renderAlertTemplate({ + await alertsUtils.renderAlertTemplate({ template: '{{#is_match "attributes.host" "web"}} @slack_webhook-My_Web {{/is_match}}', // partial name should work view: { @@ -1194,7 +1197,7 @@ describe('checkAlerts', () => { }); it('LOG alert - generic webhook', async () => { - jest.spyOn(checkAlert, 'handleSendGenericWebhook'); + jest.spyOn(alertsUtils, 'handleSendGenericWebhook'); jest .spyOn(clickhouse, 'checkAlert') .mockResolvedValueOnce({ @@ -1318,7 +1321,7 @@ describe('checkAlerts', () => { }); it('CHART alert (logs table series) - generic webhook', async () => { - jest.spyOn(checkAlert, 'handleSendGenericWebhook'); + jest.spyOn(alertsUtils, 'handleSendGenericWebhook'); mockLogsPropertyTypeMappingsModel({ runId: 'string', }); @@ -1480,7 +1483,7 @@ describe('checkAlerts', () => { const runId = Math.random().toString(); // dedup watch mode runs const teamId = team._id.toString(); - jest.spyOn(checkAlert, 'handleSendGenericWebhook'); + jest.spyOn(alertsUtils, 'handleSendGenericWebhook'); const fetchMock = jest.fn().mockResolvedValue({}); global.fetch = fetchMock; diff --git a/packages/api/src/tasks/alerts/checkAnomalyAlerts.ts b/packages/api/src/tasks/alerts/checkAnomalyAlerts.ts new file mode 100644 index 000000000..428cc06b7 --- /dev/null +++ b/packages/api/src/tasks/alerts/checkAnomalyAlerts.ts @@ -0,0 +1,291 @@ +// -------------------------------------------------------- +// -------------- EXECUTE EVERY MINUTE -------------------- +// -------------------------------------------------------- +import { recordException } from '@hyperdx/node-opentelemetry'; +import * as fns from 'date-fns'; +import ms from 'ms'; + +import * as clickhouse from '@/clickhouse'; +import Alert, { AlertDocument, AlertState, CheckerType } from '@/models/alert'; +import AlertHistory, { IAlertHistory } from '@/models/alertHistory'; +import type { Chart } from '@/models/dashboard'; +import Dashboard from '@/models/dashboard'; +import Team from '@/models/team'; +import { fireChannelEvent, roundDownToXMinutes } from '@/tasks/alerts/utils'; +import logger from '@/utils/logger'; +import { detectAnomaly } from '@/utils/miner'; + +const AVAILABILITY_THRESHOLD = 0.5; +const COUNT_THRESHOLD = 10; + +export const processAlert = async (now: Date, alert: AlertDocument) => { + try { + if (alert.source !== 'CUSTOM' && alert.source !== 'CHART') { + throw new Error(`Unsupported alert source: ${alert.source}`); + } + + if (alert.checker?.type !== CheckerType.Anomaly) { + throw new Error(`Unsupported checker type: ${alert.checker?.type}`); + } + + if (!alert.customConfig && alert.source === 'CUSTOM') { + throw new Error( + 'Custom query config is required for custom alerts and is missing', + ); + } + + if (!alert.historyWindow) { + throw new Error( + 'History window is required for anomaly alerts and is missing', + ); + } + + // remove these checks once we support multiple series + if ( + alert.customConfig && + alert.source === 'CUSTOM' && + alert.customConfig.series.length != 1 + ) { + throw new Error( + 'Custom alerts currently only support single series custom queries', + ); + } + + const previous: IAlertHistory | undefined = ( + await AlertHistory.find({ alert: alert._id }) + .sort({ createdAt: -1 }) + .limit(1) + )[0]; + + const windowSizeInMins = ms(alert.interval) / 60000; + + const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now); + if ( + previous && + fns.getTime(previous.createdAt) === fns.getTime(nowInMinsRoundDown) + ) { + logger.info({ + message: `Skipped to check alert since the time diff is still less than 1 window size`, + windowSizeInMins, + nowInMinsRoundDown, + previous, + now, + alert, + }); + return; + } + + let series: Chart['series'] = []; + let dashboard; + + if (alert.source === 'CUSTOM' && alert.customConfig) { + series = alert.customConfig.series; + } else if (alert.source === 'CHART' && alert.dashboardId && alert.chartId) { + dashboard = await Dashboard.findOne( + { + _id: alert.dashboardId, + 'charts.id': alert.chartId, + }, + { + name: 1, + charts: { + $elemMatch: { + id: alert.chartId, + }, + }, + }, + ); + + if ( + dashboard && + Array.isArray(dashboard.charts) && + dashboard.charts.length === 1 + ) { + const chart = dashboard.charts[0]; + series = chart.series; + } else { + throw new Error( + `Chart with id ${alert.chartId} not found in dashboard with id ${alert.dashboardId}`, + ); + } + } else { + throw new Error( + `Invalid alert source type for anomaly alert: ${alert.source}`, + ); + } + + // remove this check once we support multiple series + // this should cover both missing and multiple series for both CUSTOM and CHART sources + if (series.length != 1) { + throw new Error( + `Anomaly Alerts currently only support single series for both CUSTOM and CHART sources but got ${series.length} series`, + ); + } + + const startTime = fns.subMinutes(nowInMinsRoundDown, alert.historyWindow); + const endTime = nowInMinsRoundDown; + + const team = await Team.findById(alert.team); + + if (team == null) { + throw new Error('Team not found'); + } + + const startTimeMs = fns.getTime(startTime); + const endTimeMs = fns.getTime(endTime); + + const alertData = ( + await clickhouse.getMultiSeriesChart({ + series: series, + teamId: alert.team.toString(), + startTime: startTimeMs, + endTime: endTimeMs, + tableVersion: team?.logStreamTableVersion, + granularity: `${windowSizeInMins} minute`, + maxNumGroups: 123456789, // arbitrarily large number + seriesReturnType: clickhouse.SeriesReturnType.Column, + }) + ).data; + + if (!alertData || alertData.length === 0) { + logger.info({ + message: 'No data found for alert', + alert, + startTime, + endTime, + }); + return; + } + + const currentWindowData = alertData[alertData.length - 1]; + const totalCount = currentWindowData['series_0.data']; + const bucketStart = new Date(currentWindowData['ts_bucket'] * 1000); + + let alertState = AlertState.OK; + const history = await new AlertHistory({ + alert: alert._id, + createdAt: nowInMinsRoundDown, + counts: totalCount, + state: alertState, + }).save(); + + const formattedAlertData = alertData.map(row => { + return { + ts_bucket: row.ts_bucket, + count: row['series_0.data'], // only single series support for anom currently + }; + }); + + const dataAvailability = + alertData.filter(row => row['series_0.data'] > 0).length / + alertData.length; + + const isLowDataAvailability = dataAvailability < AVAILABILITY_THRESHOLD; + const isLowCurrentWindowCount = totalCount < COUNT_THRESHOLD; + + if (isLowDataAvailability) { + logger.info({ + message: 'Skipping alert due to low data availability', + dataAvailability, + alert, + alertData, + currentWindowData, + }); + } + + if (isLowCurrentWindowCount) { + logger.info({ + message: 'Skipping alert due to low current window count', + totalCount, + alert, + alertData, + currentWindowData, + }); + } + + if (!isLowDataAvailability && !isLowCurrentWindowCount) { + const detectAnomalyResult = await detectAnomaly( + formattedAlertData, + formattedAlertData[formattedAlertData.length - 1], + undefined, + alert?.checker?.config, + ); + + logger.info({ + message: 'Anomaly detection results', + alert, + detectAnomalyResult, + }); + + if (detectAnomalyResult.is_anomalous) { + alertState = AlertState.ALERT; + logger.info({ + message: `Triggering ${alert.channel.type} alarm!`, + alert, + totalCount, + detectAnomalyResult, + }); + + try { + await fireChannelEvent({ + alert, + attributes: {}, + endTime: fns.addMinutes(bucketStart, windowSizeInMins), + startTime: bucketStart, + totalCount, + windowSizeInMins, + team, + logView: null, + dashboard: dashboard, + group: undefined, // TODO: add group support for anomaly alerts + }); + } catch (e) { + void recordException(e, { + mechanism: { + handled: false, + }, + attributes: { + 'hyperdx.alert.id': alert.id, + }, + }); + } + } + } + + history.lastValues.push({ + count: totalCount, + startTime: bucketStart, + }); + + history.state = alertState; + await history.save(); + + alert.state = alertState; + await alert.save(); + } catch (e) { + // Uncomment this for better error messages locally + // console.error(e); + void recordException(e, { + mechanism: { + handled: false, + }, + attributes: { + 'hyperdx.alert.id': alert.id, + }, + }); + } +}; + +export default async () => { + logger.info('Checking anomaly alerts'); + + const now = new Date(); + + const alerts = await Alert.find({ + state: { $ne: AlertState.DISABLED }, + 'checker.type': CheckerType.Anomaly, + }); + + logger.info(`Going to process ${alerts.length} anomaly alerts`); + await Promise.all(alerts.map(alert => processAlert(now, alert))); +}; diff --git a/packages/api/src/tasks/alerts/checkUserAlerts.ts b/packages/api/src/tasks/alerts/checkUserAlerts.ts new file mode 100644 index 000000000..dd97326cc --- /dev/null +++ b/packages/api/src/tasks/alerts/checkUserAlerts.ts @@ -0,0 +1,277 @@ +// -------------------------------------------------------- +// -------------- EXECUTE EVERY MINUTE -------------------- +// -------------------------------------------------------- +import { recordException } from '@hyperdx/node-opentelemetry'; +import * as fns from 'date-fns'; +import { isString } from 'lodash'; +import ms from 'ms'; +import { z } from 'zod'; + +import * as clickhouse from '@/clickhouse'; +import Alert, { AlertDocument, AlertState, CheckerType } from '@/models/alert'; +import AlertHistory, { IAlertHistory } from '@/models/alertHistory'; +import Dashboard from '@/models/dashboard'; +import Team, { ITeam } from '@/models/team'; +import { + doesExceedThreshold, + EnhancedDashboard, + fireChannelEvent, + getLogViewEnhanced, + roundDownToXMinutes, +} from '@/tasks/alerts/utils'; +import logger from '@/utils/logger'; + +export const processAlert = async (now: Date, alert: AlertDocument) => { + try { + const previous: IAlertHistory | undefined = ( + await AlertHistory.find({ alert: alert._id }) + .sort({ createdAt: -1 }) + .limit(1) + )[0]; + + const windowSizeInMins = ms(alert.interval) / 60000; + const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now); + if ( + previous && + fns.getTime(previous.createdAt) === fns.getTime(nowInMinsRoundDown) + ) { + logger.info({ + message: `Skipped to check alert since the time diff is still less than 1 window size`, + windowSizeInMins, + nowInMinsRoundDown, + previous, + now, + alert, + }); + return; + } + const checkStartTime = previous + ? previous.createdAt + : fns.subMinutes(nowInMinsRoundDown, windowSizeInMins); + const checkEndTime = nowInMinsRoundDown; + + // Logs Source + let checksData: + | Awaited> + | Awaited> + | null = null; + let logView: Awaited> | null = null; + let targetDashboard: EnhancedDashboard | null = null; + const team = await Team.findById(alert.team); + + if (team == null) { + throw new Error('Team not found'); + } + + // Logs Source + if (alert.source === 'LOG' && alert.logView) { + logView = await getLogViewEnhanced(alert.logView); + // TODO: use getLogsChart instead so we can deprecate checkAlert + checksData = await clickhouse.checkAlert({ + endTime: checkEndTime, + groupBy: alert.groupBy, + q: logView.query, + startTime: checkStartTime, + tableVersion: logView.team.logStreamTableVersion, + teamId: logView.team._id.toString(), + windowSizeInMins, + }); + logger.info({ + message: 'Received alert metric [LOG source]', + alert, + logView, + checksData, + checkStartTime, + checkEndTime, + }); + } + // Chart Source + else if (alert.source === 'CHART' && alert.dashboardId && alert.chartId) { + const dashboard = await Dashboard.findOne( + { + _id: alert.dashboardId, + 'charts.id': alert.chartId, + }, + { + name: 1, + charts: { + $elemMatch: { + id: alert.chartId, + }, + }, + }, + ).populate<{ + team: ITeam; + }>('team'); + + if ( + dashboard && + Array.isArray(dashboard.charts) && + dashboard.charts.length === 1 + ) { + const chart = dashboard.charts[0]; + // Doesn't work for metric alerts yet + const MAX_NUM_GROUPS = 20; + // TODO: assuming that the chart has only 1 series for now + const firstSeries = chart.series[0]; + if (firstSeries.type === 'time' && firstSeries.table === 'logs') { + targetDashboard = dashboard; + const startTimeMs = fns.getTime(checkStartTime); + const endTimeMs = fns.getTime(checkEndTime); + + checksData = await clickhouse.getMultiSeriesChartLegacyFormat({ + series: chart.series, + endTime: endTimeMs, + granularity: `${windowSizeInMins} minute`, + maxNumGroups: MAX_NUM_GROUPS, + startTime: startTimeMs, + tableVersion: dashboard.team.logStreamTableVersion, + teamId: dashboard.team._id.toString(), + seriesReturnType: chart.seriesReturnType, + }); + } else if ( + firstSeries.type === 'time' && + firstSeries.table === 'metrics' && + firstSeries.field + ) { + targetDashboard = dashboard; + const startTimeMs = fns.getTime(checkStartTime); + const endTimeMs = fns.getTime(checkEndTime); + checksData = await clickhouse.getMultiSeriesChartLegacyFormat({ + series: chart.series.map(series => { + if ('field' in series && series.field != null) { + const [metricName, rawMetricDataType] = + series.field.split(' - '); + const metricDataType = z + .nativeEnum(clickhouse.MetricsDataType) + .parse(rawMetricDataType); + return { + ...series, + metricDataType, + field: metricName, + }; + } + return series; + }), + endTime: endTimeMs, + granularity: `${windowSizeInMins} minute`, + maxNumGroups: MAX_NUM_GROUPS, + startTime: startTimeMs, + tableVersion: dashboard.team.logStreamTableVersion, + teamId: dashboard.team._id.toString(), + seriesReturnType: chart.seriesReturnType, + }); + } + } + + logger.info({ + message: 'Received alert metric [CHART source]', + alert, + checksData, + checkStartTime, + checkEndTime, + }); + } else { + logger.error({ + message: `Unsupported alert source: ${alert.source}`, + alert, + }); + return; + } + + // TODO: support INSUFFICIENT_DATA state + let alertState = AlertState.OK; + const history = await new AlertHistory({ + alert: alert._id, + createdAt: nowInMinsRoundDown, + state: alertState, + }).save(); + + if (checksData?.rows && checksData?.rows > 0) { + for (const checkData of checksData.data) { + // TODO: we might want to fix the null value from the upstream + // this happens when the ratio is 0/0 + if (checkData.data == null) { + continue; + } + + const totalCount = isString(checkData.data) + ? parseInt(checkData.data) + : checkData.data; + const bucketStart = new Date(checkData.ts_bucket * 1000); + if ( + doesExceedThreshold( + alert.type === 'presence', + alert.threshold, + totalCount, + ) + ) { + alertState = AlertState.ALERT; + logger.info({ + message: `Triggering ${alert.channel.type} alarm!`, + alert, + totalCount, + checkData, + }); + + try { + await fireChannelEvent({ + alert, + attributes: checkData.attributes, + dashboard: targetDashboard, + endTime: fns.addMinutes(bucketStart, windowSizeInMins), + group: Array.isArray(checkData.group) + ? checkData.group.join(', ') + : checkData.group, + logView, + startTime: bucketStart, + totalCount, + windowSizeInMins, + team, + }); + } catch (e) { + void recordException(e, { + mechanism: { + handled: false, + }, + attributes: { + 'hyperdx.alert.id': alert.id, + }, + }); + } + + history.counts += 1; + } + history.lastValues.push({ count: totalCount, startTime: bucketStart }); + } + + history.state = alertState; + await history.save(); + } + + alert.state = alertState; + await alert.save(); + } catch (e) { + void recordException(e, { + mechanism: { + handled: false, + }, + attributes: { + 'hyperdx.alert.id': alert.id, + }, + }); + } +}; + +export default async () => { + const now = new Date(); + const alerts = await Alert.find({ + state: { $ne: AlertState.DISABLED }, + $or: [ + { checker: { $exists: false } }, + { 'checker.type': { $ne: CheckerType.Anomaly } }, + ], + }); + logger.info(`Going to process ${alerts.length} alerts`); + await Promise.all(alerts.map(alert => processAlert(now, alert))); +}; diff --git a/packages/api/src/tasks/checkAlerts.ts b/packages/api/src/tasks/alerts/utils.ts similarity index 66% rename from packages/api/src/tasks/checkAlerts.ts rename to packages/api/src/tasks/alerts/utils.ts index 18cb84c2a..614f6bb64 100644 --- a/packages/api/src/tasks/checkAlerts.ts +++ b/packages/api/src/tasks/alerts/utils.ts @@ -1,24 +1,18 @@ -// -------------------------------------------------------- -// -------------- EXECUTE EVERY MINUTE -------------------- -// -------------------------------------------------------- -import * as fns from 'date-fns'; import * as fnsTz from 'date-fns-tz'; import Handlebars, { HelperOptions } from 'handlebars'; import _ from 'lodash'; -import { escapeRegExp, isString } from 'lodash'; +import { escapeRegExp } from 'lodash'; import mongoose from 'mongoose'; import ms from 'ms'; import PromisedHandlebars from 'promised-handlebars'; import { serializeError } from 'serialize-error'; import { URLSearchParams } from 'url'; -import { z } from 'zod'; import * as clickhouse from '@/clickhouse'; import * as config from '@/config'; import { ObjectId } from '@/models'; -import Alert, { AlertDocument, AlertState } from '@/models/alert'; -import AlertHistory, { IAlertHistory } from '@/models/alertHistory'; -import Dashboard, { IDashboard } from '@/models/dashboard'; +import { AlertDocument } from '@/models/alert'; +import { IDashboard } from '@/models/dashboard'; import LogView from '@/models/logView'; import { ITeam } from '@/models/team'; import Webhook, { IWebhook } from '@/models/webhook'; @@ -26,18 +20,15 @@ import { convertMsToGranularityString, truncateString } from '@/utils/common'; import { translateDashboardDocumentToExternalDashboard } from '@/utils/externalApi'; import logger from '@/utils/logger'; import * as slack from '@/utils/slack'; -import { - externalAlertSchema, - translateAlertDocumentToExternalAlert, -} from '@/utils/zod'; +import { translateAlertDocumentToExternalAlert } from '@/utils/zod'; -type EnhancedDashboard = Omit & { team: ITeam }; +export type EnhancedDashboard = Omit & { team: ITeam }; +const IS_MATCH_FN_NAME = 'is_match'; const MAX_MESSAGE_LENGTH = 500; const NOTIFY_FN_NAME = '__hdx_notify_channel__'; -const IS_MATCH_FN_NAME = 'is_match'; -const getLogViewEnhanced = async (logViewId: ObjectId) => { +export const getLogViewEnhanced = async (logViewId: ObjectId) => { const logView = await LogView.findById(logViewId).populate<{ team: ITeam; }>('team'); @@ -47,6 +38,54 @@ const getLogViewEnhanced = async (logViewId: ObjectId) => { return logView; }; +export const buildCustomLink = ({ + alert, + endTime, + granularity, + startTime, +}: { + alert: any; + endTime: Date; + granularity: string; + startTime: Date; +}) => { + const url = new URL(`${config.FRONTEND_URL}/dashboards`); + const dashboardConfig = { + id: '', + name: `${alert.name} Dashboard`, + charts: [ + { + id: '4rro4', + name: `${alert.name} Chart`, + x: 0, + y: 0, + w: 12, + h: 5, + series: + alert?.customConfig?.series.map(s => { + return { + ...s, + type: 'time', // display needs to be time series, while table is for raw data + }; + }) ?? [], + seriesReturnType: 'column', + }, + ], + }; + + // extend both start and end time by 7x granularity + const from = (startTime.getTime() - ms(granularity) * 7).toString(); + const to = (endTime.getTime() + ms(granularity) * 7).toString(); + const queryParams = new URLSearchParams({ + from, + granularity: convertMsToGranularityString(ms(granularity)), + to, + config: JSON.stringify(dashboardConfig), + }); + url.search = queryParams.toString(); + return url.toString(); +}; + export const buildLogSearchLink = ({ endTime, logViewId, @@ -142,9 +181,11 @@ export const expandToNestedObject = ( // ----------------- Alert Message Template ------------------- // ------------------------------------------------------------ // should match the external alert schema -type AlertMessageTemplateDefaultView = { +export type AlertMessageTemplateDefaultView = { // FIXME: do we want to include groupBy in the external alert schema? - alert: z.infer & { groupBy?: string }; + alert: ReturnType & { + groupBy?: string; + }; attributes: ReturnType; dashboard: ReturnType< typeof translateDashboardDocumentToExternalDashboard @@ -161,6 +202,7 @@ type AlertMessageTemplateDefaultView = { startTime: Date; value: number; }; + export const notifyChannel = async ({ channel, id, @@ -349,7 +391,7 @@ export const buildAlertMessageTemplateTitle = ({ template?: string | null; view: AlertMessageTemplateDefaultView; }) => { - const { alert, dashboard, savedSearch, value } = view; + const { alert, dashboard, savedSearch, value, granularity } = view; const handlebars = Handlebars.create(); if (alert.source === 'search') { if (savedSearch == null) { @@ -463,6 +505,7 @@ export const renderAlertTemplate = async ({ if (channel !== 'slack_webhook') { throw new Error(`Unsupported channel type: ${channel}`); } + // render id template const renderedId = _hb.compile(id)(view); // render body template @@ -559,7 +602,11 @@ ${targetTemplate}`; }; // ------------------------------------------------------------ -const fireChannelEvent = async ({ +export const roundDownTo = (roundTo: number) => (x: Date) => + new Date(Math.floor(x.getTime() / roundTo) * roundTo); +export const roundDownToXMinutes = (x: number) => roundDownTo(1000 * 60 * x); + +export const fireChannelEvent = async ({ alert, attributes, dashboard, @@ -569,6 +616,7 @@ const fireChannelEvent = async ({ startTime, totalCount, windowSizeInMins, + team, }: { alert: AlertDocument; attributes: Record; // TODO: support other types than string @@ -579,12 +627,8 @@ const fireChannelEvent = async ({ startTime: Date; totalCount: number; windowSizeInMins: number; + team: ITeam; }) => { - const team = logView?.team ?? dashboard?.team; - if (team == null) { - throw new Error('Team not found'); - } - if ((alert.silenced?.until?.getTime() ?? 0) > Date.now()) { logger.info({ message: 'Skipped firing alert due to silence', @@ -594,12 +638,13 @@ const fireChannelEvent = async ({ return; } + const externalAlert = { + ...translateAlertDocumentToExternalAlert(alert), + groupBy: alert.groupBy, + }; const attributesNested = expandToNestedObject(attributes); const templateView: AlertMessageTemplateDefaultView = { - alert: { - ...translateAlertDocumentToExternalAlert(alert), - groupBy: alert.groupBy, - }, + alert: externalAlert, attributes: attributesNested, dashboard: dashboard ? translateDashboardDocumentToExternalDashboard({ @@ -638,244 +683,3 @@ const fireChannelEvent = async ({ }, }); }; - -export const roundDownTo = (roundTo: number) => (x: Date) => - new Date(Math.floor(x.getTime() / roundTo) * roundTo); -export const roundDownToXMinutes = (x: number) => roundDownTo(1000 * 60 * x); - -export const processAlert = async (now: Date, alert: AlertDocument) => { - try { - const previous: IAlertHistory | undefined = ( - await AlertHistory.find({ alert: alert._id }) - .sort({ createdAt: -1 }) - .limit(1) - )[0]; - - const windowSizeInMins = ms(alert.interval) / 60000; - const nowInMinsRoundDown = roundDownToXMinutes(windowSizeInMins)(now); - if ( - previous && - fns.getTime(previous.createdAt) === fns.getTime(nowInMinsRoundDown) - ) { - logger.info({ - message: `Skipped to check alert since the time diff is still less than 1 window size`, - windowSizeInMins, - nowInMinsRoundDown, - previous, - now, - alert, - }); - return; - } - const checkStartTime = previous - ? previous.createdAt - : fns.subMinutes(nowInMinsRoundDown, windowSizeInMins); - const checkEndTime = nowInMinsRoundDown; - - // Logs Source - let checksData: - | Awaited> - | Awaited> - | null = null; - let logView: Awaited> | null = null; - let targetDashboard: EnhancedDashboard | null = null; - if (alert.source === 'LOG' && alert.logView) { - logView = await getLogViewEnhanced(alert.logView); - // TODO: use getLogsChart instead so we can deprecate checkAlert - checksData = await clickhouse.checkAlert({ - endTime: checkEndTime, - groupBy: alert.groupBy, - q: logView.query, - startTime: checkStartTime, - tableVersion: logView.team.logStreamTableVersion, - teamId: logView.team._id.toString(), - windowSizeInMins, - }); - logger.info({ - message: 'Received alert metric [LOG source]', - alert, - logView, - checksData, - checkStartTime, - checkEndTime, - }); - } - // Chart Source - else if (alert.source === 'CHART' && alert.dashboardId && alert.chartId) { - const dashboard = await Dashboard.findOne( - { - _id: alert.dashboardId, - 'charts.id': alert.chartId, - }, - { - name: 1, - charts: { - $elemMatch: { - id: alert.chartId, - }, - }, - }, - ).populate<{ - team: ITeam; - }>('team'); - - if ( - dashboard && - Array.isArray(dashboard.charts) && - dashboard.charts.length === 1 - ) { - const chart = dashboard.charts[0]; - // Doesn't work for metric alerts yet - const MAX_NUM_GROUPS = 20; - // TODO: assuming that the chart has only 1 series for now - const firstSeries = chart.series[0]; - if (firstSeries.type === 'time' && firstSeries.table === 'logs') { - targetDashboard = dashboard; - const startTimeMs = fns.getTime(checkStartTime); - const endTimeMs = fns.getTime(checkEndTime); - - checksData = await clickhouse.getMultiSeriesChartLegacyFormat({ - series: chart.series, - endTime: endTimeMs, - granularity: `${windowSizeInMins} minute`, - maxNumGroups: MAX_NUM_GROUPS, - startTime: startTimeMs, - tableVersion: dashboard.team.logStreamTableVersion, - teamId: dashboard.team._id.toString(), - seriesReturnType: chart.seriesReturnType, - }); - } else if ( - firstSeries.type === 'time' && - firstSeries.table === 'metrics' && - firstSeries.field - ) { - targetDashboard = dashboard; - const startTimeMs = fns.getTime(checkStartTime); - const endTimeMs = fns.getTime(checkEndTime); - checksData = await clickhouse.getMultiSeriesChartLegacyFormat({ - series: chart.series.map(series => { - if ('field' in series && series.field != null) { - const [metricName, rawMetricDataType] = - series.field.split(' - '); - const metricDataType = z - .nativeEnum(clickhouse.MetricsDataType) - .parse(rawMetricDataType); - return { - ...series, - metricDataType, - field: metricName, - }; - } - return series; - }), - endTime: endTimeMs, - granularity: `${windowSizeInMins} minute`, - maxNumGroups: MAX_NUM_GROUPS, - startTime: startTimeMs, - tableVersion: dashboard.team.logStreamTableVersion, - teamId: dashboard.team._id.toString(), - seriesReturnType: chart.seriesReturnType, - }); - } - } - - logger.info({ - message: 'Received alert metric [CHART source]', - alert, - checksData, - checkStartTime, - checkEndTime, - }); - } else { - logger.error({ - message: `Unsupported alert source: ${alert.source}`, - alert, - }); - return; - } - - // TODO: support INSUFFICIENT_DATA state - let alertState = AlertState.OK; - const history = await new AlertHistory({ - alert: alert._id, - createdAt: nowInMinsRoundDown, - state: alertState, - }).save(); - - if (checksData?.rows && checksData?.rows > 0) { - for (const checkData of checksData.data) { - // TODO: we might want to fix the null value from the upstream - // this happens when the ratio is 0/0 - if (checkData.data == null) { - continue; - } - - const totalCount = isString(checkData.data) - ? parseInt(checkData.data) - : checkData.data; - const bucketStart = new Date(checkData.ts_bucket * 1000); - if ( - doesExceedThreshold( - alert.type === 'presence', - alert.threshold, - totalCount, - ) - ) { - alertState = AlertState.ALERT; - logger.info({ - message: `Triggering ${alert.channel.type} alarm!`, - alert, - totalCount, - checkData, - }); - - try { - await fireChannelEvent({ - alert, - attributes: checkData.attributes, - dashboard: targetDashboard, - endTime: fns.addMinutes(bucketStart, windowSizeInMins), - group: Array.isArray(checkData.group) - ? checkData.group.join(', ') - : checkData.group, - logView, - startTime: bucketStart, - totalCount, - windowSizeInMins, - }); - } catch (e) { - logger.error({ - message: 'Failed to fire channel event', - alert, - error: serializeError(e), - }); - } - - history.counts += 1; - } - history.lastValues.push({ count: totalCount, startTime: bucketStart }); - } - - history.state = alertState; - await history.save(); - } - - alert.state = alertState; - await alert.save(); - } catch (e) { - // Uncomment this for better error messages locally - // console.error(e); - logger.error({ - message: 'Failed to process alert', - alert, - error: serializeError(e), - }); - } -}; - -export default async () => { - const now = new Date(); - const alerts = await Alert.find({}); - logger.info(`Going to process ${alerts.length} alerts`); - await Promise.all(alerts.map(alert => processAlert(now, alert))); -}; diff --git a/packages/api/src/tasks/index.ts b/packages/api/src/tasks/index.ts index ab414e7e6..20a2a577f 100644 --- a/packages/api/src/tasks/index.ts +++ b/packages/api/src/tasks/index.ts @@ -5,12 +5,12 @@ import { serializeError } from 'serialize-error'; import { IS_DEV } from '@/config'; import { connectDB, mongooseConnection } from '@/models'; +import checkAnomalyAlerts from '@/tasks/alerts/checkAnomalyAlerts'; +import checkUserAlerts from '@/tasks/alerts/checkUserAlerts'; +import refreshPropertyTypeMappings from '@/tasks/refreshPropertyTypeMappings'; import logger from '@/utils/logger'; import redisClient from '@/utils/redis'; -import checkAlerts from './checkAlerts'; -import refreshPropertyTypeMappings from './refreshPropertyTypeMappings'; - const main = async () => { const argv = minimist(process.argv.slice(2)); const taskName = argv._[0]; @@ -21,8 +21,12 @@ const main = async () => { const t0 = performance.now(); logger.info(`Task [${taskName}] started at ${new Date()}`); switch (taskName) { + // TODO: rename to check-users-alerts case 'check-alerts': - await checkAlerts(); + await checkUserAlerts(); + break; + case 'check-anomaly-alerts': + await checkAnomalyAlerts(); break; case 'refresh-property-type-mappings': await refreshPropertyTypeMappings(); diff --git a/packages/api/src/utils/miner.ts b/packages/api/src/utils/miner.ts index 8ad3026ae..7740fb752 100644 --- a/packages/api/src/utils/miner.ts +++ b/packages/api/src/utils/miner.ts @@ -30,3 +30,55 @@ export const getLogsPatterns = async ( timeout: ms('2 minute'), }).then(response => response.data); }; + +export const detectAnomalies = async ( + history: { count: number; ts_bucket?: number }[][], + strength?: number, + anomalyConfig?: Record, +): Promise => { + try { + const response = await axios({ + method: 'POST', + url: `${config.MINER_API_URL}/detect_anomalies`, + data: { + history: history, + strength: strength, + config: anomalyConfig, + }, + maxContentLength: Infinity, + maxBodyLength: Infinity, + timeout: ms('2 minute'), + }); + return response.data; + } catch (error) { + logger.error('Error detecting anomalies:', error); + throw error; + } +}; + +export const detectAnomaly = async ( + history: { count: number; ts_bucket?: number }[], + current: { count: number; ts_bucket?: number }, + strength?: number, + anomalyConfig?: Record, +): Promise => { + try { + const response = await axios({ + method: 'POST', + url: `${config.MINER_API_URL}/detect_anomaly`, + data: { + history: history, + current: current, + strength: strength, + config: anomalyConfig, + }, + maxContentLength: Infinity, + maxBodyLength: Infinity, + timeout: ms('2 minute'), + }); + return response.data; + } catch (error) { + logger.error('Error detecting anomalies:', error); + throw error; + } +}; diff --git a/packages/api/src/utils/zod.ts b/packages/api/src/utils/zod.ts index feb88f575..2937d9cfd 100644 --- a/packages/api/src/utils/zod.ts +++ b/packages/api/src/utils/zod.ts @@ -2,7 +2,7 @@ import { Types } from 'mongoose'; import { z } from 'zod'; import { AggFn, MetricsDataType } from '@/clickhouse'; -import { AlertDocument } from '@/models/alert'; +import { AlertDocument, CheckerType } from '@/models/alert'; export const objectIdSchema = z.string().refine(val => { return Types.ObjectId.isValid(val); @@ -207,17 +207,57 @@ export const zChartAlert = z.object({ dashboardId: z.string().min(1), }); +export const anomalyConfigSchema = z.object({ + models: z + .array( + z.object({ + name: z.string(), + enabled: z.boolean(), + params: z.record(z.any()), + }), + ) + .optional(), + mode: z.union([z.literal('any'), z.literal('combined')]).optional(), +}); + +export const alertCheckerSchema = z.object({ + type: z.nativeEnum(CheckerType), + config: anomalyConfigSchema.optional(), // union and add more config types here when needed +}); + +export const zAlertInterval = z.enum([ + '1m', + '5m', + '15m', + '30m', + '1h', + '6h', + '12h', + '1d', +]); + +export const zCustomAlert = z.object({ + source: z.literal('CUSTOM'), + customConfig: z + .object({ + series: z.array(chartSeriesSchema), + }) + .optional(), + historyWindow: z.number().min(5).max(10080).optional(), + checker: alertCheckerSchema.optional(), +}); + export const alertSchema = z .object({ channel: zChannel, - interval: z.enum(['1m', '5m', '15m', '30m', '1h', '6h', '12h', '1d']), + interval: zAlertInterval, threshold: z.number().min(0), type: z.enum(['presence', 'absence']), - source: z.enum(['LOG', 'CHART']).default('LOG'), + source: z.enum(['LOG', 'CHART', 'CUSTOM']).default('LOG'), name: z.string().min(1).max(512).nullish(), message: z.string().min(1).max(4096).nullish(), }) - .and(zLogAlert.or(zChartAlert)); + .and(zLogAlert.or(zChartAlert).or(zCustomAlert)); // ============================== // External API Alerts @@ -240,17 +280,37 @@ export const externalChartAlertSchema = z.object({ dashboardId: objectIdSchema, }); +export const externalCustomAlertSchema = z.object({ + source: z.literal('custom'), + customConfig: z + .object({ + series: z.array(chartSeriesSchema), + }) + .optional(), + historyWindow: z.number().min(5).max(10080).optional(), + checker: z + .object({ + type: z.nativeEnum(CheckerType), + config: anomalyConfigSchema.optional(), // union and add more config types here when needed + }) + .optional(), +}); + export const externalAlertSchema = z .object({ channel: externalSlackWebhookAlertChannel, interval: z.enum(['1m', '5m', '15m', '30m', '1h', '6h', '12h', '1d']), threshold: z.number().min(0), threshold_type: z.enum(['above', 'below']), - source: z.enum(['search', 'chart']).default('search'), + source: z.enum(['search', 'chart', 'custom']).default('search'), name: z.string().min(1).max(512).nullish(), message: z.string().min(1).max(4096).nullish(), }) - .and(externalSearchAlertSchema.or(externalChartAlertSchema)); + .and( + externalSearchAlertSchema + .or(externalChartAlertSchema) + .or(externalCustomAlertSchema), + ); export const externalAlertSchemaWithId = externalAlertSchema.and( z.object({ @@ -280,6 +340,12 @@ export const translateExternalAlertToInternalAlert = ( dashboardId: alertInput.dashboardId, chartId: alertInput.chartId, } + : alertInput.source === 'custom' + ? { + source: 'CUSTOM', + customConfig: alertInput.customConfig, + checker: alertInput.checker, + } : ({} as never)), }; }; @@ -307,6 +373,12 @@ export const translateAlertDocumentToExternalAlert = ( dashboardId: alertDoc.dashboardId.toString(), chartId: alertDoc.chartId as string, } + : alertDoc.source === 'CUSTOM' + ? { + source: 'custom', + customConfig: alertDoc.customConfig, + checker: alertDoc.checker, + } : ({} as never)), }; }; diff --git a/packages/miner/src/anomaly_detection.py b/packages/miner/src/anomaly_detection.py new file mode 100644 index 000000000..973c3c11d --- /dev/null +++ b/packages/miner/src/anomaly_detection.py @@ -0,0 +1,237 @@ +from typing import List, Optional, Dict, Any +import numpy as np +import ruptures as rpt +from sklearn.ensemble import IsolationForest +from pydantic import BaseModel +from scipy.stats import zscore +import copy + +# strength is a measure of the sensitivity of the anomaly detection +# 0 would make the anomaly detection least sensitive (most likely to miss anoms) +# 1 would make it most sensitive (most likely to have false positives) +DEFAULT_STRENGTH = 0.5 # unused for now + +DEFAULT_CONFIG = { + "models": [ + { + "name": "zscore", + "enabled": True, + "params": { + "threshold": 3.0 + } + }, + { + "name": "change_point", + "enabled": False, + "params": { + "penalty": 10 + } + }, + { + "name": "isolation_forest", + "enabled": False, + "params": { + "contamination": 0.05 + } + } + ], + "mode": "any" # "combined" | "any" + + # TODO: still thinking here above - not important for MVP + # Combined + # Pros + # - multiple methods being considered makes it more robust + # - less likely to have false positives since they have to somewhat agree + # - so less noisy + # Cons + # - easier to miss anoms if individual methods are off or thresholds are tweaked poorly + # - so tuning is more important + + # Any + # Pros + # - much simpler to implement. understand, tweak etc + # - will catch pretty much all potential anoms - might be pro for display purposes but not alerting? + # Cons + # - way more likely to have false positives - any method would trigger vs a combo of all + # - so more noisy - which might be a dealbreaker for alerting + +} + +class DataPoint(BaseModel): + count: int + ts_bucket: Optional[int] = None + +class AnomalyResponse(BaseModel): + is_anomalous: bool + count: int + ts_bucket: Optional[int] = None + details: Dict[str, Dict[str, Any]] + +class ZScoreResponse(BaseModel): + is_anomalous: bool + zscore: float + mean: float + threshold: float + stdv: float + +class ChangePointResponse(BaseModel): + is_anomalous: bool + penalty: float + change_points: List[int] + +class IsolationForestResponse(BaseModel): + is_anomalous: bool + isolation_score: float + contamination: float + +def calculate_zscore(history: List[DataPoint], params: Dict[str, Any], exclude_last: bool = False) -> List[ZScoreResponse]: + # TODO: determine if this is valid for the detect_anomalies case. + # technically each points is_anomalous calc should ignore itself (as if it was checked at the time it occurred) + # but this adds some series perf cost and isn't viable for large sets as the mean/stdv would be recalculated for each point + # this might be fine when ensembled and gets better the more data points you have but still worth considering.... + + threshold = params.get('threshold', 3.0) + counts = np.array([item.count for item in history[:-1]]) if exclude_last else np.array([item.count for item in history]) + zscores = zscore(counts) + mean = np.mean(counts) + stdv = np.std(counts) + + zscore_response = [ZScoreResponse(is_anomalous=False, zscore=0, mean=mean, threshold=threshold, stdv=stdv) for _ in range(len(history))] + + for i in range(len(history) - 1 if exclude_last else len(history)): + if stdv != 0: + zscore_response[i].zscore = abs(zscores[i]) + else: + zscore_response[i].zscore = 0 + zscore_response[i].is_anomalous = bool(zscore_response[i].zscore > threshold) + if exclude_last: + if stdv != 0: + zscore_response[-1].zscore = abs((history[-1].count - mean) / stdv) + else: + zscore_response[-1].zscore = 0 + zscore_response[-1].is_anomalous = bool(zscore_response[-1].zscore > threshold) + + return zscore_response + +def detect_change_points(history: List[DataPoint], params: Dict[str, Any]) -> List[ChangePointResponse]: + penalty = params.get('penalty', 10) + counts = np.array([item.count for item in history]).reshape(-1, 1) + model = rpt.Pelt(model="rbf").fit(counts) + change_points = model.predict(pen=penalty) + + change_point_response = [ChangePointResponse(is_anomalous=False, penalty=penalty, change_points=change_points) for _ in range(len(history))] + + for i in range(len(history)): + change_point_response[i].is_anomalous = i in change_points + + return change_point_response + +def apply_isolation_forest(history: List[DataPoint], params: Dict[str, Any]) -> List[IsolationForestResponse]: + contamination = params.get('contamination', 0.05) + counts = np.array([item.count for item in history]).reshape(-1, 1) + model = IsolationForest(contamination=contamination) + preds = model.fit_predict(counts) + scores = model.decision_function(counts) + + isolation_forest_response = [IsolationForestResponse(is_anomalous=False, isolation_score=0, contamination=contamination) for _ in range(len(history))] + + for i, (item, pred, score) in enumerate(zip(history, preds, scores)): + isolation_forest_response[i].isolation_score = -score + isolation_forest_response[i].is_anomalous = bool(pred == -1) + + return isolation_forest_response + +def calculate_adjusted_params(strength: float) -> Dict[str, float]: + return { + "zscore_threshold": 5.0 - 4.0 * strength, + "change_point_penalty": 10 * (1.0 - strength), + "isolation_forest_contamination": max(0.1 * strength, 0.01) + } + +def merge_with_default_config(user_config: Dict[str, Any]) -> Dict[str, Any]: + merged_config = copy.deepcopy(DEFAULT_CONFIG) + user_models = {model['name']: model for model in user_config.get('models', [])} + + for model in merged_config['models']: + if model['name'] in user_models: + model.update(user_models[model['name']]) + + if 'mode' in user_config: + merged_config['mode'] = user_config['mode'] + + return merged_config + +def apply_ensemble_methods(history: List[DataPoint], strength: float, config: Dict[str, Any], exclude_last: bool = False) -> List[DataPoint]: + config = merge_with_default_config(config) + models = config['models'] + + # params = calculate_adjusted_params(strength) # strength unused for the time being while we are using a very simple model + + enabled_methods = 0 + + zscore_results = None + change_point_results = None + isolation_forest_results = None + + for model in models: + if model.get('enabled', False): + if model['name'] == 'zscore': + zscore_results = calculate_zscore(history, model['params'], exclude_last) + elif model['name'] == 'change_point': + change_point_results = detect_change_points(history, model['params']) + elif model['name'] == 'isolation_forest': + isolation_forest_results = apply_isolation_forest(history, model['params']) + enabled_methods += 1 + + anomaly_response = [AnomalyResponse(is_anomalous=False, count=item.count, ts_bucket=item.ts_bucket, details={}) for item in history] + + if config.get('mode') == 'combined' and enabled_methods > 0: + zscore_max = max([item.zscore for item in zscore_results], default=0) if zscore_results else 0 + isolation_max = max([item.isolation_score for item in isolation_forest_results], default=0) if isolation_forest_results else 0 + + for i, item in enumerate(history): + zscore_normalized = zscore_results[i].zscore / zscore_max if zscore_max else 0 if zscore_results else 0 + isolation_normalized = isolation_forest_results[i].isolation_score / isolation_max if isolation_max else 0 if isolation_forest_results else 0 + changepoint_anomalous = 1 if change_point_results and change_point_results[i].is_anomalous else 0 + combined_score_value = (zscore_normalized + isolation_normalized + changepoint_anomalous) / enabled_methods if enabled_methods else 0 + + details = {} + if zscore_results: + details["zscore"] = zscore_results[i].dict() + if change_point_results: + details["changepoint"] = change_point_results[i].dict() + if isolation_forest_results: + details["isolation"] = isolation_forest_results[i].dict() + + anomaly_response[i].details.update(details) + anomaly_response[i].is_anomalous = bool(combined_score_value > 0.5) + + elif config.get('mode') == 'any': + for i, item in enumerate(history): + is_anomalous = False + details = {} + if zscore_results: + details["zscore"] = zscore_results[i].dict() + if change_point_results: + details["changepoint"] = change_point_results[i].dict() + if isolation_forest_results: + details["isolation"] = isolation_forest_results[i].dict() + + if zscore_results and zscore_results[i].is_anomalous or change_point_results and change_point_results[i].is_anomalous or isolation_forest_results and isolation_forest_results[i].is_anomalous: + is_anomalous = True + + anomaly_response[i].is_anomalous = is_anomalous + anomaly_response[i].details.update(details) + + return anomaly_response + +def detect_anomalies(history: List[List[DataPoint]], strength: float, config: Dict[str, Any]) -> List[List[AnomalyResponse]]: + results = [] + for series in history: + results.append(apply_ensemble_methods(series, strength, config)) + return results + +def detect_anomaly(history: List[DataPoint], current: DataPoint, strength: float, config: Dict[str, Any]) -> AnomalyResponse: + history = history + [current] + results = apply_ensemble_methods(history, strength, config, exclude_last=True) + return results[-1] \ No newline at end of file diff --git a/packages/miner/src/logger.py b/packages/miner/src/logger.py new file mode 100644 index 000000000..e4fef0caf --- /dev/null +++ b/packages/miner/src/logger.py @@ -0,0 +1,12 @@ +import logging +import os + +def get_logging_level(): + try: + return getattr(logging, os.environ.get("HYPERDX_LOG_LEVEL", "DEBUG").upper()) + except Exception: + return logging.DEBUG + + +logger = logging.getLogger(__name__) +logger.setLevel(get_logging_level()) \ No newline at end of file diff --git a/packages/miner/src/main.py b/packages/miner/src/main.py index 6d9a4e1ae..e33453e28 100644 --- a/packages/miner/src/main.py +++ b/packages/miner/src/main.py @@ -1,8 +1,6 @@ -from typing import List +from typing import Any, Dict, List, Optional import hashlib import json -import logging -import os import time from drain3 import TemplateMiner @@ -12,23 +10,13 @@ from fastapi import FastAPI, Request from pydantic import BaseModel +from .logger import logger +from .anomaly_detection import DEFAULT_CONFIG, DEFAULT_STRENGTH, DataPoint, detect_anomalies, detect_anomaly API_VERSION = "0.0.1" app = FastAPI() - -def get_logging_level(): - try: - return getattr(logging, os.environ.get("HYPERDX_LOG_LEVEL", "DEBUG").upper()) - except Exception: - return logging.DEBUG - - -logger = logging.getLogger(__name__) -logger.setLevel(get_logging_level()) - - def get_template_miner(): persistence = FilePersistence("hdx_state.bin") config = TemplateMinerConfig() @@ -116,3 +104,52 @@ def health_check(): logger.info("🐱 Health check !!!!") return {"status": "ok", "version": API_VERSION} + +class DetectAnomaliesRequest(BaseModel): + history: List[List[DataPoint]] + strength: Optional[float] = DEFAULT_STRENGTH + config: Optional[Dict[str, Any]] = DEFAULT_CONFIG + +class DetectAnomalyRequest(BaseModel): + history: List[DataPoint] + current: DataPoint + strength: Optional[float] = DEFAULT_STRENGTH + config: Optional[Dict[str, Any]] = DEFAULT_CONFIG + +@app.post("/detect_anomalies") +def detect_anomalies_endpoint(request: DetectAnomaliesRequest): + history = request.history + strength = request.strength + config = request.config + results = detect_anomalies(history, strength, config) + + logger.info( + json.dumps( + { + "message": "Anomaly Detection Results: detect_anomalies", + "results": [[item.dict() for item in service] for service in results], + } + ) + ) + + return results + +# TODO: make sure we have traces/spans in place to debug and benchmark this endpoint performance +@app.post("/detect_anomaly") +def detect_anomaly_endpoint(request: DetectAnomalyRequest): + history = request.history + current = request.current + strength = request.strength + config = request.config + result = detect_anomaly(history, current, strength, config) + + logger.info( + json.dumps( + { + "message": "Anomaly Detection Result: detect_anomaly", + "result": result.dict(), + } + ) + ) + + return result \ No newline at end of file