Skip to content

Commit

Permalink
Changes from a review.
Browse files Browse the repository at this point in the history
  • Loading branch information
joka921 committed Dec 12, 2023
1 parent 417ac19 commit d0b6a4f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 15 deletions.
13 changes: 6 additions & 7 deletions src/parser/TurtleParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,13 +822,12 @@ bool TurtleStreamParser<T>::getLine(TurtleTriple* triple) {

// We will use the following trick: For a batch that is forwarded to the
// parallel parser, we will first increment `numBatchesTotal_` and then call
// the following lambda from within the thread that parses that batch after
// it has been completely parsed and pushed to the `tripleCollector`. With
// this invariant we get that `batchIdx_ == numBatchesTotal_` iff all
// batches that have been inserted to the `parallelParser_` have been fully
// processed. After the last batch we will push another call to this lambda
// to the `parallelParser_` which will then finish the `tripleCollector_` as
// soon as all batches have been computed.
// the following lambda after the batch has completely parsed and the result
// pushed to the `tripleCollector_`. We thus get the invariant that `batchIdx_
// == numBatchesTotal_` iff all batches that have been inserted to the
// `parallelParser_` have been fully processed. After the last batch we will
// push another call to this lambda to the `parallelParser_` which will then
// finish the `tripleCollector_` as soon as all batches have been computed.
template <typename Tokenizer_T>
void TurtleParallelParser<Tokenizer_T>::finishTripleCollectorIfLastBatch() {
if (batchIdx_.fetch_add(1) == numBatchesTotal_) {
Expand Down
5 changes: 2 additions & 3 deletions src/parser/TurtleParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ class TurtleParallelParser : public TurtleParser<Tokenizer_T> {
TurtleParallelParser() = default;

// If the `sleepTimeForTesting` is set, then after the initialization the
// parser will sleep for the specified time for each batch s.t. certain corner
// cases can be tested.
// parser will sleep for the specified time before parsing each batch s.t.
// certain corner cases can be tested.
explicit TurtleParallelParser(const string& filename,
std::chrono::milliseconds sleepTimeForTesting =
std::chrono::milliseconds{0})
Expand Down Expand Up @@ -636,7 +636,6 @@ class TurtleParallelParser : public TurtleParser<Tokenizer_T> {
// the parser threads can be destroyed. The following two members are needed
// for keeping track of this condition.
std::atomic<size_t> batchIdx_ = 0;
// `max()` means `not yet set`.
std::atomic<size_t> numBatchesTotal_ = 0;

std::chrono::milliseconds sleepTimeForTesting_;
Expand Down
1 change: 0 additions & 1 deletion src/util/TaskQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class TaskQueue {
// _________________________________________________________________________
void function_for_thread() {
while (auto task = queuedTasks_.pop()) {
// perform the task without actually holding the lock.
task.value()();
}
}
Expand Down
4 changes: 4 additions & 0 deletions test/TaskQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ TEST(TaskQueue, SimpleSumWithWait) {
q.finish();
ASSERT_EQ(result, 500500);
}

TEST(TaskQueue, ThrowOnMaxQueueSizeZero) {
EXPECT_ANY_THROW((ad_utility::TaskQueue{0, 5}));
}
11 changes: 7 additions & 4 deletions test/TurtleParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,6 @@ TEST(TurtleParserTest, exceptionPropagation) {
// propagated.
TEST(TurtleParserTest, exceptionPropagationFileBufferReading) {
std::string filename{"turtleParserEmptyInput.dat"};
FILE_BUFFER_SIZE() = 40;
auto testWithParser = [&]<typename Parser>(bool useBatchInterface,
std::string_view input) {
{
Expand All @@ -799,13 +798,17 @@ TEST(TurtleParserTest, exceptionPropagationFileBufferReading) {
::testing::ContainsRegex("Please increase the FILE_BUFFER_SIZE"));
ad_utility::deleteFile(filename);
};
// Deliberately chosen s.t. the first triple fits in a block, but the second
// one doesn't.
FILE_BUFFER_SIZE() = 40;
forAllParallelParsers(testWithParser,
"<subject> <predicate> <object> . \n <veryLongSubject> "
"<veryLongPredicate> <veryLongObject> .");
}

// Test that the parallel parsers destructor can be run quickly, even when there
// are still lots of blocks in the pipeline that are currently being parsed.
// Test that the parallel parser's destructor can be run quickly and without
// blocking, even when there are still lots of blocks in the pipeline that are
// currently being parsed.
TEST(TurtleParserTest, stopParsingOnOutsideFailure) {
std::string filename{"turtleParserStopParsingOnOutsideFailure.dat"};
auto testWithParser = [&]<typename Parser>(
Expand All @@ -822,7 +825,7 @@ TEST(TurtleParserTest, stopParsingOnOutsideFailure) {
}
EXPECT_LE(t.msecs(), 20ms);
};
auto input = []() {
const std::string input = []() {
std::string singleBlock = "<subject> <predicate> <object> . \n ";
std::string longBlock;
longBlock.reserve(200 * singleBlock.size());
Expand Down

0 comments on commit d0b6a4f

Please sign in to comment.