Skip to content

Commit ded59bc

Browse files
ti-chi-botRidRisRYuJuncen
authored andcommitted
br: pre-check TiKV disk space before download (tikv#17238) (tikv#17569)
close tikv#17224 Add a disk usage check when execute `download` and `apply` RPC from br. When the disk is not `Normal`, the request would be rejected. Signed-off-by: ti-chi-bot <[email protected]> Signed-off-by: hillium <[email protected]> Co-authored-by: ris <[email protected]> Co-authored-by: hillium <[email protected]>
1 parent 5236c58 commit ded59bc

File tree

5 files changed

+172
-5
lines changed

5 files changed

+172
-5
lines changed

components/error_code/src/sst_importer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,11 @@ define_error_codes!(
2222
TTL_LEN_NOT_EQUALS_TO_PAIRS => ("TtlLenNotEqualsToPairs", "", ""),
2323
INCOMPATIBLE_API_VERSION => ("IncompatibleApiVersion", "", ""),
2424
INVALID_KEY_MODE => ("InvalidKeyMode", "", ""),
25-
RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", "")
25+
RESOURCE_NOT_ENOUTH => ("ResourceNotEnough", "", ""),
26+
SUSPENDED => ("Suspended",
27+
"this request has been suspended.",
28+
"Probably there are some export tools don't support exporting data inserted by `ingest`(say, snapshot backup). Check the user manual and stop them."),
29+
REQUEST_TOO_NEW => ("RequestTooNew", "", ""),
30+
REQUEST_TOO_OLD => ("RequestTooOld", "", ""),
31+
DISK_SPACE_NOT_ENOUGH => ("DiskSpaceNotEnough", "", "")
2632
);

components/sst_importer/src/errors.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ pub enum Error {
125125

126126
#[error("resource is not enough {0}")]
127127
ResourceNotEnough(String),
128+
129+
#[error("imports are suspended for {time_to_lease_expire:?}")]
130+
Suspended { time_to_lease_expire: Duration },
131+
132+
#[error("TiKV disk space is not enough.")]
133+
DiskSpaceNotEnough,
128134
}
129135

130136
impl Error {
@@ -197,6 +203,10 @@ impl ErrorCodeExt for Error {
197203
Error::IncompatibleApiVersion => error_code::sst_importer::INCOMPATIBLE_API_VERSION,
198204
Error::InvalidKeyMode { .. } => error_code::sst_importer::INVALID_KEY_MODE,
199205
Error::ResourceNotEnough(_) => error_code::sst_importer::RESOURCE_NOT_ENOUTH,
206+
Error::Suspended { .. } => error_code::sst_importer::SUSPENDED,
207+
Error::RequestTooNew(_) => error_code::sst_importer::REQUEST_TOO_NEW,
208+
Error::RequestTooOld(_) => error_code::sst_importer::REQUEST_TOO_OLD,
209+
Error::DiskSpaceNotEnough => error_code::sst_importer::DISK_SPACE_NOT_ENOUGH,
200210
}
201211
}
202212
}

src/import/sst_service.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@ use tikv_kv::{
3535
};
3636
use tikv_util::{
3737
config::ReadableSize,
38-
future::create_stream_with_buffer,
39-
sys::thread::ThreadBuildWrapper,
38+
future::{create_stream_with_buffer, paired_future_callback},
39+
sys::{
40+
disk::{get_disk_status, DiskUsage},
41+
thread::ThreadBuildWrapper,
42+
},
4043
time::{Instant, Limiter},
4144
HandyRwLock,
4245
};
@@ -883,6 +886,10 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
883886
.observe(start.saturating_elapsed().as_secs_f64());
884887

885888
let mut resp = ApplyResponse::default();
889+
if get_disk_status(0) != DiskUsage::Normal {
890+
resp.set_error(Error::DiskSpaceNotEnough.into());
891+
return crate::send_rpc_response!(Ok(resp), sink, label, start);
892+
}
886893

887894
match Self::apply_imp(req, importer, applier, limiter, max_raft_size).await {
888895
Ok(Some(r)) => resp.set_range(r),
@@ -924,6 +931,11 @@ impl<E: Engine> ImportSst for ImportSstService<E> {
924931
sst_importer::metrics::IMPORTER_DOWNLOAD_DURATION
925932
.with_label_values(&["queue"])
926933
.observe(start.saturating_elapsed().as_secs_f64());
934+
if get_disk_status(0) != DiskUsage::Normal {
935+
let mut resp = DownloadResponse::default();
936+
resp.set_error(Error::DiskSpaceNotEnough.into());
937+
return crate::send_rpc_response!(Ok(resp), sink, label, timer);
938+
}
927939

928940
// FIXME: download() should be an async fn, to allow BR to cancel
929941
// a download task.

tests/failpoints/cases/test_import_service.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ use std::{
88
use file_system::calc_crc32;
99
use futures::{executor::block_on, stream, SinkExt};
1010
use grpcio::{Result, WriteFlags};
11-
use kvproto::import_sstpb::*;
11+
use grpcio::{ChannelBuilder, Environment, Result, WriteFlags};
12+
use kvproto::{disk_usage::DiskUsage, import_sstpb::*, tikvpb_grpc::TikvClient};
1213
use tempfile::{Builder, TempDir};
1314
use test_raftstore::Simulator;
1415
use test_sst_importer::*;
1516
use tikv::config::TikvConfig;
16-
use tikv_util::{config::ReadableSize, HandyRwLock};
17+
use tikv_util::{config::ReadableSize, sys::disk, HandyRwLock};
1718

1819
#[allow(dead_code)]
1920
#[path = "../../integrations/import/util.rs"]
@@ -90,6 +91,43 @@ fn upload_sst(import: &ImportSstClient, meta: &SstMeta, data: &[u8]) -> Result<U
9091
})
9192
}
9293

94+
#[test]
95+
fn test_download_to_full_disk() {
96+
let (_cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
97+
let temp_dir = Builder::new()
98+
.prefix("test_download_sst_blocking_sst_writer")
99+
.tempdir()
100+
.unwrap();
101+
102+
let sst_path = temp_dir.path().join("test.sst");
103+
let sst_range = (0, 100);
104+
let (mut meta, _) = gen_sst_file(sst_path, sst_range);
105+
meta.set_region_id(ctx.get_region_id());
106+
meta.set_region_epoch(ctx.get_region_epoch().clone());
107+
108+
// Now perform a proper download.
109+
let mut download = DownloadRequest::default();
110+
download.set_sst(meta.clone());
111+
download.set_storage_backend(external_storage_export::make_local_backend(temp_dir.path()));
112+
download.set_name("test.sst".to_owned());
113+
download.mut_sst().mut_range().set_start(vec![sst_range.1]);
114+
download
115+
.mut_sst()
116+
.mut_range()
117+
.set_end(vec![sst_range.1 + 1]);
118+
download.mut_sst().mut_range().set_start(Vec::new());
119+
download.mut_sst().mut_range().set_end(Vec::new());
120+
disk::set_disk_status(DiskUsage::AlmostFull);
121+
let result = import.download(&download).unwrap();
122+
assert!(!result.get_is_empty());
123+
assert!(result.has_error());
124+
assert_eq!(
125+
result.get_error().get_message(),
126+
"TiKV disk space is not enough."
127+
);
128+
disk::set_disk_status(DiskUsage::Normal);
129+
}
130+
93131
#[test]
94132
fn test_ingest_reentrant() {
95133
let (cluster, ctx, _tikv, import) = new_cluster_and_tikv_import_client();
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use engine_traits::CF_DEFAULT;
2+
use external_storage_export::LocalStorage;
3+
use kvproto::import_sstpb::ApplyRequest;
4+
use tempfile::TempDir;
5+
use tikv_util::sys::disk::{self, DiskUsage};
6+
7+
use crate::import::util;
8+
9+
#[test]
10+
fn test_basic_apply() {
11+
let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();
12+
let tmp = TempDir::new().unwrap();
13+
let storage = LocalStorage::new(tmp.path()).unwrap();
14+
let default = [
15+
(b"k1", b"v1", 1),
16+
(b"k2", b"v2", 2),
17+
(b"k3", b"v3", 3),
18+
(b"k4", b"v4", 4),
19+
];
20+
let default_rewritten = [(b"r1", b"v1", 1), (b"r2", b"v2", 2), (b"r3", b"v3", 3)];
21+
let mut sst_meta = util::make_plain_file(&storage, "file1.log", default.into_iter());
22+
util::register_range_for(&mut sst_meta, b"k1", b"k3a");
23+
let mut req = ApplyRequest::new();
24+
req.set_context(ctx.clone());
25+
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
26+
req.set_metas(vec![sst_meta].into());
27+
req.set_storage_backend(util::local_storage(&tmp));
28+
import.apply(&req).unwrap();
29+
util::check_applied_kvs_cf(&tikv, &ctx, CF_DEFAULT, default_rewritten.into_iter());
30+
}
31+
32+
#[test]
33+
fn test_apply_full_disk() {
34+
let (_cluster, ctx, _tikv, import) = util::new_cluster_and_tikv_import_client();
35+
let tmp = TempDir::new().unwrap();
36+
let storage = LocalStorage::new(tmp.path()).unwrap();
37+
let default = [
38+
(b"k1", b"v1", 1),
39+
(b"k2", b"v2", 2),
40+
(b"k3", b"v3", 3),
41+
(b"k4", b"v4", 4),
42+
];
43+
let mut sst_meta = util::make_plain_file(&storage, "file1.log", default.into_iter());
44+
util::register_range_for(&mut sst_meta, b"k1", b"k3a");
45+
let mut req = ApplyRequest::new();
46+
req.set_context(ctx);
47+
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
48+
req.set_metas(vec![sst_meta].into());
49+
req.set_storage_backend(util::local_storage(&tmp));
50+
disk::set_disk_status(DiskUsage::AlmostFull);
51+
let result = import.apply(&req).unwrap();
52+
assert!(result.has_error());
53+
assert_eq!(
54+
result.get_error().get_message(),
55+
"TiKV disk space is not enough."
56+
);
57+
disk::set_disk_status(DiskUsage::Normal);
58+
}
59+
60+
#[test]
61+
fn test_apply_twice() {
62+
let (_cluster, ctx, tikv, import) = util::new_cluster_and_tikv_import_client();
63+
let tmp = TempDir::new().unwrap();
64+
let storage = LocalStorage::new(tmp.path()).unwrap();
65+
let default = [(
66+
b"k1",
67+
b"In this case, we are going to test write twice, but with different rewrite rule.",
68+
1,
69+
)];
70+
let default_fst = [(
71+
b"r1",
72+
b"In this case, we are going to test write twice, but with different rewrite rule.",
73+
1,
74+
)];
75+
let default_snd = [(
76+
b"z1",
77+
b"In this case, we are going to test write twice, but with different rewrite rule.",
78+
1,
79+
)];
80+
81+
let mut sst_meta = util::make_plain_file(&storage, "file2.log", default.into_iter());
82+
util::register_range_for(&mut sst_meta, b"k1", b"k1a");
83+
let mut req = ApplyRequest::new();
84+
req.set_context(ctx.clone());
85+
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"r")].into());
86+
req.set_metas(vec![sst_meta.clone()].into());
87+
req.set_storage_backend(util::local_storage(&tmp));
88+
import.apply(&req).unwrap();
89+
util::check_applied_kvs_cf(&tikv, &ctx, CF_DEFAULT, default_fst.into_iter());
90+
91+
util::register_range_for(&mut sst_meta, b"k1", b"k1a");
92+
req.set_rewrite_rules(vec![util::rewrite_for(&mut sst_meta, b"k", b"z")].into());
93+
req.set_metas(vec![sst_meta].into());
94+
import.apply(&req).unwrap();
95+
util::check_applied_kvs_cf(
96+
&tikv,
97+
&ctx,
98+
CF_DEFAULT,
99+
default_fst.into_iter().chain(default_snd.into_iter()),
100+
);
101+
}

0 commit comments

Comments
 (0)