Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "backports/concepts.h"
#include "engine/LocalVocab.h"
#include "engine/Result.h"
#include "engine/idTable/IdTable.h"
#include "engine/idTable/IdTableConcepts.h"
#include "global/Id.h"
Expand Down Expand Up @@ -259,6 +260,14 @@ class AddCombinedRowToIdTable {

LocalVocab& localVocab() { return mergedVocab_; }

// Move both the result table and local vocab out as an IdTableVocabPair.
// This is a convenience method for the common pattern of moving both out.
auto toIdTableVocabPair() && {
flush();
return Result::IdTableVocabPair{std::move(resultTable_),
std::move(mergedVocab_)};
}

// Disable copying and moving, it is currently not needed and makes it harder
// to reason about
AddCombinedRowToIdTable(const AddCombinedRowToIdTable&) = delete;
Expand Down
89 changes: 80 additions & 9 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ struct IndexScan::SharedGeneratorState {
bool hasUndef_ = false;
// Indicates if the generator has been fully consumed.
bool doneFetching_ = false;
// If true, filter the left side (skip non-matching inputs). If false, pass
// through all inputs even if they don't match any blocks.
bool filterLeftSide_ = true;

// Advance the `iterator` to the next non-empty table. Set `hasUndef_` to true
// if the first table is undefined. Also set `doneFetching_` if the generator
Expand Down Expand Up @@ -643,13 +646,59 @@ struct IndexScan::SharedGeneratorState {
// We have seen entries in the join column that are larger than the
// largest block in the index scan, which means that there will be no
// more matches.
if (!filterLeftSide_) {
// Case B: Push current table before marking as done.
prefetchedValues_.push_back(std::move(*iterator_.value()));
}
doneFetching_ = true;
return;
}
// The current `joinColumn` has no matching block in the index, we can
// safely skip appending it to `prefetchedValues_`, but future values
// might require later blocks from the index.
continue;
// Case A: The current `joinColumn` has no matching block in the index.
if (filterLeftSide_) {
// We can safely skip appending it to `prefetchedValues_`, but future
// values might require later blocks from the index.
continue;
} else {
// When not filtering, push the table to prefetchedValues.
prefetchedValues_.push_back(std::move(*iterator_.value()));
// If buffer grows too large, find a dummy block to add.
if (prefetchedValues_.size() > 5) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 5 should be a named constant

// Find the last value in the join column of the last prefetched
// table.
const auto& lastPrefetched = prefetchedValues_.back();
auto lastJoinColumn =
lastPrefetched.idTable_.getColumn(joinColumn_);
AD_CORRECTNESS_CHECK(!lastJoinColumn.empty());
Id lastValue = lastJoinColumn.back();
// Find the smallest block whose first entry is larger than
// lastValue.
// TODO<joka921> This should always be the first block that is still
// available. also remove code duplication with the above code.
bool foundBlock = false;
size_t numBlocksHandled = 0;
for (const auto& block : metaBlocks_.getBlockMetadataView()) {
++numBlocksHandled;
if (CompressedRelationReader::getRelevantIdFromTriple(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if statement can be asserted, and the meta blocks range is never empty at this point.

block.firstTriple_, metaBlocks_) > lastValue) {
// Found a suitable block, add it to pendingBlocks.
pendingBlocks_.push_back(block);
lastEntryInBlocks_ =
CompressedRelationReader::getRelevantIdFromTriple(
block.lastTriple_, metaBlocks_);
AD_CORRECTNESS_CHECK(numBlocksHandled == 1);
metaBlocks_.removePrefix(numBlocksHandled);
foundBlock = true;
break;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be extracted to a helper function

if (!foundBlock) {
// No more blocks available, mark as done.
doneFetching_ = true;
return;
}
}
continue;
}
}
prefetchedValues_.push_back(std::move(*iterator_.value()));
ql::ranges::move(newBlocks, std::back_inserter(pendingBlocks_));
Expand Down Expand Up @@ -690,7 +739,19 @@ Result::LazyResult IndexScan::createPrefilteredJoinSide(

if (prefetched.empty()) {
AD_CORRECTNESS_CHECK(state->doneFetching_);
return LoopControl::makeBreak();
// If not filtering left side, yield all remaining elements.
AD_CORRECTNESS_CHECK(state->iterator_.has_value());
auto it = state->iterator_.value();
if (!state->filterLeftSide_ && it != state->generator_.end()) {
// Advance the iterator past the last value we already yielded.
++it;
Comment on lines +746 to +747
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you don't yield this value up until this point? I think this would simplify the logic in fetch() noticeably

return LoopControl::breakWithYieldAll(
ql::ranges::subrange(it, state->generator_.end()) |
ql::views::filter(
[](const auto& block) { return !block.idTable_.empty(); }));
} else {
return LoopControl::makeBreak();
}
}

// Make a defensive copy of the values to avoid modification during
Expand Down Expand Up @@ -769,17 +830,27 @@ Result::LazyResult IndexScan::createPrefilteredIndexScanSide(

// _____________________________________________________________________________
std::pair<Result::LazyResult, Result::LazyResult> IndexScan::prefilterTables(
Result::LazyResult input, ColumnIndex joinColumn) {
Result::LazyResult input, ColumnIndex joinColumn, bool filterLeftSide) {
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
auto metaBlocks = getMetadataForScan();

if (!metaBlocks.has_value()) {
// Return empty results
return {Result::LazyResult{}, Result::LazyResult{}};
return {filterLeftSide ? Result::LazyResult{} : std::move(input),
Result::LazyResult{}};
}

auto state = std::make_shared<SharedGeneratorState>(SharedGeneratorState{
std::move(input), joinColumn, std::move(metaBlocks.value())});
auto state = std::make_shared<SharedGeneratorState>(
SharedGeneratorState{std::move(input),
joinColumn,
std::move(metaBlocks.value()),
std::nullopt,
{},
{},
std::nullopt,
false,
false,
filterLeftSide});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this param up a bit in the struct, then you wouldn't have to re-define all of these default arguments again.

return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}
Expand Down
3 changes: 2 additions & 1 deletion src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ class IndexScan final : public Operation {
// there are undef values, the second generator represents the full index
// scan.
std::pair<Result::LazyResult, Result::LazyResult> prefilterTables(
Result::LazyResult input, ColumnIndex joinColumn);
Result::LazyResult input, ColumnIndex joinColumn,
bool filterLeftSide = true);

private:
// Implementation detail that allows to consume a lazy range from two other
Expand Down
71 changes: 17 additions & 54 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <sstream>
#include <vector>

#include "JoinWithIndexScanHelpers.h"
#include "backports/functional.h"
#include "backports/type_traits.h"
#include "engine/AddCombinedRowToTable.h"
Expand All @@ -29,7 +30,7 @@
#include "util/JoinAlgorithms/JoinAlgorithms.h"

using namespace qlever::joinHelpers;

using namespace qlever::joinWithIndexScanHelpers;
using std::endl;
using std::string;

Expand Down Expand Up @@ -566,40 +567,6 @@ void Join::addCombinedRowToIdTable(const ROW_A& rowA, const ROW_B& rowB,
}
}

// _____________________________________________________________________________
namespace {
// Type alias for the general InputRangeTypeErased with specific types.
using IteratorWithSingleCol = InputRangeTypeErased<IdTableAndFirstCol<IdTable>>;

// Convert a `CompressedRelationReader::IdTableGeneratorInputRange` to a
// `InputRangeTypeErased<IdTableAndFirstCol<IdTable>>` for more efficient access
// in the join columns below. This also makes sure the runtime information of
// the passed `IndexScan` is updated properly as the range is consumed.
IteratorWithSingleCol convertGenerator(
CompressedRelationReader::IdTableGeneratorInputRange gen, IndexScan& scan) {
// Store the generator in a wrapper so we can access its details after moving
auto generatorStorage =
std::make_shared<CompressedRelationReader::IdTableGeneratorInputRange>(
std::move(gen));

using SendPriority = RuntimeInformation::SendPriority;

auto range = CachingTransformInputRange(
*generatorStorage,
[generatorStorage, &scan,
sendPriority = SendPriority::Always](auto& table) mutable {
scan.updateRuntimeInfoForLazyScan(generatorStorage->details(),
sendPriority);
sendPriority = SendPriority::IfDue;
// IndexScans don't have a local vocabulary, so we can just use an empty
// one.
return IdTableAndFirstCol{std::move(table), LocalVocab{}};
});

return IteratorWithSingleCol{std::move(range)};
}
} // namespace

// ______________________________________________________________________________________________________
Result Join::computeResultForTwoIndexScans(bool requestLaziness) const {
return createResult(
Expand All @@ -626,17 +593,13 @@ Result Join::computeResultForTwoIndexScans(bool requestLaziness) const {
// of the child. If we serialize it whenever the join operation yields a
// table that's frequent enough and reduces the overhead.
auto leftBlocks =
convertGenerator(std::move(leftBlocksInternal), *leftScan);
auto rightBlocks =
convertGenerator(std::move(rightBlocksInternal), *rightScan);
convertGeneratorFromScan(std::move(leftBlocksInternal), *leftScan);
auto rightBlocks = convertGeneratorFromScan(
std::move(rightBlocksInternal), *rightScan);

ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks,
std::less{}, rowAdder);
leftScan->runtimeInfo().status_ =
RuntimeInformation::Status::lazilyMaterializedCompleted;
rightScan->runtimeInfo().status_ =
RuntimeInformation::Status::lazilyMaterializedCompleted;

setScanStatusToLazilyCompleted(*leftScan, *rightScan);
auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
std::move(localVocab)};
Expand All @@ -661,11 +624,12 @@ Result Join::computeResultForIndexScanAndIdTable(
const IdTable& idTable = resultWithIdTable->idTable();
auto rowAdder = makeRowAdder(std::move(yieldTable));

auto permutationIdTable = ad_utility::IdTableAndFirstCol{
idTable.asColumnSubsetView(idTableIsRightInput
? joinColMap.permutationRight()
: joinColMap.permutationLeft()),
resultWithIdTable->getCopyOfLocalVocab()};
auto permutationIdTable =
ad_utility::IdTableAndFirstCols<1, IdTableView<0>>{
idTable.asColumnSubsetView(idTableIsRightInput
? joinColMap.permutationRight()
: joinColMap.permutationLeft()),
resultWithIdTable->getCopyOfLocalVocab()};

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
Expand All @@ -676,7 +640,7 @@ Result Join::computeResultForIndexScanAndIdTable(
std::optional<std::shared_ptr<const Result>> indexScanResult =
std::nullopt;
auto rightBlocks = [&scan, idTableHasUndef, &permutationIdTable,
&indexScanResult]() -> LazyInputView {
&indexScanResult]() -> LazyInputView<1> {
if (idTableHasUndef) {
indexScanResult =
scan->getResult(false, ComputationMode::LAZY_IF_SUPPORTED);
Expand All @@ -686,7 +650,8 @@ Result Join::computeResultForIndexScanAndIdTable(
} else {
auto rightBlocksInternal =
scan->lazyScanForJoinOfColumnWithScan(permutationIdTable.col());
return convertGenerator(std::move(rightBlocksInternal), *scan);
return convertGeneratorFromScan(std::move(rightBlocksInternal),
*scan);
}
}();

Expand All @@ -704,8 +669,7 @@ Result Join::computeResultForIndexScanAndIdTable(
} else {
doJoin(blockForIdTable, rightBlocks);
}
scan->runtimeInfo().status_ =
RuntimeInformation::Status::lazilyMaterializedCompleted;
setScanStatusToLazilyCompleted(*scan);

auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
Expand Down Expand Up @@ -741,8 +705,7 @@ Result Join::computeResultForIndexScanAndLazyOperation(
convertGenerator(std::move(indexScanSide),
joinColMap.permutationRight()),
std::less{}, rowAdder);
scan->runtimeInfo().status_ =
RuntimeInformation::Status::lazilyMaterializedCompleted;
setScanStatusToLazilyCompleted(*scan);

auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
Expand Down
Loading