Skip to content

Commit

Permalink
Axes level and thread local Context
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 4, 2024
1 parent f819698 commit 89e89eb
Show file tree
Hide file tree
Showing 21 changed files with 118 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pygribjump/src/pygribjump/gribjump_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int gribjump_result_values(gribjump_extraction_result_t* result, double*** value
int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues);
int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks);
int gribjump_delete_result(gribjump_extraction_result_t* result);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int depth, const char* ctx, gribjump_handle_t* gj);
int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size);
int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size);
int gribjump_delete_axes(gj_axes_t* axes);
Expand Down
14 changes: 8 additions & 6 deletions pygribjump/src/pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ def extract(self, polyrequest, ctx=None, dump=True):
nfields = ffi.new('unsigned long**')
nrequests = len(requests)
c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests])
if (ctx):
logctx=str(ctx)
else:
logctx=""
logctx=str(ctx) if ctx else "pygribjump_extract"

logctx_c = ffi.new('const char[]', logctx.encode('ascii'))
lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c)
Expand Down Expand Up @@ -219,12 +216,17 @@ def extract_single(self, request):
]
return res

def axes(self, req):

def axes(self, req, level=3, ctx=None):
# note old axes used a dict in. This is now a string.
logctx=str(ctx) if ctx else "pygribjump_axes"
ctx_c = ffi.new('const char[]', logctx.encode('ascii'))
requeststr = dic_to_request(req)
newaxes = ffi.new('gj_axes_t**')
reqstr = ffi.new('const char[]', requeststr.encode('ascii'))
lib.gribjump_new_axes(newaxes, reqstr, self.__gribjump)
level_c = ffi.new('int', level)
lib.gribjump_new_axes(newaxes, reqstr, level_c, ctx_c, self.__gribjump)

# TODO want to return a dict like:
# {key: [value1, value2, ...], ...}
# each key and value is a string
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ size_t Engine::scan(std::vector<eckit::PathName> files) {
return files.size();
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request, int level) {
MetricsManager::instance().set("request", request);
return FDBLister::instance().axes(request);
return FDBLister::instance().axes(request, level);
}

void Engine::reportErrors(eckit::Stream& client) {
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Engine {
size_t scan(const MarsRequests& requests, bool byfiles = false);
size_t scan(std::vector<eckit::PathName> files);

std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level=3);

void scheduleTasks(filemap_t& filemap);

Expand Down
19 changes: 12 additions & 7 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ GribJump::GribJump() {
GribJump::~GribJump() {
}

size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
size_t GribJump::scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (paths.empty()) {
throw eckit::UserError("Paths must not be empty", Here());
Expand All @@ -37,7 +38,8 @@ size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
return ret;
}

size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) {
size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
Expand All @@ -48,17 +50,19 @@ size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, boo
}


std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
}

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests, ctx); // why are we not using extraction items?
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests); // why are we not using extraction items?
return out;
}

std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) {
std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (path.asString().empty()) {
throw eckit::UserError("Path must not be empty", Here());
Expand All @@ -74,13 +78,14 @@ std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::Path
return out;
}

std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request, int level, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (request.empty()) {
throw eckit::UserError("Request string must not be empty", Here());
}

auto out = impl_->axes(request);
auto out = impl_->axes(request, level);
return out;
}

Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/GribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class GribJump {

~GribJump();

size_t scan(const std::vector<eckit::PathName>& paths);
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false);
size_t scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx=LogContext("none"));
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false, const LogContext& ctx=LogContext("none"));

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, LogContext ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges);
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx=LogContext("none"));

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level=3, const LogContext& ctx=LogContext("none"));

void stats();

Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class GribJumpBase : public eckit::NonCopyable {

virtual size_t scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) = 0;

virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) = 0;
virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) = 0;
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;
virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) = 0;

virtual void stats();

Expand Down
8 changes: 4 additions & 4 deletions src/gribjump/Lister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,20 @@ std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(std::vect
return files;
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request, int level) {
std::vector<fdb5::FDBToolRequest> requests = fdb5::FDBToolRequest::requestsFromString(request, std::vector<std::string>(), true);
ASSERT(requests.size() == 1); // i.e. assume string is a single request.

return axes(requests.front());
return axes(requests.front(), level);
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request, int level) {
eckit::AutoLock<FDBLister> lock(this);
std::map<std::string, std::unordered_set<std::string>> values;

LOG_DEBUG_LIB(LibGribJump) << "Using FDB's (new) axes impl" << std::endl;

fdb5::IndexAxis ax = fdb_.axes(request);
fdb5::IndexAxis ax = fdb_.axes(request, level);
ax.sort();
std::map<std::string, eckit::DenseSet<std::string>> fdbValues = ax.map();

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Lister.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Lister {
public:

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) = 0; // <-- May not want to use mars request?
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) = 0 ;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) = 0 ;

void lock() { mutex_.lock(); locked_ = true; }
void unlock() { mutex_.unlock(); locked_ = false; }
Expand All @@ -55,8 +55,8 @@ class FDBLister : public Lister {
static FDBLister& instance();

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request);
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request, int level);

filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ std::vector<std::unique_ptr<ExtractionItem>> LocalGribJump::extract(const eckit:
}

/// @todo, change API, remove extraction request
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests) {

bool flatten = true;
Engine engine;
Expand Down Expand Up @@ -120,13 +120,13 @@ ResultsMap LocalGribJump::extract(const std::vector<MarsRequest>& requests, cons
return results;
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request, int level) {

// Note: This is likely to be removed from GribJump, and moved to FDB.
// Here for now to support polytope.

Engine engine;
return engine.axes(request);
return engine.axes(request, level);
}

static GribJumpBuilder<LocalGribJump> builder("local");
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/LocalGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class LocalGribJump : public GribJumpBase {

// old API
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) override;

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) override;
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) override;

private:
};
Expand Down
31 changes: 26 additions & 5 deletions src/gribjump/Metrics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ std::string iso(time_t t) {

namespace gribjump {

// --------------------------------------------------------------------------------------------------------------------------------
thread_local LogContext ContextManager::context_;

// --------------------------------------------------------------------------------------------------------------------------------

Metrics::Metrics() : created_(std::time(nullptr)) {}

Metrics::~Metrics() {}
Expand All @@ -47,7 +52,7 @@ void Metrics::report() {
for (const auto& [name, value] : values_) {
j << name << value;
}
j << "context" << context_;
j << "context" << ContextManager::instance().context();
j.endObject();

eckit::Log::metrics() << oss.str() << std::endl;
Expand All @@ -69,10 +74,6 @@ void MetricsManager::set(const std::string& name, const eckit::Value& value) {
metrics().add(name, value);
}

void MetricsManager::setContext(const LogContext& context) {
metrics().addContext(context);
}

Metrics& MetricsManager::metrics() {
static thread_local Metrics metrics;
return metrics;
Expand All @@ -82,4 +83,24 @@ void MetricsManager::report() {
metrics().report();
}


// --------------------------------------------------------------------------------------------------------------------------------
ContextManager::ContextManager() {
}

ContextManager& ContextManager::instance() {
static ContextManager instance;
return instance;
}

ContextManager::~ContextManager() {}

void ContextManager::set(const LogContext& context) {
context_ = context;
}

LogContext& ContextManager::context() {
return context_;
}

} // namespace gribjump
25 changes: 21 additions & 4 deletions src/gribjump/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ class Metrics {

void add(const std::string& name, const eckit::Value& value);

//
void addTime(const std::string& name);

void addContext(const LogContext& context) { context_ = context; }

void report();
Expand All @@ -94,7 +91,7 @@ class MetricsManager {
static MetricsManager& instance();

void set(const std::string& name, const eckit::Value& value);
void setContext(const LogContext& context);
// void setContext(const LogContext& context);
void report();

private:
Expand All @@ -105,4 +102,24 @@ class MetricsManager {

};

// --------------------------------------------------------------------------------------------------------------------------------

// Context also needs to be thread-local
class ContextManager {
public:
static ContextManager& instance();

LogContext& context();

void set(const LogContext& context);

private:

ContextManager();
~ContextManager();

private:
static thread_local LogContext context_;

};
} // namespace gribjump
10 changes: 7 additions & 3 deletions src/gribjump/gribjump_c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests,
if (ctx) {
logctx = LogContext(ctx);
}

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> results;
results = handle->extract(reqs, logctx);

Expand All @@ -269,12 +269,16 @@ int extract(gribjump_handle_t* handle, gribjump_extraction_request_t** requests,
}


int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj) {
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int depth, const char* ctx, gribjump_handle_t* gj) {
return wrapApiFunction([=] {
ASSERT(gj);
LogContext logctx;
if (ctx) {
logctx = LogContext(ctx);
}
std::string reqstr_str(reqstr);
std::map<std::string, std::unordered_set<std::string>> values;
values = gj->axes(reqstr_str);
values = gj->axes(reqstr_str, depth, logctx);
*axes = new gj_axes_t(values);
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/gribjump_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double**
int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks);
int gribjump_delete_result(gribjump_extraction_result_t* result);

int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int depth, const char* ctx, gribjump_handle_t* gj);
int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size);
int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size);
int gribjump_delete_axes(gj_axes_t* axes);
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/remote/GribJumpUser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void GribJumpUser::handle_client(eckit::Stream& s, eckit::Timer& timer) {
}

LogContext ctx(s);
MetricsManager::instance().setContext(ctx);
ContextManager::instance().set(ctx);

s >> i_requestType;
RequestType requestType = static_cast<RequestType>(i_requestType);
Expand Down
Loading

0 comments on commit 89e89eb

Please sign in to comment.