Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resource watcher issue on early disconnect. #107

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions operator/resourceclaim.py
Original file line number Diff line number Diff line change
Expand Up @@ -1092,20 +1092,25 @@ async def remove_resource_from_status(self, index):
await self.json_patch_status(patch)

async def update_resource_in_status(self, index, state):
patch = [{
"op": "add",
"path": f"/status/resources/{index}/state",
"value": state,
}]
patch = []
if self.status_resources[index].get('state') != state:
patch.append({
"op": "add",
"path": f"/status/resources/{index}/state",
"value": state,
})

if self.has_resource_provider:
resource_provider = await self.get_resource_provider()
if resource_provider.status_summary_template:
self.status_resources[index]['state'] = state
patch.append({
"op": "add",
"path": "/status/summary",
"value": resource_provider.make_status_summary(self),
})
status_summary = resource_provider.make_status_summary(self)
if status_summary != self.status.get('summary'):
patch.append({
"op": "add",
"path": "/status/summary",
"value": status_summary,
})

await self.json_patch_status(patch)
if patch:
await self.json_patch_status(patch)
3 changes: 2 additions & 1 deletion operator/resourcehandle.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ async def get(name: str) -> Optional[ResourceHandleT]:
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
Poolboy.operator_domain, Poolboy.operator_version, Poolboy.namespace, 'resourcehandles', name
)

if 'deletionTimestamp' in definition['metadata']:
return None
return ResourceHandle.__register_definition(definition=definition)

@staticmethod
Expand Down
199 changes: 109 additions & 90 deletions operator/resourcewatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def watch(self):
if '/' in self.api_version:
group, version = self.api_version.split('/')
plural = await poolboy_k8s.kind_to_plural(group=group, version=version, kind=self.kind)
kwargs = dict(group = group, plural = plural, version = version)
kwargs = dict(group=group, plural=plural, version=version)
if self.namespace:
method = Poolboy.custom_objects_api.list_namespaced_custom_object
kwargs['namespace'] = self.namespace
Expand All @@ -109,6 +109,7 @@ async def watch(self):
try:
await self.__watch(method, **kwargs)
except asyncio.CancelledError:
logger.info(f"{self} cancelled")
return
except ResourceWatchRestartError as e:
logger.info(f"{self} restart: {e}")
Expand All @@ -133,6 +134,17 @@ async def watch(self):
async def __watch(self, method, **kwargs):
watch = None
try:
_continue = None
while True:
obj_list = await method(**kwargs, _continue=_continue, limit=50)
for obj in obj_list.get('items', []):
if not isinstance(obj, Mapping):
obj = Poolboy.api_client.sanitize_for_serialization(event_obj)
await self.__watch_event(event_type='PRELOAD', event_obj=obj)
_continue = obj_list['metadata'].get('continue')
if not _continue:
break

watch = kubernetes_asyncio.watch.Watch()
async for event in watch.stream(method, **kwargs):
if not isinstance(event, Mapping):
Expand All @@ -151,95 +163,7 @@ async def __watch(self, method, **kwargs):
else:
raise ResourceWatchFailedError(f"UNKNOWN EVENT: {event}")

event_obj_annotations = event_obj['metadata'].get('annotations')
if not event_obj_annotations:
continue

resource_handle_name = event_obj_annotations.get(resource_handle_name_annotation)
resource_handle_namespace = event_obj_annotations.get(resource_handle_namespace_annotation)
resource_index = int(event_obj_annotations.get(resource_index_annotation, 0))
resource_name = event_obj['metadata']['name']
resource_namespace = event_obj['metadata'].get('namespace')
resource_description = (
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name} in {resource_namespace}"
if resource_namespace else
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name}"
)

if not resource_handle_name or not resource_handle_namespace:
continue

resource_handle = resourcehandle.ResourceHandle.get_from_cache(
name = resource_handle_name
)
if resource_handle:
await resource_handle.handle_resource_event(
logger = logger,
resource_index = resource_index,
resource_state = event_obj,
)
else:
logger.debug(
f"Received event for ResourceHande {resource_handle_name} "
f"which seems to have been deleted."
)
continue

resource_claim_name = event_obj_annotations.get(resource_claim_name_annotation)
resource_claim_namespace = event_obj_annotations.get(resource_claim_namespace_annotation)

if not resource_claim_name or not resource_claim_namespace:
continue

resource_claim_description = f"ResourceClaim {resource_claim_name} in {resource_claim_namespace}"
try:
resource_claim = await resourceclaim.ResourceClaim.get(
name = resource_claim_name,
namespace = resource_claim_namespace,
)

