Skip to content

Commit 76200b3

Browse files
committed
Merge remote-tracking branch 'baidu_tera/master' into rb.m.exc
2 parents 5005046 + 241dade commit 76200b3

22 files changed

+268
-157
lines changed

Diff for: README.md

+4
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,7 @@ sh ./build.sh
4949

5050
#反馈与技术支持
5151
52+
53+
#欢迎加入
54+
如果你热爱开源,热爱分布式技术,请将简历发送至:
55+

Diff for: src/io/default_compact_strategy.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ namespace io {
1212

1313
DefaultCompactStrategy::DefaultCompactStrategy(const TableSchema& schema)
1414
: m_schema(schema),
15-
m_raw_key_operator(GetRawKeyOperatorFromSchema(m_schema)) {
15+
m_raw_key_operator(GetRawKeyOperatorFromSchema(m_schema)),
16+
m_last_ts(-1), m_del_row_ts(-1), m_del_col_ts(-1), m_del_qual_ts(-1), m_cur_ts(-1),
17+
m_del_row_seq(0), m_del_col_seq(0), m_del_qual_seq(0), m_version_num(0),
18+
m_snapshot(leveldb::kMaxSequenceNumber) {
1619
// build index
1720
for (int32_t i = 0; i < m_schema.column_families_size(); ++i) {
1821
const std::string name = m_schema.column_families(i).name();
@@ -140,7 +143,8 @@ bool DefaultCompactStrategy::Drop(const Slice& tera_key, uint64_t n,
140143
}
141144
}
142145

143-
if (IsAtomicOP(type) && m_has_put) { // drop ADDs which is later than Put
146+
if (IsAtomicOP(type) && m_has_put) {
147+
// drop ADDs which is later than Put
144148
return true;
145149
}
146150
return false;

Diff for: src/io/ttlkv_compact_strategy.cc

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ namespace io {
1111

1212
using namespace leveldb;
1313

14-
KvCompactStrategy::KvCompactStrategy(const TableSchema& schema) :
15-
schema_(schema), raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)) {
14+
KvCompactStrategy::KvCompactStrategy(const TableSchema& schema)
15+
: schema_(schema),
16+
raw_key_operator_(GetRawKeyOperatorFromSchema(schema_)) {
1617
VLOG(11) << "KvCompactStrategy construct";
1718
}
1819

@@ -32,11 +33,11 @@ bool KvCompactStrategy::Drop(const leveldb::Slice& tera_key, uint64_t n,
3233
leveldb::Slice row_key;
3334
int64_t expire_timestamp;
3435
raw_key_operator_->ExtractTeraKey(tera_key, &row_key, NULL, NULL,
35-
&expire_timestamp, NULL);
36+
&expire_timestamp, NULL);
3637

3738
int64_t now = get_micros() / 1000000;
3839
if (expire_timestamp <= 0 /*上溢,永不过期*/
39-
|| expire_timestamp > now) {
40+
|| expire_timestamp > now) {
4041
VLOG(11) << "[KvCompactStrategy-Not-Drop] row_key:[" << row_key.ToString()
4142
<< "] expire_timestamp:[" << expire_timestamp
4243
<< "] now:[" << now << "]";

Diff for: src/leveldb/db/version_set.cc

+13-15
Original file line numberDiff line numberDiff line change
@@ -999,26 +999,24 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
999999
force_switch_manifest_ = false;
10001000
last_switch_manifest_ = env_->NowMicros();
10011001
}
1002-
// Initialize new descriptor log file if necessary by creating
1003-
// a temporary file that contains a snapshot of the current version.
1002+
10041003
std::string new_manifest_file;
10051004
Status s;
1006-
if (descriptor_log_ == NULL) {
1007-
// No reason to unlock *mu here since we only hit this path in the
1008-
// first call to LogAndApply (when opening the database).
1009-
assert(descriptor_file_ == NULL);
1010-
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
1011-
edit->SetNextFile(next_file_number_);
1012-
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
1013-
if (s.ok()) {
1014-
descriptor_log_ = new log::Writer(descriptor_file_);
1015-
s = WriteSnapshot(descriptor_log_);
1016-
}
1017-
}
1018-
10191005
// Unlock during expensive MANIFEST log write
10201006
{
10211007
mu->Unlock();
1008+
if (descriptor_log_ == NULL) {
1009+
// Initialize new descriptor log file if necessary by creating
1010+
// a temporary file that contains a snapshot of the current version.
1011+
assert(descriptor_file_ == NULL);
1012+
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
1013+
edit->SetNextFile(manifest_file_number_ + 1);
1014+
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
1015+
if (s.ok()) {
1016+
descriptor_log_ = new log::Writer(descriptor_file_);
1017+
s = WriteSnapshot(descriptor_log_);
1018+
}
1019+
}
10221020

10231021
// Write new record to MANIFEST log
10241022
if (s.ok()) {

Diff for: src/master/master_impl.cc

+47-83
Original file line numberDiff line numberDiff line change
@@ -797,27 +797,27 @@ void MasterImpl::DeleteTable(const DeleteTableRequest* request,
797797
done->Run();
798798
return;
799799
}
800-
response->set_status(kMasterOk);
801-
done->Run();
802800

803-
WriteClosure* closure =
804-
NewClosure(this, &MasterImpl::DeleteTableRecordCallback, table,
805-
FLAGS_tera_master_impl_retry_times);
806-
BatchWriteMetaTableAsync(boost::bind(&Table::ToMetaTableKeyValue, table, _1, _2),
807-
true, closure);
801+
std::vector<TabletPtr> tablets;
802+
table->GetTablet(&tablets);
808803

809-
std::vector<TabletPtr> tablet_meta_list;
810-
table->GetTablet(&tablet_meta_list);
811-
for (uint32_t i = 0; i < tablet_meta_list.size(); ++i) {
812-
TabletPtr tablet = tablet_meta_list[i];
813-
if (tablet->SetStatusIf(kTabletDeleting, kTabletDisable)) {
814-
WriteClosure* closure =
815-
NewClosure(this, &MasterImpl::DeleteTabletRecordCallback, tablet,
816-
FLAGS_tera_master_impl_retry_times);
817-
BatchWriteMetaTableAsync(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2),
818-
true, closure);
804+
// check if all tablet disable
805+
for (uint32_t i = 0; i < tablets.size(); ++i) {
806+
TabletPtr tablet = tablets[i];
807+
if (tablet->GetStatus() != kTabletDisable) {
808+
CHECK(table->SetStatus(old_status));
809+
LOG(ERROR) << "fail to delete table: " << request->table_name()
810+
<< ", tablet status: " << StatusCodeToString(tablet->GetStatus());
811+
response->set_status(kTabletReady);
812+
done->Run();
813+
return;
819814
}
820815
}
816+
817+
WriteClosure* closure =
818+
NewClosure(this, &MasterImpl::DeleteTableCallback, table, tablets,
819+
FLAGS_tera_master_impl_retry_times, response, done);
820+
BatchWriteMetaTableAsync(table, tablets, true, closure);
821821
}
822822

823823
void MasterImpl::DisableTable(const DisableTableRequest* request,
@@ -1237,7 +1237,7 @@ void MasterImpl::OperateUser(const OperateUserRequest* request,
12371237
}
12381238
/*
12391239
* for (change password), (add user to group), (delete user from group),
1240-
* we get the original UserInfo(including token & group),
1240+
* we get the original UserInfo(including token & group),
12411241
* do some modification according to the RPC request on the original UserInfo,
12421242
* and rewrite it to meta table.
12431243
*/
@@ -1270,7 +1270,7 @@ void MasterImpl::OperateUser(const OperateUserRequest* request,
12701270
}
12711271
} else if (op_type == kAddToGroup) {
12721272
if (!operated_user.has_user_name() || operated_user.group_name_size() != 1
1273-
|| !m_user_manager->IsValidForAddToGroup(token, user_name,
1273+
|| !m_user_manager->IsValidForAddToGroup(token, user_name,
12741274
operated_user.group_name(0))) {
12751275
is_invalid = true;
12761276
} else {
@@ -2817,6 +2817,7 @@ void MasterImpl::GetSnapshot(const GetSnapshotRequest* request,
28172817
task->finish_num = 0;
28182818
task->aborted = false;
28192819
MutexLock lock(&task->mutex);
2820+
int64_t snapshot_id = get_micros();
28202821
for (uint32_t i = 0; i < task->tablets.size(); ++i) {
28212822
TabletPtr tablet = task->tablets[i];
28222823
if (!tablet->SetStatusIf(kTabletOnSnapshot, kTableReady)) {
@@ -2829,7 +2830,7 @@ void MasterImpl::GetSnapshot(const GetSnapshotRequest* request,
28292830
++task->task_num;
28302831
SnapshotClosure* closure =
28312832
NewClosure(this, &MasterImpl::GetSnapshotCallback, static_cast<int32_t>(i), task);
2832-
GetSnapshotAsync(tablet, 3000, closure);
2833+
GetSnapshotAsync(tablet, snapshot_id, 3000, closure);
28332834
}
28342835
if (task->task_num == 0) {
28352836
LOG(WARNING) << "fail to create snapshot: " << request->table_name()
@@ -2840,7 +2841,7 @@ void MasterImpl::GetSnapshot(const GetSnapshotRequest* request,
28402841
}
28412842
}
28422843

2843-
void MasterImpl::GetSnapshotAsync(TabletPtr tablet, int32_t timeout,
2844+
void MasterImpl::GetSnapshotAsync(TabletPtr tablet, int64_t snapshot_id, int32_t timeout,
28442845
SnapshotClosure* done) {
28452846

28462847
std::string addr = tablet->GetServerAddr();
@@ -2850,7 +2851,7 @@ void MasterImpl::GetSnapshotAsync(TabletPtr tablet, int32_t timeout,
28502851
SnapshotResponse* response = new SnapshotResponse;
28512852
request->set_sequence_id(m_this_sequence_id.Inc());
28522853
request->set_table_name(tablet->GetTableName());
2853-
request->set_snapshot_id(get_micros());
2854+
request->set_snapshot_id(snapshot_id);
28542855
request->mutable_key_range()->set_key_start(tablet->GetKeyStart());
28552856
request->mutable_key_range()->set_key_end(tablet->GetKeyEnd());
28562857

@@ -4057,8 +4058,6 @@ void MasterImpl::MergeTabletWriteMetaCallback(TabletMeta new_meta,
40574058
}
40584059

40594060
TabletPtr tablet_c;
4060-
tablet_p1->SetStatus(kTableDeleted);
4061-
tablet_p2->SetStatus(kTableDeleted);
40624061
if (tablet_p1->GetKeyStart() == new_meta.key_range().key_start()) {
40634062
m_tablet_manager->DeleteTablet(tablet_p1->GetTableName(), tablet_p1->GetKeyStart());
40644063
m_tablet_manager->AddTablet(new_meta, TableSchema(), &tablet_c);
@@ -4180,7 +4179,6 @@ void MasterImpl::AddMetaCallback(TablePtr table,
41804179
}
41814180
if (retry_times <= 0) {
41824181
for(size_t i = 0; i < tablets.size(); i++) {
4183-
tablets[i]->SetStatus(kTableDeleted);
41844182
m_tablet_manager->DeleteTablet(tablets[i]->GetTableName(),
41854183
tablets[i]->GetKeyStart());
41864184
}
@@ -4478,10 +4476,14 @@ void MasterImpl::UpdateMetaForLoadCallback(TabletPtr tablet, int32_t retry_times
44784476
LoadTabletAsync(tablet, done);
44794477
}
44804478

4481-
void MasterImpl::DeleteTableRecordCallback(TablePtr table, int32_t retry_times,
4482-
WriteTabletRequest* request,
4483-
WriteTabletResponse* response,
4484-
bool failed, int error_code) {
4479+
void MasterImpl::DeleteTableCallback(TablePtr table,
4480+
std::vector<TabletPtr> tablets,
4481+
int32_t retry_times,
4482+
DeleteTableResponse* rpc_response,
4483+
google::protobuf::Closure* rpc_done,
4484+
WriteTabletRequest* request,
4485+
WriteTabletResponse* response,
4486+
bool failed, int error_code) {
44854487
StatusCode status = response->status();
44864488
if (!failed && status == kTabletNodeOk) {
44874489
// all the row status should be the same
@@ -4492,20 +4494,21 @@ void MasterImpl::DeleteTableRecordCallback(TablePtr table, int32_t retry_times,
44924494
delete response;
44934495
if (failed || status != kTabletNodeOk) {
44944496
if (failed) {
4495-
LOG(ERROR) << "fail to delete meta table record: "
4497+
LOG(ERROR) << "fail to delete table meta: "
44964498
<< sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " << table;
44974499
} else {
4498-
LOG(ERROR) << "fail to delete meta table record: "
4500+
LOG(ERROR) << "fail to delete table meta: "
44994501
<< StatusCodeToString(status) << ", " << table;
45004502
}
45014503
if (retry_times <= 0) {
45024504
LOG(ERROR) << kSms << "abort delete meta table record, " << table;
4505+
rpc_response->set_status(kMetaTabletError);
4506+
rpc_done->Run();
45034507
} else {
45044508
WriteClosure* done =
4505-
NewClosure(this, &MasterImpl::DeleteTableRecordCallback,
4506-
table, retry_times - 1);
4507-
SuspendMetaOperation(boost::bind(&Table::ToMetaTableKeyValue, table, _1, _2),
4508-
true, done);
4509+
NewClosure(this, &MasterImpl::DeleteTableCallback, table, tablets,
4510+
retry_times - 1, rpc_response, rpc_done);
4511+
SuspendMetaOperation(table, tablets, true, done);
45094512
}
45104513
return;
45114514
}
@@ -4514,43 +4517,14 @@ void MasterImpl::DeleteTableRecordCallback(TablePtr table, int32_t retry_times,
45144517
MutexLock locker(&m_alias_mutex);
45154518
m_alias.erase(table_alias);
45164519
}
4517-
LOG(INFO) << "delete meta table record success, " << table;
4518-
}
4519-
4520-
void MasterImpl::DeleteTabletRecordCallback(TabletPtr tablet, int32_t retry_times,
4521-
WriteTabletRequest* request,
4522-
WriteTabletResponse* response,
4523-
bool failed, int error_code) {
4524-
StatusCode status = response->status();
4525-
if (!failed && status == kTabletNodeOk) {
4526-
// all the row status should be the same
4527-
CHECK_GT(response->row_status_list_size(), 0);
4528-
status = response->row_status_list(0);
4529-
}
4530-
delete request;
4531-
delete response;
4532-
if (failed || status != kTabletNodeOk) {
4533-
if (failed) {
4534-
LOG(ERROR) << "fail to delete meta tablet record: "
4535-
<< sofa::pbrpc::RpcErrorCodeToString(error_code) << ", " << tablet;
4536-
} else {
4537-
LOG(ERROR) << "fail to delete meta tablet record: "
4538-
<< StatusCodeToString(status) << ", " << tablet;
4539-
}
4540-
if (retry_times <= 0) {
4541-
LOG(ERROR) << kSms << "abort delete meta tablet record, " << tablet;
4542-
} else {
4543-
WriteClosure* done =
4544-
NewClosure(this, &MasterImpl::DeleteTabletRecordCallback,
4545-
tablet, retry_times - 1);
4546-
SuspendMetaOperation(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2),
4547-
true, done);
4548-
}
4549-
return;
4520+
// clean tablet manager
4521+
for (uint32_t i = 0; i < tablets.size(); ++i) {
4522+
TabletPtr tablet = tablets[i];
4523+
m_tablet_manager->DeleteTablet(tablet->GetTableName(), tablet->GetKeyStart());
45504524
}
4551-
LOG(INFO) << "delete meta tablet record success, " << tablet;
4552-
tablet->SetStatus(kTableDeleted);
4553-
m_tablet_manager->DeleteTablet(tablet->GetTableName(), tablet->GetKeyStart());
4525+
LOG(INFO) << "delete meta table record success, " << table;
4526+
rpc_response->set_status(kMasterOk);
4527+
rpc_done->Run();
45544528
}
45554529

45564530
void MasterImpl::ScanMetaTableAsync(const std::string& table_name,
@@ -4671,7 +4645,6 @@ void MasterImpl::ScanMetaCallbackForSplit(TabletPtr tablet,
46714645
m_tablet_manager->AddTablet(second_meta, TableSchema(), &second_tablet);
46724646

46734647
// delete old tablet
4674-
tablet->SetStatus(kTableDeleted);
46754648
m_tablet_manager->DeleteTablet(tablet->GetTableName(), tablet->GetKeyStart());
46764649

46774650
// update first child tablet meta
@@ -4894,20 +4867,11 @@ void MasterImpl::ProcessOffLineTablet(TabletPtr tablet) {
48944867
if (!tablet->IsBound()) {
48954868
return;
48964869
}
4897-
tablet->SetStatusIf(kTabletDisable, kTableOffLine, kTableDisable)
4898-
|| tablet->SetStatusIf(kTabletDisable, kTableOffLine, kTableDeleting);
4899-
if (tablet->SetStatusIf(kTabletDeleting, kTabletDisable, kTableDeleting)) {
4900-
WriteClosure* done =
4901-
NewClosure(this, &MasterImpl::DeleteTabletRecordCallback, tablet,
4902-
FLAGS_tera_master_meta_retry_times);
4903-
BatchWriteMetaTableAsync(boost::bind(&Tablet::ToMetaTableKeyValue, tablet, _1, _2),
4904-
true, done);
4905-
}
4870+
tablet->SetStatusIf(kTabletDisable, kTableOffLine, kTableDisable);
49064871
}
49074872

49084873
void MasterImpl::ProcessReadyTablet(TabletPtr tablet) {
4909-
if (tablet->SetStatusIf(kTableUnLoading, kTableReady, kTableDisable)
4910-
|| tablet->SetStatusIf(kTableUnLoading, kTableReady, kTableDeleting)) {
4874+
if (tablet->SetStatusIf(kTableUnLoading, kTableReady, kTableDisable)) {
49114875
UnloadClosure* done =
49124876
NewClosure(this, &MasterImpl::UnloadTabletCallback, tablet,
49134877
FLAGS_tera_master_impl_retry_times);

Diff for: src/master/master_impl.h

+12-12
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class MasterImpl {
122122
void RenameTable(const RenameTableRequest* request,
123123
RenameTableResponse* response,
124124
google::protobuf::Closure* done);
125-
125+
126126
void CmdCtrl(const CmdCtrlRequest* request,
127127
CmdCtrlResponse* response);
128128
void OperateUser(const OperateUserRequest* request,
@@ -273,7 +273,7 @@ class MasterImpl {
273273
const std::vector<TabletPtr>& tablet_list,
274274
const std::string& table_name = "");
275275

276-
void GetSnapshotAsync(TabletPtr tablet, int32_t timeout,
276+
void GetSnapshotAsync(TabletPtr tablet, int64_t snapshot_id, int32_t timeout,
277277
SnapshotClosure* done);
278278
void GetSnapshotCallback(int32_t tablet_id, SnapshotTask* task,
279279
SnapshotRequest* master_request,
@@ -421,14 +421,14 @@ class MasterImpl {
421421
WriteTabletRequest* request,
422422
WriteTabletResponse* response,
423423
bool failed, int error_code);
424-
void DeleteTableRecordCallback(TablePtr table, int32_t retry_times,
425-
WriteTabletRequest* request,
426-
WriteTabletResponse* response,
427-
bool failed, int error_code);
428-
void DeleteTabletRecordCallback(TabletPtr tablet, int32_t retry_times,
429-
WriteTabletRequest* request,
430-
WriteTabletResponse* response,
431-
bool failed, int error_code);
424+
void DeleteTableCallback(TablePtr table,
425+
std::vector<TabletPtr> tablets,
426+
int32_t retry_times,
427+
DeleteTableResponse* rpc_response,
428+
google::protobuf::Closure* rpc_done,
429+
WriteTabletRequest* request,
430+
WriteTabletResponse* response,
431+
bool failed, int error_code);
432432

433433
void ScanMetaTableAsync(const std::string& table_name,
434434
const std::string& tablet_key_start,
@@ -491,7 +491,7 @@ class MasterImpl {
491491
bool is_delete, WriteClosure* done);
492492
void SuspendMetaOperation(std::vector<ToMetaFunc> meta_entries,
493493
bool is_delete, WriteClosure* done);
494-
494+
495495
void SuspendMetaOperation(const std::string& table_name,
496496
const std::string& tablet_key_start,
497497
const std::string& tablet_key_end,
@@ -521,7 +521,7 @@ class MasterImpl {
521521
bool IsRootUser(const std::string& token);
522522

523523
template <typename Request, typename Response, typename Callback>
524-
bool HasTablePermission(const Request* request, Response* response,
524+
bool HasTablePermission(const Request* request, Response* response,
525525
Callback* done, TablePtr table, const char* operate);
526526
void FillAlias(const std::string& key, const std::string& value);
527527
private:

0 commit comments

Comments
 (0)