Skip to content

Commit

Permalink
[METADATA] Expose all COMRPC metadata like Channels and Proxies (also… (
Browse files Browse the repository at this point in the history
#1775)

* [METADATA] Expose all COMRPC metadata like Channels and Proxies (also the private channels)

* Update IController.h

* Update IUnknown.cpp

* Update test_rpc.cpp

* Update IUnknown.cpp

Thank you Coverity! I like tools that add value :-)

* Update IUnknown.cpp

* [FIXES] After testing on windows :-)

* Update IShell.h

* Update PluginHost.cpp

* Update test_rpc.cpp

---------

Co-authored-by: MFransen69 <[email protected]>
Co-authored-by: Volkan Aslan <[email protected]>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent 5712a36 commit 3a82e15
Show file tree
Hide file tree
Showing 18 changed files with 210 additions and 102 deletions.
11 changes: 9 additions & 2 deletions Source/Thunder/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,22 +1216,27 @@ namespace Plugin {
return (Core::ERROR_NONE);
}

Core::hresult Controller::Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
Core::hresult Controller::Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
{
Core::hresult result = Core::ERROR_UNKNOWN_KEY;

std::vector<IMetadata::Data::Proxy> collection;
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId, [&collection](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId.IsSet() ? linkId.Value() : EMPTY_STRING, [&collection, &linkId](const string& origin, const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy : proxies) {
IMetadata::Data::Proxy data;
data.Count = proxy->ReferenceCount();
data.Instance = proxy->Implementation();
data.Interface = proxy->InterfaceId();
data.Name = proxy->Name();
if (linkId.IsSet() == false) {
data.Origin = Core::NumberType<uint32_t>(proxy->ChannelId()).Text() + '@' + origin;
}
collection.emplace_back(std::move(data));
}
});

TRACE(Trace::Information, (_T("Found %d proxies to be listed and the search = [%s]"), collection.size(), proxySearch ? _T("true") : _T("false")));