# Do not manage status for detached ResourceClaim
if resource_claim.is_detached:
continue

prev_state = resource_claim.status_resources[resource_index].get('state')
prev_description = (
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name} in {resource_namespace}"
if resource_namespace else
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name}"
) if prev_state else None
if resource_claim.resource_handle_name != resource_handle_name:
logger.info(
f"Ignoring resource update for {resource_claim_description} due to ResourceHandle "
f"name mismatch, {self.resource_handle_name} != {resource_handle_name}"
)
elif prev_state and prev_description != resource_description:
logger.info(
f"Ignoring resource update for {resource_claim_description} due to resource "
f"mismatch, {resource_description} != {prev_description}"
)
elif event_type == 'DELETED':
if prev_state:
await resource_claim.remove_resource_from_status(resource_index)
else:
logger.info(
f"Ignoring resource delete for {resource_claim_description} due to resource "
f"state not present for {resource_description}"
)
else:
await resource_claim.update_resource_in_status(resource_index, event_obj)
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status != 404:
logger.warning(
f"Received {e.status} response when attempting to patch resource state for "
f"{event_type.lower()} resource for {resource_claim_description}: "
f"{e}"
)
except Exception as e:
logger.exception(
f"Exception when attempting to patch resource state for {event_type.lower()} resource "
f"for {resource_claim_description}"
)
await self.__watch_event(event_type=event_type, event_obj=event_obj)
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status == 410:
raise ResourceWatchRestartError("Received 410 expired response.")
Expand All @@ -248,3 +172,98 @@ async def __watch(self, method, **kwargs):
finally:
if watch:
await watch.close()

async def __watch_event(self, event_type, event_obj):
event_obj_annotations = event_obj['metadata'].get('annotations')
if not event_obj_annotations:
return

resource_handle_name = event_obj_annotations.get(resource_handle_name_annotation)
resource_handle_namespace = event_obj_annotations.get(resource_handle_namespace_annotation)
resource_index = int(event_obj_annotations.get(resource_index_annotation, 0))
resource_name = event_obj['metadata']['name']
resource_namespace = event_obj['metadata'].get('namespace')
resource_description = (
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name} in {resource_namespace}"
if resource_namespace else
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name}"
)

if not resource_handle_name or not resource_handle_namespace:
return

resource_handle = resourcehandle.ResourceHandle.get_from_cache(name=resource_handle_name)

if resource_handle:
await resource_handle.handle_resource_event(
logger = logger,
resource_index = resource_index,
resource_state = event_obj,
)
else:
logger.debug(
f"Received event for ResourceHandle {resource_handle_name} "
f"which seems to have been deleted."
)
return

resource_claim_name = event_obj_annotations.get(resource_claim_name_annotation)
resource_claim_namespace = event_obj_annotations.get(resource_claim_namespace_annotation)

if not resource_claim_name or not resource_claim_namespace:
return

resource_claim_description = f"ResourceClaim {resource_claim_name} in {resource_claim_namespace}"
try:
resource_claim = await resourceclaim.ResourceClaim.get(
name = resource_claim_name,
namespace = resource_claim_namespace,
)

# Do not manage status for detached ResourceClaim
if resource_claim.is_detached:
logger.debug(
f"Not handling event for {resource_description} for detached {resource_claim_description}",
)
return

prev_state = resource_claim.status_resources[resource_index].get('state')
prev_description = (
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name} in {resource_namespace}"
if resource_namespace else
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name}"
) if prev_state else None
if resource_claim.resource_handle_name != resource_handle_name:
logger.info(
f"Ignoring resource update on {resource_description} for "
f"{resource_claim_description} due to ResourceHandle "
f"name mismatch, {resource_claim.resource_handle_name} != {resource_handle_name}"
)
elif prev_state and prev_description != resource_description:
logger.info(
f"Ignoring resource update for {resource_claim_description} due to resource "
f"mismatch, {resource_description} != {prev_description}"
)
elif event_type == 'DELETED':
if prev_state:
await resource_claim.remove_resource_from_status(resource_index)
else:
logger.info(
f"Ignoring resource delete for {resource_claim_description} due to resource "
f"state not present for {resource_description}"
)
else:
logger.debug(f"Updating {resource_description} in {resource_claim_description}")
await resource_claim.update_resource_in_status(resource_index, event_obj)
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status != 404:
logger.warning(
f"Received {e.status} response when attempting to patch resource state for "
f"{event_type.lower()} {resource_description} for {resource_claim_description}: "
f"{e}"
)
except Exception as e:
logger.exception(
f"Exception when attempting to patch resource state for {event_type.lower()} resource "
f"for {resource_claim_description}"
)
Loading