Skip to content

Commit 82ad5dc

Browse files
committed
Support writes during ingestion
Signed-off-by: hhwyt <[email protected]>
1 parent e255444 commit 82ad5dc

File tree

3 files changed

+149
-55
lines changed

3 files changed

+149
-55
lines changed

db/db_impl/db_impl.cc

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5735,6 +5735,16 @@ Status DBImpl::IngestExternalFiles(
57355735
if (args.empty()) {
57365736
return Status::InvalidArgument("ingestion arg list is empty");
57375737
}
5738+
// Supported only when all args have the consistent `allow_write` behavior, as
5739+
// `allow_write` determines whether stopping writes to the DB affects all
5740+
// args.
5741+
bool allow_write = args[0].options.allow_write;
5742+
for (const auto& arg : args) {
5743+
if (arg.options.allow_write != allow_write) {
5744+
return Status::InvalidArgument(
5745+
"Inconsistent allow_writes values across ingestion arguments");
5746+
}
5747+
}
57385748
{
57395749
std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
57405750
for (const auto& arg : args) {
@@ -5858,6 +5868,17 @@ Status DBImpl::IngestExternalFiles(
58585868
// So wait here to ensure there is no pending write to memtable.
58595869
WaitForPendingWrites();
58605870

5871+
if (allow_write) {
5872+
// If allow_write is true, writes to the DB are resumed here,
5873+
// allowing users to write normally during the subsequent ingest process.
5874+
if (two_write_queues_) {
5875+
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
5876+
}
5877+
write_thread_.ExitUnbatched(&w);
5878+
}
5879+
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:AfterPendingWrites",
5880+
nullptr);
5881+
58615882
num_running_ingest_file_ += static_cast<int>(num_cfs);
58625883
TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
58635884

@@ -5890,9 +5911,10 @@ Status DBImpl::IngestExternalFiles(
58905911
flush_opts.check_if_compaction_disabled = true;
58915912
if (immutable_db_options_.atomic_flush) {
58925913
mutex_.Unlock();
5893-
status = AtomicFlushMemTables(
5894-
flush_opts, FlushReason::kExternalFileIngestion,
5895-
{} /* provided_candidate_cfds */, true /* entered_write_thread */);
5914+
status = AtomicFlushMemTables(flush_opts,
5915+
FlushReason::kExternalFileIngestion,
5916+
{} /* provided_candidate_cfds */,
5917+
!allow_write /* entered_write_thread */);
58965918
mutex_.Lock();
58975919
} else {
58985920
for (size_t i = 0; i != num_cfs; ++i) {
@@ -5903,7 +5925,7 @@ Status DBImpl::IngestExternalFiles(
59035925
->cfd();
59045926
status = FlushMemTable(cfd, flush_opts,
59055927
FlushReason::kExternalFileIngestion,
5906-
true /* entered_write_thread */);
5928+
!allow_write /* entered_write_thread */);
59075929
mutex_.Lock();
59085930
if (!status.ok()) {
59095931
break;
@@ -6008,11 +6030,12 @@ Status DBImpl::IngestExternalFiles(
60086030
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
60096031
}
60106032

6011-
// Resume writes to the DB
6012-
if (two_write_queues_) {
6013-
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
6033+
if (!allow_write) {
6034+
if (two_write_queues_) {
6035+
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
6036+
}
6037+
write_thread_.ExitUnbatched(&w);
60146038
}
6015-
write_thread_.ExitUnbatched(&w);
60166039

60176040
if (status.ok()) {
60186041
for (auto& job : ingestion_jobs) {

0 commit comments

Comments
 (0)