-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding support for influx v3 in the influx scaler #5513
Changes from 5 commits
7ca36d3
4a833cb
30363b3
966f8fb
3df905e
b5d8507
25a5774
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,8 +3,11 @@ package scalers | |
import ( | ||
"context" | ||
"fmt" | ||
"slices" | ||
"strconv" | ||
"strings" | ||
|
||
"github.com/InfluxCommunity/influxdb3-go/influxdb3" | ||
"github.com/go-logr/logr" | ||
influxdb2 "github.com/influxdata/influxdb-client-go/v2" | ||
api "github.com/influxdata/influxdb-client-go/v2/api" | ||
|
@@ -22,10 +25,21 @@ type influxDBScaler struct { | |
logger logr.Logger | ||
} | ||
|
||
type influxDBScalerV3 struct { | ||
client influxdb3.Client | ||
metricType v2.MetricTargetType | ||
metadata *influxDBMetadata | ||
logger logr.Logger | ||
} | ||
|
||
type influxDBMetadata struct { | ||
authToken string | ||
organizationName string | ||
query string | ||
influxVersion string | ||
queryType string | ||
database string | ||
metricKey string | ||
serverURL string | ||
unsafeSsl bool | ||
thresholdValue float64 | ||
|
@@ -47,7 +61,29 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { | |
return nil, fmt.Errorf("error parsing influxdb metadata: %w", err) | ||
} | ||
|
||
logger.Info("starting up influxdb client") | ||
if meta.influxVersion == "3" { | ||
logger.Info("starting up influxdb v3 client") | ||
|
||
clientv3, err := influxdb3.New(influxdb3.ClientConfig{ | ||
Host: meta.serverURL, | ||
Token: meta.authToken, | ||
Database: meta.database, | ||
}) | ||
|
||
if err != nil { | ||
panic(err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should not panic here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated, please let me know if it looks ok |
||
} | ||
|
||
return &influxDBScalerV3{ | ||
client: *clientv3, | ||
metricType: metricType, | ||
metadata: meta, | ||
logger: logger, | ||
}, nil | ||
} | ||
|
||
logger.Info("starting up influxdb v2 client") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this would be to verbose, let add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
|
||
client := influxdb2.NewClientWithOptions( | ||
meta.serverURL, | ||
meta.authToken, | ||
|
@@ -66,6 +102,10 @@ func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadat | |
var authToken string | ||
var organizationName string | ||
var query string | ||
var influxVersion string | ||
var queryType string | ||
var metricKey string | ||
var database string | ||
var serverURL string | ||
var unsafeSsl bool | ||
var thresholdValue float64 | ||
|
@@ -91,6 +131,8 @@ func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadat | |
switch { | ||
case ok && val != "": | ||
organizationName = val | ||
case config.TriggerMetadata["influxVersion"] == "3": | ||
organizationName = val | ||
Comment on lines
+134
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct? I mean, in this case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. organizations are not used in influx v3 (only databases) so its currently ignored if passed when using that version. Would it be better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. happy to take any path here, just let me know |
||
case config.TriggerMetadata["organizationNameFromEnv"] != "": | ||
if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok { | ||
organizationName = val | ||
|
@@ -146,11 +188,43 @@ func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadat | |
} | ||
unsafeSsl = parsedVal | ||
} | ||
if val, ok := config.TriggerMetadata["influxVersion"]; ok { | ||
versions := []string{"", "2", "3"} | ||
if !slices.Contains(versions, val) { | ||
return nil, fmt.Errorf("unsupported influxVersion") | ||
} | ||
influxVersion = val | ||
if val == "3" { | ||
if val, ok := config.TriggerMetadata["database"]; ok { | ||
database = val | ||
} else { | ||
return nil, fmt.Errorf("database is required for influxdb v3") | ||
} | ||
if val, ok := config.TriggerMetadata["metricKey"]; ok { | ||
metricKey = val | ||
} else { | ||
return nil, fmt.Errorf("metricKey is required for influxdb v3") | ||
} | ||
queryTypes := []string{"", "influxql", "flightsql"} | ||
if val, ok := config.TriggerMetadata["queryType"]; ok { | ||
if !slices.Contains(queryTypes, strings.ToLower(val)) { | ||
return nil, fmt.Errorf("unsupported queryType") | ||
} | ||
queryType = val | ||
} else { | ||
queryType = "" | ||
} | ||
} | ||
} | ||
|
||
return &influxDBMetadata{ | ||
authToken: authToken, | ||
organizationName: organizationName, | ||
query: query, | ||
influxVersion: influxVersion, | ||
queryType: queryType, | ||
database: database, | ||
metricKey: metricKey, | ||
serverURL: serverURL, | ||
thresholdValue: thresholdValue, | ||
activationThresholdValue: activationThresholdValue, | ||
|
@@ -165,7 +239,21 @@ func (s *influxDBScaler) Close(context.Context) error { | |
return nil | ||
} | ||
|
||
// queryInfluxDB runs the query against the associated influxdb database | ||
// Close closes the connection of the client to the server | ||
func (s *influxDBScalerV3) Close(context.Context) error { | ||
s.client.Close() | ||
return nil | ||
} | ||
|
||
// queryOptionsInfluxDBV3 returns influxdb QueryOptions based on the database and queryType (InfluxQL or FlightSQL) | ||
func queryOptionsInfluxDBV3(d string, q string) *influxdb3.QueryOptions { | ||
if strings.ToLower(q) == "influxql" { | ||
return &influxdb3.QueryOptions{Database: d, QueryType: influxdb3.QueryType(1)} | ||
} | ||
return &influxdb3.QueryOptions{Database: d, QueryType: influxdb3.QueryType(0)} | ||
} | ||
|
||
// queryInfluxDB runs the query against the associated influxdb v2 database | ||
// there is an implicit assumption here that the first value returned from the iterator | ||
// will be the value of interest | ||
func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (float64, error) { | ||
|
@@ -189,6 +277,35 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl | |
} | ||
} | ||
|
||
// queryInfluxDBV3 runs the query against the associated influxdb v3 database | ||
// there is an implicit assumption here that the first value returned from the iterator | ||
// will be the value of interest | ||
func queryInfluxDBV3(ctx context.Context, client influxdb3.Client, metadata influxDBMetadata) (float64, error) { | ||
queryOptions := queryOptionsInfluxDBV3(metadata.database, metadata.queryType) | ||
|
||
result, err := client.QueryWithOptions(ctx, queryOptions, metadata.query) | ||
|
||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
var parsedVal float64 | ||
|
||
for result.Next() { | ||
value := result.Value() | ||
|
||
switch valRaw := value[metadata.metricKey].(type) { | ||
case float64: | ||
parsedVal = valRaw | ||
case int64: | ||
parsedVal = float64(valRaw) | ||
default: | ||
return 0, fmt.Errorf("value of type %T could not be converted into a float", valRaw) | ||
} | ||
} | ||
return parsedVal, nil | ||
} | ||
|
||
// GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query | ||
func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
// Grab QueryAPI to make queries to influxdb instance | ||
|
@@ -204,6 +321,19 @@ func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName s | |
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil | ||
} | ||
|
||
// GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query | ||
func (s *influxDBScalerV3) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { | ||
value, err := queryInfluxDBV3(ctx, s.client, *s.metadata) | ||
|
||
if err != nil { | ||
return []external_metrics.ExternalMetricValue{}, false, err | ||
} | ||
|
||
metric := GenerateMetricInMili(metricName, value) | ||
|
||
return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler | ||
func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
externalMetric := &v2.ExternalMetricSource{ | ||
|
@@ -217,3 +347,17 @@ func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe | |
} | ||
return []v2.MetricSpec{metricSpec} | ||
} | ||
|
||
// GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler | ||
func (s *influxDBScalerV3) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||
externalMetric := &v2.ExternalMetricSource{ | ||
Metric: v2.MetricIdentifier{ | ||
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.database))), | ||
}, | ||
Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue), | ||
} | ||
metricSpec := v2.MetricSpec{ | ||
External: externalMetric, Type: externalMetricType, | ||
} | ||
return []v2.MetricSpec{metricSpec} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be to verbose, let add
V(1)
hereThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated