Skip to content

Commit

Permalink
Create new instance of collectors for every request
Browse files Browse the repository at this point in the history
This restores previous behaviour where every request would create
collectors and register them
  • Loading branch information
george-angel committed Nov 18, 2024
1 parent 1cfda93 commit 3c12230
Showing 1 changed file with 19 additions and 20 deletions.
39 changes: 19 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

var metricsNamespace = "kube_summary"
var (
flagKubeConfigPath = flag.String("kubeconfig", "", "Path of a kubeconfig file, if not provided the app will try $KUBECONFIG, $HOME/.kube/config or in cluster config")
flagListenAddress = flag.String("listen-address", ":9779", "Listen address")
metricsNamespace = "kube_summary"
)

type Collectors struct {
containerLogsInodesFree *prometheus.GaugeVec
Expand Down Expand Up @@ -298,18 +302,19 @@ func collectSummaryMetrics(summary *stats.Summary, collectors *Collectors) {
}

// nodeHandler returns metrics for the /stats/summary API of the given node
func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset, collectors *Collectors) {
func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset) {
node := mux.Vars(r)["node"]

ctx, cancel := getTimeoutContext(r)
ctx, cancel := timeoutContext(r)
defer cancel()

summary, err := getNodeSummary(ctx, kubeClient, node)
summary, err := nodeSummary(ctx, kubeClient, node)
if err != nil {
http.Error(w, fmt.Sprintf("Error querying /stats/summary for %s: %v", node, err), http.StatusInternalServerError)
return
}

collectors := newCollectors()
registry := prometheus.NewRegistry()
collectors.register(registry)
collectSummaryMetrics(summary, collectors)
Expand All @@ -319,8 +324,8 @@ func nodeHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.
}

// allNodesHandler returns metrics for all nodes in the cluster
func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset, collectors *Collectors) {
ctx, cancel := getTimeoutContext(r)
func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kubernetes.Clientset) {
ctx, cancel := timeoutContext(r)
defer cancel()

nodes, err := kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
Expand All @@ -329,11 +334,12 @@ func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kuberne
return
}

collectors := newCollectors()
registry := prometheus.NewRegistry()
collectors.register(registry)

for _, node := range nodes.Items {
summary, err := getNodeSummary(ctx, kubeClient, node.Name)
summary, err := nodeSummary(ctx, kubeClient, node.Name)
if err != nil {
http.Error(w, fmt.Sprintf("Error querying /stats/summary for %s: %v", node.Name, err), http.StatusInternalServerError)
return
Expand All @@ -345,8 +351,8 @@ func allNodesHandler(w http.ResponseWriter, r *http.Request, kubeClient *kuberne
h.ServeHTTP(w, r)
}

// getNodeSummary retrieves the summary for a single node
func getNodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*stats.Summary, error) {
// nodeSummary retrieves the summary for a single node
func nodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeName string) (*stats.Summary, error) {
req := kubeClient.CoreV1().RESTClient().Get().Resource("nodes").Name(nodeName).SubResource("proxy").Suffix("stats/summary")
resp, err := req.DoRaw(ctx)
if err != nil {
Expand All @@ -361,8 +367,8 @@ func getNodeSummary(ctx context.Context, kubeClient *kubernetes.Clientset, nodeN
return summary, nil
}

// getTimeoutContext returns a context with timeout based on the X-Prometheus-Scrape-Timeout-Seconds header
func getTimeoutContext(r *http.Request) (context.Context, context.CancelFunc) {
// timeoutContext returns a context with timeout based on the X-Prometheus-Scrape-Timeout-Seconds header
func timeoutContext(r *http.Request) (context.Context, context.CancelFunc) {
if v := r.Header.Get("X-Prometheus-Scrape-Timeout-Seconds"); v != "" {
timeoutSeconds, err := strconv.ParseFloat(v, 64)
if err == nil {
Expand Down Expand Up @@ -399,11 +405,6 @@ func newKubeClient(path string) (*kubernetes.Clientset, error) {
return kubernetes.NewForConfig(config)
}

var (
flagListenAddress = flag.String("listen-address", ":9779", "Listen address")
flagKubeConfigPath = flag.String("kubeconfig", "", "Path of a kubeconfig file, if not provided the app will try $KUBECONFIG, $HOME/.kube/config or in cluster config")
)

func main() {
flag.Parse()

Expand All @@ -413,14 +414,12 @@ func main() {
os.Exit(1)
}

collectors := newCollectors()

r := mux.NewRouter()
r.HandleFunc("/nodes", func(w http.ResponseWriter, r *http.Request) {
allNodesHandler(w, r, kubeClient, collectors)
allNodesHandler(w, r, kubeClient)
})
r.HandleFunc("/node/{node}", func(w http.ResponseWriter, r *http.Request) {
nodeHandler(w, r, kubeClient, collectors)
nodeHandler(w, r, kubeClient)
})
r.Handle("/metrics", promhttp.Handler())
r.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down

0 comments on commit 3c12230

Please sign in to comment.