Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread local metrics overhaul #26

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pygribjump/src/pygribjump/gribjump_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ int gribjump_result_values(gribjump_extraction_result_t* result, double*** value
int gribjump_result_values_nocopy(gribjump_extraction_result_t* result, double*** values, unsigned long* nrange, unsigned long** nvalues);
int gribjump_result_mask(gribjump_extraction_result_t* result, unsigned long long*** masks, unsigned long* nrange, unsigned long** nmasks);
int gribjump_delete_result(gribjump_extraction_result_t* result);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, gribjump_handle_t* gj);
int gribjump_new_axes(gj_axes_t** axes, const char* reqstr, int* level, const char* ctx, gribjump_handle_t* gj);
int gribjump_axes_keys(gj_axes_t* axes, const char*** keys_out, unsigned long* size);
int gribjump_axes_values(gj_axes_t* axes, const char* key, const char*** values_out, unsigned long* size);
int gribjump_delete_axes(gj_axes_t* axes);
Expand Down
14 changes: 8 additions & 6 deletions pygribjump/src/pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ def extract(self, polyrequest, ctx=None, dump=True):
nfields = ffi.new('unsigned long**')
nrequests = len(requests)
c_requests = ffi.new('gribjump_extraction_request_t*[]', [r.ctype for r in requests])
if (ctx):
logctx=str(ctx)
else:
logctx=""
logctx=str(ctx) if ctx else "pygribjump_extract"

logctx_c = ffi.new('const char[]', logctx.encode('ascii'))
lib.extract(self.__gribjump, c_requests, nrequests, results_array, nfields, logctx_c)
Expand Down Expand Up @@ -219,12 +216,17 @@ def extract_single(self, request):
]
return res

def axes(self, req):

def axes(self, req, level=3, ctx=None):
# note old axes used a dict in. This is now a string.
logctx=str(ctx) if ctx else "pygribjump_axes"
ctx_c = ffi.new('const char[]', logctx.encode('ascii'))
requeststr = dic_to_request(req)
newaxes = ffi.new('gj_axes_t**')
reqstr = ffi.new('const char[]', requeststr.encode('ascii'))
lib.gribjump_new_axes(newaxes, reqstr, self.__gribjump)
level_c = ffi.new('int*', level)
lib.gribjump_new_axes(newaxes, reqstr, level_c, ctx_c, self.__gribjump)

# TODO want to return a dict like:
# {key: [value1, value2, ...], ...}
# each key and value is a string
Expand Down
13 changes: 13 additions & 0 deletions pygribjump/tests/test_pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,16 @@ def test_extract_simple_sunshine_case(read_only_fdb_setup) -> None:

actual = grib_jump.extract(polyrequest)
assert numpy.array_equal(expected, actual[0][0][0][0], equal_nan=True)

def test_axes(read_only_fdb_setup) -> None:
gribjump = pygj.GribJump()
req = {
"date": "20230508",
}
ax1 = gribjump.axes(req, level=1) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'stream': ['oper'], 'time': ['1200']}
ax2 = gribjump.axes(req, level=2) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levtype': ['sfc'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']}
ax3 = gribjump.axes(req, level=3) # {'class': ['od'], 'date': ['20230508'], 'domain': ['g'], 'expver': ['0001'], 'levelist': [''], 'levtype': ['sfc'], 'param': ['151130'], 'step': ['1'], 'stream': ['oper'], 'time': ['1200'], 'type': ['fc']}

assert len(ax1.keys()) == 6
assert len(ax2.keys()) == 8
assert len(ax3.keys()) == 11
1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ list( APPEND gribjump_srcs
ExtractionData.cc
ExtractionData.h
Metrics.h
Metrics.cc
Types.h
)

Expand Down
17 changes: 10 additions & 7 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& ke
}

