Skip to content

Commit b697b79

Browse files
authored
Do snapshot fsync on UV threadpool (#7035)
1 parent d950184 commit b697b79

File tree

2 files changed

+85
-29
lines changed

2 files changed

+85
-29
lines changed

src/host/test/ledger.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "ds/files.h"
99
#include "ds/serialized.h"
1010
#include "kv/serialised_entry_format.h"
11+
#define TEST_MODE_EXECUTE_SYNC_INLINE
1112
#include "snapshots/snapshot_manager.h"
1213

1314
#define DOCTEST_CONFIG_IMPLEMENT

src/snapshots/snapshot_manager.h

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,62 @@ namespace snapshots
8181
return snapshot;
8282
}
8383

84+
#define THROW_ON_ERROR(x, name) \
85+
do \
86+
{ \
87+
auto rc = x; \
88+
if (rc == -1) \
89+
{ \
90+
throw std::runtime_error(fmt::format( \
91+
"Error ({}) writing snapshot {} in " #x, strerror(errno), name)); \
92+
} \
93+
} while (0)
94+
95+
struct AsyncSnapshotSyncAndRename
96+
{
97+
// Inputs, populated at construction
98+
const std::filesystem::path dir;
99+
const std::string tmp_file_name;
100+
const int snapshot_fd;
101+
102+
// Outputs, populated by callback
103+
std::string committed_file_name = {};
104+
};
105+
106+
static void on_snapshot_sync_and_rename(uv_work_t* req)
107+
{
108+
auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
109+
110+
{
111+
asynchost::TimeBoundLogger log_if_slow(
112+
fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
113+
fsync(data->snapshot_fd);
114+
}
115+
116+
close(data->snapshot_fd);
117+
118+
// e.g. snapshot_100_105.committed
119+
data->committed_file_name =
120+
fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
121+
const auto full_committed_path = data->dir / data->committed_file_name;
122+
123+
const auto full_tmp_path = data->dir / data->tmp_file_name;
124+
files::rename(full_tmp_path, full_committed_path);
125+
}
126+
127+
static void on_snapshot_sync_and_rename_complete(uv_work_t* req, int status)
128+
{
129+
auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
130+
131+
LOG_INFO_FMT(
132+
"Renamed temporary snapshot {} to {}",
133+
data->tmp_file_name,
134+
data->committed_file_name);
135+
136+
delete data;
137+
delete req;
138+
}
139+
84140
void commit_snapshot(
85141
::consensus::Index snapshot_idx,
86142
const uint8_t* receipt_data,
@@ -130,42 +186,40 @@ namespace snapshots
130186
{
131187
const auto& snapshot = it->second.snapshot;
132188

133-
#define THROW_ON_ERROR(x) \
134-
do \
135-
{ \
136-
auto rc = x; \
137-
if (rc == -1) \
138-
{ \
139-
throw std::runtime_error(fmt::format( \
140-
"Error ({}) writing snapshot {} in " #x, errno, file_name)); \
141-
} \
142-
} while (0)
143-
144189
THROW_ON_ERROR(
145-
write(snapshot_fd, snapshot->data(), snapshot->size()));
146-
THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size));
147-
148-
THROW_ON_ERROR(fsync(snapshot_fd));
149-
THROW_ON_ERROR(close(snapshot_fd));
150-
151-
#undef THROW_ON_ERROR
190+
write(snapshot_fd, snapshot->data(), snapshot->size()),
191+
file_name);
192+
THROW_ON_ERROR(
193+
write(snapshot_fd, receipt_data, receipt_size), file_name);
152194

153195
LOG_INFO_FMT(
154-
"New snapshot file written to {} [{} bytes]",
196+
"New snapshot file written to {} [{} bytes] (unsynced)",
155197
file_name,
156198
snapshot->size() + receipt_size);
157199

158-
// e.g. snapshot_100_105.committed
159-
const auto committed_file_name =
160-
fmt::format("{}{}", file_name, snapshot_committed_suffix);
161-
const auto full_committed_path =
162-
snapshot_dir / committed_file_name;
200+
// Call fsync and rename on a worker-thread via uv async, as they
201+
// may be slow
202+
uv_work_t* work_handle = new uv_work_t;
163203

164-
files::rename(full_snapshot_path, full_committed_path);
165-
LOG_INFO_FMT(
166-
"Renamed temporary snapshot {} to committed {}",
167-
file_name,
168-
committed_file_name);
204+
{
205+
auto* data = new AsyncSnapshotSyncAndRename{
206+
.dir = snapshot_dir,
207+
.tmp_file_name = file_name,
208+
.snapshot_fd = snapshot_fd};
209+
210+
work_handle->data = data;
211+
}
212+
213+
#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
214+
on_snapshot_sync_and_rename(work_handle);
215+
on_snapshot_sync_and_rename_complete(work_handle, 0);
216+
#else
217+
uv_queue_work(
218+
uv_default_loop(),
219+
work_handle,
220+
&on_snapshot_sync_and_rename,
221+
&on_snapshot_sync_and_rename_complete);
222+
#endif
169223
}
170224

171225
pending_snapshots.erase(it);
@@ -184,6 +238,7 @@ namespace snapshots
184238
e.what());
185239
}
186240
}
241+
#undef THROW_ON_ERROR
187242

188243
std::optional<std::pair<fs::path, fs::path>>
189244
find_latest_committed_snapshot()

0 commit comments

Comments
 (0)