Skip to content

Commit

Permalink
Switch from staticmethod to classmethod
Browse files Browse the repository at this point in the history
  • Loading branch information
jkupferer committed Jun 17, 2024
1 parent c45296c commit a27d809
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 114 deletions.
48 changes: 25 additions & 23 deletions operator/resourceclaim.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ class ResourceClaim:
instances = {}
lock = asyncio.Lock()

@staticmethod
def __register_definition(definition: Mapping) -> ResourceClaimT:
@classmethod
def __register_definition(cls, definition: Mapping) -> ResourceClaimT:
name = definition['metadata']['name']
namespace = definition['metadata']['namespace']
resource_claim = ResourceClaim.instances.get((namespace, name))
resource_claim = cls.instances.get((namespace, name))
if resource_claim:
resource_claim.refresh_from_definition(definition=definition)
else:
resource_claim = ResourceClaim(
resource_claim = cls(
annotations = definition['metadata'].get('annotations', {}),
labels = definition['metadata'].get('labels', {}),
meta = definition['metadata'],
Expand All @@ -42,22 +42,23 @@ def __register_definition(definition: Mapping) -> ResourceClaimT:
status = definition.get('status', {}),
uid = definition['metadata']['uid'],
)
ResourceClaim.instances[(namespace, name)] = resource_claim
cls.instances[(namespace, name)] = resource_claim
return resource_claim

@staticmethod
async def get(name: str, namespace: str) -> ResourceClaimT:
async with ResourceClaim.lock:
resource_claim = ResourceClaim.instances.get((namespace, name))
@classmethod
async def get(cls, name: str, namespace: str) -> ResourceClaimT:
async with cls.lock:
resource_claim = cls.instances.get((namespace, name))
if resource_claim:
return resource_claim
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
Poolboy.operator_domain, Poolboy.operator_version, namespace, 'resourceclaims', name
)
return ResourceClaim.__register_definition(definition=definition)
return cls.__register_definition(definition=definition)

