Skip to content

Commit

Permalink
[PRUNING] Cleanup of the "all" event mechanism sending out RESTFULL n…
Browse files Browse the repository at this point in the history
…otifications and JSONRPC events. (#1667)
  • Loading branch information
pwielders authored Jun 24, 2024
1 parent d36af08 commit 087dcae
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 152 deletions.
14 changes: 1 addition & 13 deletions Source/Thunder/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,7 @@ namespace Plugin {

void Controller::SubSystems()
{
#if THUNDER_RESTFULL_API || defined(__DEBUG__)
PluginHost::Metadata response;
#endif
Core::JSON::ArrayType<JsonData::Subsystems::SubsystemInfo> responseJsonRpc;
PluginHost::ISubSystem* subSystem = _service->SubSystems();

Expand All @@ -763,10 +761,7 @@ namespace Plugin {
status.Subsystem = current;
status.Active = ((reportMask & bit) != 0);
responseJsonRpc.Add(status);

#if THUNDER_RESTFULL_API || defined(__DEBUG__)
response.SubSystems.Add(current, ((reportMask & bit) != 0));
#endif

sendReport = true;
}
Expand All @@ -781,15 +776,11 @@ namespace Plugin {

if (sendReport == true) {

#if THUNDER_RESTFULL_API || defined(__DEBUG__)
string message;
response.ToString(message);
TRACE_L1("Sending out a SubSystem change notification. %s", message.c_str());
#endif

#if THUNDER_RESTFULL_API
_pluginServer->_controller->Notification(message);
#endif
_service->Notify(EMPTY_STRING, message);

Exchange::Controller::JSubsystems::Event::SubsystemChange(*this, responseJsonRpc);
}
Expand Down Expand Up @@ -1110,10 +1101,7 @@ namespace Plugin {
service.Configuration = meta.Configuration;
service.Precondition = meta.Precondition;
service.Termination = meta.Termination;

#if THUNDER_RESTFULL_API
service.Observers = meta.Observers;
#endif

#if THUNDER_RUNTIME_STATISTICS
service.ProcessedRequests = meta.ProcessedRequests;
Expand Down
17 changes: 0 additions & 17 deletions Source/Thunder/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,6 @@ namespace Plugin {
}
}

void Notification(const string& callsign, const string& message)
{
ASSERT(callsign.empty() == false);
ASSERT(message.empty() == false);

Exchange::Controller::JEvents::Event::ForwardMessage(*this, callsign, message);
}

void Notification(const string& callsign, const string& event, const string& params)
{
ASSERT(callsign.empty() == false);
ASSERT(event.empty() == false);

Exchange::Controller::IEvents::INotification::Event data{event, params};
Exchange::Controller::JEvents::Event::ForwardEvent(*this, callsign, data);
}

inline void SetServer(PluginHost::Server* pluginServer, const std::vector<PluginHost::ISubSystem::subsystem>& externalSubsystems)
{
ASSERT((_pluginServer == nullptr) && (pluginServer != nullptr));
Expand Down
3 changes: 0 additions & 3 deletions Source/Thunder/PluginHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -774,10 +774,7 @@ POP_WARNING()
printf("Locator: %s\n", index.Current().Locator.Value().c_str());
printf("Classname: %s\n", index.Current().ClassName.Value().c_str());
printf("StartMode: %s\n", index.Current().StartMode.Data());
#if THUNDER_RESTFULL_API

printf("Observers: %d\n", index.Current().Observers.Value());
#endif

#if THUNDER_RUNTIME_STATISTICS
printf("Requests: %d\n", index.Current().ProcessedRequests.Value());
Expand Down
96 changes: 34 additions & 62 deletions Source/Thunder/PluginServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,14 @@ namespace PluginHost {
AddRef();
result = static_cast<PluginHost::IShell::IConnectionServer*>(this);
}
else if (id == PluginHost::IDispatcher::ID) {
_pluginHandling.Lock();
if (_jsonrpc != nullptr) {
_jsonrpc->AddRef();
result = _jsonrpc;
}
_pluginHandling.Unlock();
}
else {
_pluginHandling.Lock();

Expand Down Expand Up @@ -456,19 +464,15 @@ namespace PluginHost {
State(ACTIVATED);
_administrator.Activated(callSign, this);

#if THUNDER_RESTFULL_API
_administrator.Notification(_T("{\"callsign\":\"") + callSign + _T("\",\"state\":\"deactivated\",\"reason\":\"") + textReason.Data() + _T("\"}"));
#endif

_administrator.Notification(callSign, string(_T("{\"state\":\"activated\",\"reason\":\"")) + textReason.Data() + _T("\"}"));

IStateControl* stateControl = nullptr;
if ((Resumed() == true) && ((stateControl = _handler->QueryInterface<PluginHost::IStateControl>()) != nullptr)) {

stateControl->Request(PluginHost::IStateControl::RESUME);
stateControl->Release();
}

Notify(EMPTY_STRING, string(_T("{\"state\":\"activated\",\"reason\":\"")) + textReason.Data() + _T("\"}"));

Unlock();
}
}
Expand Down Expand Up @@ -600,12 +604,7 @@ namespace PluginHost {
if (currentState != IShell::state::ACTIVATION) {
SYSLOG(Logging::Shutdown, (_T("Deactivated plugin [%s]:[%s]"), className.c_str(), callSign.c_str()));

#if THUNDER_RESTFULL_API
_administrator.Notification(_T("{\"callsign\":\"") + callSign + _T("\",\"state\":\"deactivated\",\"reason\":\"") + textReason.Data() + _T("\"}"));
#endif

_administrator.Notification(callSign, string(_T("{\"state\":\"deactivated\",\"reason\":\"")) + textReason.Data() + _T("\"}"));

Notify(EMPTY_STRING, string(_T("{\"state\":\"deactivated\",\"reason\":\"")) + textReason.Data() + _T("\"}"));
}
}

Expand Down Expand Up @@ -691,11 +690,7 @@ namespace PluginHost {
State(UNAVAILABLE);
_administrator.Unavailable(callSign, this);

#if THUNDER_RESTFULL_API
_administrator.Notification(_T("{\"callsign\":\"") + callSign + _T("\",\"state\":\"unavailable\",\"reason\":\"") + textReason.Data() + _T("\"}"));
#endif

_administrator.Notification(callSign, string(_T("{\"state\":\"unavailable\",\"reason\":\"")) + textReason.Data() + _T("\"}"));
Notify(EMPTY_STRING, string(_T("{\"state\":\"unavailable\",\"reason\":\"")) + textReason.Data() + _T("\"}"));
}

Unlock();
Expand Down Expand Up @@ -899,19 +894,15 @@ namespace PluginHost {
return (_administrator.SubSystemsInterface(this));
}

void Server::Service::Notify(const string& message) /* override */
void Server::Service::Notify(const string& jsonrpcEvent, const string& message) /* override */
{
#if THUNDER_RESTFULL_API
// Notify the base class and the subscribers
PluginHost::Service::Notification(message);
#endif

_administrator.Notification(PluginHost::Service::Callsign(), message);
}
// JSONRPC has been send by now, lets send it to the "notification"
// observers..
if (jsonrpcEvent.empty() == true) {
BaseClass::Notification(message);
}

void Server::Service::Notify(const string& event, const string& parameters) /* override */
{
_administrator.Notification(PluginHost::Service::Callsign(), event, parameters);
_administrator.Notification(PluginHost::Service::Callsign(), jsonrpcEvent, message);
}

//
Expand Down Expand Up @@ -1193,46 +1184,27 @@ namespace PluginHost {
IFactories::Assign(nullptr);
}

void Server::Notification(const string& callsign, const string& data)
void Server::Notification(const string& callsign, const string& jsonrpc_event, const string& parameters)
{
Plugin::Controller* controller;
if ((_controller.IsValid() == true) && ((controller = (_controller->ClassType<Plugin::Controller>())) != nullptr)) {

controller->Notification(callsign, data);

#if THUNDER_RESTFULL_API
JsonData::Events::ForwardMessageParamsData message;
message.Callsign = callsign;
message.Data = data;
string messageString;
message.ToString(messageString);
_controller->Notification(messageString);
#endif
}
}
ASSERT((_controller.IsValid() == true) && (_controller->ClassType<Plugin::Controller>() != nullptr));

void Server::Notification(const string& callsign, const string& event, const string& parameters)
{
if (_controller.IsValid() == true) {
Plugin::Controller* controller = _controller->ClassType<Plugin::Controller>();
Plugin::Controller* controller = _controller->ClassType<Plugin::Controller>();

if (controller != nullptr) {
static const TCHAR allEvent[] = _T("all");
// Break a recursive loop, if it tries to arise ;-)
if ( (controller != nullptr) && (callsign != controller->Callsign()) ) {

if ((callsign != controller->Callsign()) || (event != allEvent)) {
ASSERT(callsign.empty() == false);

controller->Notification(callsign, event, parameters);
if (jsonrpc_event.empty() == false) {
JsonData::Events::ForwardEventParamsData message;
message.Data = Exchange::Controller::IEvents::INotification::Event({ jsonrpc_event, parameters });
message.Callsign = callsign;
Exchange::Controller::JEvents::Event::ForwardEvent(*controller, message);
}
else {
string messageString = string(_T("{\"callsign\":\"")) + callsign + _T("\", {\"data\":\"") + parameters + _T("\"}}");

#if THUNDER_RESTFULL_API
JsonData::Events::ForwardEventParamsData message;
message.Callsign = callsign;
message.Data.Event = event;
message.Data.Params = parameters;
string messageString;
message.ToString(messageString);
_controller->Notification(messageString);
#endif
}
_controller->Notify(EMPTY_STRING, messageString);
}
}
}
Expand Down
31 changes: 5 additions & 26 deletions Source/Thunder/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ namespace PluginHost {
};

private:
using BaseClass = PluginHost::Service;
class Composit : public PluginHost::ICompositPlugin::ICallback {
public:
Composit() = delete;
Expand Down Expand Up @@ -811,27 +812,17 @@ namespace PluginHost {
}
inline bool Subscribe(Channel& channel)
{
#if THUNDER_RESTFULL_API
bool result = PluginHost::Service::Subscribe(channel);

if ((result == true) && (_extended != nullptr)) {
_extended->Attach(channel);
}

return (result);
#else
if (_extended != nullptr) {
_extended->Attach(channel);
}

return (_extended != nullptr);
#endif
}
inline void Unsubscribe(Channel& channel)
{
#if THUNDER_RESTFULL_API
PluginHost::Service::Unsubscribe(channel);
#endif

if (_extended != nullptr) {
_extended->Detach(channel);
Expand Down Expand Up @@ -1156,8 +1147,7 @@ namespace PluginHost {

uint32_t Submit(const uint32_t id, const Core::ProxyType<Core::JSON::IElement>& response) override;
ISubSystem* SubSystems() override;
void Notify(const string& message) override;
void Notify(const string& event, const string& parameters) override;
void Notify(const string& event, const string& message) override;
void* QueryInterface(const uint32_t id) override;
void* QueryInterfaceByCallsign(const uint32_t id, const string& name) override;
template <typename REQUESTEDINTERFACE>
Expand Down Expand Up @@ -2981,20 +2971,10 @@ namespace PluginHost {

return (Iterator(std::move(workingList)));
}
inline void Notification(const string& callsign, const string& message)
inline void Notification(const string& callsign, const string& jsonrpc_event, const string& message)
{
_server.Notification(callsign, message);
_server.Notification(callsign, jsonrpc_event, message);
}
inline void Notification(const string& callsign, const string& event, const string& message)
{
_server.Notification(callsign, event, message);
}
#if THUNDER_RESTFULL_API
inline void Notification(const string& message)
{
_server.Controller()->Notification(message);
}
#endif
void GetMetadata(Core::JSON::ArrayType<Metadata::Service>& metaData) const
{
std::vector<Core::ProxyType<Service>> workingList;
Expand Down Expand Up @@ -4451,8 +4431,7 @@ namespace PluginHost {
return (_config);
}

void Notification(const string& callsign, const string& message);
void Notification(const string& callsign, const string& event, const string& message);
void Notification(const string& callsign, const string& jsonrpc_event, const string& message);
void Open();
void Close();

Expand Down
10 changes: 6 additions & 4 deletions Source/plugins/IShell.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,12 @@ namespace PluginHost {
virtual ISubSystem* SubSystems() = 0;

// Notify all subscribers of this service with the given string.
// It is expected to be JSON formatted strings as it is assumed that this is for reaching websockets clients living in
// the web world that have build in functionality to parse JSON structs.
virtual void Notify(const string& message) = 0;
virtual void Notify(const string& event, const string& parameters) = 0;
// It is expected to be JSON formatted strings (message) as it is assumed that this is for reaching websockets clients
// living in the web world that have build in functionality to parse JSON structs.
void Notify(const string& message) {
Notify(EMPTY_STRING, message);
}
virtual void Notify(const string& event, const string& message) = 0;

// Allow access to the Shells, configured for the different Plugins found in the configuration.
// Calling the QueryInterfaceByCallsign with an empty callsign will query for interfaces located
Expand Down
6 changes: 0 additions & 6 deletions Source/plugins/Metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ namespace PluginHost
, ProcessedRequests(0)
, ProcessedObjects(0)
#endif
#if THUNDER_RESTFULL_API
, Observers(0)
#endif
, ServiceVersion()
, Module()
, InterfaceVersion()
Expand All @@ -124,9 +122,7 @@ namespace PluginHost
Add(_T("processedrequests"), &ProcessedRequests);
Add(_T("processedobjects"), &ProcessedObjects);
#endif
#if THUNDER_RESTFULL_API
Add(_T("observers"), &Observers);
#endif
Add(_T("module"), &Module);
Add(_T("version"), &ServiceVersion);
Add(_T("interface"), &InterfaceVersion);
Expand All @@ -138,9 +134,7 @@ namespace PluginHost
, ProcessedRequests(std::move(move.ProcessedRequests))
, ProcessedObjects(std::move(move.ProcessedObjects))
#endif
#if THUNDER_RESTFULL_API
, Observers(std::move(move.Observers))
#endif
, ServiceVersion(std::move(move.ServiceVersion))
, Module(std::move(move.Module))
, InterfaceVersion(std::move(move.InterfaceVersion))
Expand Down
9 changes: 2 additions & 7 deletions Source/plugins/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,19 @@ namespace PluginHost {
return (Core::ProxyType<Core::IDispatch>(Core::ProxyType<IShell::Job>::Create(shell, toState, why)));
}

#if THUNDER_RESTFULL_API
void Service::Notification(const string& message)
{
_notifierLock.Lock();

ASSERT(message.empty() != true);
{
std::list<Channel*>::iterator index(_notifiers.begin());

while (index != _notifiers.end()) {
(*index)->Submit(message);
index++;
for (auto entry : _notifiers) {
entry->Submit(message);
}
}

_notifierLock.Unlock();
}
#endif

void Service::FileToServe(const string& webServiceRequest, Web::Response& response, bool allowUnsafePath)
{
Expand Down
Loading

0 comments on commit 087dcae

Please sign in to comment.