const metkit::mars::MarsRequest req = unionRequest(marsrequests);
MetricsManager::instance().set("union_request", req.asString());
timer.reset("Gribjump Engine: Flattened requests and constructed union request");

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
Expand Down Expand Up @@ -248,14 +249,19 @@ void Engine::scheduleTasks(filemap_t& filemap){

ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {

eckit::Timer timer("Engine::extract");
ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);
eckit::Timer timer("Engine::extract");
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

scheduleTasks(filemap);
MetricsManager::instance().set("elapsed_tasks", timer.elapsed());
timer.reset("Gribjump Engine: All tasks finished");

ResultsMap results = collectResults(keyToExtractionItem);
MetricsManager::instance().set("elapsed_collect_results", timer.elapsed());

timer.reset("Gribjump Engine: Repackaged results");

return results;
Expand Down Expand Up @@ -309,8 +315,9 @@ size_t Engine::scan(std::vector<eckit::PathName> files) {
return files.size();
}

std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request) {
return FDBLister::instance().axes(request);
std::map<std::string, std::unordered_set<std::string> > Engine::axes(const std::string& request, int level) {
MetricsManager::instance().set("request", request);
return FDBLister::instance().axes(request, level);
}

void Engine::reportErrors(eckit::Stream& client) {
Expand All @@ -320,10 +327,6 @@ void Engine::reportErrors(eckit::Stream& client) {
void Engine::raiseErrors() {
taskGroup_.raiseErrors();
}
void Engine::updateMetrics(Metrics& metrics) {
metrics.nTasks = taskGroup_.nTasks();
metrics.nFailedTasks = taskGroup_.nErrors();
}
//----------------------------------------------------------------------------------------------------------------------

} // namespace gribjump
3 changes: 1 addition & 2 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ class Engine {
size_t scan(const MarsRequests& requests, bool byfiles = false);
size_t scan(std::vector<eckit::PathName> files);

std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level=3);

void scheduleTasks(filemap_t& filemap);
void updateMetrics(Metrics& metrics);

void reportErrors(eckit::Stream& client_);
void raiseErrors();
Expand Down
19 changes: 12 additions & 7 deletions src/gribjump/GribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ GribJump::GribJump() {
GribJump::~GribJump() {
}

size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
size_t GribJump::scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (paths.empty()) {
throw eckit::UserError("Paths must not be empty", Here());
Expand All @@ -37,7 +38,8 @@ size_t GribJump::scan(const std::vector<eckit::PathName>& paths) {
return ret;
}

size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) {
size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
Expand All @@ -48,17 +50,19 @@ size_t GribJump::scan(const std::vector<metkit::mars::MarsRequest> requests, boo
}


std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> GribJump::extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (requests.empty()) {
throw eckit::UserError("Requests must not be empty", Here());
}

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests, ctx); // ... why is this still using raw pointers? // why are we not using extraction items?
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> out = impl_->extract(requests); // why are we not using extraction items?
return out;
}

std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) {
std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (path.asString().empty()) {
throw eckit::UserError("Path must not be empty", Here());
Expand All @@ -74,13 +78,14 @@ std::vector<std::unique_ptr<ExtractionItem>> GribJump::extract(const eckit::Path
return out;
}

std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> GribJump::axes(const std::string& request, int level, const LogContext& ctx) {
ContextManager::instance().set(ctx);

if (request.empty()) {
throw eckit::UserError("Request string must not be empty", Here());
}

auto out = impl_->axes(request);
auto out = impl_->axes(request, level);
return out;
}

Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/GribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class GribJump {

~GribJump();

size_t scan(const std::vector<eckit::PathName>& paths);
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false);
size_t scan(const std::vector<eckit::PathName>& paths, const LogContext& ctx=LogContext("none"));
size_t scan(std::vector<metkit::mars::MarsRequest> requests, bool byfiles = false, const LogContext& ctx=LogContext("none"));

std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, LogContext ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges);
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(const std::vector<ExtractionRequest>& requests, const LogContext& ctx=LogContext("none"));
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges, const LogContext& ctx=LogContext("none"));

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request);
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level=3, const LogContext& ctx=LogContext("none"));

