Skip to content

Commit 0e816a9

Browse files
author
Arnaud Degroote
committed
[socket] Implement external time synchronisation for socket
1 parent b82fbd3 commit 0e816a9

File tree

1 file changed

+53
-1
lines changed

1 file changed

+53
-1
lines changed

src/morse/middleware/socket_datastream.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from morse.helpers.transformation import Transformation3d
88
from morse.middleware import AbstractDatastream
99
from morse.core import services
10-
from morse.core.exceptions import MorseRPCInvokationError
10+
from morse.core.exceptions import MorseRPCInvokationError, MorseMiddlewareError
1111

1212
try:
1313
import mathutils
@@ -166,6 +166,16 @@ def __init__(self, args, kwargs):
166166
# Call the constructor of the parent class
167167
DatastreamManager.__init__(self, args, kwargs)
168168

169+
self.time_sync = kwargs.get('time_sync', False)
170+
self.sync_port = kwargs.get('sync_port', -1)
171+
172+
if self.time_sync:
173+
if self.sync_port == -1:
174+
logger.error("time_sync is required, but sync_port is not configured")
175+
raise MorseMiddlewareError("sync_port is not configured")
176+
else:
177+
self._init_trigger()
178+
169179
# port -> MorseSocketServ
170180
self._server_dict = {}
171181

@@ -184,6 +194,44 @@ def __init__(self, args, kwargs):
184194
services.do_service_registration(self.get_stream_port, 'simulation')
185195
services.do_service_registration(self.get_all_stream_ports, 'simulation')
186196

197+
def __del__(self):
198+
if self.time_sync:
199+
self._end_trigger()
200+
201+
def _init_trigger(self):
202+
self._sync_client = None
203+
self._sync_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
204+
self._sync_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
205+
self._sync_server.bind(('', self.sync_port))
206+
self._sync_server.listen(1)
207+
logger.info("Creating clock synchronisation on port %d" % self.sync_port)
208+
209+
def _wait_trigger(self):
210+
# If there is some client, just wait on it
211+
if self._sync_client:
212+
logger.debug("Waiting trigger")
213+
msg = self._sync_client.recv(2048)
214+
if not msg: #deconnection of client
215+
self._sync_client = None
216+
else:
217+
# Otherwise, we just check if there is some client waiting
218+
# If there is no client, we do not block for the moment to avoid
219+
# weird interaction at the startup
220+
logger.debug("Checking for some client on synchronisation port")
221+
try:
222+
inputready, _, _ = select.select([self._sync_server], [], [], 0)
223+
except select.error:
224+
pass
225+
except socket.error:
226+
pass
227+
228+
if self._sync_server in inputready:
229+
self._sync_client, _ = self._sync_server.accept()
230+
231+
def _end_trigger(self):
232+
self._sync_client.close()
233+
self._sync_server.shutdown(socket.SHUT_RDWR)
234+
187235
def list_streams(self):
188236
""" List all publish streams.
189237
"""
@@ -236,3 +284,7 @@ def register_component(self, component_name, component_instance, mw_data):
236284
self._component_nameservice[component_name] = mw_data[2]['port']
237285
if must_inc_base_port:
238286
self._base_port += 1
287+
288+
def action(self):
289+
if self.time_sync:
290+
self._wait_trigger()

0 commit comments

Comments
 (0)