Skip to content

Commit 82a8e1e

Browse files
authored
extend mempool (#112)
* extend mempool * fix write perf issue * add autoincrease arg * minor change
1 parent 493cbdb commit 82a8e1e

File tree

7 files changed

+93
-15
lines changed

7 files changed

+93
-15
lines changed

infinistore/lib.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class ServerConfig:
8686
prealloc_size (int): The preallocation size. Defaults to 16.
8787
minimal_allocate_size (int): The minimal allocation size. Defaults to 64.
8888
num_stream (int): The number of streams. Defaults to 1.
89+
auto_increase (bool): indicate if infinistore will be automatically increased. 10GB each time. Default False.
8990
"""
9091

9192
def __init__(self, **kwargs):
@@ -99,6 +100,7 @@ def __init__(self, **kwargs):
99100
self.prealloc_size = kwargs.get("prealloc_size", 16)
100101
self.minimal_allocate_size = kwargs.get("minimal_allocate_size", 64)
101102
self.num_stream = kwargs.get("num_stream", 1)
103+
self.auto_increase = kwargs.get("auto_increase", False)
102104

103105
def __repr__(self):
104106
return (

infinistore/server.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ def check_p2p_access():
4444

4545
def parse_args():
4646
parser = argparse.ArgumentParser()
47+
parser.add_argument(
48+
"--auto-increase",
49+
required=False,
50+
action="store_true",
51+
help="increase allocated memory automatically, 10GB each time, default False",
52+
)
4753
parser.add_argument(
4854
"--host",
4955
required=False,
@@ -144,6 +150,7 @@ def main():
144150
link_type=args.link_type,
145151
minimal_allocate_size=args.minimal_allocate_size,
146152
num_stream=args.num_stream,
153+
auto_increase=args.auto_increase,
147154
)
148155
config.verify()
149156
# check_p2p_access()

src/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ typedef struct ServerConfig {
1919
std::string link_type;
2020
int minimal_allocate_size; // unit: KB
2121
int num_stream; // can only be 1,2,4, number of stream for each client
22+
bool auto_increase;
2223
} server_config_t;
2324

2425
typedef struct ClientConfig {

src/infinistore.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ uint8_t ib_port = -1;
4343
// local active_mtu attr, after exchanging with remote, we will use the min of the two for path.mtu
4444
ibv_mtu active_mtu;
4545

46+
// indicate if the MM extend is in flight
47+
bool extend_in_flight = false;
48+
// indicate the number of cudaIpcOpenMemHandle
49+
std::atomic<unsigned int> opened_ipc{0};
50+
4651
// PTR is shared by kv_map and inflight_rdma_kv_map
4752
class PTR : public IntrusivePtrTarget {
4853
public:
@@ -587,6 +592,7 @@ int Client::read_cache(const LocalMetaRequest *meta_req) {
587592

588593
cudaIpcMemHandle_t ipc_handle = *(cudaIpcMemHandle_t *)meta_req->ipc_handle()->data();
589594
CHECK_CUDA(cudaIpcOpenMemHandle(&d_ptr, ipc_handle, cudaIpcMemLazyEnablePeerAccess));
595+
opened_ipc++;
590596

591597
size_t block_size = meta_req->block_size();
592598
int idx = 0;
@@ -674,6 +680,7 @@ void Client::wait_for_ipc_close(std::shared_ptr<CudaTaskQueue> cuda_task_queue)
674680
CHECK_CUDA(cudaEventDestroy(task->event));
675681
CHECK_CUDA(cudaStreamDestroy(task->stream));
676682
CHECK_CUDA(cudaIpcCloseMemHandle(task->d_ptr));
683+
opened_ipc--;
677684
DEBUG("CUDA_TASK done");
678685

679686
if (task->type == CUDA_WRITE) {
@@ -690,6 +697,19 @@ void Client::wait_for_ipc_close(std::shared_ptr<CudaTaskQueue> cuda_task_queue)
690697
INFO("quit the waiting_for_ipc_close thread");
691698
}
692699

700+
void add_mempool(uv_work_t *req) {
701+
while (opened_ipc > 0) {
702+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
703+
}
704+
mm->add_mempool(pd);
705+
}
706+
707+
void add_mempool_completion(uv_work_t *req, int status) {
708+
extend_in_flight = false;
709+
mm->need_extend = false;
710+
delete req;
711+
}
712+
693713
int Client::write_cache(const LocalMetaRequest *meta_req) {
694714
INFO("do write_cache..., num of blocks: {}, stream num {}", meta_req->blocks()->size(),
695715
global_config.num_stream);
@@ -702,6 +722,7 @@ int Client::write_cache(const LocalMetaRequest *meta_req) {
702722
CHECK_CUDA(cudaSetDevice(meta_req->device()));
703723

704724
CHECK_CUDA(cudaIpcOpenMemHandle(&d_ptr, ipc_handle, cudaIpcMemLazyEnablePeerAccess));
725+
opened_ipc++;
705726

706727
int key_idx = 0;
707728
size_t block_size = meta_req->block_size();
@@ -743,6 +764,12 @@ int Client::write_cache(const LocalMetaRequest *meta_req) {
743764
task->ptrs.push_back(ptr);
744765
key_idx++;
745766
});
767+
if (global_config.auto_increase && mm->need_extend && !extend_in_flight) {
768+
INFO("Extend another mempool");
769+
uv_work_t *req = new uv_work_t();
770+
uv_queue_work(loop, req, add_mempool, add_mempool_completion);
771+
extend_in_flight = true;
772+
}
746773

747774
CHECK_CUDA(cudaEventRecord(event, cuda_stream));
748775

src/mempool.cpp

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ MemoryPool::MemoryPool(size_t pool_size, size_t block_size, struct ibv_pd* pd)
1616
block_size_(block_size),
1717
pd_(pd),
1818
mr_(nullptr),
19-
last_search_position_(0) {
19+
last_search_position_(0),
20+
allocated_blocks_(0) {
2021
// 计算总的内存块数量
2122
total_blocks_ = pool_size_ / block_size_;
2223
assert(pool_size % block_size == 0);
@@ -53,28 +54,32 @@ MemoryPool::~MemoryPool() {
5354
}
5455
}
5556

56-
bool MemoryPool::allocate(size_t size, size_t n, SimpleAllocationCallback callback) {
57+
int MemoryPool::allocate(size_t size, size_t n, SimpleAllocationCallback callback) {
5758
size_t required_blocks = (size + block_size_ - 1) / block_size_; // round up
59+
int num_allocated = 0;
5860

5961
if (required_blocks > total_blocks_) {
60-
return false;
62+
return 0;
6163
}
6264

63-
int num_allocated = 0;
6465
size_t bit_per_word = 64;
6566
size_t shift = 6;
6667

6768
for (size_t word_index = last_search_position_; word_index < bitmap_.size(); ++word_index) {
69+
if (num_allocated == n) {
70+
break;
71+
}
72+
6873
uint64_t word = bitmap_[word_index];
6974
if (word == 0xFFFFFFFFFFFFFFFFULL) {
7075
continue;
7176
}
72-
7377
for (size_t bit_index = __builtin_ctzll(~word); bit_index < bit_per_word; ++bit_index) {
7478
size_t start_block = (word_index << shift) + bit_index;
7579

7680
if (start_block + required_blocks > total_blocks_) {
77-
return false;
81+
allocated_blocks_ += num_allocated * required_blocks;
82+
return num_allocated;
7883
}
7984

8085
bool found = true;
@@ -98,13 +103,14 @@ bool MemoryPool::allocate(size_t size, size_t n, SimpleAllocationCallback callba
98103
callback(addr, mr_->lkey, mr_->rkey);
99104
last_search_position_ = word_index;
100105
if (++num_allocated == n) {
101-
return true;
106+
break;
102107
}
103108
}
104109
}
105110
}
106111

107-
return num_allocated == n;
112+
allocated_blocks_ += num_allocated * required_blocks;
113+
return num_allocated;
108114
}
109115

110116
void MemoryPool::deallocate(void* ptr, size_t size) {
@@ -142,18 +148,42 @@ void MemoryPool::deallocate(void* ptr, size_t size) {
142148
}
143149
}
144150

151+
void MM::add_mempool(struct ibv_pd* pd) {
152+
mempools_.push_back(new MemoryPool((size_t)EXTEND_POOL_SIZE, (size_t)EXTEND_BLOCK_SIZE, pd));
153+
}
154+
155+
void MM::add_mempool(size_t pool_size, size_t block_size, struct ibv_pd* pd) {
156+
mempools_.push_back(new MemoryPool(pool_size, block_size, pd));
157+
}
158+
145159
bool MM::allocate(size_t size, size_t n, AllocationCallback callback) {
146-
for (int i = 0; i < mempools_.size(); ++i) {
160+
bool allocated = false;
161+
int mempool_cnt = mempools_.size();
162+
for (int i = 0; i < mempool_cnt; ++i) {
147163
// create a new callback from the original callback
148164
auto simple_callback = [callback, i](void* ptr, uint32_t lkey, uint32_t rkey) {
149165
callback(ptr, lkey, rkey, i);
150166
};
151167

152-
if (mempools_[i]->allocate(size, n, simple_callback)) {
153-
return true;
168+
int num_allocated = mempools_[i]->allocate(size, n, simple_callback);
169+
n -= num_allocated;
170+
171+
auto total_blocks = mempools_[i]->get_total_blocks();
172+
auto allocated_blocks = mempools_[i]->get_allocated_blocks();
173+
DEBUG(
174+
"Mempool Count: {}, Pool idx: {}, Total blocks: {}, allocated blocks: {}, block usage: "
175+
"{}%",
176+
mempool_cnt, i, total_blocks, allocated_blocks, 100 * allocated_blocks / total_blocks);
177+
if (i == mempools_.size() - 1 &&
178+
(float)allocated_blocks / total_blocks > BLOCK_USAGE_RATIO) {
179+
need_extend = true;
180+
}
181+
if (n == 0) {
182+
allocated = true;
183+
break;
154184
}
155185
}
156-
return false;
186+
return allocated;
157187
}
158188

159189
void MM::deallocate(void* ptr, size_t size, int pool_idx) {

src/mempool.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
#include <functional>
1111
#include <vector>
1212

13+
#define BLOCK_USAGE_RATIO 0.5
14+
#define EXTEND_POOL_SIZE 10 << 30
15+
#define EXTEND_BLOCK_SIZE 64 << 10
16+
1317
using AllocationCallback =
1418
std::function<void(void* ptr, uint32_t lkey, uint32_t rkey, int pool_idx)>;
1519
using SimpleAllocationCallback = std::function<void(void* ptr, uint32_t lkey, uint32_t rkey)>;
@@ -23,21 +27,24 @@ class MemoryPool {
2327
/*
2428
@brief size should be aligned to block size
2529
*/
26-
bool allocate(size_t size, size_t n, SimpleAllocationCallback callback);
30+
int allocate(size_t size, size_t n, SimpleAllocationCallback callback);
2731
/*
2832
@brief size should be aligned to block size
2933
*/
3034
void deallocate(void* ptr, size_t size);
3135

3236
uint32_t get_lkey() const { return mr_->lkey; }
3337
uint32_t get_rkey() const { return mr_->rkey; }
38+
uint32_t get_total_blocks() const { return total_blocks_; }
39+
uint32_t get_allocated_blocks() const { return allocated_blocks_; }
3440

3541
private:
3642
void* pool_;
3743
size_t pool_size_;
3844
size_t block_size_;
3945
size_t total_blocks_;
4046
size_t last_search_position_;
47+
size_t allocated_blocks_;
4148

4249
// TODO: use judy libray to speed up the bitmap?
4350
std::vector<uint64_t> bitmap_;
@@ -52,9 +59,12 @@ class MM {
5259

5360
public:
5461
MM(size_t pool_size, size_t block_size, struct ibv_pd* pd) {
55-
mempools_.push_back(new MemoryPool(pool_size, block_size, pd));
62+
add_mempool(pool_size, block_size, pd);
5663
}
5764
MM(const MM& mm) = delete;
65+
bool need_extend = false;
66+
void add_mempool(struct ibv_pd* pd);
67+
void add_mempool(size_t pool_size, size_t block_size, struct ibv_pd* pd);
5868
bool allocate(size_t size, size_t n, AllocationCallback callback);
5969
void deallocate(void* ptr, size_t size, int pool_idx);
6070
uint32_t get_lkey(int pool_idx) const {

src/pybind.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,8 @@ PYBIND11_MODULE(_infinistore, m) {
120120
.def_readwrite("link_type", &ServerConfig::link_type)
121121
.def_readwrite("prealloc_size", &ServerConfig::prealloc_size)
122122
.def_readwrite("minimal_allocate_size", &ServerConfig::minimal_allocate_size)
123-
.def_readwrite("num_stream", &ServerConfig::num_stream);
123+
.def_readwrite("num_stream", &ServerConfig::num_stream)
124+
.def_readwrite("auto_increase", &ServerConfig::auto_increase);
124125
m.def("get_kvmap_len", &get_kvmap_len, "get kv map size");
125126
m.def("register_server", &register_server, "register the server");
126127

0 commit comments

Comments
 (0)