Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
93506cd
Cleanup[MQBCONFM]: Remove legacy bmqconf service definition
pniedzielski Jul 28, 2025
0fb2c66
Cleanup[MQBCONFM]: Remove legacy `Request` type
pniedzielski Jul 28, 2025
f66669e
Cleanup[MQBCONFM]: Remove legacy `DomainConfigRequest` type
pniedzielski Jul 28, 2025
9d9a3b3
Cleanup[MQBCONFM]: Remove legacy `BrokerIdentity` type
pniedzielski Jul 28, 2025
bf8786e
Cleanup[MQBCONFM]: Remove legacy `Response` type
pniedzielski Jul 28, 2025
b9fbf2b
Cleanup[MQBCONFM]: Remove legacy `Failure` type
pniedzielski Jul 28, 2025
ea7022a
Cleanup[MQBCONFM]: Remove legacy `DomainConfigRaw` type
pniedzielski Jul 28, 2025
b65b7d7
Refactor[MQBA]: Prepare failure path to not use Response type
pniedzielski Jul 28, 2025
b51943a
Refactor[MQBA]: Remove `onDomainConfigResponseCb`
pniedzielski Jul 28, 2025
3fecd5c
Refactor[MQBA]: Lift failure handling closer to where failure is dete…
pniedzielski Jul 28, 2025
b3c4d19
Refactor[MQBA]: Remove failure detection from cache
pniedzielski Jul 28, 2025
d02fbb5
Refactor[MQBA]: Use `e_SUCCESS`
pniedzielski Jul 28, 2025
e3d2ca5
Refactor[MQBA]: Refactor into new `cacheAdd` function
pniedzielski Jul 28, 2025
7b2a4a1
Refactor[MQBA]: Unnest failure cases
pniedzielski Jul 28, 2025
5d2dc0e
Refactor[MQBA]: Remove uses of `mqbconfm::Response`
pniedzielski Jul 28, 2025
4e016fd
Fix[MQBA]: Improve logging in failure case for domain config
pniedzielski Jul 28, 2025
7ef7aaa
Cleanup[MQBA]: Remove unused function `generateConfig`
pniedzielski Jul 28, 2025
38ebd60
Refactor[MQBA]: Remove extraneous locks and unlocks
pniedzielski Jul 28, 2025
2079f90
Refactor[MQBA]: Remove uses of `mqbconfm::DomainConfigRaw`
pniedzielski Jul 28, 2025
5e0f0e4
Cleanup[MQBA]: Tidy logs for `getDomainConfig`
pniedzielski Jul 28, 2025
9501491
Cleanup[MQBA]: clang-format changes
pniedzielski Jul 28, 2025
06ccea8
cleanup[MQBA]: Remove unused `executeScript` function
pniedzielski Jul 30, 2025
8737c39
Refactor[MQBA]: Replace `mqbconfm::DomainResolver` with two strings
pniedzielski Jul 30, 2025
e9ca82c
Cleanup[MQBCONFM]: Remove legacy `DomainResolver` type
pniedzielski Jul 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 61 additions & 71 deletions src/groups/mqb/mqba/mqba_configprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace mqba {
// class ConfigProvider
// --------------------

bool ConfigProvider::cacheLookup(mqbconfm::Response* response,
bool ConfigProvider::cacheLookup(bsl::string* config,
const bslstl::StringRef& key)
{
BSLMT_MUTEXASSERT_IS_LOCKED_SAFE(&d_mutex); // mutex LOCKED
Expand All @@ -71,39 +71,22 @@ bool ConfigProvider::cacheLookup(mqbconfm::Response* response,
}

// Cache entry is still 'alive'
*response = it->second.d_data; // Assign a copy of the object
*config = it->second.d_data; // Assign a copy of the object
return true;
}

void ConfigProvider::onDomainConfigResponseCb(
const mqbconfm::Response response,
const GetDomainConfigCb& callback)
void ConfigProvider::cacheAdd(const bslstl::StringRef& key,
const bsl::string& config)
{
if (response.isFailureValue()) {
callback(response.failure().code(), response.failure().message());
return; // RETURN
}

BSLS_ASSERT_OPT(response.isDomainConfigValue());

BALL_LOG_INFO << "Received domain config for domain '"
<< response.domainConfig().domainName() << "': '"
<< response.domainConfig().config() << "'";
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // mutex LOCKED

// Save to cache
CacheEntry cacheEntry;
cacheEntry.d_data = response;
cacheEntry.d_expireTime =
bmqsys::Time::nowMonotonicClock() +
bsls::TimeInterval(
mqbcfg::BrokerConfig::get().bmqconfConfig().cacheTTLSeconds());
d_cache[response.domainConfig().domainName()] = cacheEntry;
}
BSLMT_MUTEXASSERT_IS_LOCKED_SAFE(&d_mutex); // mutex LOCKED

// Call callback
callback(0, response.domainConfig().config());
CacheEntry cacheEntry;
cacheEntry.d_data = config;
cacheEntry.d_expireTime =
bmqsys::Time::nowMonotonicClock() +
bsls::TimeInterval(
mqbcfg::BrokerConfig::get().bmqconfConfig().cacheTTLSeconds());
d_cache[key] = cacheEntry;
}

ConfigProvider::ConfigProvider(bslma::Allocator* allocator)
Expand Down Expand Up @@ -134,21 +117,29 @@ void ConfigProvider::stop()
void ConfigProvider::getDomainConfig(const bslstl::StringRef& domainName,
const GetDomainConfigCb& callback)
{
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // mutex LOCKED
enum {
e_SUCCESS = 0,
e_FILENOTEXIST = -1,
e_FILENOTOPENED = -2,
};

bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK

// First, check in the cache
mqbconfm::Response response;
if (cacheLookup(&response, domainName) == true) {
guard.release()->unlock(); // mutex UNLOCK
BALL_LOG_INFO << "Config for domain '" << domainName << "' retrieved "
<< "from cache";
onDomainConfigResponseCb(response, callback);
bsl::string config;
if (cacheLookup(&config, domainName) == true) {
BALL_LOG_INFO << "Retrieved config for domain '" << domainName
<< "' from cache: '" << config << "'";

// Update the expiration time in the cache.
cacheAdd(domainName, config);

guard.release()->unlock(); // UNLOCK
callback(e_SUCCESS, config);
return; // RETURN
}

// We don't have the config in the small cache ..
int rc = 0;
bsl::string config;

bsl::string filePath = mqbcfg::BrokerConfig::get().etcDir() + "/domains/" +
domainName + ".json";
Expand All @@ -157,44 +148,43 @@ void ConfigProvider::getDomainConfig(const bslstl::StringRef& domainName,
bdlma::LocalSequentialAllocator<1024> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "Domain file '" << filePath << "' doesn't exist";
config.assign(os.str().data(), os.str().length());
rc = -1;
}
else {
bsl::ifstream fileStream(filePath.c_str(), bsl::ios::in);
if (!fileStream) {
bdlma::LocalSequentialAllocator<1024> localAllocator(
d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "Unable to open domain file '" << filePath << "'";
config.assign(os.str().data(), os.str().length());
rc = -2;
}
else {
fileStream.seekg(0, bsl::ios::end);
config.resize(fileStream.tellg());
fileStream.seekg(0, bsl::ios::beg);
fileStream.read(config.data(), config.size());
fileStream.close();
}
}

if (rc != 0) {
response.makeFailure();
response.failure().code() = rc;
response.failure().message() = config;
guard.release()->unlock(); // UNLOCK

BALL_LOG_INFO << "Failed to retrieve config for domain '" << domainName
<< "' from file '" << filePath << "': " << os.str();
callback(e_FILENOTEXIST, os.str());
return; // RETURN
}
else {
response.makeDomainConfig();
response.domainConfig().config() = config;
response.domainConfig().domainName() = domainName;

bsl::ifstream fileStream(filePath.c_str(), bsl::ios::in);
if (!fileStream) {
bdlma::LocalSequentialAllocator<1024> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "Unable to open domain file '" << filePath << "'";

guard.release()->unlock(); // UNLOCK

BALL_LOG_INFO << "Failed to retrieve config for domain '" << domainName
<< "' from file '" << filePath << "': " << os.str();
callback(e_FILENOTOPENED, os.str());
return; // RETURN
}
guard.release()->unlock(); // unlock

BALL_LOG_INFO << "Config for domain '" << domainName << "' retrieved "
<< "from file '" << filePath << "'";
fileStream.seekg(0, bsl::ios::end);
config.resize(fileStream.tellg());
fileStream.seekg(0, bsl::ios::beg);
fileStream.read(config.data(), config.size());
fileStream.close();

BALL_LOG_INFO << "Retrieved config for domain '" << domainName
<< "' from file '" << filePath << "': '" << config << "'";

// Insert the newly-found configuration into the cache.
cacheAdd(domainName, config);

onDomainConfigResponseCb(response, callback);
guard.release()->unlock(); // UNLOCK
callback(e_SUCCESS, config);
}

void ConfigProvider::clearCache(const bslstl::StringRef& domainName)
Expand Down
34 changes: 11 additions & 23 deletions src/groups/mqb/mqba/mqba_configprovider.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
/// @todo Add commandHandler.
/// @todo Add statistics.

// MQB
#include <mqbconfm_messages.h>

// BDE
#include <ball_log.h>
#include <bsl_functional.h>
Expand Down Expand Up @@ -94,7 +91,7 @@ class ConfigProvider {
// PUBLIC DATA

/// Data to cache.
mqbconfm::Response d_data;
bsl::string d_data;
/// Time after which this entry is no longer valid.
bsls::TimeInterval d_expireTime;
};
Expand All @@ -120,26 +117,17 @@ class ConfigProvider {
private:
// PRIVATE MANIPULATORS

/// Invoke the script to generate the configuration for the specified
/// `domainName` and store the result in the specified `output` on
/// success, returning 0; or return a non-zero result code and populate
/// `output` with the error on failure.
int generateConfig(bsl::string* output,
const bslstl::StringRef& domainName);

/// Lookup entry with the specified `key` in the cache and fill the data
/// in the specified `response` if found and expiry time has not yet
/// been reached; otherwise return false and leave `response`
/// untouched. Note that if the entry is found but has expired, this
/// will erase it from the cache.
bool cacheLookup(mqbconfm::Response* response,
const bslstl::StringRef& key);

/// Callback when the configuration for the domain has been retrieved,
/// in the specified `response` and which should be forwarded to the
/// specified `callback`.
void onDomainConfigResponseCb(const mqbconfm::Response response,
const GetDomainConfigCb& callback);
/// in the specified `config` if found and expiry time has not yet been
/// reached; otherwise return false and leave `config` untouched. Note
/// that if the entry is found but has expired, this will erase it from the
/// cache.
bool cacheLookup(bsl::string* config, const bslstl::StringRef& key);

/// Add entry with the specified `key` and with the specified `config` as
/// data to the cache, resetting its expiry time. Note that this will
/// overwrite any existing entry with the specified `key` in the cache.
void cacheAdd(const bslstl::StringRef& key, const bsl::string& config);

private:
// NOT IMPLEMENTED
Expand Down
69 changes: 43 additions & 26 deletions src/groups/mqb/mqba/mqba_domainresolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
// mqba_domainresolver.cpp -*-C++-*-
#include <mqba_domainresolver.h>

#include <mqbscm_version.h>
// MQB
#include <mqbcfg_brokerconfig.h>
#include <mqbcfg_messages.h>
#include <mqbcmd_messages.h>
#include <mqbconfm_messages.h>
#include <mqbscm_version.h>

#include <bmqsys_time.h>
#include <bmqu_memoutstream.h>
Expand Down Expand Up @@ -85,8 +86,9 @@ void DomainResolver::updateTimestamps()
d_timestampsValidUntil = now + k_DIR_CHECK_TTL;
}

bool DomainResolver::cacheLookup(mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
bool DomainResolver::cacheLookup(bsl::string* resolvedDomainName,
bsl::string* clusterName,
const bslstl::StringRef& domainName)
{
// executed by the thread that holds the 'd_mutex'

Expand All @@ -110,13 +112,16 @@ bool DomainResolver::cacheLookup(mqbconfm::DomainResolver* out,
return false; // RETURN
}

*out = it->second.d_data; // Assign a copy of the object
// Copy cached data.
*resolvedDomainName = it->second.d_name;
*clusterName = it->second.d_cluster;
return true;
}

int DomainResolver::getOrRead(bsl::ostream& errorDescription,
mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
int DomainResolver::getOrRead(bsl::ostream& errorDescription,
bsl::string* resolvedDomainName,
bsl::string* clusterName,
const bslstl::StringRef& domainName)
{
// executed by *ANY* thread

Expand All @@ -126,7 +131,7 @@ int DomainResolver::getOrRead(bsl::ostream& errorDescription,
updateTimestamps();

// First, check in the cache
if (cacheLookup(out, domainName)) {
if (cacheLookup(resolvedDomainName, clusterName, domainName)) {
BALL_LOG_INFO << "Domain '" << domainName << "' resolved from cache";
return 0; // RETURN
}
Expand All @@ -143,13 +148,14 @@ int DomainResolver::getOrRead(bsl::ostream& errorDescription,

int rc = rc_SUCCESS;
bsl::string content;
bsl::string resolvedDomainName = domainName;
int redirection = 0;
int redirection = 0;

bsl::string redirectedDomainName = domainName;

for (; redirection < 2; ++redirection) {
// 1. read config
bsl::string filePath = mqbcfg::BrokerConfig::get().etcDir() +
"/domains/" + resolvedDomainName + ".json";
"/domains/" + redirectedDomainName + ".json";

// This is copy-pasted from mqba_configprovider.cpp. Maybe we are
// going to merge the two? If not, consider factoring this bit.
Expand Down Expand Up @@ -232,25 +238,26 @@ int DomainResolver::getOrRead(bsl::ostream& errorDescription,
}

if (domainVariant.isRedirectValue()) {
BALL_LOG_INFO << "Redirecting " << resolvedDomainName << " to "
BALL_LOG_INFO << "Redirecting " << redirectedDomainName << " to "
<< domainVariant.redirect();
resolvedDomainName = domainVariant.redirect();
redirectedDomainName = domainVariant.redirect();
continue;
}

// Add to cache
CacheEntry cacheEntry;
cacheEntry.d_data.name() = resolvedDomainName;
cacheEntry.d_data.cluster() = domainVariant.definition().location();
*out = cacheEntry.d_data;
cacheEntry.d_name = redirectedDomainName;
cacheEntry.d_cluster = domainVariant.definition().location();
// REVIEW: suggestion: s/location/cluster/
cacheEntry.d_cfgDirTimestamp = d_lastCfgDirTimestamp;
// This is fine, because we updated
// them by calling
// 'updateTimestamps()'.

d_cache[domainName] = cacheEntry;

// Copy resolved data.
*resolvedDomainName = cacheEntry.d_name;
*clusterName = cacheEntry.d_cluster;
return rc_SUCCESS; // RETURN
}

Expand Down Expand Up @@ -294,14 +301,18 @@ void DomainResolver::stop()
}

bmqp_ctrlmsg::Status
DomainResolver::getOrReadDomain(mqbconfm::DomainResolver* out,
const bslstl::StringRef& domainName)
DomainResolver::getOrReadDomain(bsl::string* resolvedDomainName,
bsl::string* clusterName,
const bslstl::StringRef& domainName)
{
// executed by *ANY* thread

bmqu::MemOutStream errorDescription;

int rc = getOrRead(errorDescription, out, domainName);
int rc = getOrRead(errorDescription,
resolvedDomainName,
clusterName,
domainName);

bmqp_ctrlmsg::Status status;
status.category() = (rc == 0 ? bmqp_ctrlmsg::StatusCategory::E_SUCCESS
Expand All @@ -319,21 +330,27 @@ void DomainResolver::qualifyDomain(
{
// executed by *ANY* thread

mqbconfm::DomainResolver response;
bmqp_ctrlmsg::Status status = getOrReadDomain(&response, domainName);
bsl::string resolvedDomainName;
bsl::string clusterName;
bmqp_ctrlmsg::Status status = getOrReadDomain(&resolvedDomainName,
&clusterName,
domainName);

callback(status, response.name());
callback(status, resolvedDomainName);
}

void DomainResolver::locateDomain(const bslstl::StringRef& domainName,
const LocateDomainCb& callback)
{
// executed by *ANY* thread

mqbconfm::DomainResolver response;
bmqp_ctrlmsg::Status status = getOrReadDomain(&response, domainName);
bsl::string resolvedDomainName;
bsl::string clusterName;
bmqp_ctrlmsg::Status status = getOrReadDomain(&resolvedDomainName,
&clusterName,
domainName);

callback(status, response.cluster());
callback(status, clusterName);
}

void DomainResolver::clearCache(const bslstl::StringRef& domainName)
Expand Down
Loading
Loading