66using System . Collections . Generic ;
77using System . IO ;
88using System . Threading ;
9+ using System . Threading . Tasks ;
910using System . Reflection ;
1011
1112namespace DBus
@@ -38,22 +39,23 @@ public class Connection
3839 Dictionary < uint , PendingCall > pendingCalls = new Dictionary < uint , PendingCall > ( ) ;
3940 Queue < Message > inbound = new Queue < Message > ( ) ;
4041 Dictionary < ObjectPath , BusObject > registeredObjects = new Dictionary < ObjectPath , BusObject > ( ) ;
42+ private readonly ReadMessageTask readMessageTask ;
4143
4244 public delegate void MonitorEventHandler ( Message msg ) ;
4345 public MonitorEventHandler Monitors ; // subscribe yourself to this list of observers if you want to get notified about each incoming message
4446
4547 protected Connection ( )
4648 {
47-
49+ readMessageTask = new ReadMessageTask ( this ) ;
4850 }
4951
50- internal Connection ( Transport transport )
52+ internal Connection ( Transport transport ) : this ( )
5153 {
5254 this . transport = transport ;
5355 transport . Connection = this ;
5456 }
5557
56- internal Connection ( string address )
58+ internal Connection ( string address ) : this ( )
5759 {
5860 OpenPrivate ( address ) ;
5961 Authenticate ( ) ;
@@ -183,11 +185,12 @@ internal uint GenerateSerial ()
183185
184186 internal Message SendWithReplyAndBlock ( Message msg , bool keepFDs )
185187 {
186- PendingCall pending = SendWithReply ( msg , keepFDs ) ;
187- return pending . Reply ;
188+ using ( PendingCall pending = SendWithPendingReply ( msg , keepFDs ) ) {
189+ return pending . Reply ;
190+ }
188191 }
189192
190- internal PendingCall SendWithReply ( Message msg , bool keepFDs )
193+ internal PendingCall SendWithPendingReply ( Message msg , bool keepFDs )
191194 {
192195 msg . ReplyExpected = true ;
193196
@@ -215,27 +218,23 @@ internal virtual uint Send (Message msg)
215218 return msg . Header . Serial ;
216219 }
217220
218- //temporary hack
219- internal void DispatchSignals ( )
221+ public void Iterate ( )
220222 {
221- lock ( inbound ) {
222- while ( inbound . Count != 0 ) {
223- Message msg = inbound . Dequeue ( ) ;
224- try {
225- HandleSignal ( msg ) ;
226- } finally {
227- msg . Dispose ( ) ;
228- }
229- }
230- }
223+ Iterate ( new CancellationToken ( false ) ) ;
231224 }
232225
233- public void Iterate ( )
226+ public void Iterate ( CancellationToken stopWaitToken )
234227 {
235- Message msg = transport . ReadMessage ( ) ;
236-
237- HandleMessage ( msg ) ;
238- DispatchSignals ( ) ;
228+ if ( TryGetStoredSignalMessage ( out Message inboundMsg ) ) {
229+ try {
230+ HandleSignal ( inboundMsg ) ;
231+ } finally {
232+ inboundMsg . Dispose ( ) ;
233+ }
234+ } else {
235+ var msg = readMessageTask . MakeSureTaskRunAndWait ( stopWaitToken ) ;
236+ HandleMessage ( msg ) ;
237+ }
239238 }
240239
241240 internal virtual void HandleMessage ( Message msg )
@@ -251,21 +250,19 @@ internal virtual void HandleMessage (Message msg)
251250 try {
252251
253252 //TODO: Restrict messages to Local ObjectPath?
254-
255253 {
256- object field_value = msg . Header [ FieldCode . ReplySerial ] ;
254+ object field_value = msg . Header [ FieldCode . ReplySerial ] ;
257255 if ( field_value != null ) {
258256 uint reply_serial = ( uint ) field_value ;
259- PendingCall pending ;
260257
261258 lock ( pendingCalls ) {
259+ PendingCall pending ;
262260 if ( pendingCalls . TryGetValue ( reply_serial , out pending ) ) {
263- if ( pendingCalls . Remove ( reply_serial ) ) {
264- pending . Reply = msg ;
265- if ( pending . KeepFDs )
266- cleanupFDs = false ; // caller is responsible for closing FDs
267- }
268-
261+ if ( ! pendingCalls . Remove ( reply_serial ) )
262+ return ;
263+ pending . Reply = msg ;
264+ if ( pending . KeepFDs )
265+ cleanupFDs = false ; // caller is responsible for closing FDs
269266 return ;
270267 }
271268 }
@@ -285,8 +282,7 @@ internal virtual void HandleMessage (Message msg)
285282 break ;
286283 case MessageType . Signal :
287284 //HandleSignal (msg);
288- lock ( inbound )
289- inbound . Enqueue ( msg ) ;
285+ StoreInboundSignalMessage ( msg ) ; //temporary hack
290286 cleanupFDs = false ; // FDs are closed after signal is handled
291287 break ;
292288 case MessageType . Error :
@@ -391,17 +387,19 @@ internal void HandleMethodCall (MessageContainer method_call)
391387 //this is messy and inefficient
392388 List < string > linkNodes = new List < string > ( ) ;
393389 int depth = method_call . Path . Decomposed . Length ;
394- foreach ( ObjectPath pth in registeredObjects . Keys ) {
395- if ( pth . Value == ( method_call . Path . Value ) ) {
396- ExportObject exo = ( ExportObject ) registeredObjects [ pth ] ;
397- exo . WriteIntrospect ( intro ) ;
398- } else {
399- for ( ObjectPath cur = pth ; cur != null ; cur = cur . Parent ) {
400- if ( cur . Value == method_call . Path . Value ) {
401- string linkNode = pth . Decomposed [ depth ] ;
402- if ( ! linkNodes . Contains ( linkNode ) ) {
403- intro . WriteNode ( linkNode ) ;
404- linkNodes . Add ( linkNode ) ;
390+ lock ( registeredObjects ) {
391+ foreach ( ObjectPath pth in registeredObjects . Keys ) {
392+ if ( pth . Value == ( method_call . Path . Value ) ) {
393+ ExportObject exo = ( ExportObject ) registeredObjects [ pth ] ;
394+ exo . WriteIntrospect ( intro ) ;
395+ } else {
396+ for ( ObjectPath cur = pth ; cur != null ; cur = cur . Parent ) {
397+ if ( cur . Value == method_call . Path . Value ) {
398+ string linkNode = pth . Decomposed [ depth ] ;
399+ if ( ! linkNodes . Contains ( linkNode ) ) {
400+ intro . WriteNode ( linkNode ) ;
401+ linkNodes . Add ( linkNode ) ;
402+ }
405403 }
406404 }
407405 }
@@ -415,12 +413,14 @@ internal void HandleMethodCall (MessageContainer method_call)
415413 return ;
416414 }
417415
418- BusObject bo ;
419- if ( registeredObjects . TryGetValue ( method_call . Path , out bo ) ) {
420- ExportObject eo = ( ExportObject ) bo ;
421- eo . HandleMethodCall ( method_call ) ;
422- } else {
423- MaybeSendUnknownMethodError ( method_call ) ;
416+ lock ( registeredObjects ) {
417+ BusObject bo ;
418+ if ( registeredObjects . TryGetValue ( method_call . Path , out bo ) ) {
419+ ExportObject eo = ( ExportObject ) bo ;
420+ eo . HandleMethodCall ( method_call ) ;
421+ } else {
422+ MaybeSendUnknownMethodError ( method_call ) ;
423+ }
424424 }
425425 }
426426
@@ -459,17 +459,19 @@ public void Register (ObjectPath path, object obj)
459459 eo . Registered = true ;
460460
461461 //TODO: implement some kind of tree data structure or internal object hierarchy. right now we are ignoring the name and putting all object paths in one namespace, which is bad
462- registeredObjects [ path ] = eo ;
462+ lock ( registeredObjects )
463+ registeredObjects [ path ] = eo ;
463464 }
464465
465466 public object Unregister ( ObjectPath path )
466467 {
467468 BusObject bo ;
468469
469- if ( ! registeredObjects . TryGetValue ( path , out bo ) )
470- throw new Exception ( "Cannot unregister " + path + " as it isn't registered" ) ;
471-
472- registeredObjects . Remove ( path ) ;
470+ lock ( registeredObjects ) {
471+ if ( ! registeredObjects . TryGetValue ( path , out bo ) )
472+ throw new Exception ( "Cannot unregister " + path + " as it isn't registered" ) ;
473+ registeredObjects . Remove ( path ) ;
474+ }
473475
474476 ExportObject eo = ( ExportObject ) bo ;
475477 eo . Registered = false ;
@@ -486,6 +488,25 @@ internal protected virtual void RemoveMatch (string rule)
486488 {
487489 }
488490
491+ private void StoreInboundSignalMessage ( Message msg )
492+ {
493+ lock ( inbound ) {
494+ inbound . Enqueue ( msg ) ;
495+ }
496+ }
497+
498+ private bool TryGetStoredSignalMessage ( out Message msg )
499+ {
500+ msg = null ;
501+ lock ( inbound ) {
502+ if ( inbound . Count != 0 ) {
503+ msg = inbound . Dequeue ( ) ;
504+ return true ;
505+ }
506+ }
507+ return false ;
508+ }
509+
489510 static UUID ReadMachineId ( string fname )
490511 {
491512 byte [ ] data = File . ReadAllBytes ( fname ) ;
@@ -494,5 +515,31 @@ static UUID ReadMachineId (string fname)
494515
495516 return UUID . Parse ( System . Text . Encoding . ASCII . GetString ( data , 0 , 32 ) ) ;
496517 }
518+
519+ private class ReadMessageTask
520+ {
521+ private readonly Connection ownerConnection ;
522+ private Task < Message > task = null ;
523+ private object taskLock = new object ( ) ;
524+
525+ public ReadMessageTask ( Connection connection )
526+ {
527+ ownerConnection = connection ;
528+ }
529+
530+ public Message MakeSureTaskRunAndWait ( CancellationToken stopWaitToken )
531+ {
532+ Task < Message > catchedTask = null ;
533+
534+ lock ( taskLock ) {
535+ if ( task == null || task . IsCompleted ) {
536+ task = Task < Message > . Run ( ( ) => ownerConnection . transport . ReadMessage ( ) ) ;
537+ }
538+ catchedTask = task ;
539+ }
540+ catchedTask . Wait ( stopWaitToken ) ;
541+ return catchedTask . Result ;
542+ }
543+ }
497544 }
498545}
0 commit comments