Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,19 @@ Result Operation::runComputation(const ad_utility::Timer& timer,
}
signalQueryUpdate(RuntimeInformation::SendPriority::IfDue);
},
[this](bool failed) {
// TODO<RobinTF> Distinguish between failed and cancelled.
runtimeInfo().status_ =
failed ? RuntimeInformation::failed
: RuntimeInformation::lazilyMaterializedCompleted;
[this](Result::GeneratorState state) {
runtimeInfo().status_ = [state]() {
using enum Result::GeneratorState;
switch (state) {
case FINISHED:
return RuntimeInformation::lazilyMaterializedCompleted;
case CANCELLED:
return RuntimeInformation::cancelled;
default:
AD_CORRECTNESS_CHECK(state == FAILED);
return RuntimeInformation::failed;
}
}();
signalQueryUpdate(RuntimeInformation::SendPriority::Always);
});
}
Expand Down
1 change: 1 addition & 0 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ class Operation {
FRIEND_TEST(Operation, updateRuntimeStatsWorksCorrectly);
FRIEND_TEST(Operation, verifyRuntimeInformationIsUpdatedForLazyOperations);
FRIEND_TEST(Operation, ensureFailedStatusIsSetWhenGeneratorThrowsException);
FRIEND_TEST(Operation, ensureFailedStatusIsSetWhenGeneratorIsCancelled);
FRIEND_TEST(Operation, testSubMillisecondsIncrementsAreStillTracked);
FRIEND_TEST(Operation, ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd);
FRIEND_TEST(Operation,
Expand Down
22 changes: 14 additions & 8 deletions src/engine/Result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <absl/cleanup/cleanup.h>

#include "backports/shift.h"
#include "util/CancellationHandle.h"
#include "util/Exception.h"
#include "util/Generators.h"
#include "util/InputRangeUtils.h"
Expand Down Expand Up @@ -270,7 +271,7 @@ void Result::checkDefinedness(const VariableToColumnMap& varColMap) {
void Result::runOnNewChunkComputed(
std::function<void(const IdTableVocabPair&, std::chrono::microseconds)>
onNewChunk,
std::function<void(bool)> onGeneratorFinished) {
std::function<void(GeneratorState)> onGeneratorFinished) {
AD_CONTRACT_CHECK(!isFullyMaterialized());
auto inputAsGet = ad_utility::CachingTransformInputRange(
idTables(), [](auto& input) { return std::move(input); });
Expand All @@ -281,24 +282,29 @@ void Result::runOnNewChunkComputed(
std::move(onGeneratorFinished));

// The main lambda that when being called processes the next chunk.
auto get =
[inputAsGet = std::move(inputAsGet), sharedFinish,
cleanup = absl::Cleanup{[&finish = *sharedFinish]() { finish(false); }},
onNewChunk =
std::move(onNewChunk)]() mutable -> std::optional<IdTableVocabPair> {
auto get = [inputAsGet = std::move(inputAsGet), sharedFinish,
cleanup = absl::Cleanup{[&finish = *sharedFinish]() {
finish(GeneratorState::FINISHED);
}},
onNewChunk = std::move(
onNewChunk)]() mutable -> std::optional<IdTableVocabPair> {
try {
Timer timer{Timer::Started};
auto input = inputAsGet.get();
if (!input.has_value()) {
std::move(cleanup).Cancel();
(*sharedFinish)(false);
(*sharedFinish)(GeneratorState::FINISHED);
return std::nullopt;
}
onNewChunk(input.value(), timer.value());
return input;
} catch (const ad_utility::CancellationException&) {
std::move(cleanup).Cancel();
(*sharedFinish)(GeneratorState::CANCELLED);
throw;
} catch (...) {
std::move(cleanup).Cancel();
(*sharedFinish)(true);
(*sharedFinish)(GeneratorState::FAILED);
throw;
}
};
Expand Down
13 changes: 9 additions & 4 deletions src/engine/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class Result {
: idTable_{std::move(idTable)}, localVocab_{std::move(localVocab)} {}
};

// Helper enum to indicate the state of a generator after consumption.
enum class GeneratorState { FINISHED, CANCELLED, FAILED };

// The lazy result type that is actually stored. It is type-erased and allows
// explicit conversion from the `Generator` above.
using LazyResult = ad_utility::InputRangeTypeErased<IdTableVocabPair>;
Expand Down Expand Up @@ -160,16 +163,18 @@ class Result {
// generator and passed this new `IdTableVocabPair` along with microsecond
// precision timing information on how long it took to compute this new chunk.
// `onGeneratorFinished` is guaranteed to be called eventually as long as the
// generator is consumed at least partially, with `true` if an exception
// occurred during consumption or with `false` when the generator is done
// processing or abandoned and destroyed.
// generator is consumed at least partially, with `GeneratorState::FAILED` if
// an exception occurred during consumption, with `GeneratorState::CANCELLED`
// if said exception is a cancellation exception or with
// `GeneratorState::FINISHED` when the generator is done processing or
// abandoned and destroyed.
//
// Throw an `ad_utility::Exception` if the underlying `data_` member holds the
// wrong variant.
void runOnNewChunkComputed(
std::function<void(const IdTableVocabPair&, std::chrono::microseconds)>
onNewChunk,
std::function<void(bool)> onGeneratorFinished);
std::function<void(GeneratorState)> onGeneratorFinished);

// Wrap the generator stored in `data_` within a new generator that aggregates
// the entries yielded by the generator into a cacheable `IdTable`. Once
Expand Down
33 changes: 33 additions & 0 deletions test/OperationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,39 @@ TEST(Operation, ensureFailedStatusIsSetWhenGeneratorThrowsException) {
EXPECT_TRUE(signaledUpdate);
}

// _____________________________________________________________________________
TEST(Operation, ensureFailedStatusIsSetWhenGeneratorIsCancelled) {
bool signaledUpdate = false;
const Index& index = ad_utility::testing::getQec()->getIndex();
QueryResultCache cache{};
NamedResultCache namedCache{};
MaterializedViewsManager materializedViewsManager;
QueryExecutionContext context{
index,
&cache,
makeAllocator(ad_utility::MemorySize::megabytes(100)),
SortPerformanceEstimator{},
&namedCache,
&materializedViewsManager,
[&](std::string) { signaledUpdate = true; }};
CustomGeneratorOperation operation{
&context, []() -> Result::Generator {
throw CancellationException{"Operation was cancelled"};
co_return;
}()};
ad_utility::Timer timer{ad_utility::Timer::InitialStatus::Started};
auto result =
operation.runComputation(timer, ComputationMode::LAZY_IF_SUPPORTED);

EXPECT_EQ(operation.runtimeInfo().status_,
Status::lazilyMaterializedInProgress);

EXPECT_THROW(result.idTables().begin(), ad_utility::CancellationException);

EXPECT_EQ(operation.runtimeInfo().status_, Status::cancelled);
EXPECT_TRUE(signaledUpdate);
}

// _____________________________________________________________________________
TEST(Operation, ensureSignalUpdateIsOnlyCalledEvery50msAndAtTheEnd) {
#ifdef _QLEVER_NO_TIMING_TESTS
Expand Down
43 changes: 36 additions & 7 deletions test/ResultTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ TEST(Result, verifyRunOnNewChunkComputedThrowsWithFullyMaterializedResult) {

EXPECT_THROW(result.runOnNewChunkComputed(
[](const IdTableVocabPair&, std::chrono::microseconds) {},
[](bool) {}),
[](Result::GeneratorState) {}),
ad_utility::Exception);
}

Expand Down Expand Up @@ -208,8 +208,8 @@ TEST(Result, verifyRunOnNewChunkComputedFiresCorrectly) {
EXPECT_GE(duration, 5ms);
}
},
[&](bool error) {
EXPECT_FALSE(error);
[&](Result::GeneratorState state) {
EXPECT_EQ(state, Result::GeneratorState::FINISHED);
finishedConsuming = true;
});

Expand All @@ -234,8 +234,8 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnError) {
[&](const IdTableVocabPair&, std::chrono::microseconds) {
++callCounterGenerator;
},
[&](bool error) {
EXPECT_TRUE(error);
[&](Result::GeneratorState state) {
EXPECT_EQ(state, Result::GeneratorState::FAILED);
++callCounterFinished;
});

Expand All @@ -247,6 +247,35 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnError) {
EXPECT_EQ(callCounterFinished, 1);
}

// _____________________________________________________________________________
TEST(Result, verifyRunOnNewChunkCallsFinishOnCancellation) {
Result result{[]() -> Result::Generator {
throw ad_utility::CancellationException{
"verifyRunOnNewChunkCallsFinishOnCancellation"};
co_return;
}(),
{}};
uint32_t callCounterGenerator = 0;
uint32_t callCounterFinished = 0;

result.runOnNewChunkComputed(
[&](const IdTableVocabPair&, std::chrono::microseconds) {
++callCounterGenerator;
},
[&](Result::GeneratorState state) {
EXPECT_EQ(state, Result::GeneratorState::CANCELLED);
++callCounterFinished;
});

AD_EXPECT_THROW_WITH_MESSAGE_AND_TYPE(
consumeGenerator(result.idTables()),
HasSubstr("verifyRunOnNewChunkCallsFinishOnCancellation"),
ad_utility::CancellationException);

EXPECT_EQ(callCounterGenerator, 0);
EXPECT_EQ(callCounterFinished, 1);
}

// _____________________________________________________________________________
TEST(Result, verifyRunOnNewChunkCallsFinishOnPartialConsumption) {
uint32_t callCounterGenerator = 0;
Expand All @@ -262,8 +291,8 @@ TEST(Result, verifyRunOnNewChunkCallsFinishOnPartialConsumption) {
[&](const IdTableVocabPair&, std::chrono::microseconds) {
++callCounterGenerator;
},
[&](bool error) {
EXPECT_FALSE(error);
[&](Result::GeneratorState state) {
EXPECT_EQ(state, Result::GeneratorState::FINISHED);
++callCounterFinished;
});

Expand Down
Loading