if (proxySearch == true) {
using Iterator = IMetadata::Data::IProxiesIterator;

Expand Down Expand Up @@ -1440,6 +1445,8 @@ namespace Plugin {
buildInfo.ThreadPoolCount = THREADPOOL_COUNT;
#endif

buildInfo.COMRPCTimeOut = RPC::CommunicationTimeOut;

return (Core::ERROR_NONE);
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Thunder/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ namespace Plugin {

// IMetadata overrides
Core::hresult Links(IMetadata::Data::ILinksIterator*& links) const override;
Core::hresult Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Services(const Core::OptionalType<string>& callsign, IMetadata::Data::IServicesIterator*& services) const override;
Core::hresult CallStack(const uint8_t threadId, IMetadata::Data::ICallStackIterator*& callstack) const override;
Core::hresult Threads(IMetadata::Data::IThreadsIterator*& threads) const override;
Expand Down
30 changes: 12 additions & 18 deletions Source/Thunder/ExampleConfigWindows.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
"binding": "127.0.0.1",
"idletime": 60,
"ipv6": false,
"persistentpath": "C:/ThunderWin/artifacts/Persistent",
"volatilepath": "C:/ThunderWin/artifacts/temp",
"datapath": "C:/ThunderWin/artifacts/Debug/Plugins",
"systempath": "C:/ThunderWin/artifacts/Debug",
"proxystubpath": "C:/ThunderWin/artifacts/ProxyStubs/Debug",
"persistentpath": "D:/domotica/artifacts/Persistent",
"volatilepath": "D:/domotica//artifacts/temp",
"datapath": "D:/domotica/artifacts/Debug/Plugins",
"systempath": "D:/domotica/artifacts/Debug",
"proxystubpath": "D:/domotica/artifacts/ProxyStubs/Debug",
"communicator": "127.0.0.1:62000",
"redirect": "Service/Controller/UI",
"observe": {
"proxystubpath": "C:/ThunderWin/artifacts/dynamic/proxystubs",
"configpath": "C:/ThunderWin/artifacts/dynamic/config"
"proxystubpath": "D:/domotica/artifacts/dynamic/proxystubs",
"configpath": "D:/domotica/artifacts/dynamic/config"
},
"messaging": {
"port": 63000,
Expand Down Expand Up @@ -137,7 +137,7 @@
"callsign": "Butler",
"locator": "libButler.so",
"classname": "Butler",
"startmode": "Activated"
"startmode": "Deactivated"
},
{
"callsign": "ZigbeeControl",
Expand All @@ -154,10 +154,10 @@
"callsign": "ZWaveControl",
"locator": "libZWaveControl.so",
"classname": "ZWaveControl",
"startmode": "Activated",
"startmode": "Deactivated",
"configuration": {
"port": "\\\\.\\COM4",
"key": "ba:09:87:65:43:21:de:ad:be:ef:12:34:56:78:90:ab"
"key": "ba:09:87:65:43:21:de:ad:be:ef:12:34:56:78:90:ab"
}
},
{
Expand Down Expand Up @@ -401,7 +401,7 @@
"sleep": "5",
"single": false,
"crash": true,
"leak": true,
"leak": true,
"root": {
"mode": "Local"
}
Expand Down Expand Up @@ -458,7 +458,7 @@
"callsign": "WebServer",
"locator": "libWebServer.so",
"classname": "WebServer",
"startmode": "Deactivated",
"startmode": "Activated",
"communicator": "127.0.0.1:2349",
"configuration": {
"port": 8080,
Expand Down Expand Up @@ -571,12 +571,6 @@
"classname": "MessageControl",
"startmode": "Activated"
},
{
"callsign": "TraceControl",
"locator": "libtracecontrol.so",
"classname": "TraceControl",
"startmode": "Deactivated"
},
{
"callsign": "RemoteControl",
"locator": "libremotecontrol.so",
Expand Down
4 changes: 2 additions & 2 deletions Source/Thunder/PluginHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,10 @@ POP_WARNING()
printf("Link: %s\n", index.Current().Remote.Value().c_str());
printf("------------------------------------------------------------\n");

RPC::Administrator::Instance().Allocations(index.Current().ID.Value(), [](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
RPC::Administrator::Instance().Allocations(index.Current().Name.Value(), [](const string& origin, const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy: proxies) {
Core::instance_id instanceId = proxy->Implementation();
printf("[%s] InstanceId: 0x%" PRIx64 ", RefCount: %d, InterfaceId %d [0x%X]\n", proxy->Name().c_str(), static_cast<uint64_t>(instanceId), proxy->ReferenceCount(), proxy->InterfaceId(), proxy->InterfaceId());
printf("[%s] InstanceId: 0x%" PRIx64 ", RefCount: %d, InterfaceId %d [0x%X], Origin: %s\n", proxy->Name().c_str(), static_cast<uint64_t>(instanceId), proxy->ReferenceCount(), proxy->InterfaceId(), proxy->InterfaceId(), origin.c_str());
}
printf("\n");
});
Expand Down
32 changes: 25 additions & 7 deletions Source/Thunder/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,11 @@ namespace PluginHost {
ExternalAccess& operator=(const ExternalAccess&) = delete;

ExternalAccess(
const Core::NodeId& source,
const Core::NodeId& sourceNode,
const string& proxyStubPath,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(source, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler))
const Core::ProxyType<RPC::InvokeServer>& handler,
const string& sourceName)
: RPC::Communicator(sourceNode, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler), sourceName.c_str())
, _plugin(nullptr) {
}
~ExternalAccess() override = default;
Expand Down Expand Up @@ -822,7 +823,7 @@ namespace PluginHost {
, _lastId(0)
, _metadata(plugin.MaxRequests.Value())
, _library()
, _external(PluginNodeId(server, plugin), server.ProxyStubPath(), handler)
, _external(PluginNodeId(server, plugin), server.ProxyStubPath(), handler, '/' + Callsign())
, _administrator(administrator)
, _composit(*this)
, _jobs(administrator)
Expand Down Expand Up @@ -855,6 +856,9 @@ namespace PluginHost {
}

public:
inline const RPC::Communicator& COMServer() const {
return (_external);
}
inline void Submit(Core::ProxyType<Core::IDispatch>&& job) {
_jobs.Push(std::move(job));
}
Expand Down Expand Up @@ -2205,7 +2209,7 @@ namespace PluginHost {
const uint8_t hardKillCheckWaitTime,
const bool delegatedReleases,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler))
: RPC::Communicator(node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler), _T("/"))
, _parent(parent)
, _persistentPath(persistentPath)
, _systemPath(systemPath)
Expand Down Expand Up @@ -3203,10 +3207,24 @@ namespace PluginHost {

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = string("/" EXPAND_AND_QUOTE(APPLICATION_NAME) "/Communicator");
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
_adminLock.Unlock();

for (const auto& entry : _services) {
entry.second->COMServer().Visit([&](const RPC::Communicator::Client& element)
{
Metadata::Channel& entry = metaData.Add();
entry.ID = element.Extension().Id();

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
}

_adminLock.Unlock();
}
uint32_t FromIdentifier(const string& callSign, Core::ProxyType<IShell>& service)
{
Expand Down
44 changes: 29 additions & 15 deletions Source/com/Administrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

#include "Administrator.h"
#include "IUnknown.h"
#include "Communicator.h"

namespace Thunder {
namespace RPC {

/* static */ const string Administrator::DanglingId("/Dangling");

Administrator::Administrator()
: _adminLock()
, _stubs()
Expand Down Expand Up @@ -122,17 +125,17 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(proxy.Id()));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && ((*entry) != &proxy)) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && ((*entry) != &proxy)) {
entry++;
}

ASSERT(entry != index->second.end());
ASSERT(entry != index->second.second.end());

if (entry != index->second.end()) {
index->second.erase(entry);
if (entry != index->second.second.end()) {
index->second.second.erase(entry);
removed = true;
if (index->second.size() == 0) {
if (index->second.second.size() == 0) {
_channelProxyMap.erase(index);
}
}
Expand Down Expand Up @@ -228,11 +231,11 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channel->Id()));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
entry++;
}
if (entry != index->second.end()) {
if (entry != index->second.second.end()) {
interface = (*entry)->QueryInterface(id);
if (interface != nullptr) {
result = (*entry);
Expand Down Expand Up @@ -261,11 +264,11 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channelId));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
entry++;
}
if (entry != index->second.end()) {
if (entry != index->second.second.end()) {
interface = (*entry)->Acquire(outbound, id);

// The implementation could be found, but the current implemented proxy is not
Expand All @@ -287,7 +290,18 @@ namespace RPC {
ASSERT(result != nullptr);

// Register it as it is remotely registered :-)
_channelProxyMap[channelId].push_back(result);
ChannelMap::iterator channelIndex(_channelProxyMap.find(channelId));

if (channelIndex != _channelProxyMap.end()) {
channelIndex->second.second.push_back(result);
}
else {
Proxies baseList;
baseList.emplace_back(result);
_channelProxyMap.emplace(std::piecewise_construct,
std::forward_as_tuple(channelId),
std::forward_as_tuple(std::pair<string, Proxies>(channel->Origin(), baseList)));
}

// This will increment the reference count to 2 (one in the ChannelProxyMap and one in the QueryInterface ).
interface = result->QueryInterface(id);
Expand Down Expand Up @@ -431,7 +445,7 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channelId));

if (index != _channelProxyMap.end()) {
for (auto entry : index->second) {
for (auto entry : index->second.second) {
entry->Invalidate();
_danglingProxies.emplace_back(entry);

Expand All @@ -444,7 +458,7 @@ namespace RPC {
// the pendingProxies. The receiver of pendingProxies has to take
// care of releasing the last reference we, as administration layer
// hold upon this..
pendingProxies = std::move(index->second);
pendingProxies = std::move(index->second.second);
_channelProxyMap.erase(index);
}

Expand Down
22 changes: 15 additions & 7 deletions Source/com/Administrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ namespace RPC {
};

public:
static const string DanglingId;

using Proxies = std::vector<ProxyStub::UnknownProxy*>;
using ChannelMap = std::unordered_map<uint32_t, Proxies>;
using ChannelMap = std::unordered_map<uint32_t, std::pair<string, Proxies > >;
using ReferenceMap = std::unordered_map<uint32_t, std::list< RecoverySet > >;
using Stubs = std::unordered_map<uint32_t, ProxyStub::UnknownStub*>;
using Factories = std::unordered_map<uint32_t, IMetadata*>;
Expand All @@ -149,25 +151,31 @@ namespace RPC {
}

template<typename ACTION>
bool Allocations(const uint32_t id, ACTION&& action) const {
bool Allocations(const string& linkId, ACTION&& action) const {
bool found = false;
_adminLock.Lock();
if (id == 0) {
if (linkId.empty() == true) {
for (const auto& proxy : _channelProxyMap) {
action(proxy.second);
action(proxy.second.first, proxy.second.second);
}
action(_danglingProxies);
action(DanglingId, _danglingProxies);
found = true;
}
else if (linkId == DanglingId) {
action(DanglingId, _danglingProxies);
found = true;
}
else {
ChannelMap::const_iterator index(_channelProxyMap.begin());
while ((found == false) && (index != _channelProxyMap.end())) {
if (index->first != id) {
ASSERT(index->second.second.size() != 0);

if (index->second.first != linkId) {
index++;
}
else {
found = true;
action(index->second);
action(index->second.first, index->second.second);
}
}
}
Expand Down
Loading

0 comments on commit 3a82e15

Please sign in to comment.