From a27d809abff491b6174ecbc399cd48fdffa95002 Mon Sep 17 00:00:00 2001 From: Johnathan Kupferer Date: Mon, 17 Jun 2024 13:29:39 -0400 Subject: [PATCH] Switch from staticmethod to classmethod --- operator/resourceclaim.py | 48 ++++++++-------- operator/resourcehandle.py | 107 +++++++++++++++++++---------------- operator/resourcepool.py | 29 +++++----- operator/resourceprovider.py | 56 +++++++++--------- 4 files changed, 126 insertions(+), 114 deletions(-) diff --git a/operator/resourceclaim.py b/operator/resourceclaim.py index caa373a..57f587f 100644 --- a/operator/resourceclaim.py +++ b/operator/resourceclaim.py @@ -24,15 +24,15 @@ class ResourceClaim: instances = {} lock = asyncio.Lock() - @staticmethod - def __register_definition(definition: Mapping) -> ResourceClaimT: + @classmethod + def __register_definition(cls, definition: Mapping) -> ResourceClaimT: name = definition['metadata']['name'] namespace = definition['metadata']['namespace'] - resource_claim = ResourceClaim.instances.get((namespace, name)) + resource_claim = cls.instances.get((namespace, name)) if resource_claim: resource_claim.refresh_from_definition(definition=definition) else: - resource_claim = ResourceClaim( + resource_claim = cls( annotations = definition['metadata'].get('annotations', {}), labels = definition['metadata'].get('labels', {}), meta = definition['metadata'], @@ -42,22 +42,23 @@ def __register_definition(definition: Mapping) -> ResourceClaimT: status = definition.get('status', {}), uid = definition['metadata']['uid'], ) - ResourceClaim.instances[(namespace, name)] = resource_claim + cls.instances[(namespace, name)] = resource_claim return resource_claim - @staticmethod - async def get(name: str, namespace: str) -> ResourceClaimT: - async with ResourceClaim.lock: - resource_claim = ResourceClaim.instances.get((namespace, name)) + @classmethod + async def get(cls, name: str, namespace: str) -> ResourceClaimT: + async with cls.lock: + resource_claim = cls.instances.get((namespace, name)) if resource_claim: return resource_claim definition = await Poolboy.custom_objects_api.get_namespaced_custom_object( Poolboy.operator_domain, Poolboy.operator_version, namespace, 'resourceclaims', name ) - return ResourceClaim.__register_definition(definition=definition) + return cls.__register_definition(definition=definition) - @staticmethod + @classmethod async def register( + cls, annotations: kopf.Annotations, labels: kopf.Labels, meta: kopf.Meta, @@ -67,8 +68,8 @@ async def register( status: kopf.Status, uid: str, ) -> ResourceClaimT: - async with ResourceClaim.lock: - resource_claim = ResourceClaim.instances.get((namespace, name)) + async with cls.lock: + resource_claim = cls.instances.get((namespace, name)) if resource_claim: resource_claim.refresh( annotations = annotations, @@ -79,7 +80,7 @@ async def register( uid = uid, ) else: - resource_claim = ResourceClaim( + resource_claim = cls( annotations = annotations, labels = labels, meta = meta, @@ -89,20 +90,21 @@ async def register( status = status, uid = uid, ) - ResourceClaim.instances[(namespace, name)] = resource_claim + cls.instances[(namespace, name)] = resource_claim return resource_claim - @staticmethod + @classmethod async def register_definition( + cls, definition: Mapping, ) -> ResourceClaimT: - async with ResourceClaim.lock: - return ResourceClaim.__register_definition(definition=definition) + async with cls.lock: + return cls.__register_definition(definition=definition) - @staticmethod - async def unregister(name: str, namespace: str) -> Optional[ResourceClaimT]: - async with ResourceClaim.lock: - return ResourceClaim.instances.pop((namespace, name), None) + @classmethod + async def unregister(cls, name: str, namespace: str) -> Optional[ResourceClaimT]: + async with cls.lock: + return cls.instances.pop((namespace, name), None) def __init__(self, annotations: Union[kopf.Annotations, Mapping], @@ -927,7 +929,7 @@ async def refetch(self) -> Optional[ResourceClaimT]: return self except kubernetes_asyncio.client.exceptions.ApiException as e: if e.status == 404: - ResourceClaim.unregister(name=self.name, namespace=self.namespace) + self.unregister(name=self.name, namespace=self.namespace) return None else: raise diff --git a/operator/resourcehandle.py b/operator/resourcehandle.py index 86e0d08..b40ba20 100644 --- a/operator/resourcehandle.py +++ b/operator/resourcehandle.py @@ -29,14 +29,14 @@ class ResourceHandle: unbound_instances = {} lock = asyncio.Lock() - @staticmethod - def __register_definition(definition: Mapping) -> ResourceHandleT: + @classmethod + def __register_definition(cls, definition: Mapping) -> ResourceHandleT: name = definition['metadata']['name'] - resource_handle = ResourceHandle.all_instances.get(name) + resource_handle = cls.all_instances.get(name) if resource_handle: resource_handle.refresh_from_definition(definition=definition) else: - resource_handle = ResourceHandle( + resource_handle = cls( annotations = definition['metadata'].get('annotations', {}), labels = definition['metadata'].get('labels', {}), meta = definition['metadata'], @@ -49,15 +49,16 @@ def __register_definition(definition: Mapping) -> ResourceHandleT: resource_handle.__register() return resource_handle - @staticmethod + @classmethod async def bind_handle_to_claim( + cls, logger: kopf.ObjectLogger, resource_claim: ResourceClaimT, resource_claim_resources: List[Mapping], ) -> Optional[ResourceHandleT]: - async with ResourceHandle.lock: + async with cls.lock: # Check if there is already an assigned claim - resource_handle = ResourceHandle.bound_instances.get((resource_claim.namespace, resource_claim.name)) + resource_handle = cls.bound_instances.get((resource_claim.namespace, resource_claim.name)) if resource_handle: return resource_handle @@ -66,7 +67,7 @@ async def bind_handle_to_claim( claim_status_resources = resource_claim.status_resources # Loop through unbound instances to find best match - for resource_handle in ResourceHandle.unbound_instances.values(): + for resource_handle in cls.unbound_instances.values(): # Honor explicit pool requests if resource_claim.resource_pool_name \ and resource_claim.resource_pool_name != resource_handle.resource_pool_name: @@ -170,7 +171,7 @@ async def bind_handle_to_claim( _content_type = 'application/json-patch+json', body = patch, ) - matched_resource_handle = ResourceHandle.__register_definition(definition=definition) + matched_resource_handle = cls.__register_definition(definition=definition) except kubernetes_asyncio.client.exceptions.ApiException as exception: if exception.status == 404: logger.warning(f"Attempt to bind deleted {matched_resource_handle} to {resource_claim}") @@ -196,8 +197,9 @@ async def bind_handle_to_claim( ) return matched_resource_handle - @staticmethod + @classmethod async def create_for_claim( + cls, logger: kopf.ObjectLogger, resource_claim: ResourceClaimT, resource_claim_resources: List[Mapping], @@ -316,15 +318,16 @@ async def create_for_claim( plural = 'resourcehandles', version = Poolboy.operator_version, ) - resource_handle = await ResourceHandle.register_definition(definition=definition) + resource_handle = await cls.register_definition(definition=definition) logger.info( f"Created ResourceHandle {resource_handle.name} for " f"ResourceClaim {resource_claim.name} in {resource_claim.namespace}" ) return resource_handle - @staticmethod + @classmethod async def create_for_pool( + cls, logger: kopf.ObjectLogger, resource_pool: ResourcePoolT, ): @@ -364,18 +367,19 @@ async def create_for_pool( plural = "resourcehandles", version = Poolboy.operator_version, ) - resource_handle = await ResourceHandle.register_definition(definition=definition) + resource_handle = await cls.register_definition(definition=definition) logger.info(f"Created ResourceHandle {resource_handle.name} for ResourcePool {resource_pool.name}") return resource_handle - @staticmethod + @classmethod async def delete_unbound_handles_for_pool( + cls, logger: kopf.ObjectLogger, resource_pool: ResourcePoolT, ) -> List[ResourceHandleT]: - async with ResourceHandle.lock: + async with cls.lock: resource_handles = [] - for resource_handle in list(ResourceHandle.unbound_instances.values()): + for resource_handle in list(cls.unbound_instances.values()): if resource_handle.resource_pool_name == resource_pool.name \ and resource_handle.resource_pool_namespace == resource_pool.namespace: logger.info( @@ -386,10 +390,10 @@ async def delete_unbound_handles_for_pool( await resource_handle.delete() return resource_handles - @staticmethod - async def get(name: str) -> Optional[ResourceHandleT]: - async with ResourceHandle.lock: - resource_handle = ResourceHandle.all_instances.get(name) + @classmethod + async def get(cls, name: str) -> Optional[ResourceHandleT]: + async with cls.lock: + resource_handle = cls.all_instances.get(name) if resource_handle: return resource_handle definition = await Poolboy.custom_objects_api.get_namespaced_custom_object( @@ -397,14 +401,18 @@ async def get(name: str) -> Optional[ResourceHandleT]: ) if 'deletionTimestamp' in definition['metadata']: return None - return ResourceHandle.__register_definition(definition=definition) + return cls.__register_definition(definition=definition) - @staticmethod - def get_from_cache(name: str) -> Optional[ResourceHandleT]: - return ResourceHandle.all_instances.get(name) + @classmethod + def get_from_cache(cls, name: str) -> Optional[ResourceHandleT]: + return cls.all_instances.get(name) - @staticmethod - async def get_unbound_handles_for_pool(resource_pool: ResourcePoolT, logger) -> List[ResourceHandleT]: + @classmethod + async def get_unbound_handles_for_pool( + cls, + resource_pool: ResourcePoolT, + logger: kopf.ObjectLogger, + ) -> List[ResourceHandleT]: async with ResourceHandle.lock: resource_handles = [] for resource_handle in ResourceHandle.unbound_instances.values(): @@ -413,8 +421,8 @@ async def get_unbound_handles_for_pool(resource_pool: ResourcePoolT, logger) -> resource_handles.append(resource_handle) return resource_handles - @staticmethod - async def preload(logger: kopf.ObjectLogger) -> None: + @classmethod + async def preload(cls, logger: kopf.ObjectLogger) -> None: async with ResourceHandle.lock: _continue = None while True: @@ -424,13 +432,14 @@ async def preload(logger: kopf.ObjectLogger) -> None: limit = 50, ) for definition in resource_handle_list['items']: - ResourceHandle.__register_definition(definition=definition) + cls.__register_definition(definition=definition) _continue = resource_handle_list['metadata'].get('continue') if not _continue: break - @staticmethod + @classmethod async def register( + cls, annotations: kopf.Annotations, labels: kopf.Labels, meta: kopf.Meta, @@ -440,8 +449,8 @@ async def register( status: kopf.Status, uid: str, ) -> ResourceHandleT: - async with ResourceHandle.lock: - resource_handle = ResourceHandle.all_instances.get(name) + async with cls.lock: + resource_handle = cls.all_instances.get(name) if resource_handle: resource_handle.refresh( annotations = annotations, @@ -452,7 +461,7 @@ async def register( uid = uid, ) else: - resource_handle = ResourceHandle( + resource_handle = cls( annotations = annotations, labels = labels, meta = meta, @@ -465,15 +474,15 @@ async def register( resource_handle.__register() return resource_handle - @staticmethod - async def register_definition(definition: Mapping) -> ResourceHandleT: - async with ResourceHandle.lock: - return ResourceHandle.__register_definition(definition) + @classmethod + async def register_definition(cls, definition: Mapping) -> ResourceHandleT: + async with cls.lock: + return cls.__register_definition(definition) - @staticmethod - async def unregister(name: str) -> Optional[ResourceHandleT]: - async with ResourceHandle.lock: - resource_handle = ResourceHandle.all_instances.pop(name, None) + @classmethod + async def unregister(cls, name: str) -> Optional[ResourceHandleT]: + async with cls.lock: + resource_handle = cls.all_instances.pop(name, None) if resource_handle: resource_handle.__unregister() return resource_handle @@ -508,21 +517,21 @@ def __register(self) -> None: Add ResourceHandle to register of bound or unbound instances. This method must be called with the ResourceHandle.lock held. """ - ResourceHandle.all_instances[self.name] = self + self.all_instances[self.name] = self if self.is_bound: - ResourceHandle.bound_instances[( + self.bound_instances[( self.resource_claim_namespace, self.resource_claim_name )] = self - ResourceHandle.unbound_instances.pop(self.name, None) + self.unbound_instances.pop(self.name, None) elif not self.is_deleting: - ResourceHandle.unbound_instances[self.name] = self + self.unbound_instances[self.name] = self def __unregister(self) -> None: - ResourceHandle.all_instances.pop(self.name, None) - ResourceHandle.unbound_instances.pop(self.name, None) + self.all_instances.pop(self.name, None) + self.unbound_instances.pop(self.name, None) if self.is_bound: - ResourceHandle.bound_instances.pop( + self.bound_instances.pop( (self.resource_claim_namespace, self.resource_claim_name), None, ) @@ -1047,7 +1056,7 @@ async def refetch(self) -> Optional[ResourceHandleT]: return self except kubernetes_asyncio.client.exceptions.ApiException as e: if e.status == 404: - ResourceHandle.unregister(name=self.name) + self.unregister(name=self.name) return None else: raise diff --git a/operator/resourcepool.py b/operator/resourcepool.py index e577680..ab86b21 100644 --- a/operator/resourcepool.py +++ b/operator/resourcepool.py @@ -17,13 +17,14 @@ class ResourcePool: instances = {} lock = asyncio.Lock() - @staticmethod - async def get(name: str) -> ResourcePoolT: - async with ResourcePool.lock: - return ResourcePool.instances.get(name) + @classmethod + async def get(cls, name: str) -> ResourcePoolT: + async with cls.lock: + return cls.instances.get(name) - @staticmethod + @classmethod async def register( + cls, annotations: kopf.Annotations, labels: kopf.Labels, meta: kopf.Meta, @@ -33,8 +34,8 @@ async def register( status: kopf.Status, uid: str, ) -> ResourcePoolT: - async with ResourcePool.lock: - resource_pool = ResourcePool.instances.get(name) + async with cls.lock: + resource_pool = cls.instances.get(name) if resource_pool: resource_pool.refresh( annotations = annotations, @@ -45,7 +46,7 @@ async def register( uid = uid, ) else: - resource_pool = ResourcePool( + resource_pool = cls( annotations = annotations, labels = labels, meta = meta, @@ -58,10 +59,10 @@ async def register( resource_pool.__register() return resource_pool - @staticmethod - async def unregister(name: str) -> Optional[ResourcePoolT]: - async with ResourcePool.lock: - return ResourcePool.instances.pop(name, None) + @classmethod + async def unregister(cls, name: str) -> Optional[ResourcePoolT]: + async with cls.lock: + return cls.instances.pop(name, None) def __init__(self, annotations: kopf.Annotations, @@ -141,10 +142,10 @@ def vars(self) -> Mapping: return self.spec.get('vars', {}) def __register(self) -> None: - ResourcePool.instances[self.name] = self + self.instances[self.name] = self def __unregister(self) -> None: - ResourcePool.instances.pop(self.name, None) + self.instances.pop(self.name, None) def refresh(self, annotations: kopf.Annotations, diff --git a/operator/resourceprovider.py b/operator/resourceprovider.py index 319fd40..bf30314 100644 --- a/operator/resourceprovider.py +++ b/operator/resourceprovider.py @@ -118,22 +118,22 @@ class ResourceProvider: instances = {} lock = asyncio.Lock() - @staticmethod - def __register_definition(definition: Mapping) -> ResourceProviderT: + @classmethod + def __register_definition(cls, definition: Mapping) -> ResourceProviderT: name = definition['metadata']['name'] - resource_provider = ResourceProvider.instances.get(name) + resource_provider = cls.instances.get(name) if resource_provider: resource_provider.definition = definition self.__init_resource_template_validator() else: - resource_provider = ResourceProvider(definition=definition) - ResourceProvider.instances[name] = resource_provider + resource_provider = cls(definition=definition) + cls.instances[name] = resource_provider return resource_provider - @staticmethod - def find_provider_by_template_match(template: Mapping) -> ResourceProviderT: + @classmethod + def find_provider_by_template_match(cls, template: Mapping) -> ResourceProviderT: provider_matches = [] - for provider in ResourceProvider.instances.values(): + for provider in cls.instances.values(): if provider.is_match_for_template(template): provider_matches.append(provider) if len(provider_matches) == 0: @@ -143,20 +143,20 @@ def find_provider_by_template_match(template: Mapping) -> ResourceProviderT: else: raise kopf.TemporaryError(f"Resource template matches multiple ResourceProviders", delay=600) - @staticmethod - async def get(name: str) -> ResourceProviderT: - async with ResourceProvider.lock: - resource_provider = ResourceProvider.instances.get(name) + @classmethod + async def get(cls, name: str) -> ResourceProviderT: + async with cls.lock: + resource_provider = cls.instances.get(name) if resource_provider: return resource_provider definition = await Poolboy.custom_objects_api.get_cluster_custom_object( Poolboy.operator_domain, Poolboy.operator_version, 'resourceproviders', name ) - return ResourceProvider.__register_definition(definition=definition) + return cls.__register_definition(definition=definition) - @staticmethod - async def preload(logger: kopf.ObjectLogger) -> None: - async with ResourceProvider.lock: + @classmethod + async def preload(cls, logger: kopf.ObjectLogger) -> None: + async with cls.lock: _continue = None while True: resource_provider_list = await Poolboy.custom_objects_api.list_namespaced_custom_object( @@ -165,30 +165,30 @@ async def preload(logger: kopf.ObjectLogger) -> None: limit = 50, ) for definition in resource_provider_list['items']: - ResourceProvider.__register_definition(definition=definition) + cls.__register_definition(definition=definition) _continue = resource_provider_list['metadata'].get('continue') if not _continue: break - @staticmethod - async def register(definition: Mapping, logger: kopf.ObjectLogger) -> ResourceProviderT: - async with ResourceProvider.lock: + @classmethod + async def register(cls, definition: Mapping, logger: kopf.ObjectLogger) -> ResourceProviderT: + async with cls.lock: name = definition['metadata']['name'] - resource_provider = ResourceProvider.instances.get(name) + resource_provider = cls.instances.get(name) if resource_provider: resource_provider.__init__(definition=definition) logger.info(f"Refreshed definition of ResourceProvider {name}") else: - resource_provider = ResourceProvider.__register_definition(definition=definition) + resource_provider = cls.__register_definition(definition=definition) logger.info(f"Registered ResourceProvider {name}") return resource_provider - @staticmethod - async def unregister(name: str, logger: kopf.ObjectLogger) -> Optional[ResourceProviderT]: - async with ResourceProvider.lock: - if name in ResourceProvider.instances: + @classmethod + async def unregister(cls, name: str, logger: kopf.ObjectLogger) -> Optional[ResourceProviderT]: + async with cls.lock: + if name in cls.instances: logger.info(f"Unregistered ResourceProvider {name}") - return ResourceProvider.instances.pop(name) + return cls.instances.pop(name) def __init__(self, definition: Mapping) -> None: self.meta = definition['metadata'] @@ -435,7 +435,7 @@ async def get_claim_resources(self, resources = [] for linked_resource_provider in self.linked_resource_providers: - resource_provider = await ResourceProvider.get(linked_resource_provider.name) + resource_provider = await self.get(linked_resource_provider.name) resources.extend( await resource_provider.get_claim_resources( resource_claim = resource_claim,