Skip to content

Commit a05980b

Browse files
committed
Do snapshot fsync on UV threadpool (microsoft#7035)
(cherry picked from commit b697b79) # Conflicts: # src/host/test/ledger.cpp
1 parent 18b6d97 commit a05980b

File tree

2 files changed

+85
-29
lines changed

2 files changed

+85
-29
lines changed

src/host/snapshots.h

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,62 @@ namespace asynchost
239239
return snapshot;
240240
}
241241

242+
#define THROW_ON_ERROR(x, name) \
243+
do \
244+
{ \
245+
auto rc = x; \
246+
if (rc == -1) \
247+
{ \
248+
throw std::runtime_error(fmt::format( \
249+
"Error ({}) writing snapshot {} in " #x, strerror(errno), name)); \
250+
} \
251+
} while (0)
252+
253+
struct AsyncSnapshotSyncAndRename
254+
{
255+
// Inputs, populated at construction
256+
const std::filesystem::path dir;
257+
const std::string tmp_file_name;
258+
const int snapshot_fd;
259+
260+
// Outputs, populated by callback
261+
std::string committed_file_name = {};
262+
};
263+
264+
static void on_snapshot_sync_and_rename(uv_work_t* req)
265+
{
266+
auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
267+
268+
{
269+
asynchost::TimeBoundLogger log_if_slow(
270+
fmt::format("Committing snapshot - fsync({})", data->tmp_file_name));
271+
fsync(data->snapshot_fd);
272+
}
273+
274+
close(data->snapshot_fd);
275+
276+
// e.g. snapshot_100_105.committed
277+
data->committed_file_name =
278+
fmt::format("{}{}", data->tmp_file_name, snapshot_committed_suffix);
279+
const auto full_committed_path = data->dir / data->committed_file_name;
280+
281+
const auto full_tmp_path = data->dir / data->tmp_file_name;
282+
files::rename(full_tmp_path, full_committed_path);
283+
}
284+
285+
static void on_snapshot_sync_and_rename_complete(uv_work_t* req, int status)
286+
{
287+
auto data = static_cast<AsyncSnapshotSyncAndRename*>(req->data);
288+
289+
LOG_INFO_FMT(
290+
"Renamed temporary snapshot {} to {}",
291+
data->tmp_file_name,
292+
data->committed_file_name);
293+
294+
delete data;
295+
delete req;
296+
}
297+
242298
void commit_snapshot(
243299
::consensus::Index snapshot_idx,
244300
const uint8_t* receipt_data,
@@ -288,42 +344,40 @@ namespace asynchost
288344
{
289345
const auto& snapshot = it->second.snapshot;
290346

291-
#define THROW_ON_ERROR(x) \
292-
do \
293-
{ \
294-
auto rc = x; \
295-
if (rc == -1) \
296-
{ \
297-
throw std::runtime_error(fmt::format( \
298-
"Error ({}) writing snapshot {} in " #x, errno, file_name)); \
299-
} \
300-
} while (0)
301-
302347
THROW_ON_ERROR(
303-
write(snapshot_fd, snapshot->data(), snapshot->size()));
304-
THROW_ON_ERROR(write(snapshot_fd, receipt_data, receipt_size));
305-
306-
THROW_ON_ERROR(fsync(snapshot_fd));
307-
THROW_ON_ERROR(close(snapshot_fd));
308-
309-
#undef THROW_ON_ERROR
348+
write(snapshot_fd, snapshot->data(), snapshot->size()),
349+
file_name);
350+
THROW_ON_ERROR(
351+
write(snapshot_fd, receipt_data, receipt_size), file_name);
310352

311353
LOG_INFO_FMT(
312-
"New snapshot file written to {} [{} bytes]",
354+
"New snapshot file written to {} [{} bytes] (unsynced)",
313355
file_name,
314356
snapshot->size() + receipt_size);
315357

316-
// e.g. snapshot_100_105.committed
317-
const auto committed_file_name =
318-
fmt::format("{}{}", file_name, snapshot_committed_suffix);
319-
const auto full_committed_path =
320-
snapshot_dir / committed_file_name;
358+
// Call fsync and rename on a worker-thread via uv async, as they
359+
// may be slow
360+
uv_work_t* work_handle = new uv_work_t;
321361

322-
files::rename(full_snapshot_path, full_committed_path);
323-
LOG_INFO_FMT(
324-
"Renamed temporary snapshot {} to committed {}",
325-
file_name,
326-
committed_file_name);
362+
{
363+
auto* data = new AsyncSnapshotSyncAndRename{
364+
.dir = snapshot_dir,
365+
.tmp_file_name = file_name,
366+
.snapshot_fd = snapshot_fd};
367+
368+
work_handle->data = data;
369+
}
370+
371+
#ifdef TEST_MODE_EXECUTE_SYNC_INLINE
372+
on_snapshot_sync_and_rename(work_handle);
373+
on_snapshot_sync_and_rename_complete(work_handle, 0);
374+
#else
375+
uv_queue_work(
376+
uv_default_loop(),
377+
work_handle,
378+
&on_snapshot_sync_and_rename,
379+
&on_snapshot_sync_and_rename_complete);
380+
#endif
327381
}
328382

329383
pending_snapshots.erase(it);
@@ -342,6 +396,7 @@ namespace asynchost
342396
e.what());
343397
}
344398
}
399+
#undef THROW_ON_ERROR
345400

346401
std::optional<std::pair<fs::path, fs::path>>
347402
find_latest_committed_snapshot()

src/host/test/ledger.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "crypto/openssl/hash.h"
88
#include "ds/files.h"
99
#include "ds/serialized.h"
10+
#define TEST_MODE_EXECUTE_SYNC_INLINE
1011
#include "host/snapshots.h"
1112
#include "kv/serialised_entry_format.h"
1213

0 commit comments

Comments
 (0)