6
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
7
// Use of this source code is governed by a BSD-style license that can be
8
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
- #include < iostream>
10
9
11
10
#include " db/dbformat.h"
12
11
#include " db_stress_tool/db_stress_listener.h"
@@ -154,6 +153,46 @@ class NonBatchedOpsStressTest : public StressTest {
154
153
from_db.data (), from_db.size ());
155
154
}
156
155
}
156
+
157
+ if (secondary_db_) {
158
+ assert (secondary_cfhs_.size () == column_families_.size ());
159
+ // We are going to read in the expected values before catching the
160
+ // secondary up to the primary. This sets the lower bound of the
161
+ // acceptable values that can be returned from the secondary. After
162
+ // each Get() to the secondary, we are going to read in the expected
163
+ // value again to determine the upper bound. As long as the returned
164
+ // value from Get() is within these bounds, we consider that okay. The
165
+ // lower bound will always be moving forwards anyways as
166
+ // TryCatchUpWithPrimary() gets called.
167
+ std::vector<ExpectedValue> pre_read_expected_values;
168
+ for (int64_t i = start; i < end; ++i) {
169
+ pre_read_expected_values.push_back (
170
+ shared->Get (static_cast <int >(cf), i));
171
+ }
172
+
173
+ Status s = secondary_db_->TryCatchUpWithPrimary ();
174
+ if (!s.ok ()) {
175
+ VerificationAbort (shared,
176
+ " Secondary failed to catch up to the primary" );
177
+ }
178
+
179
+ for (int64_t i = start; i < end; ++i) {
180
+ if (thread->shared ->HasVerificationFailedYet ()) {
181
+ break ;
182
+ }
183
+
184
+ const std::string key = Key (i);
185
+ std::string from_db;
186
+
187
+ s = secondary_db_->Get (options, column_families_[cf], key,
188
+ &from_db);
189
+
190
+ VerifyValueRange (static_cast <int >(cf), i, options, shared, from_db,
191
+ /* msg_prefix */ " Secondary get verification" , s,
192
+ pre_read_expected_values[i - start]);
193
+ }
194
+ }
195
+
157
196
} else if (method == VerificationMethod::kGetEntity ) {
158
197
for (int64_t i = start; i < end; ++i) {
159
198
if (thread->shared ->HasVerificationFailedYet ()) {
@@ -339,82 +378,12 @@ class NonBatchedOpsStressTest : public StressTest {
339
378
}
340
379
assert (secondary_db_);
341
380
assert (!secondary_cfhs_.empty ());
342
-
343
- auto * shared = thread->shared ;
344
- assert (shared);
345
-
346
- // Each thread is going to verify a different portion of the key space
347
- const int64_t max_key = shared->GetMaxKey ();
348
- const int64_t keys_per_thread = max_key / shared->GetNumThreads ();
349
- int64_t start = keys_per_thread * thread->tid ;
350
- int64_t end = thread->tid == shared->GetNumThreads () - 1
351
- ? max_key
352
- : start + keys_per_thread;
353
-
354
- // We are going to read in the expected values before catching the secondary
355
- // up to the primary. This sets the lower bound of the acceptable values
356
- // that can be returned from the secondary. After each Get() to the
357
- // secondary, we are going to read in the expected value again to determine
358
- // the upper bound. As long as the returned value from Get() is within these
359
- // bounds, we consider that okay. The lower bound will always be moving
360
- // forwards anyways as TryCatchUpWithPrimary() gets called.
361
- std::vector<ExpectedValue> pre_read_expected_values;
362
- for (int64_t i = start; i < end; ++i) {
363
- pre_read_expected_values.push_back (shared->Get (0 , i));
364
- }
365
-
366
381
Status s = secondary_db_->TryCatchUpWithPrimary ();
367
382
if (!s.ok ()) {
368
- VerificationAbort (shared, " Secondary failed to catch up to the primary" );
369
383
assert (false );
370
384
exit (1 );
371
385
}
372
386
373
- ReadOptions read_opts (FLAGS_verify_checksum, true );
374
- // Check that there are no keys in the secondary that should not exist, and
375
- // that the values fall into the acceptable range for those that should
376
- // exist
377
- for (int64_t i = start; i < end; ++i) {
378
- if (shared->HasVerificationFailedYet ()) {
379
- break ;
380
- }
381
- const ExpectedValue pre_read_expected_value =
382
- pre_read_expected_values[i - start];
383
- std::string from_db;
384
- std::string key_str = Key (i);
385
- Slice key = key_str;
386
- s = db_->Get (read_opts, column_families_[0 ], key_str, &from_db);
387
- const ExpectedValue post_read_expected_value = shared->Get (0 , i);
388
- if (s.ok ()) {
389
- const Slice slice (from_db);
390
- const uint32_t value_base_from_db = GetValueBase (slice);
391
- if (!ExpectedValueHelper::InExpectedValueBaseRange (
392
- value_base_from_db, pre_read_expected_value,
393
- post_read_expected_value)) {
394
- std::cout << " Secondary verification failed for i=" << i
395
- << " , key=" << key.ToString (true )
396
- << " . Get() returned a value base " << value_base_from_db
397
- << " which was outside of the value base range of"
398
- << pre_read_expected_value.GetValueBase () << " to"
399
- << post_read_expected_value.GetFinalValueBase ()
400
- << std::endl;
401
- shared->SetVerificationFailure ();
402
- break ;
403
- }
404
- } else if (s.IsNotFound ()) {
405
- if (ExpectedValueHelper::MustHaveExisted (pre_read_expected_value,
406
- post_read_expected_value)) {
407
- std::cout
408
- << " Secondary verification failed for i=" << i
409
- << " , key=" << key.ToString (true )
410
- << " . Get() returned NotFound when the key should have existed."
411
- << std::endl;
412
- shared->SetVerificationFailure ();
413
- break ;
414
- }
415
- }
416
- }
417
-
418
387
const auto checksum_column_family = [](Iterator* iter,
419
388
uint32_t * checksum) -> Status {
420
389
assert (nullptr != checksum);
@@ -427,6 +396,10 @@ class NonBatchedOpsStressTest : public StressTest {
427
396
return iter->status ();
428
397
};
429
398
399
+ auto * shared = thread->shared ;
400
+ assert (shared);
401
+ const int64_t max_key = shared->GetMaxKey ();
402
+ ReadOptions read_opts (FLAGS_verify_checksum, true );
430
403
std::string ts_str;
431
404
Slice ts;
432
405
if (FLAGS_user_timestamp_size > 0 ) {
@@ -436,8 +409,19 @@ class NonBatchedOpsStressTest : public StressTest {
436
409
}
437
410
438
411
static Random64 rand64 (shared->GetSeed ());
439
- for (size_t cf = 0 ; cf < secondary_cfhs_.size (); cf++) {
440
- ColumnFamilyHandle* handle = secondary_cfhs_[cf];
412
+
413
+ {
414
+ uint32_t crc = 0 ;
415
+ std::unique_ptr<Iterator> it (secondary_db_->NewIterator (read_opts));
416
+ s = checksum_column_family (it.get (), &crc);
417
+ if (!s.ok ()) {
418
+ fprintf (stderr, " Computing checksum of default cf: %s\n " ,
419
+ s.ToString ().c_str ());
420
+ assert (false );
421
+ }
422
+ }
423
+
424
+ for (auto * handle : secondary_cfhs_) {
441
425
if (thread->rand .OneInOpt (3 )) {
442
426
// Use Get()
443
427
uint64_t key = rand64.Uniform (static_cast <uint64_t >(max_key));
@@ -448,11 +432,7 @@ class NonBatchedOpsStressTest : public StressTest {
448
432
read_opts, handle, key_str, &value,
449
433
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr );
450
434
s.PermitUncheckedError ();
451
- } else if (!FLAGS_inplace_update_support) {
452
- // I think this portion of the verification failed because the
453
- // combination of inplace_update_support=true and backward iteration is
454
- // not allowed.
455
-
435
+ } else {
456
436
// Use range scan
457
437
std::unique_ptr<Iterator> iter (
458
438
secondary_db_->NewIterator (read_opts, handle));
@@ -476,7 +456,6 @@ class NonBatchedOpsStressTest : public StressTest {
476
456
iter->Seek (key_str);
477
457
for (int i = 0 ; i < 5 && iter->Valid (); ++i, iter->Next ()) {
478
458
}
479
-
480
459
} else {
481
460
// SeekForPrev() + Prev()*5
482
461
uint64_t key = rand64.Uniform (static_cast <uint64_t >(max_key));
@@ -485,18 +464,6 @@ class NonBatchedOpsStressTest : public StressTest {
485
464
for (int i = 0 ; i < 5 && iter->Valid (); ++i, iter->Prev ()) {
486
465
}
487
466
}
488
- } else {
489
- uint32_t crc = 0 ;
490
- std::unique_ptr<Iterator> it (
491
- secondary_db_->NewIterator (read_opts, handle));
492
- s = checksum_column_family (it.get (), &crc);
493
- if (!s.ok ()) {
494
- std::string checksum_err_msg =
495
- " Failed to compute checksum for secondary cf " +
496
- std::to_string (cf) + " . Status: " + s.ToString ();
497
- VerificationAbort (shared, checksum_err_msg);
498
- assert (false );
499
- }
500
467
}
501
468
}
502
469
}
@@ -2885,6 +2852,84 @@ class NonBatchedOpsStressTest : public StressTest {
2885
2852
return true ;
2886
2853
}
2887
2854
2855
+ // Compared to VerifyOrSyncValue, VerifyValueRange takes in a
2856
+ // pre_read_expected_value to determine the lower bound of acceptable values.
2857
+ // Anything from the pre_read_expected_value to the post_read_expected_value
2858
+ // is considered acceptable. VerifyValueRange does not perform the initial
2859
+ // "sync" step and does not compare the exact data/lengths for the values.
2860
+ // This verification is suitable for verifying secondary or follower databases
2861
+ bool VerifyValueRange (int cf, int64_t key, const ReadOptions& opts,
2862
+ SharedState* shared, const std::string& value_from_db,
2863
+ std::string msg_prefix, const Status& s,
2864
+ const ExpectedValue& pre_read_expected_value) const {
2865
+ if (shared->HasVerificationFailedYet ()) {
2866
+ return false ;
2867
+ }
2868
+ const ExpectedValue post_read_expected_value = shared->Get (cf, key);
2869
+ char expected_value_data[kValueMaxLen ];
2870
+ size_t expected_value_data_size =
2871
+ GenerateValue (post_read_expected_value.GetValueBase (),
2872
+ expected_value_data, sizeof (expected_value_data));
2873
+
2874
+ std::ostringstream read_u64ts;
2875
+ if (opts.timestamp ) {
2876
+ read_u64ts << " while read with timestamp: " ;
2877
+ uint64_t read_ts;
2878
+ if (DecodeU64Ts (*opts.timestamp , &read_ts).ok ()) {
2879
+ read_u64ts << std::to_string (read_ts) << " , " ;
2880
+ } else {
2881
+ read_u64ts << s.ToString ()
2882
+ << " Encoded read timestamp: " << opts.timestamp ->ToString ()
2883
+ << " , " ;
2884
+ }
2885
+ }
2886
+
2887
+ // compare value_from_db with the range of possible values from
2888
+ // pre_read_expected_value to post_read_expected_value
2889
+ if (s.ok ()) {
2890
+ const Slice slice (value_from_db);
2891
+ const uint32_t value_base_from_db = GetValueBase (slice);
2892
+ if (ExpectedValueHelper::MustHaveNotExisted (pre_read_expected_value,
2893
+ post_read_expected_value)) {
2894
+ VerificationAbort (shared,
2895
+ msg_prefix +
2896
+ " : Unexpected value found that should not exist" +
2897
+ read_u64ts.str (),
2898
+ cf, key, value_from_db, " " );
2899
+ return false ;
2900
+ }
2901
+ if (!ExpectedValueHelper::InExpectedValueBaseRange (
2902
+ value_base_from_db, pre_read_expected_value,
2903
+ post_read_expected_value)) {
2904
+ VerificationAbort (
2905
+ shared,
2906
+ msg_prefix +
2907
+ " : Unexpected value found outside of the value base range" +
2908
+ read_u64ts.str (),
2909
+ cf, key, value_from_db,
2910
+ Slice (expected_value_data, expected_value_data_size));
2911
+ return false ;
2912
+ }
2913
+ } else if (s.IsNotFound ()) {
2914
+ if (ExpectedValueHelper::MustHaveExisted (pre_read_expected_value,
2915
+ post_read_expected_value)) {
2916
+ VerificationAbort (shared,
2917
+ msg_prefix + " : Value not found which should exist" +
2918
+ read_u64ts.str () + s.ToString (),
2919
+ cf, key, " " ,
2920
+ Slice (expected_value_data, expected_value_data_size));
2921
+ return false ;
2922
+ }
2923
+ } else {
2924
+ VerificationAbort (
2925
+ shared,
2926
+ msg_prefix + " Non-OK status" + read_u64ts.str () + s.ToString (), cf,
2927
+ key, " " , Slice (expected_value_data, expected_value_data_size));
2928
+ return false ;
2929
+ }
2930
+ return true ;
2931
+ }
2932
+
2888
2933
void PrepareTxnDbOptions (SharedState* shared,
2889
2934
TransactionDBOptions& txn_db_opts) override {
2890
2935
txn_db_opts.rollback_deletion_type_callback =
0 commit comments