Skip to content

Commit

Permalink
Merge pull request #10 from ecmwf/feature/lru-cache
Browse files Browse the repository at this point in the history
Feature/lru cache
  • Loading branch information
ChrisspyB authored Sep 10, 2024
2 parents 151354d + be75c5d commit 7c5a3fe
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/gribjump/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ if( HAVE_GRIBJUMP_LOCAL_EXTRACT )
info/InfoAggregator.cc
info/InfoCache.cc
info/InfoCache.h
info/LRUCache.h
info/UnsupportedInfo.h
info/UnsupportedInfo.cc

Expand Down
71 changes: 34 additions & 37 deletions src/gribjump/info/InfoCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ InfoCache& InfoCache::instance() {
InfoCache::~InfoCache() {
}

InfoCache::InfoCache(): cacheDir_(eckit::PathName()) {
InfoCache::InfoCache():
cacheDir_(eckit::PathName()),
cache_(eckit::Resource<int>("gribjumpCacheSize", LibGribJump::instance().config().getInt("cache.size", 64))) {

const Config& config = LibGribJump::instance().config();

Expand Down Expand Up @@ -78,18 +80,21 @@ eckit::PathName InfoCache::cacheFilePath(const eckit::PathName& path) const {
return cacheDir_ / path.baseName() + file_ext;
}

FileCache& InfoCache::getFileCache(const eckit::PathName& path, bool load) {
std::shared_ptr<FileCache> InfoCache::getFileCache(const eckit::PathName& path, bool load) {
std::lock_guard<std::mutex> lock(mutex_);

const filename_t& f = path.baseName();
auto it = cache_.find(f);
if(it != cache_.end()) return *(it->second);


if (cache_.exists(f)) {
return cache_.get(f);
}

eckit::PathName cachePath = cacheFilePath(path);
LOG_DEBUG_LIB(LibGribJump) << "New InfoCache entry for file " << f << " at " << cachePath << std::endl;

std::unique_ptr<FileCache> filecache(new FileCache(cachePath, load));
cache_.insert(std::make_pair(f, std::move(filecache)));
return *cache_[f];
auto filecache = std::make_shared<FileCache>(cachePath, load);
cache_.put(f, filecache);
return filecache;
}

std::shared_ptr<JumpInfo> InfoCache::get(const eckit::URI& uri) {
Expand All @@ -102,12 +107,12 @@ std::shared_ptr<JumpInfo> InfoCache::get(const eckit::URI& uri) {

std::shared_ptr<JumpInfo> InfoCache::get(const eckit::PathName& path, const eckit::Offset offset) {

FileCache& filecache = getFileCache(path);
filecache.load();
std::shared_ptr<FileCache> filecache = getFileCache(path);
filecache->load();

// return it if in memory cache
{
std::shared_ptr<JumpInfo> info = filecache.find(offset);
std::shared_ptr<JumpInfo> info = filecache->find(offset);
if (info) return info;

LOG_DEBUG_LIB(LibGribJump) << "InfoCache file " << path << " does not contain JumpInfo for field at offset " << offset << std::endl;
Expand All @@ -119,20 +124,20 @@ std::shared_ptr<JumpInfo> InfoCache::get(const eckit::PathName& path, const ecki
InfoExtractor extractor;
std::shared_ptr<JumpInfo> info = extractor.extract(path, offset);

filecache.insert(offset, info);
filecache->insert(offset, info);

return info;
}

std::vector<std::shared_ptr<JumpInfo>> InfoCache::get(const eckit::PathName& path, const eckit::OffsetList& offsets) {

FileCache& filecache = getFileCache(path);
filecache.load();
std::shared_ptr<FileCache> filecache = getFileCache(path);
filecache->load();

std::vector<eckit::Offset> missingOffsets;

for (const auto& offset : offsets) {
if (!filecache.find(offset)) {
if (!filecache->find(offset)) {
missingOffsets.push_back(offset);
}
}
Expand All @@ -144,14 +149,14 @@ std::vector<std::shared_ptr<JumpInfo>> InfoCache::get(const eckit::PathName& pat

std::vector<std::unique_ptr<JumpInfo>> infos = extractor.extract(path, missingOffsets);
for (size_t i = 0; i < infos.size(); i++) {
filecache.insert(missingOffsets[i], std::move(infos[i]));
filecache->insert(missingOffsets[i], std::move(infos[i]));
}
}

std::vector<std::shared_ptr<JumpInfo>> result;

for (const auto& offset : offsets) {
std::shared_ptr<JumpInfo> info = filecache.find(offset);
std::shared_ptr<JumpInfo> info = filecache->find(offset);
ASSERT(info);
result.push_back(info);
}
Expand All @@ -167,17 +172,10 @@ std::vector<std::shared_ptr<JumpInfo>> InfoCache::get(const eckit::PathName& pat

void InfoCache::insert(const eckit::PathName& path, const eckit::Offset offset, std::shared_ptr<JumpInfo> info) {
LOG_DEBUG_LIB(LibGribJump) << "GribJumpCache inserting " << path << ":" << offset << std::endl;
FileCache& filecache = getFileCache(path, false);
filecache.insert(offset, info);
std::shared_ptr<FileCache> filecache = getFileCache(path, false);
filecache->insert(offset, info);
}


// void InfoCache::insert(const eckit::PathName& path, std::vector<std::shared_ptr<JumpInfo>> infos) {
// LOG_DEBUG_LIB(LibGribJump) << "GribJumpCache inserting " << path << "" << infos.size() << " fields" << std::endl;
// FileCache& filecache = getFileCache(path, false);
// filecache.insert(infos);
// }

void InfoCache::flush(bool append) {
std::lock_guard<std::mutex> lock(mutex_);
for (auto& [filename, filecache] : cache_) {
Expand All @@ -199,14 +197,14 @@ void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector<eckit::Of


// if cache exists load so we can merge with memory cache
FileCache& filecache = getFileCache(fdbpath);
filecache.load();
std::shared_ptr<FileCache> filecache = getFileCache(fdbpath);
filecache->load();

// Find which offsets are not already in file cache
std::vector<eckit::Offset> newOffsets;

for (const auto& offset : offsets) {
if(!filecache.find(offset)) {
if(!filecache->find(offset)) {
newOffsets.push_back(offset);
}
}
Expand All @@ -223,34 +221,33 @@ void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector<eckit::Of
// std::vector<std::shared_ptr<JumpInfo>> infos;
// infos.reserve(uinfos.size());
// std::move(uinfos.begin(), uinfos.end(), std::back_inserter(infos));
// filecache.insert(infos);
// filecache->insert(infos);
for (size_t i = 0; i < uinfos.size(); i++) {
filecache.insert(newOffsets[i], std::move(uinfos[i]));
filecache->insert(newOffsets[i], std::move(uinfos[i]));
}

if (persistentCache_) {
filecache.write();
filecache->write();
}

}

void InfoCache::scan(const eckit::PathName& fdbpath) {


LOG_DEBUG_LIB(LibGribJump) << "Scanning whole file " << fdbpath << std::endl;

// if cache exists load so we can merge with memory cache
FileCache& filecache = getFileCache(fdbpath);
filecache.load();
std::shared_ptr<FileCache> filecache = getFileCache(fdbpath);
filecache->load();

InfoExtractor extractor;
std::vector<std::pair<eckit::Offset, std::unique_ptr<JumpInfo>>> uinfos = extractor.extract(fdbpath); /* This needs to give use the offsets too*/
for (size_t i = 0; i < uinfos.size(); i++) {
filecache.insert(uinfos[i].first, std::move(uinfos[i].second));
filecache->insert(uinfos[i].first, std::move(uinfos[i].second));
}

if (persistentCache_) {
filecache.write();
filecache->write();
}

}
Expand Down
9 changes: 4 additions & 5 deletions src/gribjump/info/InfoCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include "eckit/filesystem/URI.h"
#include "eckit/serialisation/FileStream.h"


#include "gribjump/info/JumpInfo.h"
#include "gribjump/info/LRUCache.h"
#include "gribjump/LibGribJump.h"

namespace gribjump {
Expand All @@ -30,8 +30,8 @@ class InfoCache {

private: // types

using filename_t = std::string; //< key is fieldlocation's path basename
using cache_t = std::map<filename_t, std::unique_ptr<FileCache>>; //< map fieldlocation's to gribinfo
using filename_t = std::string; //< key is fieldlocation's path basename
using cache_t = LRUCache<filename_t, std::shared_ptr<FileCache>>; //< map fieldlocation's to gribinfo

public:

Expand Down Expand Up @@ -61,14 +61,13 @@ class InfoCache {

void print(std::ostream& s) const;

FileCache& getFileCache(const eckit::PathName& f, bool load=true);

private: // methods

InfoCache();

~InfoCache();

std::shared_ptr<FileCache> getFileCache(const eckit::PathName& f, bool load=true);

eckit::PathName cacheFilePath(const eckit::PathName& path) const;

Expand Down
75 changes: 75 additions & 0 deletions src/gribjump/info/LRUCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* (C) Copyright 2024- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation nor
* does it submit to any jurisdiction.
*/

/// @author Christopher Bradley

#pragma once

#include <list>
#include <unordered_map>
#include "eckit/exception/Exceptions.h"

namespace gribjump {

// Note: not a thread safe container, use an external lock if needed
template <typename K, typename V>
class LRUCache {
public:
LRUCache(size_t capacity) : capacity_(capacity) {}

void put(const K& key, const V& value) {
if (map_.find(key) == map_.end()) {
if (list_.size() == capacity_) {
auto last = list_.back();
list_.pop_back();
map_.erase(last);
}
list_.push_front(key);
} else {
list_.remove(key);
list_.push_front(key);
}
map_[key] = value;
}


V& get(const K& key) {
if (map_.find(key) == map_.end()) {
throw eckit::BadValue("Key does not exist");
}
list_.remove(key);
list_.push_front(key);
return map_[key];
}

bool exists(const K& key) {
return map_.find(key) != map_.end();
}

typename std::unordered_map<K, V>::const_iterator begin() const {
return map_.begin();
}

typename std::unordered_map<K, V>::const_iterator end() const {
return map_.end();
}

void clear() {
list_.clear();
map_.clear();
}

private:
size_t capacity_;
std::list<K> list_;
std::unordered_map<K, V> map_;
};

} // namespace gribjump
9 changes: 9 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ ecbuild_add_test(
LIBS gribjump
)

ecbuild_add_test(
TARGET "gribjump_test_misc_units"
SOURCES "test_misc_units.cc"
INCLUDES "${ECKIT_INCLUDE_DIRS}"
ENVIRONMENT "${gribjump_env}"
NO_AS_NEEDED
LIBS gribjump
)

if (ENABLE_FDB_BUILD_TOOLS)
add_subdirectory(tools)
endif()
68 changes: 68 additions & 0 deletions tests/test_misc_units.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* (C) Copyright 2024- ECMWF.
*
* This software is licensed under the terms of the Apache Licence Version 2.0
* which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
* In applying this licence, ECMWF does not waive the privileges and immunities
* granted to it by virtue of its status as an intergovernmental organisation
* nor does it submit to any jurisdiction.
*/

#include <cmath>
#include <fstream>

#include "eckit/testing/Test.h"
#include "gribjump/info/LRUCache.h"


#include "metkit/mars/MarsParser.h"
#include "metkit/mars/MarsExpension.h"
using namespace eckit::testing;

namespace gribjump {
namespace test {

// Miscellanous unit tests
//-----------------------------------------------------------------------------
CASE( "test_lru" ){

LRUCache<std::string, int> cache(3);

// Check basic functionality
cache.put("a", 1);
cache.put("b", 2);
cache.put("c", 3);

EXPECT(cache.get("a") == 1);
EXPECT(cache.get("b") == 2);
EXPECT(cache.get("c") == 3);

cache.put("d", 4);

EXPECT_THROWS_AS(cache.get("a"), eckit::BadValue);
EXPECT(cache.get("d") == 4);

// Check recency is updated with get
cache.put("x", 1);
cache.put("y", 2);
cache.put("z", 3);

EXPECT(cache.get("z") == 3);
EXPECT(cache.get("y") == 2);
EXPECT(cache.get("x") == 1);

// z should now be the least recently used
cache.put("w", 1);

EXPECT_THROWS_AS(cache.get("z"), eckit::BadValue);
}
//-----------------------------------------------------------------------------

} // namespace test
} // namespace gribjump


int main(int argc, char **argv)
{
return run_tests ( argc, argv );
}

0 comments on commit 7c5a3fe

Please sign in to comment.