Skip to content

Commit

Permalink
Fix SensorSystem issues (#267)
Browse files Browse the repository at this point in the history
* Fix update of subsystem config

* Make sure database starts handling new systems and subsystems

* Avoid registering subsystem twice
  • Loading branch information
earocorn authored Sep 13, 2024
1 parent bddb337 commit 1895e40
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ protected void afterInit() throws SensorHubException
protected void beforeStart() throws SensorHubException
{
super.beforeStart();


if(getParentSystem() != null && !getParentSystem().isEnabled())
throw new ProcessingException("Parent system must be started");

// register sensor with registry if attached to a hub and we have no parent
try
{
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null)
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null && (getParentSystem() == null || getParentSystem().isEnabled()))
getParentHub().getSystemDriverRegistry().register(this).get(); // for now, block here until init is also async
}
catch (InterruptedException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,13 @@ protected void beforeStart() throws SensorHubException
{
super.beforeStart();

if(getParentSystem() != null && !getParentSystem().isEnabled())
throw new ProcessingException("Parent system must be started");

// register sensor with registry if attached to a hub and we have no parent
try
{
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null)
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null && (getParentSystem() == null || getParentSystem().isEnabled()))
getParentHub().getSystemDriverRegistry().register(this).get(); // for now, block here until init is also async
}
catch (InterruptedException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,14 @@ protected void afterInit() throws SensorHubException
protected void beforeStart() throws SensorHubException
{
super.beforeStart();


if(getParentSystem() != null && !getParentSystem().isEnabled())
throw new SensorException("Parent system must be started");

// register sensor with registry if attached to a hub and we have no parent
try
{
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null)
if (hasParentHub() && getParentHub().getSystemDriverRegistry() != null && (getParentSystem() == null || getParentSystem().isEnabled()))
getParentHub().getSystemDriverRegistry().register(this).get(); // for now, block here until init is also async
}
catch (InterruptedException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public class SensorSystem extends AbstractSensorModule<SensorSystemConfig> imple
private static final String URN_PREFIX = "urn:";

Collection<IDataProducerModule<?>> subsystems = new ArrayList<>();


@Override
protected void doInit() throws SensorHubException
{
Expand All @@ -80,16 +80,13 @@ protected void doInit() throws SensorHubException
}
}

// load and init all subsystem modules
subsystems.clear();
for (SystemMember member: config.subsystems)
// Init all subsystem modules
for (var module: subsystems)
{
var module = (IDataProducerModule<?>)loadModule(member.config);
if (module != null)
{
try
{
subsystems.add(module);
module.init();
}
catch (Exception e)
Expand Down Expand Up @@ -172,10 +169,23 @@ private IModule<?> loadModule(ModuleConfig config)
protected void handleEvent(Event e)
{
if (e instanceof ModuleEvent)
{
eventHandler.publish(e);
if(((ModuleEvent) e).getType() == ModuleEvent.Type.CONFIG_CHANGED)
{
var moduleConfig = ((ModuleEvent)e).getModule().getConfiguration();
for(SystemMember member : config.subsystems)
{
if(moduleConfig.id.equals(member.config.id))
{
member.config = moduleConfig;
break;
}
}
}
}
}


@Override
protected void updateSensorDescription()
{
Expand All @@ -189,21 +199,25 @@ protected void updateSensorDescription()


@Override
protected void doStart() throws SensorHubException
{
for (var member: subsystems)
{
try
protected void setState(ModuleState newState) {
super.setState(newState);

// Ensure that autoStart starts modules after Sensor System is enabled
if (newState == ModuleState.STARTED) {
for (var member: subsystems)
{
if (member.getConfiguration().autoStart)
try
{
member.waitForState(ModuleState.INITIALIZED, 10000);
member.start();
if (member.getConfiguration().autoStart)
{
member.waitForState(ModuleState.INITIALIZED, 10000);
member.start();
}
}
catch (Exception e)
{
reportError("Cannot start subsystem " + MsgUtils.moduleString(member), e);
}
}
catch (Exception e)
{
reportError("Cannot start subsystem " + MsgUtils.moduleString(member), e);
}
}
}
Expand Down Expand Up @@ -242,6 +256,19 @@ public boolean isConnected()
return true;
}

@Override
public void setConfiguration(SensorSystemConfig config) {
super.setConfiguration(config);

// Load all subsystem modules from config
subsystems.clear();
for (SystemMember member : config.subsystems) {
var module = (IDataProducerModule<?>) loadModule(member.config);
if (module != null) {
subsystems.add(module);
}
}
}

@Override
public synchronized void loadState(IModuleStateManager loader) throws SensorHubException
Expand Down Expand Up @@ -286,14 +313,14 @@ protected void generateXmlIDFromUUID(String uuid)
public Map<String, ? extends IDataProducerModule<?>> getMembers()
{
return subsystems != null ?
subsystems.stream().collect(ImmutableMap.toImmutableMap(this::getMemberName, e -> e)) :
subsystems.stream().collect(ImmutableMap.toImmutableMap(this::getMemberId, e -> e)) :
Collections.emptyMap();
}


protected String getMemberName(IModule<?> member)
protected String getMemberId(IModule<?> member)
{
return member.getName().toLowerCase().replaceAll("\\s+", "_");
return member.getLocalID();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public synchronized void registerDatabase(String uid, IObsSystemDatabase db)
{
var procFilter = new SystemFilter.Builder()
.withUniqueIDs(uid)
.includeMembers(true)
.build();

var dsFilter = new DataStreamFilter.Builder()
Expand All @@ -279,17 +280,18 @@ public synchronized void registerDatabase(String uid, IObsSystemDatabase db)
.withSystems(procFilter)
.build();

// Replace driver's transaction handler so that new IObsSystemDatabase handles driver
var systemFilter = new SystemFilter.Builder()
var topLevelSystemsFilter = new SystemFilter.Builder()
.withUniqueIDs(uid)
.includeMembers(true)
.withNoParent()
.build();
systemStateDb.getSystemDescStore().selectEntries(systemFilter).forEach(desc ->

// Replace driver's transaction handler so that new IObsSystemDatabase handles driver
systemStateDb.getSystemDescStore().selectEntries(topLevelSystemsFilter).forEach(desc ->
register(getDriverHandler(desc.getValue().getUniqueIdentifier()).driver));

systemStateDb.getDataStreamStore().removeEntries(dsFilter);
systemStateDb.getCommandStreamStore().removeEntries(csFilter);
var count = systemStateDb.getSystemDescStore().removeEntries(procFilter);
var count = systemStateDb.getSystemDescStore().removeEntries(topLevelSystemsFilter);

if (count > 0)
log.info("Database #{} now handles system {}. Removing all records from state DB", db.getDatabaseNum(), uid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ protected void doFinishRegister(ISystemDriver driver) throws DataStoreException
{
for (var member: ((ISystemGroupDriver<?>)driver).getMembers().values())
{
if (!(member instanceof IModule<?>)) // don't register submodules as they'll register themselves
doRegisterMember(member, driver.getCurrentDescription().getValidTime());
doRegisterMember(member, driver.getCurrentDescription().getValidTime());
}
}

Expand Down Expand Up @@ -188,7 +187,6 @@ protected synchronized boolean doRegisterMember(ISystemDriver driver, TimeExtent
{
Asserts.checkNotNull(driver, ISystemDriver.class);
var uid = OshAsserts.checkValidUID(driver.getUniqueIdentifier());
boolean isNew = false;

var procWrapper = new SystemWrapper(driver.getCurrentDescription())
.hideOutputs()
Expand All @@ -199,17 +197,23 @@ protected synchronized boolean doRegisterMember(ISystemDriver driver, TimeExtent
procWrapper.defaultToValidTime(validTime);
else
procWrapper.defaultToValidFromNow();

// add or update existing system entry
var newMemberHandler = (SystemDriverTransactionHandler)addOrUpdateMember(procWrapper);

// replace and cleanup old handler
var oldMemberHandler = memberHandlers.get(uid);
if (oldMemberHandler != null)
boolean isNew = oldMemberHandler == null;
if (!isNew)
{
driver.unregisterListener(oldMemberHandler);
isNew = false;
// only allow same member driver to re-register with same UID
ISystemDriver registeredDriver = oldMemberHandler.driver;
if (registeredDriver != null && registeredDriver != driver)
throw new IllegalArgumentException("A subsystem with UID " + uid + " is already registered");

driver.unregisterListener(memberHandlers.get(uid));
}

memberHandlers.put(uid, newMemberHandler);

// register/update driver sub-components
Expand Down Expand Up @@ -463,7 +467,7 @@ public void handleEvent(Event e)
{
var driverUid = driver.getUniqueIdentifier();
var eventUid = ((SystemEvent) e).getSystemUID();

// assign internal ID before event is dispatched
((SystemEvent)e).assignSystemID(sysKey.getInternalID());

Expand Down

0 comments on commit 1895e40

Please sign in to comment.