Skip to content

Commit 3a88830

Browse files
Merge branch 'master' into tyler-cleanup-libstuff
2 parents 0a808de + cf1d717 commit 3a88830

19 files changed

+257
-82
lines changed

.github/PULL_REQUEST_TEMPLATE.md

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
### Details
2+
3+
### Fixed Issues
4+
Fixes GH_LINK
5+
6+
### Tests
7+
8+
_________
9+
**Internal Testing Reminder:** when changing bedrock, please compile auth against your new changes

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ clean:
6060
mbedtls/library/libmbedcrypto.a mbedtls/library/libmbedtls.a mbedtls/library/libmbedx509.a:
6161
git submodule init
6262
git submodule update
63-
cd mbedtls && git checkout -q 04a049bda1ceca48060b57bc4bcf5203ce591421
63+
cd mbedtls && git checkout -q v2.26.0
6464
cd mbedtls && $(MAKE) no_test
6565

6666
# We select all of the cpp files (and manually add sqlite3.c) that will be in libstuff.

libstuff/SHTTPSManager.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ SStandaloneHTTPSManager::Transaction* SStandaloneHTTPSManager::_httpsSend(const
225225
Socket* s = openSocket(host, x509);
226226
if (!s) {
227227
delete transaction;
228+
delete x509;
228229
return _createErrorTransaction();
229230
}
230231

libstuff/SRandom.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
#include "SRandom.h"
22

3+
#ifdef VALGRIND
4+
// random_device breaks valgrind.
5+
mt19937_64 SRandom::_generator = mt19937_64();
6+
#else
37
mt19937_64 SRandom::_generator = mt19937_64(random_device()());
8+
#endif
9+
410
uniform_int_distribution<uint64_t> SRandom::_distribution64 = uniform_int_distribution<uint64_t>();
511

612
uint64_t SRandom::limitedRand64(uint64_t minNum, uint64_t maxNum) {

libstuff/SSignal.cpp

+31-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ void _SSignal_StackTrace(int signum, siginfo_t *info, void *ucontext);
2121
// A boolean indicating whether or not we've initialized our signal thread.
2222
atomic_flag _SSignal_threadInitialized = ATOMIC_FLAG_INIT;
2323

24+
// Set to true to stop the signal thread.
25+
atomic<bool> _SSignal_threadStopFlag(false);
26+
2427
// The signals we've received since the last time this was cleared.
2528
atomic<uint64_t> _SSignal_pendingSignalBitMask(0);
2629

@@ -85,7 +88,7 @@ void SInitializeSignals() {
8588
sigprocmask(SIG_BLOCK, &signals, 0);
8689

8790
// This is the signal action structure we'll use to specify what to listen for.
88-
struct sigaction newAction;
91+
struct sigaction newAction = {0};
8992

9093
// The old style handler is explicitly null
9194
newAction.sa_handler = nullptr;
@@ -109,7 +112,6 @@ void SInitializeSignals() {
109112
bool threadAlreadyStarted = _SSignal_threadInitialized.test_and_set();
110113
if (!threadAlreadyStarted) {
111114
_SSignal_signalThread = thread(_SSignal_signalHandlerThreadFunc);
112-
_SSignal_signalThread.detach();
113115
}
114116
}
115117

@@ -124,10 +126,24 @@ void _SSignal_signalHandlerThreadFunc() {
124126

125127
// Now we wait for any signal to occur.
126128
while (true) {
129+
127130
// Wait for a signal to appear.
128-
int signum = 0;
129-
int result = sigwait(&signals, &signum);
130-
if (!result) {
131+
siginfo_t siginfo = {0};
132+
struct timespec timeout;
133+
timeout.tv_sec = 1;
134+
timeout.tv_nsec = 0;
135+
int result = -1;
136+
while (result == -1) {
137+
result = sigtimedwait(&signals, &siginfo, &timeout);
138+
if (_SSignal_threadStopFlag) {
139+
// Done.
140+
SINFO("Stopping signal handler thread.");
141+
return;
142+
}
143+
}
144+
int signum = siginfo.si_signo;
145+
146+
if (result > 0) {
131147
// Do the same handling for these functions here as any other thread.
132148
if (signum == SIGSEGV || signum == SIGABRT || signum == SIGFPE || signum == SIGILL || signum == SIGBUS) {
133149
_SSignal_StackTrace(signum, nullptr, nullptr);
@@ -140,6 +156,16 @@ void _SSignal_signalHandlerThreadFunc() {
140156
}
141157
}
142158

159+
void SStopSignalThread() {
160+
_SSignal_threadStopFlag = true;
161+
if (_SSignal_threadInitialized.test_and_set()) {
162+
// Send ourselves a singnal to interrupt our thread.
163+
SINFO("Joining signal thread.");
164+
_SSignal_signalThread.join();
165+
_SSignal_threadInitialized.clear();
166+
}
167+
}
168+
143169
void _SSignal_StackTrace(int signum, siginfo_t *info, void *ucontext) {
144170
if (signum == SIGSEGV || signum == SIGABRT || signum == SIGFPE || signum == SIGILL || signum == SIGBUS) {
145171
// If we haven't already saved a signal number, we'll do it now. Any signal we catch here will generate a

libstuff/STCPNode.cpp

+1-6
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,16 @@ const string& STCPNode::stateName(STCPNode::State state) {
4949
}
5050

5151
STCPNode::State STCPNode::stateFromName(const string& name) {
52-
string normalizedName = SToUpper(name);
53-
54-
// Accept both old and new state names, but map them all to the new states.
55-
// We can delete the old names once the entire cluster is using the new names.
52+
const string normalizedName = SToUpper(name);
5653
static map<string, State> lookup = {
5754
{"SEARCHING", SEARCHING},
5855
{"SYNCHRONIZING", SYNCHRONIZING},
5956
{"WAITING", WAITING},
6057
{"STANDINGUP", STANDINGUP},
6158
{"LEADING", LEADING},
62-
{"MASTERING", LEADING},
6359
{"STANDINGDOWN", STANDINGDOWN},
6460
{"SUBSCRIBING", SUBSCRIBING},
6561
{"FOLLOWING", FOLLOWING},
66-
{"SLAVING", FOLLOWING},
6762
};
6863
auto it = lookup.find(normalizedName);
6964
if (it == lookup.end()) {

libstuff/libstuff.h

+2
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ string SGetSignalDescription();
206206
// Clear all outstanding signals.
207207
void SClearSignals();
208208

209+
void SStopSignalThread();
210+
209211
// And also exception stuff.
210212
string SGetCurrentExceptionName();
211213
void STerminateHandler(void);

main.cpp

+9-5
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,6 @@ int main(int argc, char* argv[]) {
197197
}
198198
}
199199

200-
// Start libstuff. Generally, we want to initialize libstuff immediately on any new thread, but we wait until after
201-
// the `fork` above has completed, as we can get strange behaviors from signal handlers across forked processes.
202-
SInitialize("main", (args.isSet("-overrideProcessName") ? args["-overrideProcessName"].c_str() : 0));
203-
SLogLevel(LOG_INFO);
204-
205200
if (args.isSet("-version")) {
206201
// Just output the version
207202
cout << VERSION << endl;
@@ -277,6 +272,12 @@ int main(int argc, char* argv[]) {
277272
cout << endl;
278273
return 1;
279274
}
275+
276+
// Start libstuff. Generally, we want to initialize libstuff immediately on any new thread, but we wait until after
277+
// the `fork` above has completed, as we can get strange behaviors from signal handlers across forked processes.
278+
SInitialize("main", (args.isSet("-overrideProcessName") ? args["-overrideProcessName"].c_str() : 0));
279+
SLogLevel(LOG_INFO);
280+
280281
if (args.isSet("-v")) {
281282
// Verbose logging
282283
SINFO("Enabling verbose logging");
@@ -395,6 +396,9 @@ int main(int argc, char* argv[]) {
395396
delete _server;
396397
SINFO("BedrockServer deleted");
397398

399+
// Finished with our signal handler.
400+
SStopSignalThread();
401+
398402
// All done
399403
SINFO("Graceful process shutdown complete");
400404
return 0;

mbedtls

plugins/Cache.h

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#pragma once
12
#include <libstuff/libstuff.h>
23
#include "../BedrockPlugin.h"
34

plugins/Jobs.h

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#pragma once
12
#include <libstuff/libstuff.h>
23
#include "../BedrockPlugin.h"
34

sqlitecluster/SQLite.cpp

+24-6
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,21 @@ string SQLite::initializeFilename(const string& filename) {
3535
}
3636

3737
SQLite::SharedData& SQLite::initializeSharedData(sqlite3* db, const string& filename, const vector<string>& journalNames) {
38-
static map<string, SharedData*> sharedDataLookupMap;
38+
static struct SharedDataLookupMapType {
39+
map<string, SharedData*> m;
40+
~SharedDataLookupMapType() {
41+
for (auto& p : m) {
42+
delete(p.second);
43+
}
44+
m.clear();
45+
}
46+
} sharedDataLookupMap;
47+
3948
static mutex instantiationMutex;
4049
lock_guard<mutex> lock(instantiationMutex);
41-
auto sharedDataIterator = sharedDataLookupMap.find(filename);
42-
if (sharedDataIterator == sharedDataLookupMap.end()) {
43-
SharedData* sharedData = new SharedData();
50+
auto sharedDataIterator = sharedDataLookupMap.m.find(filename);
51+
if (sharedDataIterator == sharedDataLookupMap.m.end()) {
52+
SharedData* sharedData = new SharedData(); // This is never deleted.
4453

4554
// Read the highest commit count from the database, and store it in commitCount.
4655
string query = "SELECT MAX(maxIDs) FROM (" + _getJournalQuery(journalNames, {"SELECT MAX(id) as maxIDs FROM"}, true) + ")";
@@ -60,7 +69,7 @@ SQLite::SharedData& SQLite::initializeSharedData(sqlite3* db, const string& file
6069
}
6170

6271
// Insert our SharedData object into the global map.
63-
sharedDataLookupMap.emplace(filename, sharedData);
72+
sharedDataLookupMap.m.emplace(filename, sharedData);
6473
return *sharedData;
6574
} else {
6675
// Otherwise, use the existing one.
@@ -263,7 +272,16 @@ int SQLite::_sqliteWALCallback(void* data, sqlite3* db, const char* dbName, int
263272
SINFO("[checkpoint] Ready for complete checkpoint but skipping because less than 100 commits since last complete checkpoint.");
264273
return SQLITE_OK;
265274
}
266-
// If we get here, then full checkpoints are enabled, and we have enough pages in the WAL file to perform one.
275+
276+
int dbInUse = 0;
277+
int useCheckResult = sqlite3_file_control(db, "main", SQLITE_FCNTL_EXTERNAL_READER, (void*)&dbInUse);
278+
if (useCheckResult == SQLITE_OK && dbInUse) {
279+
SINFO("Skipping complete checkpoint because external transaction in progress.");
280+
return SQLITE_OK;
281+
}
282+
283+
// If we get here, then full checkpoints are enabled, we have enough pages in the WAL file to perform one, and
284+
// nothing else is stopping us from running one.
267285
SINFO("[checkpoint] " << pageCount << " pages behind, beginning complete checkpoint.");
268286

269287
// This thread will run independently. We capture the variables we need here and pass them by value.

sqlitecluster/SQLiteNode.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ const string SQLiteNode::consistencyLevelNames[] = {"ASYNC",
6868
atomic<int64_t> SQLiteNode::_currentCommandThreadID(0);
6969

7070
const vector<STCPNode::Peer*> SQLiteNode::initPeers(const string& peerListString) {
71+
_state = UNKNOWN;
7172
vector<Peer*> peerList;
7273
list<string> parsedPeerList = SParseList(peerListString);
7374
for (const string& peerString : parsedPeerList) {
@@ -98,6 +99,7 @@ SQLiteNode::SQLiteNode(SQLiteServer& server, SQLitePool& dbPool, const string& n
9899
: STCPNode(name, host, initPeers(peerList), max(SQL_NODE_DEFAULT_RECV_TIMEOUT, SQL_NODE_SYNCHRONIZING_RECV_TIMEOUT)),
99100
_dbPool(dbPool),
100101
_db(_dbPool.getBase()),
102+
_state(UNKNOWN),
101103
_commitState(CommitState::UNINITIALIZED),
102104
_server(server),
103105
_stateChangeCount(0),

test/clustertest/testplugin/TestPlugin.h

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#pragma once
12
#include <libstuff/libstuff.h>
23
#include <BedrockPlugin.h>
34
#include <BedrockServer.h>

test/clustertest/tests/LeadingTest.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ struct LeadingTest : tpunit::TestFixture {
6565
SData cmd("Status");
6666
string response = newMaster.executeWaitVerifyContent(cmd);
6767
STable json = SParseJSONObject(response);
68-
if (json["state"] == "LEADING" || json["state"] == "MASTERING") {
68+
if (json["state"] == "LEADING") {
6969
success = true;
7070
break;
7171
}
@@ -118,9 +118,9 @@ struct LeadingTest : tpunit::TestFixture {
118118
STable json1 = SParseJSONObject(responses[1]);
119119
STable json2 = SParseJSONObject(responses[2]);
120120

121-
if ((json0["state"] == "LEADING" || json0["state"] == "MASTERING") &&
122-
(json1["state"] == "FOLLOWING" || json1["state"] == "SLAVING") &&
123-
(json2["state"] == "FOLLOWING" || json2["state"] == "SLAVING")) {
121+
if (json0["state"] == "LEADING" &&
122+
json1["state"] == "FOLLOWING" &&
123+
json2["state"] == "FOLLOWING") {
124124

125125
break;
126126
}
@@ -171,7 +171,7 @@ struct LeadingTest : tpunit::TestFixture {
171171
continue;
172172
}
173173
}
174-
if(json["state"] == "FOLLOWING" || json["state"] == "SLAVING") {
174+
if(json["state"] == "FOLLOWING") {
175175
// Make sure it was following before it was synchronizing.
176176
ASSERT_TRUE(wasSynchronizing);
177177
wasFollowing = true;

test/lib/BedrockTester.cpp

+30-10
Original file line numberDiff line numberDiff line change
@@ -146,16 +146,7 @@ string BedrockTester::startServer(bool wait) {
146146
}
147147
}
148148

149-
// Convert our c++ strings to old-school C strings for exec.
150-
char* cargs[args.size() + 1];
151-
int count = 0;
152-
for(string arg : args) {
153-
char* newstr = (char*)malloc(arg.size() + 1);
154-
strcpy(newstr, arg.c_str());
155-
cargs[count] = newstr;
156-
count++;
157-
}
158-
cargs[count] = 0;
149+
159150

160151
// Make sure the ports we need are free.
161152
int portsFree = 0;
@@ -168,6 +159,35 @@ string BedrockTester::startServer(bool wait) {
168159
<< _controlPort << ") to start server, things will probably fail." << endl;
169160
}
170161

162+
#ifdef VALGRIND
163+
#define xstr(a) str(a)
164+
#define str(a) #a
165+
166+
list<string> valgrind = SParseList(xstr(VALGRIND), ' ');
167+
if (valgrind.size()) {
168+
serverName = valgrind.front();
169+
cout << "Starting bedrock server in '" << serverName << "' with args: " << endl;
170+
auto it = valgrind.rbegin();
171+
while (it != valgrind.rend()) {
172+
args.push_front(*it);
173+
it++;
174+
}
175+
cout << SComposeList(args, " ") << endl;
176+
cout << "==========================" << endl;
177+
}
178+
#endif
179+
180+
// Convert our c++ strings to old-school C strings for exec.
181+
char* cargs[args.size() + 1];
182+
int count = 0;
183+
for(string arg : args) {
184+
char* newstr = (char*)malloc(arg.size() + 1);
185+
strcpy(newstr, arg.c_str());
186+
cargs[count] = newstr;
187+
count++;
188+
}
189+
cargs[count] = 0;
190+
171191
// And then start the new server!
172192
execvp(serverName.c_str(), cargs);
173193

0 commit comments

Comments
 (0)