Skip to content

Commit 51ed242

Browse files
authored
[backend] implement pthread backend (#46)
* [backend] init pthread pool * [pthread] make it compilable * [pthread] pass callback f ptr * [pthread] sync api with other backends * [pthread] fetch environ * [pthread] implement sync * [pthread] fix * [pthread] debug hang * [pthread] use c++ 3rd lib thread pool * [pthread] pass test * [chore] remove unintended commit * [chore] refine * [chore] license
1 parent f165590 commit 51ed242

File tree

8 files changed

+1351
-6
lines changed

8 files changed

+1351
-6
lines changed

csrc/backend.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
#ifndef DISABLE_AIO
1212
#include "aio.h"
1313
#endif
14+
#ifndef DISABLE_PTHREAD
15+
#include "pthread_backend.h"
16+
#endif
1417

1518
std::unordered_set<std::string> get_backends()
1619
{
@@ -20,6 +23,9 @@ std::unordered_set<std::string> get_backends()
2023
#endif
2124
#ifndef DISABLE_AIO
2225
backends.insert("aio");
26+
#endif
27+
#ifndef DISABLE_PTHREAD
28+
backends.insert("pthread");
2329
#endif
2430
return backends;
2531
}
@@ -35,18 +41,27 @@ void probe_asyncio(const std::string &backend)
3541
try
3642
{
3743
std::unique_ptr<AsyncIO> aio;
38-
if (backend == "uring")
44+
if (backend == "uring") {
3945
#ifndef DISABLE_URING
4046
aio.reset(new UringAsyncIO(2));
4147
#else
42-
throw std::runtime_error("backend is not installed\n");
48+
throw std::runtime_error("backend uring is not installed\n");
4349
#endif
44-
else
50+
} else if (backend == "aio") {
4551
#ifndef DISABLE_AIO
4652
aio.reset(new AIOAsyncIO(2));
4753
#else
48-
throw std::runtime_error("backend is not installed\n");
54+
throw std::runtime_error("backend aio is not installed\n");
55+
#endif
56+
} else if (backend == "pthread") {
57+
#ifndef DISABLE_PTHREAD
58+
aio.reset(new PthreadAsyncIO(2));
59+
#else
60+
throw std::runtime_error("backend pthread is not installed\n");
4961
#endif
62+
} else {
63+
throw std::runtime_error("unknown backend");
64+
}
5065

5166
int fd = fileno(fp);
5267
const int n_loop = 5, n_len = 18;
@@ -120,6 +135,10 @@ AsyncIO *create_asyncio(unsigned int n_entries, const std::string &backend)
120135
#ifndef DISABLE_AIO
121136
if (backend == "aio")
122137
return new AIOAsyncIO(n_entries);
138+
#endif
139+
#ifndef DISABLE_PTHREAD
140+
if (backend == "pthread")
141+
return new PthreadAsyncIO(n_entries);
123142
#endif
124143
throw std::runtime_error("Unsupported backend: " + backend);
125144
}

csrc/pthread_backend.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#include "pthread_backend.h"
2+
3+
void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) {
4+
auto fut = this->pool.submit_task(
5+
[fd, buffer, n_bytes, offset] {
6+
return pwrite(fd, buffer, n_bytes, offset);
7+
}
8+
);
9+
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
10+
}
11+
12+
void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) {
13+
auto fut = this->pool.submit_task(
14+
[fd, iov, iovcnt, offset] {
15+
return pwritev(fd, iov, iovcnt, offset);
16+
}
17+
);
18+
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
19+
}
20+
21+
void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) {
22+
auto fut = this->pool.submit_task(
23+
[fd, buffer, n_bytes, offset] {
24+
return pread(fd, buffer, n_bytes, offset);
25+
}
26+
);
27+
this->read_fut.push_back(std::make_tuple(std::move(fut), callback));
28+
}
29+
30+
void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) {
31+
auto fut = this->pool.submit_task(
32+
[fd, iov, iovcnt, offset] {
33+
return preadv(fd, iov, iovcnt, offset);
34+
}
35+
);
36+
this->read_fut.push_back(std::make_tuple(std::move(fut), callback));
37+
}
38+
39+
void PthreadAsyncIO::get_event(WaitType wt) {
40+
if (wt == NOWAIT) return;
41+
this->sync_write_events();
42+
this->sync_read_events();
43+
}
44+
45+
void PthreadAsyncIO::sync_write_events() {
46+
while (this->write_fut.size() > 0) {
47+
auto front = std::move(this->write_fut.front());
48+
this->write_fut.pop_front();
49+
50+
auto fut(std::move(std::get<0>(front)));
51+
fut.wait();
52+
53+
auto callback = std::get<1>(front);
54+
if (callback != nullptr) {
55+
callback();
56+
}
57+
}
58+
}
59+
60+
void PthreadAsyncIO::sync_read_events() {
61+
while (this->read_fut.size() > 0) {
62+
auto front = std::move(this->read_fut.front());
63+
this->read_fut.pop_front();
64+
65+
auto fut(std::move(std::get<0>(front)));
66+
fut.wait();
67+
68+
auto callback = std::get<1>(front);
69+
if (callback != nullptr) {
70+
callback();
71+
}
72+
}
73+
}
74+
75+
void PthreadAsyncIO::synchronize() {
76+
this->get_event(WAIT);
77+
}
78+
79+
void PthreadAsyncIO::register_file(int fd) {}

csrc/py_api.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <pybind11/functional.h>
22
#include <pybind11/pybind11.h>
3+
#include <pybind11/stl.h>
4+
#include <torch/extension.h>
35
#include "offload.h"
46
#include "async_file_io.h"
57
#include "backend.h"

include/offload.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22

3+
#include "asyncio.h"
34
#include <ATen/ATen.h>
5+
46
#include "space_mgr.h"
57
#ifndef DISABLE_URING
68
#include "uring.h"

include/pthread_backend.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <stdexcept>
4+
#include <sys/io.h>
5+
#include <sys/uio.h>
6+
#include <unistd.h>
7+
#include <cstdlib>
8+
#include <future>
9+
#include <queue>
10+
#include <tuple>
11+
#include <functional>
12+
13+
#include "asyncio.h"
14+
#include "threadpool.hpp"
15+
16+
17+
class PthreadAsyncIO : public AsyncIO
18+
{
19+
private:
20+
BS::thread_pool pool;
21+
std::deque<std::tuple<std::future<ssize_t>, callback_t>> write_fut;
22+
std::deque<std::tuple<std::future<ssize_t>, callback_t>> read_fut;
23+
24+
public:
25+
PthreadAsyncIO(unsigned int n_entries)
26+
: pool(n_entries) {}
27+
28+
~PthreadAsyncIO() {}
29+
30+
void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
31+
void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
32+
void writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);
33+
void readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback);
34+
35+
void get_event(WaitType wt);
36+
void sync_write_events();
37+
void sync_read_events();
38+
void synchronize();
39+
40+
void register_file(int fd);
41+
};

0 commit comments

Comments
 (0)