@staticmethod
@classmethod
async def register(
cls,
annotations: kopf.Annotations,
labels: kopf.Labels,
meta: kopf.Meta,
Expand All @@ -67,8 +68,8 @@ async def register(
status: kopf.Status,
uid: str,
) -> ResourceClaimT:
async with ResourceClaim.lock:
resource_claim = ResourceClaim.instances.get((namespace, name))
async with cls.lock:
resource_claim = cls.instances.get((namespace, name))
if resource_claim:
resource_claim.refresh(
annotations = annotations,
Expand All @@ -79,7 +80,7 @@ async def register(
uid = uid,
)
else:
resource_claim = ResourceClaim(
resource_claim = cls(
annotations = annotations,
labels = labels,
meta = meta,
Expand All @@ -89,20 +90,21 @@ async def register(
status = status,
uid = uid,
)
ResourceClaim.instances[(namespace, name)] = resource_claim
cls.instances[(namespace, name)] = resource_claim
return resource_claim

@staticmethod
@classmethod
async def register_definition(
cls,
definition: Mapping,
) -> ResourceClaimT:
async with ResourceClaim.lock:
return ResourceClaim.__register_definition(definition=definition)
async with cls.lock:
return cls.__register_definition(definition=definition)

@staticmethod
async def unregister(name: str, namespace: str) -> Optional[ResourceClaimT]:
async with ResourceClaim.lock:
return ResourceClaim.instances.pop((namespace, name), None)
@classmethod
async def unregister(cls, name: str, namespace: str) -> Optional[ResourceClaimT]:
async with cls.lock:
return cls.instances.pop((namespace, name), None)

def __init__(self,
annotations: Union[kopf.Annotations, Mapping],
Expand Down Expand Up @@ -927,7 +929,7 @@ async def refetch(self) -> Optional[ResourceClaimT]:
return self
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status == 404:
ResourceClaim.unregister(name=self.name, namespace=self.namespace)
self.unregister(name=self.name, namespace=self.namespace)
return None
else:
raise
Expand Down
107 changes: 58 additions & 49 deletions operator/resourcehandle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class ResourceHandle:
unbound_instances = {}
lock = asyncio.Lock()

@staticmethod
def __register_definition(definition: Mapping) -> ResourceHandleT:
@classmethod
def __register_definition(cls, definition: Mapping) -> ResourceHandleT:
name = definition['metadata']['name']
resource_handle = ResourceHandle.all_instances.get(name)
resource_handle = cls.all_instances.get(name)
if resource_handle:
resource_handle.refresh_from_definition(definition=definition)
else:
resource_handle = ResourceHandle(
resource_handle = cls(
annotations = definition['metadata'].get('annotations', {}),
labels = definition['metadata'].get('labels', {}),
meta = definition['metadata'],
Expand All @@ -49,15 +49,16 @@ def __register_definition(definition: Mapping) -> ResourceHandleT:
resource_handle.__register()
return resource_handle

@staticmethod
@classmethod
async def bind_handle_to_claim(
cls,
logger: kopf.ObjectLogger,
resource_claim: ResourceClaimT,
resource_claim_resources: List[Mapping],
) -> Optional[ResourceHandleT]:
async with ResourceHandle.lock:
async with cls.lock:
# Check if there is already an assigned claim
resource_handle = ResourceHandle.bound_instances.get((resource_claim.namespace, resource_claim.name))
resource_handle = cls.bound_instances.get((resource_claim.namespace, resource_claim.name))
if resource_handle:
return resource_handle

Expand All @@ -66,7 +67,7 @@ async def bind_handle_to_claim(
claim_status_resources = resource_claim.status_resources

# Loop through unbound instances to find best match
for resource_handle in ResourceHandle.unbound_instances.values():
for resource_handle in cls.unbound_instances.values():
# Honor explicit pool requests
if resource_claim.resource_pool_name \
and resource_claim.resource_pool_name != resource_handle.resource_pool_name:
Expand Down Expand Up @@ -170,7 +171,7 @@ async def bind_handle_to_claim(
_content_type = 'application/json-patch+json',
body = patch,
)
matched_resource_handle = ResourceHandle.__register_definition(definition=definition)
matched_resource_handle = cls.__register_definition(definition=definition)
except kubernetes_asyncio.client.exceptions.ApiException as exception:
if exception.status == 404:
logger.warning(f"Attempt to bind deleted {matched_resource_handle} to {resource_claim}")
Expand All @@ -196,8 +197,9 @@ async def bind_handle_to_claim(
)
return matched_resource_handle

@staticmethod
@classmethod
async def create_for_claim(
cls,
logger: kopf.ObjectLogger,
resource_claim: ResourceClaimT,
resource_claim_resources: List[Mapping],
Expand Down Expand Up @@ -316,15 +318,16 @@ async def create_for_claim(
plural = 'resourcehandles',
version = Poolboy.operator_version,
)
resource_handle = await ResourceHandle.register_definition(definition=definition)
resource_handle = await cls.register_definition(definition=definition)
logger.info(
f"Created ResourceHandle {resource_handle.name} for "
f"ResourceClaim {resource_claim.name} in {resource_claim.namespace}"
)
return resource_handle

@staticmethod
@classmethod
async def create_for_pool(
cls,
logger: kopf.ObjectLogger,
resource_pool: ResourcePoolT,
):
Expand Down Expand Up @@ -364,18 +367,19 @@ async def create_for_pool(
plural = "resourcehandles",
version = Poolboy.operator_version,
)
resource_handle = await ResourceHandle.register_definition(definition=definition)
resource_handle = await cls.register_definition(definition=definition)
logger.info(f"Created ResourceHandle {resource_handle.name} for ResourcePool {resource_pool.name}")
return resource_handle

@staticmethod
@classmethod
async def delete_unbound_handles_for_pool(
cls,
logger: kopf.ObjectLogger,
resource_pool: ResourcePoolT,
) -> List[ResourceHandleT]:
async with ResourceHandle.lock:
async with cls.lock:
resource_handles = []
for resource_handle in list(ResourceHandle.unbound_instances.values()):
for resource_handle in list(cls.unbound_instances.values()):
if resource_handle.resource_pool_name == resource_pool.name \
and resource_handle.resource_pool_namespace == resource_pool.namespace:
logger.info(
Expand All @@ -386,25 +390,29 @@ async def delete_unbound_handles_for_pool(
await resource_handle.delete()
return resource_handles

@staticmethod
async def get(name: str) -> Optional[ResourceHandleT]:
async with ResourceHandle.lock:
resource_handle = ResourceHandle.all_instances.get(name)
@classmethod
async def get(cls, name: str) -> Optional[ResourceHandleT]:
async with cls.lock:
resource_handle = cls.all_instances.get(name)
if resource_handle:
return resource_handle
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
Poolboy.operator_domain, Poolboy.operator_version, Poolboy.namespace, 'resourcehandles', name
)
if 'deletionTimestamp' in definition['metadata']:
return None
return ResourceHandle.__register_definition(definition=definition)
return cls.__register_definition(definition=definition)

@staticmethod
def get_from_cache(name: str) -> Optional[ResourceHandleT]:
return ResourceHandle.all_instances.get(name)
@classmethod
def get_from_cache(cls, name: str) -> Optional[ResourceHandleT]:
return cls.all_instances.get(name)

@staticmethod
async def get_unbound_handles_for_pool(resource_pool: ResourcePoolT, logger) -> List[ResourceHandleT]:
@classmethod
async def get_unbound_handles_for_pool(
cls,
resource_pool: ResourcePoolT,
logger: kopf.ObjectLogger,
) -> List[ResourceHandleT]:
async with ResourceHandle.lock:
resource_handles = []
for resource_handle in ResourceHandle.unbound_instances.values():
Expand All @@ -413,8 +421,8 @@ async def get_unbound_handles_for_pool(resource_pool: ResourcePoolT, logger) ->
resource_handles.append(resource_handle)
return resource_handles

@staticmethod
async def preload(logger: kopf.ObjectLogger) -> None:
@classmethod
async def preload(cls, logger: kopf.ObjectLogger) -> None:
async with ResourceHandle.lock:
_continue = None
while True:
Expand All @@ -424,13 +432,14 @@ async def preload(logger: kopf.ObjectLogger) -> None:
limit = 50,
)
for definition in resource_handle_list['items']:
ResourceHandle.__register_definition(definition=definition)
cls.__register_definition(definition=definition)
_continue = resource_handle_list['metadata'].get('continue')
if not _continue:
break

@staticmethod
@classmethod
async def register(
cls,
annotations: kopf.Annotations,
labels: kopf.Labels,
meta: kopf.Meta,
Expand All @@ -440,8 +449,8 @@ async def register(
status: kopf.Status,
uid: str,
) -> ResourceHandleT:
async with ResourceHandle.lock:
resource_handle = ResourceHandle.all_instances.get(name)
async with cls.lock:
resource_handle = cls.all_instances.get(name)
if resource_handle:
resource_handle.refresh(
annotations = annotations,
Expand All @@ -452,7 +461,7 @@ async def register(
uid = uid,
)
else:
resource_handle = ResourceHandle(
resource_handle = cls(
annotations = annotations,
labels = labels,
meta = meta,
Expand All @@ -465,15 +474,15 @@ async def register(
resource_handle.__register()
return resource_handle

@staticmethod
async def register_definition(definition: Mapping) -> ResourceHandleT:
async with ResourceHandle.lock:
return ResourceHandle.__register_definition(definition)
@classmethod
async def register_definition(cls, definition: Mapping) -> ResourceHandleT:
async with cls.lock:
return cls.__register_definition(definition)

@staticmethod
async def unregister(name: str) -> Optional[ResourceHandleT]:
async with ResourceHandle.lock:
resource_handle = ResourceHandle.all_instances.pop(name, None)
@classmethod
async def unregister(cls, name: str) -> Optional[ResourceHandleT]:
async with cls.lock:
resource_handle = cls.all_instances.pop(name, None)
if resource_handle:
resource_handle.__unregister()
return resource_handle
Expand Down Expand Up @@ -508,21 +517,21 @@ def __register(self) -> None:
Add ResourceHandle to register of bound or unbound instances.
This method must be called with the ResourceHandle.lock held.
"""
ResourceHandle.all_instances[self.name] = self
self.all_instances[self.name] = self
if self.is_bound:
ResourceHandle.bound_instances[(
self.bound_instances[(
self.resource_claim_namespace,
self.resource_claim_name
)] = self
ResourceHandle.unbound_instances.pop(self.name, None)
self.unbound_instances.pop(self.name, None)
elif not self.is_deleting:
ResourceHandle.unbound_instances[self.name] = self
self.unbound_instances[self.name] = self

def __unregister(self) -> None:
ResourceHandle.all_instances.pop(self.name, None)
ResourceHandle.unbound_instances.pop(self.name, None)
self.all_instances.pop(self.name, None)
self.unbound_instances.pop(self.name, None)
if self.is_bound:
ResourceHandle.bound_instances.pop(
self.bound_instances.pop(
(self.resource_claim_namespace, self.resource_claim_name),
None,
)
Expand Down Expand Up @@ -1047,7 +1056,7 @@ async def refetch(self) -> Optional[ResourceHandleT]:
return self
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status == 404:
ResourceHandle.unregister(name=self.name)
self.unregister(name=self.name)
return None
else:
raise
Loading

0 comments on commit a27d809

Please sign in to comment.