File tree Expand file tree Collapse file tree 1 file changed +20
-0
lines changed
quixstreams/state/rocksdb/windowed Expand file tree Collapse file tree 1 file changed +20
-0
lines changed Original file line number Diff line number Diff line change @@ -403,6 +403,26 @@ def expire_all_windows(
403403 )
404404
405405 def _deserialize_prefix (self , prefix : bytes ) -> Any :
406+ """
407+ Attempt to deserialize a window prefix.
408+
409+ Window prefixes can be provided either as raw bytes or as other types
410+ (e.g., dict). The `as_state()` method conditionally serializes these
411+ prefixes to bytes only if they are not already bytes before storing.
412+
413+ When retrieving a prefix during partition-level windows expiration, we
414+ don't know its original type due to this conditional serialization.
415+ Therefore, we must first *try* to deserialize it using the configured
416+ `loads` function.
417+
418+ If deserialization succeeds, it means the prefix was originally a
419+ non-bytes type, and we return the deserialized object.
420+ If deserialization fails with a `StateSerializationError`, it indicates
421+ that the prefix was likely provided as raw bytes initially, so we
422+ return the original `prefix` bytes.
423+
424+ :param prefix: The prefix bytes retrieved from storage.
425+ """
406426 try :
407427 return deserialize (prefix , loads = self ._loads )
408428 except StateSerializationError :
You can’t perform that action at this time.
0 commit comments