Skip to content

Commit c86c283

Browse files
committed
use prometheus /metrics endpoint instead of assuming that prometheus self-scrapes
1 parent f5a507e commit c86c283

File tree

6 files changed

+106
-49
lines changed

6 files changed

+106
-49
lines changed

pkg/locator/locator.go

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package locator
22

33
import (
4+
"bufio"
45
"context"
56
"fmt"
67
"io/ioutil"
8+
"math/big"
9+
"net/http"
710
"regexp"
811
"strings"
912
"time"
1013

1114
log "github.com/Sirupsen/logrus"
1215

1316
"github.com/prometheus/client_golang/api/prometheus"
14-
"github.com/prometheus/common/model"
1517
)
1618

1719
var (
@@ -64,31 +66,93 @@ func ToPrometheusClients(endpointURLs []string) ([]*PrometheusEndpoint, error) {
6466
for _, endpoint := range endpointURLs {
6567
addr := strings.Trim(endpoint, " ")
6668
if len(addr) > 0 {
69+
var uptime time.Duration
70+
var queryAPI prometheus.QueryAPI
6771
client, err := prometheus.New(prometheus.Config{
6872
Address: addr,
6973
})
7074
if err == nil {
71-
queryAPI := prometheus.NewQueryAPI(client)
72-
result, err := queryAPI.Query(context.TODO(), "(time() - max(process_start_time_seconds{job=\"prometheus\"}))", time.Now())
75+
// Scape the /metrics endpoint of the individual prometheus instance, since
76+
// self-scaping of prometheus' own metrics might not be configured
7377
if log.GetLevel() >= log.DebugLevel {
74-
log.Debugf("Endpoint %v returned uptime result: %v", addr, result)
78+
log.Debugf("Testing %s/metrics", addr)
7579
}
76-
if err == nil {
77-
if vector, ok := result.(model.Vector); ok && len(vector) > 0 {
78-
uptime := time.Duration(float64(result.(model.Vector)[0].Value)) * time.Second
79-
endpoints = append(endpoints, &PrometheusEndpoint{QueryAPI: prometheus.NewQueryAPI(client), Address: addr, Uptime: uptime})
80-
continue
81-
} else {
82-
log.Errorf("Endpoint %v returned unexpected uptime result: %v", addr, result)
83-
err = fmt.Errorf("Unexpected uptime result: '%v'", result)
80+
scraped, err := ScrapeMetric(addr, "process_start_time_seconds")
81+
if err == nil && scraped != nil {
82+
processStartTimeSeconds := scraped.Value
83+
uptime = time.Duration(time.Now().UTC().Unix()-int64(processStartTimeSeconds)) * time.Second
84+
if log.GetLevel() >= log.DebugLevel {
85+
log.Debugf("Parsed current uptime for %s: %s", addr, uptime)
86+
}
87+
queryAPI = prometheus.NewQueryAPI(client)
88+
_, err = queryAPI.Query(context.TODO(), "up", time.Now())
89+
if err != nil && log.GetLevel() >= log.DebugLevel {
90+
log.Debugf("Query 'up' returned error: %v", err)
8491
}
8592
}
8693
}
87-
endpoints = append(endpoints, &PrometheusEndpoint{Address: addr, Uptime: time.Duration(0), Error: err})
94+
95+
if err == nil {
96+
endpoints = append(endpoints, &PrometheusEndpoint{QueryAPI: queryAPI, Address: addr, Uptime: uptime})
97+
} else {
98+
log.Errorf("Failed to resolve build_info and uptime for %v: %v", addr, err)
99+
endpoints = append(endpoints, &PrometheusEndpoint{Address: addr, Uptime: time.Duration(0), Error: err})
100+
}
88101
}
89102
}
90103
if len(endpoints) == 0 {
91104
return nil, fmt.Errorf("Unable to locate any potential endpoints")
92105
}
93106
return endpoints, nil
94107
}
108+
109+
// LabeledValue represents a persed metric instance
110+
type LabeledValue struct {
111+
Name string
112+
Labels string
113+
Value float64
114+
}
115+
116+
func (lv *LabeledValue) String() string {
117+
return fmt.Sprintf("%s%s %f", lv.Name, lv.Labels, lv.Value)
118+
}
119+
120+
// ScrapeMetric parses metrics in a simple fashion, returning
121+
// the first instance of each metric for a given name; results may be unexpected
122+
// for metrics with multiple instances
123+
func ScrapeMetric(addr string, name string) (*LabeledValue, error) {
124+
125+
resp, err := http.Get(fmt.Sprintf("%s/metrics", addr))
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
if resp.StatusCode != http.StatusOK {
131+
return nil, fmt.Errorf("%s/metrics returned %d", addr, resp.StatusCode)
132+
}
133+
134+
defer resp.Body.Close()
135+
scanner := bufio.NewScanner(resp.Body)
136+
scanner.Split(bufio.ScanLines)
137+
for scanner.Scan() {
138+
line := scanner.Text()
139+
if !strings.HasPrefix(line, "#") {
140+
parts := strings.Split(line, " ")
141+
nameParts := strings.Split(parts[0], "{")
142+
if nameParts[0] == name {
143+
f := new(big.Float)
144+
_, err := fmt.Sscan(parts[1], f)
145+
if err == nil {
146+
v := &LabeledValue{Name: nameParts[0]}
147+
v.Value, _ = f.Float64()
148+
if len(nameParts) > 1 {
149+
v.Labels = "{" + nameParts[1]
150+
}
151+
return v, nil
152+
}
153+
return nil, fmt.Errorf("Failed to parse value for metric %s", line)
154+
}
155+
}
156+
}
157+
return nil, nil
158+
}

pkg/router/router.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func NewRouter(interval time.Duration, affinityOptions []AffinityOption,
6363
interval: interval,
6464
rewriter: noOpRewriter,
6565
metrics: newMetrics(version.Name),
66+
selection: &selector.Result{},
6667
theConch: make(chan struct{}, 1),
6768
}
6869

@@ -104,7 +105,7 @@ func (r *Router) doSelection() {
104105

105106
result, err := r.selector.Select()
106107

107-
if result.Selection == nil || len(result.Selection) == 0 {
108+
if len(result.Selection) == 0 {
108109
if err != nil {
109110
log.Errorf("Selector returned no valid selection, and error: %v", err)
110111
if r.selection == nil {

pkg/selector/selector.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,38 @@ func NewSelector(locators []locator.Locator, strategyArgs ...string) (*Selector,
4343
// Select performs selection of a/all viable prometheus endpoint target(s)
4444
func (s *Selector) Select() (result *Result, err error) {
4545

46-
endpoints := make([]*locator.PrometheusEndpoint, 0, 3)
46+
result = &Result{
47+
Candidates: make([]*locator.PrometheusEndpoint, 0, 3),
48+
}
49+
4750
for _, loc := range s.locators {
48-
clients, err := loc.Endpoints()
51+
endpoints, err := loc.Endpoints()
4952
if err != nil {
50-
if clients != nil && len(clients) > 0 {
51-
endpoints = append(endpoints, clients...)
52-
log.Warnf("Locator %v resolved the following endpoints: %v, with errors: %v", loc, clients, err)
53+
if endpoints != nil && len(endpoints) > 0 {
54+
result.Candidates = append(result.Candidates, endpoints...)
55+
log.Warnf("Locator %v resolved the following endpoints: %v, with errors: %v", loc, endpoints, err)
5356
} else {
5457
log.Errorf("Locator %v failed to resolve endpoints: %v", loc, err)
5558
}
5659
} else {
57-
endpoints = append(endpoints, clients...)
60+
result.Candidates = append(result.Candidates, endpoints...)
5861
if log.GetLevel() >= log.DebugLevel {
59-
log.Debugf("Locator %v resolved the following endpoints: %v", loc, clients)
62+
log.Debugf("Locator %v resolved the following endpoints: %v", loc, endpoints)
6063
}
6164
}
6265
}
63-
if len(endpoints) == 0 {
64-
return nil, fmt.Errorf("No endpoints returned by any locators")
65-
}
6666

67-
result = &Result{
68-
Candidates: endpoints,
67+
if len(result.Candidates) == 0 {
68+
return result, fmt.Errorf("No endpoints returned by any locators")
6969
}
7070

71-
err = s.Strategy.Select(endpoints)
71+
err = s.Strategy.Select(result.Candidates)
7272
if err != nil {
7373
return result, err
7474
}
7575

76-
result.Selection = make([]*url.URL, 0, len(endpoints))
77-
for _, endpoint := range endpoints {
76+
result.Selection = make([]*url.URL, 0, len(result.Candidates))
77+
for _, endpoint := range result.Candidates {
7878
if endpoint.Selected {
7979
target, err := url.ParseRequestURI(endpoint.Address)
8080
if err != nil {

pkg/selector/strategy/minimumhistory/minimumhistory.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (s *Selector) Description() string {
4747

4848
// ComparisonMetricName gets the name of the comparison metric/calculation used to make a selection
4949
func (s *Selector) ComparisonMetricName() string {
50-
return "prometheus_build_info"
50+
return "up"
5151
}
5252

5353
// NextIndex returns the index of the target that should be used to field the next request
@@ -65,7 +65,7 @@ func (s *Selector) Select(endpoints []*locator.PrometheusEndpoint) (err error) {
6565
for _, endpoint := range endpoints {
6666
endpoint.Selected = false
6767
if endpoint.QueryAPI != nil {
68-
value, err := endpoint.QueryAPI.Query(context.TODO(), "prometheus_build_info", time.Now().Add(-1*s.minimumHistory))
68+
value, err := endpoint.QueryAPI.Query(context.TODO(), "max(up)", time.Now().Add(-1*s.minimumHistory))
6969
if err != nil {
7070
log.Errorf("Endpoint %v returned error: %v", endpoint, err)
7171
} else {

pkg/selector/strategy/singlemostdata/singlemostdata.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,12 @@
11
package singlemostdata
22

33
import (
4-
"context"
54
"fmt"
65
"net/url"
7-
"time"
86

97
log "github.com/Sirupsen/logrus"
108
"github.com/matt-deboer/mpp/pkg/locator"
119
"github.com/matt-deboer/mpp/pkg/selector"
12-
"github.com/prometheus/common/model"
1310
)
1411

1512
const (
@@ -60,24 +57,19 @@ func (s *Selector) Select(endpoints []*locator.PrometheusEndpoint) (err error) {
6057
for i, endpoint := range endpoints {
6158
endpoint.Selected = false
6259
if endpoint.QueryAPI != nil {
63-
value, err := endpoint.QueryAPI.Query(context.TODO(), comparisonMetricName, time.Now())
60+
scraped, err := locator.ScrapeMetric(endpoint.Address, "prometheus_local_storage_ingested_samples_total")
6461
if err != nil {
6562
log.Errorf("Endpoint %v returned error: %v", endpoint, err)
6663
endpoint.Error = err
6764
} else {
6865
if log.GetLevel() >= log.DebugLevel {
69-
log.Debugf("Endpoint %v returned value: %v", endpoint, value)
66+
log.Debugf("Endpoint %v returned value: %v", endpoint, scraped)
7067
}
71-
if value.Type() == model.ValVector {
72-
sampleValue := int64(value.(model.Vector)[0].Value)
73-
endpoint.ComparisonMetricValue = sampleValue
74-
if sampleValue > mostData {
75-
mostData = sampleValue
76-
mostDataIndex = i
77-
}
78-
} else {
79-
endpoint.Error = fmt.Errorf("Endpoint %v returned unexpected type: %v", endpoint, value.Type())
80-
log.Error(endpoint.Error)
68+
sampleValue := int64(scraped.Value)
69+
endpoint.ComparisonMetricValue = sampleValue
70+
if sampleValue > mostData {
71+
mostData = sampleValue
72+
mostDataIndex = i
8173
}
8274
}
8375
}

pkg/server/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,9 @@ func parseLocators(c *cli.Context) []locator.Locator {
188188
locators = append(locators, locator.NewEndpointsFileLocator(endpointsFile))
189189
}
190190

191-
if len(kubeNamespace) > 0 {
192-
if len(kubeServiceName) == 0 && len(kubePodLabelSelector) == 0 {
193-
argError(c, "Kubernetes locator requires one of either 'kube-service-name' or 'kube-pod-label-selector'")
191+
if len(kubeServiceName) > 0 || len(kubePodLabelSelector) > 0 {
192+
if len(kubeNamespace) == 0 {
193+
argError(c, `--kube-namespace is required when using the kubernetes locator`)
194194
}
195195
kubePort := c.String("kube-port")
196196
locator, err := kuberneteslocator.NewKubernetesLocator(kubeconfig,

0 commit comments

Comments
 (0)