void stats();

Expand Down
5 changes: 2 additions & 3 deletions src/gribjump/GribJumpBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@ class GribJumpBase : public eckit::NonCopyable {

virtual size_t scan(const std::vector<metkit::mars::MarsRequest> requests, bool byfiles) = 0;

virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) = 0;
virtual std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) = 0;
virtual std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) = 0;

virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) = 0;
virtual std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) = 0;

virtual void stats();

protected: // members

Stats stats_;
LogContext ctx_;
};

} // namespace gribjump
8 changes: 4 additions & 4 deletions src/gribjump/Lister.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,20 @@ std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(std::vect
return files;
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const std::string& request, int level) {
std::vector<fdb5::FDBToolRequest> requests = fdb5::FDBToolRequest::requestsFromString(request, std::vector<std::string>(), true);
ASSERT(requests.size() == 1); // i.e. assume string is a single request.

return axes(requests.front());
return axes(requests.front(), level);
}

std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request) {
std::map<std::string, std::unordered_set<std::string> > FDBLister::axes(const fdb5::FDBToolRequest& request, int level) {
eckit::AutoLock<FDBLister> lock(this);
std::map<std::string, std::unordered_set<std::string>> values;

LOG_DEBUG_LIB(LibGribJump) << "Using FDB's (new) axes impl" << std::endl;

fdb5::IndexAxis ax = fdb_.axes(request);
fdb5::IndexAxis ax = fdb_.axes(request, level);
ax.sort();
std::map<std::string, eckit::DenseSet<std::string>> fdbValues = ax.map();

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Lister.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Lister {
public:

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) = 0; // <-- May not want to use mars request?
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) = 0 ;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) = 0 ;

void lock() { mutex_.lock(); locked_ = true; }
void unlock() { mutex_.unlock(); locked_ = false; }
Expand All @@ -55,8 +55,8 @@ class FDBLister : public Lister {
static FDBLister& instance();

virtual std::vector<eckit::URI> list(const std::vector<metkit::mars::MarsRequest> requests) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request);
virtual std::map<std::string, std::unordered_set<std::string> > axes(const std::string& request, int level) override;
virtual std::map<std::string, std::unordered_set<std::string> > axes(const fdb5::FDBToolRequest& request, int level);

filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/LocalGribJump.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ std::vector<std::unique_ptr<ExtractionItem>> LocalGribJump::extract(const eckit:
}

/// @todo, change API, remove extraction request
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests, LogContext ctx) {
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> LocalGribJump::extract(ExtractionRequests requests) {

bool flatten = true;
Engine engine;
Expand Down Expand Up @@ -120,13 +120,13 @@ ResultsMap LocalGribJump::extract(const std::vector<MarsRequest>& requests, cons
return results;
}

std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request) {
std::map<std::string, std::unordered_set<std::string>> LocalGribJump::axes(const std::string& request, int level) {

// Note: This is likely to be removed from GribJump, and moved to FDB.
// Here for now to support polytope.

Engine engine;
return engine.axes(request);
return engine.axes(request, level);
}

static GribJumpBuilder<LocalGribJump> builder("local");
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/LocalGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class LocalGribJump : public GribJumpBase {

// old API
std::vector<std::unique_ptr<ExtractionItem>> extract(const eckit::PathName& path, const std::vector<eckit::Offset>& offsets, const std::vector<std::vector<Range>>& ranges) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>, LogContext ctx=LogContext("none")) override;
std::vector<std::vector<std::unique_ptr<ExtractionResult>>> extract(std::vector<ExtractionRequest>) override;

std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request) override;
std::map<std::string, std::unordered_set<std::string>> axes(const std::string& request, int level) override;

private:
};
Expand Down
Loading
Loading