Skip to content

Commit

Permalink
Merge pull request #29 from ecmwf/feature/optimise
Browse files Browse the repository at this point in the history
Feature/optimise
  • Loading branch information
ChrisspyB authored Nov 13, 2024
2 parents 98beec3 + 14e3372 commit da1ba0a
Show file tree
Hide file tree
Showing 30 changed files with 261 additions and 275 deletions.
2 changes: 1 addition & 1 deletion pygribjump/src/pygribjump/pygribjump.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ class ExtractionRequest:
The ranges to extract.
"""
def __init__(self, req, ranges, gridHash=None):
reqstr = "retrieve,"+dic_to_request(req)
reqstr = dic_to_request(req)
rangestr = list_to_rangestr(ranges)
request = ffi.new('gribjump_extraction_request_t**')
c_reqstr = ffi.new("char[]", reqstr.encode())
Expand Down
197 changes: 73 additions & 124 deletions src/gribjump/Engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
/// @author Christopher Bradley

#include "eckit/log/Plural.h"
#include "eckit/utils/StringTools.h"

#include "metkit/mars/MarsExpension.h"
#include "metkit/mars/MarsParser.h"

#include "gribjump/Engine.h"
#include "gribjump/ExtractionItem.h"
Expand All @@ -26,95 +28,8 @@ namespace gribjump {
//----------------------------------------------------------------------------------------------------------------------

// Stringify requests and keys alphabetically
namespace
{
std::string requestToStr(const metkit::mars::MarsRequest& request) {
std::stringstream ss;
std::string separator = "";
std::vector<std::string> keys = request.params();
std::sort(keys.begin(), keys.end());
for(const auto& key : keys) {
ss << separator << key << "=" << request[key];
separator = ",";
}
return ss.str();
}

//----------------------------------------------------------------------------------------------------------------------


class CollectFlattenedRequests : public metkit::mars::FlattenCallback {
public:
CollectFlattenedRequests(std::vector<metkit::mars::MarsRequest>& flattenedRequests) : flattenedRequests_(flattenedRequests) {}

virtual void operator()(const metkit::mars::MarsRequest& req) {
flattenedRequests_.push_back(req);
}

std::vector<metkit::mars::MarsRequest>& flattenedRequests_;
};

std::vector<metkit::mars::MarsRequest> flattenRequest(const metkit::mars::MarsRequest& request) {

metkit::mars::MarsExpension expansion(false);
metkit::mars::DummyContext ctx;
std::vector<metkit::mars::MarsRequest> flattenedRequests;

CollectFlattenedRequests cb(flattenedRequests);
expansion.flatten(ctx, request, cb);

LOG_DEBUG_LIB(LibGribJump) << "Base request: " << request << std::endl;

for (const auto& req : flattenedRequests) {
LOG_DEBUG_LIB(LibGribJump) << " Flattened request: " << req << std::endl;
}

return flattenedRequests;
}

// Stringify requests, and flatten if necessary

typedef std::map<metkit::mars::MarsRequest, std::vector<std::string>> flattenedKeys_t;

flattenedKeys_t buildFlatKeys(const ExtractionRequests& requests, bool flatten) {

flattenedKeys_t keymap;

for (const auto& req : requests) {
const metkit::mars::MarsRequest& baseRequest = req.request();
keymap[baseRequest] = std::vector<std::string>();

// Assume baseRequest has cardinality >= 1 and may need to be flattened
if (flatten) {
std::vector<metkit::mars::MarsRequest> flat = flattenRequest(baseRequest);
for (const auto& r : flat) {
keymap[baseRequest].push_back(requestToStr(r));
}
}

// Assume baseRequest has cardinality 1
else {
keymap[baseRequest].push_back(requestToStr(baseRequest));
}

eckit::Log::debug<LibGribJump>() << "Flattened keys for request " << baseRequest << ": " << keymap[baseRequest] << std::endl;
}

return keymap;
}

metkit::mars::MarsRequest unionRequest(const MarsRequests& requests) {

/// @todo: we should do some check not to merge on keys like class and stream
metkit::mars::MarsRequest unionRequest = requests.front();
for(size_t i = 1; i < requests.size(); ++i) {
unionRequest.merge(requests[i]);
}

eckit::Log::info() << "Gribjump: Union request is " << unionRequest << std::endl;

return unionRequest;
}
namespace {
// ----------------------------------------------------------------------------------------------------------------------

bool isRemote(eckit::URI uri) {
return uri.scheme() == "fdb";
Expand All @@ -127,43 +42,73 @@ Engine::Engine() {}

Engine::~Engine() {}

ExItemMap Engine::buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten){
ExItemMap keyToExtractionItem;

flattenedKeys_t flatKeys = buildFlatKeys(requests, flatten); // Map from base request to {flattened keys}

LOG_DEBUG_LIB(LibGribJump) << "Built flat keys" << std::endl;
metkit::mars::MarsRequest Engine::buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem ){
// Split strings into one unified map
// We also canonicalise the requests such that their keys are in alphabetical order
/// @todo: Note that it is not in general possible to arbitrary requests into a single request. In future, we should look into
/// merging into the minimum number of requests.

std::map<std::string, std::set<std::string>> keyValues;
for (auto& r : requests) {
const std::string& s = r.requestString();
std::vector<std::string> kvs = eckit::StringTools::split(",", s); /// @todo might be faster to use tokenizer directly.
for (auto& kv : kvs) {
std::vector<std::string> kv_s = eckit::StringTools::split("=", kv);
if (kv_s.size() != 2) continue; // ignore verb
keyValues[kv_s[0]].insert(kv_s[1]);
}

// Create the 1-to-1 map
for (size_t i = 0; i < requests.size(); i++) {
const metkit::mars::MarsRequest& basereq = requests[i].request();
const std::vector<std::string> keys = flatKeys[basereq];
for (const auto& key : keys) {
ASSERT(keyToExtractionItem.find(key) == keyToExtractionItem.end()); /// @todo support duplicated requests?
auto extractionItem = std::make_unique<ExtractionItem>(basereq, requests[i].ranges());
extractionItem->gridHash(requests[i].gridHash());
keyToExtractionItem.emplace(key, std::move(extractionItem)); // 1-to-1-map
// Canonicalise string by sorting keys
std::sort(kvs.begin(), kvs.end());
std::string canonicalised = "";
for (auto& kv : kvs) {
canonicalised += kv;
if (kv != kvs.back()) {
canonicalised += ",";
}
}
ASSERT(keyToExtractionItem.find(canonicalised) == keyToExtractionItem.end()); // no repeats
r.requestString(canonicalised);
auto extractionItem = std::make_unique<ExtractionItem>(canonicalised, r.ranges());
extractionItem->gridHash(r.gridHash());
keyToExtractionItem.emplace(canonicalised, std::move(extractionItem)); // 1-to-1-map
}

return keyToExtractionItem;
}

filemap_t Engine::buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
eckit::Timer timer("Gribjump Engine: Building file map");

std::vector<metkit::mars::MarsRequest> marsrequests;
for (const auto& req : requests) {
marsrequests.push_back(req.request());
// Construct the union request

std::string result = "retrieve,";
size_t i = 0;
for (auto& [key, values] : keyValues) {
result += key + "=";
if (values.size() == 1) {
result += *values.begin();
} else {
size_t j = 0;
for (auto& value : values) {
result += value;
if (j != values.size() - 1) {
result += "/";
}
j++;
}
}
if (i != keyValues.size() - 1) {
result += ",";
}
i++;
}

const metkit::mars::MarsRequest req = unionRequest(marsrequests);
MetricsManager::instance().set("union_request", req.asString());
timer.reset("Gribjump Engine: Flattened requests and constructed union request");
std::istringstream istream(result);
metkit::mars::MarsParser parser(istream);
std::vector<metkit::mars::MarsParsedRequest> unionRequests = parser.parse();
ASSERT(unionRequests.size() == 1);

filemap_t filemap = FDBLister::instance().fileMap(req, keyToExtractionItem);
return unionRequests[0];
}

filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem) {
// Map files to ExtractionItem
filemap_t filemap = FDBLister::instance().fileMap(unionrequest, keyToExtractionItem);
return filemap;
}

Expand All @@ -174,10 +119,11 @@ void Engine::forwardRemoteExtraction(filemap_t& filemap) {
const std::map<std::string, std::string>& servermap_str = LibGribJump::instance().config().serverMap();
ASSERT(!servermap_str.empty());

for (auto& [fdb, gj] : servermap_str) {
LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl;
if (LibGribJump::instance().debug()) {
for (auto& [fdb, gj] : servermap_str) {
LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl;
}
}

std::unordered_map<eckit::net::Endpoint, eckit::net::Endpoint> servermap;
for (auto& [fdb, gj] : servermap_str) {
eckit::net::Endpoint fdbEndpoint(fdb);
Expand Down Expand Up @@ -247,11 +193,14 @@ void Engine::scheduleTasks(filemap_t& filemap){
taskGroup_.waitForTasks();
}

ResultsMap Engine::extract(const ExtractionRequests& requests, bool flatten) {
ResultsMap Engine::extract(ExtractionRequests& requests) {

eckit::Timer timer("Engine::extract");
ExItemMap keyToExtractionItem = buildKeyToExtractionItem(requests, flatten); // Owns the ExtractionItems
filemap_t filemap = buildFileMap(requests, keyToExtractionItem);

ExItemMap keyToExtractionItem;
metkit::mars::MarsRequest unionreq = buildRequestMap(requests, keyToExtractionItem);

filemap_t filemap = buildFileMap(unionreq, keyToExtractionItem);
MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed());
timer.reset("Gribjump Engine: Built file map");

Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Engine {
Engine();
~Engine();

ResultsMap extract(const ExtractionRequests& requests, bool flattenRequests = false);
ResultsMap extract(ExtractionRequests& requests);

// byfiles: scan entire file, not just fields matching request
size_t scan(const MarsRequests& requests, bool byfiles = false);
Expand All @@ -43,10 +43,10 @@ class Engine {

private:

filemap_t buildFileMap(const ExtractionRequests& requests, ExItemMap& keyToExtractionItem);
ExItemMap buildKeyToExtractionItem(const ExtractionRequests& requests, bool flatten);
filemap_t buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem);
ResultsMap collectResults(ExItemMap& keyToExtractionItem);
void forwardRemoteExtraction(filemap_t& filemap);
metkit::mars::MarsRequest buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem );

private:

Expand Down
31 changes: 3 additions & 28 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ std::vector<double> decodeVector(eckit::Stream& s) {
return std::vector<double>(data, data + size);
}

// todo: encodeVectorVector ?

} // namespace

ExtractionResult::ExtractionResult() {}
Expand Down Expand Up @@ -120,15 +118,16 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionResult& o) {

//---------------------------------------------------------------------------------------------------------------------

ExtractionRequest::ExtractionRequest(const metkit::mars::MarsRequest& request, const std::vector<Range>& ranges, std::string gridHash):
ExtractionRequest::ExtractionRequest(const std::string& request, const std::vector<Range>& ranges, std::string gridHash):
ranges_(ranges),
request_(request),
gridHash_(gridHash)
{}

ExtractionRequest::ExtractionRequest() {}

ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
request_ = metkit::mars::MarsRequest(s);
s >> request_;
s >> gridHash_;
size_t numRanges;
s >> numRanges;
Expand All @@ -139,30 +138,6 @@ ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
}
}

std::vector<ExtractionRequest> ExtractionRequest::split(const std::string& key) const {

std::vector<metkit::mars::MarsRequest> reqs = request_.split(key);

std::vector<ExtractionRequest> requests;
requests.reserve(reqs.size());
for (auto& r : reqs) {
requests.push_back(ExtractionRequest(r, ranges_));
}
return requests;
}

std::vector<ExtractionRequest> ExtractionRequest::split(const std::vector<std::string>& keys) const {

std::vector<metkit::mars::MarsRequest> reqs = request_.split(keys);

std::vector<ExtractionRequest> requests;
requests.reserve(reqs.size());
for (auto& r : reqs) {
requests.push_back(ExtractionRequest(r, ranges_));
}
return requests;
}

eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
o.encode(s);
return s;
Expand Down
12 changes: 4 additions & 8 deletions src/gribjump/ExtractionData.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ namespace gribjump {

//----------------------------------------------------------------------------------------------------------------------


/// @todo This class is now redundant thanks to ExtractionItem.

class ExtractionResult {
public: // methods

Expand Down Expand Up @@ -77,13 +74,12 @@ class ExtractionRequest {
public: // methods

ExtractionRequest();
ExtractionRequest(const metkit::mars::MarsRequest&, const std::vector<Range>&, std::string gridHash="");
ExtractionRequest(const std::string&, const std::vector<Range>&, std::string gridHash="");
explicit ExtractionRequest(eckit::Stream& s);

std::vector<ExtractionRequest> split(const std::vector<std::string>& keys) const;
std::vector<ExtractionRequest> split(const std::string& key) const;
const std::vector<Range>& ranges() const {return ranges_;}
const metkit::mars::MarsRequest& request() const {return request_;}
const std::string& requestString() const {return request_;}
void requestString(const std::string& s) {request_ = s;}
const std::string& gridHash() const {return gridHash_;}

private: // methods
Expand All @@ -94,7 +90,7 @@ class ExtractionRequest {

private: // members
std::vector<Range> ranges_;
metkit::mars::MarsRequest request_;
std::string request_;
std::string gridHash_;
};

Expand Down
Loading

0 comments on commit da1ba0a

Please sign in to comment.