Skip to content

Commit 1f10792

Browse files
authored
Fix resource watcher issue on early disconnect. (#107)
In some situations the resource watch can disconnect before all resources are seen. This fix implements a full list of all watched resources before the actual watch begins.
1 parent e3c8707 commit 1f10792

File tree

3 files changed

+127
-102
lines changed

3 files changed

+127
-102
lines changed

operator/resourceclaim.py

+16-11
Original file line numberDiff line numberDiff line change
@@ -1092,20 +1092,25 @@ async def remove_resource_from_status(self, index):
10921092
await self.json_patch_status(patch)
10931093

10941094
async def update_resource_in_status(self, index, state):
1095-
patch = [{
1096-
"op": "add",
1097-
"path": f"/status/resources/{index}/state",
1098-
"value": state,
1099-
}]
1095+
patch = []
1096+
if self.status_resources[index].get('state') != state:
1097+
patch.append({
1098+
"op": "add",
1099+
"path": f"/status/resources/{index}/state",
1100+
"value": state,
1101+
})
11001102

11011103
if self.has_resource_provider:
11021104
resource_provider = await self.get_resource_provider()
11031105
if resource_provider.status_summary_template:
11041106
self.status_resources[index]['state'] = state
1105-
patch.append({
1106-
"op": "add",
1107-
"path": "/status/summary",
1108-
"value": resource_provider.make_status_summary(self),
1109-
})
1107+
status_summary = resource_provider.make_status_summary(self)
1108+
if status_summary != self.status.get('summary'):
1109+
patch.append({
1110+
"op": "add",
1111+
"path": "/status/summary",
1112+
"value": status_summary,
1113+
})
11101114

1111-
await self.json_patch_status(patch)
1115+
if patch:
1116+
await self.json_patch_status(patch)

operator/resourcehandle.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ async def get(name: str) -> Optional[ResourceHandleT]:
391391
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
392392
Poolboy.operator_domain, Poolboy.operator_version, Poolboy.namespace, 'resourcehandles', name
393393
)
394-
394+
if 'deletionTimestamp' in definition['metadata']:
395+
return None
395396
return ResourceHandle.__register_definition(definition=definition)
396397

397398
@staticmethod

operator/resourcewatcher.py

+109-90
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async def watch(self):
8787
if '/' in self.api_version:
8888
group, version = self.api_version.split('/')
8989
plural = await poolboy_k8s.kind_to_plural(group=group, version=version, kind=self.kind)
90-
kwargs = dict(group = group, plural = plural, version = version)
90+
kwargs = dict(group=group, plural=plural, version=version)
9191
if self.namespace:
9292
method = Poolboy.custom_objects_api.list_namespaced_custom_object
9393
kwargs['namespace'] = self.namespace
@@ -109,6 +109,7 @@ async def watch(self):
109109
try:
110110
await self.__watch(method, **kwargs)
111111
except asyncio.CancelledError:
112+
logger.info(f"{self} cancelled")
112113
return
113114
except ResourceWatchRestartError as e:
114115
logger.info(f"{self} restart: {e}")
@@ -133,6 +134,17 @@ async def watch(self):
133134
async def __watch(self, method, **kwargs):
134135
watch = None
135136
try:
137+
_continue = None
138+
while True:
139+
obj_list = await method(**kwargs, _continue=_continue, limit=50)
140+
for obj in obj_list.get('items', []):
141+
if not isinstance(obj, Mapping):
142+
obj = Poolboy.api_client.sanitize_for_serialization(event_obj)
143+
await self.__watch_event(event_type='PRELOAD', event_obj=obj)
144+
_continue = obj_list['metadata'].get('continue')
145+
if not _continue:
146+
break
147+
136148
watch = kubernetes_asyncio.watch.Watch()
137149
async for event in watch.stream(method, **kwargs):
138150
if not isinstance(event, Mapping):
@@ -151,95 +163,7 @@ async def __watch(self, method, **kwargs):
151163
else:
152164
raise ResourceWatchFailedError(f"UNKNOWN EVENT: {event}")
153165

