Skip to content

Commit

Permalink
Create new instance of collectors for every request (#88)
Browse files Browse the repository at this point in the history
This restores previous behaviour where every request would create
collectors and register them
  • Loading branch information
george-angel authored Nov 19, 2024
1 parent 1cfda93 commit ba1a4da
Showing 1 changed file with 19 additions and 20 deletions.
39 changes: 19 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var metricsNamespace = "kube_summary"
var (
flagKubeConfigPath = flag.String("kubeconfig", "", "Path of a kubeconfig file, if not provided the app will try $KUBECONFIG, $HOME/.kube/config or in cluster config")
flagListenAddress = flag.String("listen-address", ":9779", "Listen address")
metricsNamespace = "kube_summary"
)

type Collectors struct {
containerLogsInodesFree *prometheus.GaugeVec
Expand Down Expand Up @@ -298,18 +302,19 @@ func collectSummaryMetrics(summary *stats.Summary, collectors *Collectors) {
}

// nodeHandler returns metrics for the /stats/summary API of the given node
func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset, collectors *Collectors) {
func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset) {
node := mux.Vars(r)["node"]

ctx, cancel := getTimeoutContext(r)
ctx, cancel := timeoutContext(r)
defer cancel()

summary, err := getNodeSummary(ctx, kubeClient, node)
summary, err := nodeSummary(ctx, kubeClient, node)
if err != nil {
http.Error(w, fmt.Sprintf("Error querying /stats/summary for %s: %v", node, err), http.StatusInternalServerError)
return
}

collectors := newCollectors()
registry := prometheus.NewRegistry()
collectors.register(registry)
collectSummaryMetrics(summary, collectors)
Expand All @@ -319,8 +324,8 @@ func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.
}

// allNodesHandler returns metrics for all nodes in the cluster
func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset, collectors *Collectors) {
ctx, cancel := getTimeoutContext(r)
func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset) {
ctx, cancel := timeoutContext(r)
defer cancel()

nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
Expand All @@ -329,11 +334,12 @@ func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kuberne
return
}

collectors := newCollectors()
registry := prometheus.NewRegistry()
collectors.register(registry)

for _, node := range nodes.Items {
summary, err := getNodeSummary(ctx, kubeClient, node.Name)
summary, err := nodeSummary(ctx, kubeClient, node.Name)
if err != nil {
http.Error(w, fmt.Sprintf("Error querying /stats/summary for %s: %v", node.Name, err), http.StatusInternalServerError)
return
Expand All @@ -345,8 +351,8 @@ func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kuberne
h.ServeHTTP(w, r)
}

// getNodeSummary retrieves the summary for a single node
func getNodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*stats.Summary, error) {
// nodeSummary retrieves the summary for a single node
func nodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*stats.Summary, error) {
req := kubeClient.CoreV1().RESTClient().Get().Resource("nodes").Name(nodeName).SubResource("proxy").Suffix("stats/summary")
resp, err := req.DoRaw(ctx)
if err != nil {
Expand All @@ -361,8 +367,8 @@ func getNodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeN
return summary, nil
}

// getTimeoutContext returns a context with timeout based on the X-Prometheus-Scrape-Timeout-Seconds header
func getTimeoutContext(r *http.Request) (context.Context, context.CancelFunc) {
// timeoutContext returns a context with timeout based on the X-Prometheus-Scrape-Timeout-Seconds header
func timeoutContext(r *http.Request) (context.Context, context.CancelFunc) {
if v := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"); v != "" {
timeoutSeconds, err := strconv.ParseFloat(v, 64)
if err == nil {
Expand Down Expand Up @@ -399,11 +405,6 @@ func newKubeClient(path string) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(config)
}

var (
flagListenAddress = flag.String("listen-address", ":9779", "Listen address")
flagKubeConfigPath = flag.String("kubeconfig", "", "Path of a kubeconfig file, if not provided the app will try $KUBECONFIG, $HOME/.kube/config or in cluster config")
)

func main() {
flag.Parse()

Expand All @@ -413,14 +414,12 @@ func main() {
os.Exit(1)
}

collectors := newCollectors()

r := mux.NewRouter()
r.HandleFunc("/nodes", func(w http.ResponseWriter, r *http.Request) {
allNodesHandler(w, r, kubeClient, collectors)
allNodesHandler(w, r, kubeClient)
})
r.HandleFunc("/node/{node}", func(w http.ResponseWriter, r *http.Request) {
nodeHandler(w, r, kubeClient, collectors)
nodeHandler(w, r, kubeClient)
})
r.Handle("/metrics", promhttp.Handler())
r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit ba1a4da

Please sign in to comment.