1818import dasbus .client .proxy
1919import dasbus .client .observer
2020from gi .repository import GLib
21- from staslib import defs , conf , gutil
21+ from staslib import defs , conf , gutil , iputil
2222
2323
2424def _txt2dict (txt : list ):
@@ -54,6 +54,141 @@ def _proto2trans(protocol):
5454 return None
5555
5656
57+ def mk_service_key (interface , protocol , name , stype , domain ):
58+ '''Return a tuple used as a service key (unique identifier)'''
59+ return (interface , protocol , name , stype , domain )
60+
61+
62+ def fmt_service_str (interface , protocol , name , stype , domain , flags ): # pylint: disable=too-many-arguments
63+ '''Return service identifier as a string'''
64+ return (
65+ f'interface={ interface } :{ (socket .if_indextoname (interface ) + "," ):<9} '
66+ f'protocol={ Avahi .protocol_as_string (protocol )} , '
67+ f'stype={ stype } , '
68+ f'domain={ domain } , '
69+ f'flags={ flags } :{ (Avahi .result_flags_as_string (flags ) + "," ):<12} '
70+ f'name={ name } '
71+ )
72+
73+
74+ # ******************************************************************************
75+ class Service : # pylint: disable=too-many-instance-attributes
76+ '''Object used to keep track of the services discovered from the avahi-daemon'''
77+
78+ interface_name = property (lambda self : self ._interface_name )
79+ interface = property (lambda self : self ._interface_id )
80+ ip_family = property (lambda self : self ._ip_family )
81+ reachable = property (lambda self : self ._reachable )
82+ protocol = property (lambda self : self ._protocol_id )
83+ key_str = property (lambda self : self ._key_str )
84+ domain = property (lambda self : self ._domain )
85+ stype = property (lambda self : self ._stype )
86+ data = property (lambda self : self ._data )
87+ name = property (lambda self : self ._name )
88+ key = property (lambda self : self ._key )
89+ ip = property (lambda self : self ._ip )
90+
91+ def __init__ (self , args , identified_cback ):
92+ self ._identified_cback = identified_cback
93+ self ._interface_id = args [0 ]
94+ self ._protocol_id = args [1 ]
95+ self ._name = args [2 ]
96+ self ._stype = args [3 ]
97+ self ._domain = args [4 ]
98+ self ._flags = args [5 ]
99+ self ._ip_family = 4 if self ._protocol_id == Avahi .PROTO_INET else 6
100+
101+ self ._interface_name = socket .if_indextoname (self ._interface_id ).strip ()
102+ self ._protocol_name = Avahi .protocol_as_string (self ._protocol_id )
103+ self ._flags_str = '(' + Avahi .result_flags_as_string (self ._flags ) + '),'
104+
105+ self ._key = mk_service_key (self ._interface_id , self ._protocol_id , self ._name , self ._stype , self ._domain )
106+ self ._key_str = f'({ self ._interface_name } , { self ._protocol_name } , { self ._name } .{ self ._domain } , { self ._stype } )'
107+
108+ self ._id = fmt_service_str (
109+ self ._interface_id , self ._protocol_id , self ._name , self ._stype , self ._domain , self ._flags
110+ )
111+
112+ self ._ip = None
113+ self ._resolver = None
114+ self ._data = {}
115+ self ._reachable = False
116+ self ._connect_checker = None
117+
118+ def info (self ):
119+ '''Return debug info'''
120+ info = self ._data
121+ info ['reachable' ] = str (self ._reachable )
122+ return info
123+
124+ def __str__ (self ):
125+ return self ._id
126+
127+ def set_identity (self , transport , address , port , txt ): # pylint: disable=too-many-arguments
128+ '''Complete identification and check connectivity (if needed)
129+ Return True if identification is complete. Return False if
130+ we need to check connectivity.
131+ '''
132+ traddr = address .strip ()
133+ trsvcid = str (port ).strip ()
134+ # host-iface permitted for tcp alone and not rdma
135+ host_iface = self ._interface_name if transport == 'tcp' else ''
136+ self ._data = {
137+ 'transport' : transport ,
138+ 'traddr' : traddr ,
139+ 'trsvcid' : trsvcid ,
140+ # host-iface permitted for tcp alone and not rdma
141+ 'host-iface' : host_iface ,
142+ 'subsysnqn' : txt .get ('nqn' , defs .WELL_KNOWN_DISC_NQN ).strip ()
143+ if conf .NvmeOptions ().discovery_supp
144+ else defs .WELL_KNOWN_DISC_NQN ,
145+ }
146+
147+ self ._ip = iputil .get_ipaddress_obj (traddr , ipv4_mapped_convert = True )
148+
149+ if transport != 'tcp' :
150+ self ._reachable = True
151+ self ._identified_cback ()
152+ return
153+
154+ self ._reachable = False
155+ connect_checker = gutil .TcpChecker (traddr , trsvcid , host_iface , self ._tcp_connect_check_cback )
156+
157+ try :
158+ connect_checker .connect ()
159+ except RuntimeError as err :
160+ logging .error ('Unable to verify connectivity: %s' , err )
161+ connect_checker .close ()
162+ connect_checker = None
163+
164+ self ._connect_checker = connect_checker
165+
166+ def _tcp_connect_check_cback (self , connected ):
167+ if self ._connect_checker is not None :
168+ self ._connect_checker .close ()
169+ self ._connect_checker = None
170+ self ._reachable = connected
171+ self ._identified_cback ()
172+
173+ def set_resolver (self , resolver ):
174+ '''Set the resolver object'''
175+ self ._resolver = resolver
176+
177+ def close (self ):
178+ '''Close this object and release all resources'''
179+ if self ._connect_checker is not None :
180+ self ._connect_checker .close ()
181+ self ._connect_checker = None
182+
183+ if self ._resolver is not None :
184+ try :
185+ self ._resolver .Free ()
186+ dasbus .client .proxy .disconnect_proxy (self ._resolver )
187+ except (AttributeError , dasbus .error .DBusError ) as ex :
188+ logging .debug ('Service.close() - Failed to Free() resolver. %s' , ex )
189+ self ._resolver = None
190+
191+
57192# ******************************************************************************
58193class Avahi : # pylint: disable=too-many-instance-attributes
59194 '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi
@@ -182,16 +317,10 @@ def kill(self):
182317
183318 def info (self ) -> dict :
184319 '''@brief return debug info about this object'''
185- services = dict ()
186- for service , obj in self ._services .items ():
187- interface , protocol , name , stype , domain = service
188- key = f'({ socket .if_indextoname (interface )} , { Avahi .protos .get (protocol , "unknown" )} , { name } .{ domain } , { stype } )'
189- services [key ] = obj .get ('data' , {})
190-
191320 info = {
192321 'avahi wake up timer' : str (self ._kick_avahi_tmr ),
193322 'service types' : list (self ._stypes ),
194- 'services' : services ,
323+ 'services' : { service . key_str : service . info () for service in self . _services . values ()} ,
195324 }
196325
197326 return info
@@ -217,7 +346,7 @@ def get_controllers(self) -> list:
217346 [...]
218347 ]
219348 '''
220- return [service [ ' data' ] for service in self ._services .values () if len ( service [ 'data' ]) ]
349+ return [service . data for service in self ._services .values () if service . reachable ]
221350
222351 def config_stypes (self , stypes : list ):
223352 '''@brief Configure the service types that we want to discover.
@@ -234,18 +363,17 @@ def kick_start(self):
234363 '''
235364 self ._kick_avahi_tmr .clear ()
236365
366+ def _remove_service (self , service_to_rm : typing .Tuple [int , int , str , str , str ]):
367+ service = self ._services .pop (service_to_rm )
368+ if service is not None :
369+ service .close ()
370+
237371 def _disconnect (self ):
238372 logging .debug ('Avahi._disconnect()' )
239373 for service in self ._services .values ():
240- resolver = service .pop ('resolver' , None )
241- if resolver is not None :
242- try :
243- resolver .Free ()
244- dasbus .client .proxy .disconnect_proxy (resolver )
245- except (AttributeError , dasbus .error .DBusError ) as ex :
246- logging .debug ('Avahi._disconnect() - Failed to Free() resolver. %s' , ex )
374+ service .close ()
247375
248- self ._services = dict ()
376+ self ._services . clear ()
249377
250378 for browser in self ._service_browsers .values ():
251379 try :
@@ -296,15 +424,9 @@ def _configure_browsers(self):
296424 logging .debug ('Avahi._configure_browsers() - Failed to Free() browser. %s' , ex )
297425
298426 # Find the cached services corresponding to stype_to_rm and remove them
299- services_to_rm = [service for service in self ._services if service [3 ] == stype_to_rm ]
300- for service in services_to_rm :
301- resolver = self ._services .pop (service , {}).pop ('resolver' , None )
302- if resolver is not None :
303- try :
304- resolver .Free ()
305- dasbus .client .proxy .disconnect_proxy (resolver )
306- except (AttributeError , dasbus .error .DBusError ) as ex :
307- logging .debug ('Avahi._configure_browsers() - Failed to Free() resolver. %s' , ex )
427+ services_to_rm = [service .key for service in self ._services .values () if service .stype == stype_to_rm ]
428+ for service_to_rm in services_to_rm :
429+ self ._remove_service (service_to_rm )
308430
309431 for stype in stypes_to_add :
310432 try :
@@ -329,31 +451,25 @@ def _service_discovered(
329451 args : typing .Tuple [int , int , str , str , str , int ],
330452 * _user_data ,
331453 ):
332- (interface , protocol , name , stype , domain , flags ) = args
333- logging .debug (
334- 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s' ,
335- interface ,
336- socket .if_indextoname (interface ),
337- Avahi .protocol_as_string (protocol ),
338- stype ,
339- domain ,
340- flags ,
341- '(' + Avahi .result_flags_as_string (flags ) + '),' ,
342- name ,
343- )
454+ service = Service (args , self ._change_cb )
455+ logging .debug ('Avahi._service_discovered() - %s' , service )
344456
345- service = (interface , protocol , name , stype , domain )
346- if service not in self ._services :
457+ if service .key not in self ._services :
347458 try :
348459 obj_path = self ._avahi .ServiceResolverNew (
349- interface , protocol , name , stype , domain , Avahi .PROTO_UNSPEC , Avahi .LOOKUP_USE_MULTICAST
460+ service .interface ,
461+ service .protocol ,
462+ service .name ,
463+ service .stype ,
464+ service .domain ,
465+ Avahi .PROTO_UNSPEC ,
466+ Avahi .LOOKUP_USE_MULTICAST ,
350467 )
351- self ._services [service ] = {
352- 'resolver' : self ._sysbus .get_proxy (Avahi .DBUS_NAME , obj_path ),
353- 'data' : {},
354- }
468+ service .set_resolver (self ._sysbus .get_proxy (Avahi .DBUS_NAME , obj_path ))
355469 except dasbus .error .DBusError as ex :
356- logging .warning ('Failed to create resolver: "%s", "%s", "%s". %s' , interface , name , stype , ex )
470+ logging .warning ('Failed to create resolver - %s: %s' , service , ex )
471+
472+ self ._services [service .key ] = service
357473
358474 def _service_removed (
359475 self ,
@@ -367,27 +483,14 @@ def _service_removed(
367483 ):
368484 (interface , protocol , name , stype , domain , flags ) = args
369485 logging .debug (
370- 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s' ,
371- interface ,
372- socket .if_indextoname (interface ),
373- Avahi .protocol_as_string (protocol ),
374- stype ,
375- domain ,
376- flags ,
377- '(' + Avahi .result_flags_as_string (flags ) + '),' ,
378- name ,
486+ 'Avahi._service_removed() - %s' ,
487+ fmt_service_str (interface , protocol , name , stype , domain , flags ),
379488 )
380489
381- service = (interface , protocol , name , stype , domain )
382- resolver = self ._services .pop (service , {}).pop ('resolver' , None )
383- if resolver is not None :
384- try :
385- resolver .Free ()
386- dasbus .client .proxy .disconnect_proxy (resolver )
387- except (AttributeError , dasbus .error .DBusError ) as ex :
388- logging .debug ('Avahi._service_removed() - Failed to Free() resolver. %s' , ex )
389-
390- self ._change_cb ()
490+ service_key = mk_service_key (interface , protocol , name , stype , domain )
491+ self ._remove_service (service_key )
492+ if self ._change_cb is not None :
493+ self ._change_cb ()
391494
392495 def _service_identified ( # pylint: disable=too-many-locals
393496 self ,
@@ -402,38 +505,21 @@ def _service_identified( # pylint: disable=too-many-locals
402505 (interface , protocol , name , stype , domain , host , aprotocol , address , port , txt , flags ) = args
403506 txt = _txt2dict (txt )
404507 logging .debug (
405- 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s' ,
406- interface ,
407- socket .if_indextoname (interface ),
408- Avahi .protocol_as_string (protocol ),
409- stype ,
410- domain ,
411- flags ,
412- '(' + Avahi .result_flags_as_string (flags ) + '),' ,
413- name ,
508+ 'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s' ,
509+ fmt_service_str (interface , protocol , name , stype , domain , flags ),
414510 host ,
415511 Avahi .protocol_as_string (aprotocol ),
416- address ,
417512 port ,
513+ address ,
418514 txt ,
419515 )
420516
421- service = (interface , protocol , name , stype , domain )
422- if service in self ._services :
517+ service_key = mk_service_key (interface , protocol , name , stype , domain )
518+ service = self ._services .get (service_key , None )
519+ if service is not None :
423520 transport = _proto2trans (txt .get ('p' ))
424521 if transport is not None :
425- self ._services [service ]['data' ] = {
426- 'transport' : transport ,
427- 'traddr' : address .strip (),
428- 'trsvcid' : str (port ).strip (),
429- # host-iface permitted for tcp alone and not rdma
430- 'host-iface' : socket .if_indextoname (interface ).strip () if transport == 'tcp' else '' ,
431- 'subsysnqn' : txt .get ('nqn' , defs .WELL_KNOWN_DISC_NQN ).strip ()
432- if conf .NvmeOptions ().discovery_supp
433- else defs .WELL_KNOWN_DISC_NQN ,
434- }
435-
436- self ._change_cb ()
522+ service .set_identity (transport , address , port , txt )
437523 else :
438524 logging .error (
439525 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s' ,
@@ -442,6 +528,8 @@ def _service_identified( # pylint: disable=too-many-locals
442528 txt ,
443529 )
444530
531+ self ._check_for_duplicate_ips ()
532+
445533 def _failure_handler ( # pylint: disable=no-self-use
446534 self ,
447535 _connection ,
@@ -456,3 +544,15 @@ def _failure_handler( # pylint: disable=no-self-use
456544 if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error :
457545 # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal.
458546 logging .error ('Avahi._failure_handler() - name=%s, error=%s' , interface_name , error )
547+
548+ def _check_for_duplicate_ips (self ):
549+ '''This is to identify misconfigured networks where the
550+ same IP addresses are discovered on two or more interfaces.'''
551+ ips = {}
552+ for service in self ._services .values ():
553+ if service .ip is not None :
554+ ips .setdefault (service .ip .compressed , []).append (service .interface_name )
555+
556+ for ip , ifaces in ips .items ():
557+ if len (ifaces ) > 1 :
558+ logging .error ('IP address %s was found on multiple interfaces: %s' , ip , ',' .join (ifaces ))
0 commit comments