154-
event_obj_annotations = event_obj['metadata'].get('annotations')
155-
if not event_obj_annotations:
156-
continue
157-
158-
resource_handle_name = event_obj_annotations.get(resource_handle_name_annotation)
159-
resource_handle_namespace = event_obj_annotations.get(resource_handle_namespace_annotation)
160-
resource_index = int(event_obj_annotations.get(resource_index_annotation, 0))
161-
resource_name = event_obj['metadata']['name']
162-
resource_namespace = event_obj['metadata'].get('namespace')
163-
resource_description = (
164-
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name} in {resource_namespace}"
165-
if resource_namespace else
166-
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name}"
167-
)
168-
169-
if not resource_handle_name or not resource_handle_namespace:
170-
continue
171-
172-
resource_handle = resourcehandle.ResourceHandle.get_from_cache(
173-
name = resource_handle_name
174-
)
175-
if resource_handle:
176-
await resource_handle.handle_resource_event(
177-
logger = logger,
178-
resource_index = resource_index,
179-
resource_state = event_obj,
180-
)
181-
else:
182-
logger.debug(
183-
f"Received event for ResourceHande {resource_handle_name} "
184-
f"which seems to have been deleted."
185-
)
186-
continue
187-
188-
resource_claim_name = event_obj_annotations.get(resource_claim_name_annotation)
189-
resource_claim_namespace = event_obj_annotations.get(resource_claim_namespace_annotation)
190-
191-
if not resource_claim_name or not resource_claim_namespace:
192-
continue
193-
194-
resource_claim_description = f"ResourceClaim {resource_claim_name} in {resource_claim_namespace}"
195-
try:
196-
resource_claim = await resourceclaim.ResourceClaim.get(
197-
name = resource_claim_name,
198-
namespace = resource_claim_namespace,
199-
)
200-
201-
# Do not manage status for detached ResourceClaim
202-
if resource_claim.is_detached:
203-
continue
204-
205-
prev_state = resource_claim.status_resources[resource_index].get('state')
206-
prev_description = (
207-
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name} in {resource_namespace}"
208-
if resource_namespace else
209-
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name}"
210-
) if prev_state else None
211-
if resource_claim.resource_handle_name != resource_handle_name:
212-
logger.info(
213-
f"Ignoring resource update for {resource_claim_description} due to ResourceHandle "
214-
f"name mismatch, {self.resource_handle_name} != {resource_handle_name}"
215-
)
216-
elif prev_state and prev_description != resource_description:
217-
logger.info(
218-
f"Ignoring resource update for {resource_claim_description} due to resource "
219-
f"mismatch, {resource_description} != {prev_description}"
220-
)
221-
elif event_type == 'DELETED':
222-
if prev_state:
223-
await resource_claim.remove_resource_from_status(resource_index)
224-
else:
225-
logger.info(
226-
f"Ignoring resource delete for {resource_claim_description} due to resource "
227-
f"state not present for {resource_description}"
228-
)
229-
else:
230-
await resource_claim.update_resource_in_status(resource_index, event_obj)
231-
except kubernetes_asyncio.client.exceptions.ApiException as e:
232-
if e.status != 404:
233-
logger.warning(
234-
f"Received {e.status} response when attempting to patch resource state for "
235-
f"{event_type.lower()} resource for {resource_claim_description}: "
236-
f"{e}"
237-
)
238-
except Exception as e:
239-
logger.exception(
240-
f"Exception when attempting to patch resource state for {event_type.lower()} resource "
241-
f"for {resource_claim_description}"
242-
)
166+
await self.__watch_event(event_type=event_type, event_obj=event_obj)
243167
except kubernetes_asyncio.client.exceptions.ApiException as e:
244168
if e.status == 410:
245169
raise ResourceWatchRestartError("Received 410 expired response.")
@@ -248,3 +172,98 @@ async def __watch(self, method, **kwargs):
248172
finally:
249173
if watch:
250174
await watch.close()
175+
176+
async def __watch_event(self, event_type, event_obj):
177+
event_obj_annotations = event_obj['metadata'].get('annotations')
178+
if not event_obj_annotations:
179+
return
180+
181+
resource_handle_name = event_obj_annotations.get(resource_handle_name_annotation)
182+
resource_handle_namespace = event_obj_annotations.get(resource_handle_namespace_annotation)
183+
resource_index = int(event_obj_annotations.get(resource_index_annotation, 0))
184+
resource_name = event_obj['metadata']['name']
185+
resource_namespace = event_obj['metadata'].get('namespace')
186+
resource_description = (
187+
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name} in {resource_namespace}"
188+
if resource_namespace else
189+
f"{event_obj['apiVersion']} {event_obj['kind']} {resource_name}"
190+
)
191+
192+
if not resource_handle_name or not resource_handle_namespace:
193+
return
194+
195+
resource_handle = resourcehandle.ResourceHandle.get_from_cache(name=resource_handle_name)
196+
197+
if resource_handle:
198+
await resource_handle.handle_resource_event(
199+
logger = logger,
200+
resource_index = resource_index,
201+
resource_state = event_obj,
202+
)
203+
else:
204+
logger.debug(
205+
f"Received event for ResourceHandle {resource_handle_name} "
206+
f"which seems to have been deleted."
207+
)
208+
return
209+
210+
resource_claim_name = event_obj_annotations.get(resource_claim_name_annotation)
211+
resource_claim_namespace = event_obj_annotations.get(resource_claim_namespace_annotation)
212+
213+
if not resource_claim_name or not resource_claim_namespace:
214+
return
215+
216+
resource_claim_description = f"ResourceClaim {resource_claim_name} in {resource_claim_namespace}"
217+
try:
218+
resource_claim = await resourceclaim.ResourceClaim.get(
219+
name = resource_claim_name,
220+
namespace = resource_claim_namespace,
221+
)
222+
223+
# Do not manage status for detached ResourceClaim
224+
if resource_claim.is_detached:
225+
logger.debug(
226+
f"Not handling event for {resource_description} for detached {resource_claim_description}",
227+
)
228+
return
229+
230+
prev_state = resource_claim.status_resources[resource_index].get('state')
231+
prev_description = (
232+
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name} in {resource_namespace}"
233+
if resource_namespace else
234+
f"{prev_state['apiVersion']} {prev_state['kind']} {resource_name}"
235+
) if prev_state else None
236+
if resource_claim.resource_handle_name != resource_handle_name:
237+
logger.info(
238+
f"Ignoring resource update on {resource_description} for "
239+
f"{resource_claim_description} due to ResourceHandle "
240+
f"name mismatch, {resource_claim.resource_handle_name} != {resource_handle_name}"
241+
)
242+
elif prev_state and prev_description != resource_description:
243+
logger.info(
244+
f"Ignoring resource update for {resource_claim_description} due to resource "
245+
f"mismatch, {resource_description} != {prev_description}"
246+
)
247+
elif event_type == 'DELETED':
248+
if prev_state:
249+
await resource_claim.remove_resource_from_status(resource_index)
250+
else:
251+
logger.info(
252+
f"Ignoring resource delete for {resource_claim_description} due to resource "
253+
f"state not present for {resource_description}"
254+
)
255+
else:
256+
logger.debug(f"Updating {resource_description} in {resource_claim_description}")
257+
await resource_claim.update_resource_in_status(resource_index, event_obj)
258+
except kubernetes_asyncio.client.exceptions.ApiException as e:
259+
if e.status != 404:
260+
logger.warning(
261+
f"Received {e.status} response when attempting to patch resource state for "
262+
f"{event_type.lower()} {resource_description} for {resource_claim_description}: "
263+
f"{e}"
264+
)
265+
except Exception as e:
266+
logger.exception(
267+
f"Exception when attempting to patch resource state for {event_type.lower()} resource "
268+
f"for {resource_claim_description}"
269+
)

0 commit comments

Comments
 (0)