@@ -239,9 +239,22 @@ def __getitem__(self, key):
239239
240240 return super ().__getitem__ (key )
241241
242+ @functools .cached_property
243+ def _has_streams_namespace (self ):
244+ return ("streams" in self ) and ("BlueskyEventStream" not in {s .name for s in self ["streams" ].specs })
245+
246+ @functools .cached_property
247+ def _streams_node (self ):
248+ # Access to the "streams" namespace (possibly a separate container)
249+ if self ._has_streams_namespace :
250+ return self ["streams" ]
251+ else :
252+ # No intermediate "streams" node, use the top-level node
253+ return self
254+
242255 @functools .cached_property
243256 def _stream_names (self ):
244- return sorted (self .get ( "streams" , ()) )
257+ return sorted (k for k in self ._streams_node )
245258
246259 def documents (self , fill = False ):
247260 with io .BytesIO () as buffer :
@@ -254,22 +267,30 @@ def documents(self, fill=False):
254267
255268class BlueskyRunV2SQL (BlueskyRunV2 , _BlueskyRunSQL ):
256269 def _keys_slice (self , start , stop , direction , page_size : Optional [int ] = None , ** kwargs ):
257- keys = reversed (self ._stream_names ) if direction < 0 else self ._stream_names
258- return (yield from keys [start :stop ])
270+ if self ._has_streams_namespace :
271+ keys = reversed (self ._stream_names ) if direction < 0 else self ._stream_names
272+ return (yield from keys [start :stop ])
273+ else :
274+ return (yield from super ()._keys_slice (start , stop , direction , page_size = page_size , ** kwargs ))
259275
260276 def _items_slice (self , start , stop , direction , page_size : Optional [int ] = None , ** kwargs ):
261- _streams_node = super ().get ("streams" , {})
262- for key in reversed (self ._stream_names ) if direction < 0 else self ._stream_names :
263- yield key , _streams_node .get (key )
264- return
277+ if self ._has_streams_namespace :
278+ _streams_node = super ().get ("streams" , {})
279+ for key in reversed (self ._stream_names ) if direction < 0 else self ._stream_names :
280+ yield key , _streams_node .get (key )
281+ return
282+ else :
283+ return (yield from super ()._items_slice (start , stop , direction , page_size = page_size , ** kwargs ))
265284
266285 def __getitem__ (self , key ):
267286 # For v3, we need to handle the streams and configs keys
268287 if key in RESERVED_V3_KEYS :
269288 return super ().__getitem__ (key )
270289
271290 if key in self ._stream_names :
272- stream_container = super ().get ("streams" , {}).get (key )
291+ stream_container = (
292+ super ().get ("streams" , {}).get (key ) if self ._has_streams_namespace else super ().get (key )
293+ )
273294 return BlueskyEventStreamV2SQL .from_stream_client (stream_container )
274295
275296 return super ().__getitem__ (key )
@@ -293,7 +314,7 @@ def __new__(cls, context, *, item, structure_clients, **kwargs):
293314 def __getattr__ (self , key ):
294315 if key in self ._stream_names :
295316 # A shortcut to the stream data
296- return self ["streams" ][key ]
317+ return self ["streams" ][key ] if self . _has_streams_namespace else self [ key ]
297318
298319 return super ().__getattr__ (key )
299320
0 commit comments