From 4f524fb1b2bea288685847bf14fd56ad6bf3e57b Mon Sep 17 00:00:00 2001 From: Henning Jacobs Date: Sun, 15 Jan 2017 00:42:00 +0100 Subject: [PATCH 1/2] #73 implement --cluster-registry-url option --- README.rst | 2 + docs/multiple-clusters.rst | 26 ++++++++++++- kube_ops_view/cluster_discovery.py | 60 ++++++++++++++++++++++++++++-- kube_ops_view/kubernetes.py | 21 +++-------- kube_ops_view/main.py | 14 +++++-- kube_ops_view/stores.py | 4 +- 6 files changed, 102 insertions(+), 25 deletions(-) diff --git a/README.rst b/README.rst index db3b67d..7211ee5 100644 --- a/README.rst +++ b/README.rst @@ -126,6 +126,8 @@ The following environment variables are supported: Optional token endpoint URL for the OAuth 2 Authorization Code Grant flow. ``CLUSTERS`` Comma separated list of Kubernetes API server URLs. It defaults to ``http://localhost:8001/`` (default endpoint of ``kubectl proxy``). +``CLUSTER_REGISTRY_URL`` + URL to cluster registry returning list of Kubernetes clusters. ``CREDENTIALS_DIR`` Directory to read (OAuth) credentials from --- these credentials are only used for non-localhost cluster URLs. ``DEBUG`` diff --git a/docs/multiple-clusters.rst b/docs/multiple-clusters.rst index d3f63f8..404fdb5 100644 --- a/docs/multiple-clusters.rst +++ b/docs/multiple-clusters.rst @@ -4,4 +4,28 @@ Multiple Clusters Set the ``CLUSTERS`` environment variable to a comma separated list of Kubernetes API server URLs. -TODO: how to configure authentication, registry, .. +Cluster Registry +================ + +Clusters can be dynamically discovered by providing one HTTP endpoint as the cluster registry. +Set either the ``CLUSTER_REGISTRY_URL`` environment variable or the ``--cluster-registry-url`` option to an URL conforming to: + +.. code-block:: bash + + $ curl -H 'Authorization: Bearer mytoken' $CLUSTER_REGISTRY_URL/kubernetes-clusters + { + "items": [ + { + "id": "my-cluster-id", + "api_server_url": "https://my-cluster.example.org" + } + ] + } + +The cluster registry will be queryied with an OAuth Bearer token, the token can be statically set via the ``OAUTH2_ACCESS_TOKENS`` environment variable. +Example: + +.. code-block:: bash + + $ token=mysecrettoken + $ docker run -it -p 8080:8080 -e OAUTH2_ACCESS_TOKENS=read-only=$token hjacobs/kube-ops-view --cluster-registry-url=https://cluster-registry.example.org diff --git a/kube_ops_view/cluster_discovery.py b/kube_ops_view/cluster_discovery.py index c507cb6..e9d77b6 100644 --- a/kube_ops_view/cluster_discovery.py +++ b/kube_ops_view/cluster_discovery.py @@ -1,13 +1,30 @@ +import time +from urllib.parse import urljoin + import kubernetes.client import kubernetes.config +import logging +import re +import requests import tokens from requests.auth import AuthBase DEFAULT_CLUSTERS = 'http://localhost:8001/' +CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') + +logger = logging.getLogger(__name__) tokens.configure(from_file_only=True) +def generate_cluster_id(url: str): + '''Generate some "cluster ID" from given API server URL''' + for prefix in ('https://', 'http://'): + if url.startswith(prefix): + url = url[len(prefix):] + return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-') + + class StaticTokenAuth(AuthBase): def __init__(self, token): self.token = token @@ -29,7 +46,8 @@ def __call__(self, request): class Cluster: - def __init__(self, api_server_url, ssl_ca_cert=None, auth=None): + def __init__(self, id, api_server_url, ssl_ca_cert=None, auth=None): + self.id = id self.api_server_url = api_server_url self.ssl_ca_cert = ssl_ca_cert self.auth = auth @@ -37,17 +55,20 @@ def __init__(self, api_server_url, ssl_ca_cert=None, auth=None): class StaticClusterDiscoverer: - def __init__(self, api_server_urls): + def __init__(self, api_server_urls: list): self._clusters = [] if not api_server_urls: try: kubernetes.config.load_incluster_config() except kubernetes.config.ConfigException: - cluster = Cluster('http://localhost:8001') + # we are not running inside a cluster + # => assume default kubectl proxy URL + cluster = Cluster(generate_cluster_id(DEFAULT_CLUSTERS), DEFAULT_CLUSTERS) else: config = kubernetes.client.configuration cluster = Cluster( + generate_cluster_id(config.host), config.host, ssl_ca_cert=config.ssl_ca_cert, auth=StaticTokenAuth(config.api_key['authorization'].split(' ', 1)[-1])) @@ -60,7 +81,38 @@ def __init__(self, api_server_urls): auth = OAuthTokenAuth('read-only') else: auth = None - self._clusters.append(Cluster(api_server_url, auth=auth)) + self._clusters.append(Cluster(generate_cluster_id(api_server_url), api_server_url, auth=auth)) + + def get_clusters(self): + return self._clusters + + +class ClusterRegistryDiscoverer: + + def __init__(self, cluster_registry_url: str, cache_lifetime=60): + self._url = cluster_registry_url + self._cache_lifetime = cache_lifetime + self._last_cache_refresh = 0 + self._clusters = [] + self._session = requests.Session() + self._session.auth = OAuthTokenAuth('read-only') + + def refresh(self): + try: + response = self._session.get(urljoin(self._url, '/kubernetes-clusters'), timeout=10) + response.raise_for_status() + clusters = [] + for row in response.json()['items']: + # only consider "ready" clusters + if row.get('lifecycle_status', 'ready') == 'ready': + clusters.append(Cluster(row['id'], row['api_server_url'])) + self._clusters = clusters + self._last_cache_refresh = time.time() + except: + logger.exception('Failed to refresh from cluster registry {}'.format(self._url)) def get_clusters(self): + now = time.time() + if now - self._last_cache_refresh > self._cache_lifetime: + self.refresh() return self._clusters diff --git a/kube_ops_view/kubernetes.py b/kube_ops_view/kubernetes.py index c2c56b5..de2b4b7 100644 --- a/kube_ops_view/kubernetes.py +++ b/kube_ops_view/kubernetes.py @@ -1,23 +1,14 @@ import datetime import logging -import re from urllib.parse import urljoin import requests -CLUSTER_ID_INVALID_CHARS = re.compile('[^a-z0-9:-]') +logger = logging.getLogger(__name__) session = requests.Session() -def generate_cluster_id(url: str): - '''Generate some "cluster ID" from given API server URL''' - for prefix in ('https://', 'http://'): - if url.startswith(prefix): - url = url[len(prefix):] - return CLUSTER_ID_INVALID_CHARS.sub('-', url.lower()).strip('-') - - def map_node_status(status: dict): return { 'addresses': status.get('addresses'), @@ -62,8 +53,8 @@ def request(cluster, path, **kwargs): def get_kubernetes_clusters(cluster_discoverer): for cluster in cluster_discoverer.get_clusters(): + cluster_id = cluster.id api_server_url = cluster.api_server_url - cluster_id = generate_cluster_id(api_server_url) response = request(cluster, '/api/v1/nodes') response.raise_for_status() nodes = {} @@ -94,18 +85,18 @@ def get_kubernetes_clusters(cluster_discoverer): response.raise_for_status() data = response.json() if not data.get('items'): - logging.info('Heapster node metrics not available (yet)') + logger.info('Heapster node metrics not available (yet)') else: for metrics in data['items']: nodes[metrics['metadata']['name']]['usage'] = metrics['usage'] except: - logging.exception('Failed to get node metrics') + logger.exception('Failed to get node metrics') try: response = request(cluster, '/api/v1/namespaces/kube-system/services/heapster/proxy/apis/metrics/v1alpha1/pods') response.raise_for_status() data = response.json() if not data.get('items'): - logging.info('Heapster pod metrics not available (yet)') + logger.info('Heapster pod metrics not available (yet)') else: for metrics in data['items']: pod = pods_by_namespace_name.get((metrics['metadata']['namespace'], metrics['metadata']['name'])) @@ -115,5 +106,5 @@ def get_kubernetes_clusters(cluster_discoverer): if container['name'] == container_metrics['name']: container['resources']['usage'] = container_metrics['usage'] except: - logging.exception('Failed to get pod metrics') + logger.exception('Failed to get pod metrics') yield {'id': cluster_id, 'api_server_url': api_server_url, 'nodes': nodes, 'unassigned_pods': unassigned_pods} diff --git a/kube_ops_view/main.py b/kube_ops_view/main.py index bde7270..6a21333 100644 --- a/kube_ops_view/main.py +++ b/kube_ops_view/main.py @@ -26,7 +26,7 @@ from .mock import get_mock_clusters from .kubernetes import get_kubernetes_clusters from .stores import MemoryStore, RedisStore -from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer +from .cluster_discovery import DEFAULT_CLUSTERS, StaticClusterDiscoverer, ClusterRegistryDiscoverer logger = logging.getLogger(__name__) @@ -216,7 +216,8 @@ def print_version(ctx, param, value): @click.option('--redis-url', help='Redis URL to use for pub/sub and job locking', envvar='REDIS_URL') @click.option('--clusters', help='Comma separated list of Kubernetes API server URLs (default: {})'.format(DEFAULT_CLUSTERS), envvar='CLUSTERS') -def main(port, debug, mock, secret_key, redis_url, clusters): +@click.option('--cluster-registry-url', help='URL to cluster registry', envvar='CLUSTER_REGISTRY_URL') +def main(port, debug, mock, secret_key, redis_url, clusters, cluster_registry_url): logging.basicConfig(level=logging.DEBUG if debug else logging.INFO) store = RedisStore(redis_url) if redis_url else MemoryStore() @@ -225,8 +226,13 @@ def main(port, debug, mock, secret_key, redis_url, clusters): app.secret_key = secret_key app.store = store - api_server_urls = clusters.split(',') if clusters else [] - gevent.spawn(update, cluster_discoverer=StaticClusterDiscoverer(api_server_urls), store=store, mock=mock) + if cluster_registry_url: + discoverer = ClusterRegistryDiscoverer(cluster_registry_url) + else: + api_server_urls = clusters.split(',') if clusters else [] + discoverer = StaticClusterDiscoverer(api_server_urls) + + gevent.spawn(update, cluster_discoverer=discoverer, store=store, mock=mock) signal.signal(signal.SIGTERM, exit_gracefully) http_server = gevent.wsgi.WSGIServer(('0.0.0.0', port), app) diff --git a/kube_ops_view/stores.py b/kube_ops_view/stores.py index ebaaaaf..e4cd850 100644 --- a/kube_ops_view/stores.py +++ b/kube_ops_view/stores.py @@ -8,6 +8,8 @@ from redlock import Redlock from queue import Queue +logger = logging.getLogger(__name__) + ONE_YEAR = 3600 * 24 * 365 @@ -87,7 +89,7 @@ class RedisStore: '''Redis-based backend for deployments with replicas > 1''' def __init__(self, url: str): - logging.info('Connecting to Redis on {}..'.format(url)) + logger.info('Connecting to Redis on {}..'.format(url)) self._redis = redis.StrictRedis.from_url(url) self._redlock = Redlock([url]) From c647f152f959d3212f845002d0385741ec9e980f Mon Sep 17 00:00:00 2001 From: Henning Jacobs Date: Sun, 15 Jan 2017 00:53:47 +0100 Subject: [PATCH 2/2] #73 we need to authenticate.. --- kube_ops_view/cluster_discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube_ops_view/cluster_discovery.py b/kube_ops_view/cluster_discovery.py index e9d77b6..c70f676 100644 --- a/kube_ops_view/cluster_discovery.py +++ b/kube_ops_view/cluster_discovery.py @@ -105,7 +105,7 @@ def refresh(self): for row in response.json()['items']: # only consider "ready" clusters if row.get('lifecycle_status', 'ready') == 'ready': - clusters.append(Cluster(row['id'], row['api_server_url'])) + clusters.append(Cluster(row['id'], row['api_server_url'], auth=OAuthTokenAuth('read-only'))) self._clusters = clusters self._last_cache_refresh = time.time() except: