Skip to content
Open
8 changes: 8 additions & 0 deletions src/engine/AddCombinedRowToTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ class AddCombinedRowToIdTable {

LocalVocab& localVocab() { return mergedVocab_; }

// Move both the result table and local vocab out as an IdTableVocabPair.
// This is a convenience method for the common pattern of moving both out.
auto toIdTableVocabPair() && {
flush();
return Result::IdTableVocabPair{std::move(resultTable_),
std::move(mergedVocab_)};
}

// Disable copying and moving, it is currently not needed and makes it harder
// to reason about
AddCombinedRowToIdTable(const AddCombinedRowToIdTable&) = delete;
Expand Down
40 changes: 40 additions & 0 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,46 @@ std::pair<Result::LazyResult, Result::LazyResult> IndexScan::prefilterTables(
createPrefilteredIndexScanSide(state)};
}

// _____________________________________________________________________________
Result::LazyResult IndexScan::createPrefilteredJoinSideForOptional(
std::shared_ptr<SharedGeneratorState> innerState) {
using LoopControl = ad_utility::LoopControl<Result::IdTableVocabPair>;

auto range = ad_utility::InputRangeFromLoopControlGet{
[state = std::move(innerState)]() mutable {
// For OPTIONAL, we always re-yield ALL input, never filter anything
// This is the key difference from regular JOIN
if (!state->iterator_.has_value()) {
state->iterator_ = state->generator_.begin();
}

// Just pass through the entire input stream
return LoopControl::breakWithYieldAll(ql::ranges::subrange(
state->iterator_.value(), state->generator_.end()));
}};
return Result::LazyResult{std::move(range)};
}

// _____________________________________________________________________________
std::pair<Result::LazyResult, Result::LazyResult>
IndexScan::prefilterTablesForOptional(Result::LazyResult input,
ColumnIndex joinColumn) {
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
auto metaBlocks = getMetadataForScan();

if (!metaBlocks.has_value()) {
// Return empty results
return {Result::LazyResult{}, Result::LazyResult{}};
}

auto state = std::make_shared<SharedGeneratorState>(SharedGeneratorState{
std::move(input), joinColumn, std::move(metaBlocks.value())});
// Use the OPTIONAL version for the left side (never filters)
// and the regular version for the right side (still prefilters)
return {createPrefilteredJoinSideForOptional(state),
createPrefilteredIndexScanSide(state)};
}

// _____________________________________________________________________________
std::unique_ptr<Operation> IndexScan::cloneImpl() const {
return std::make_unique<IndexScan>(
Expand Down
31 changes: 24 additions & 7 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ class IndexScan final : public Operation {
std::pair<Result::LazyResult, Result::LazyResult> prefilterTables(
Result::LazyResult input, ColumnIndex joinColumn);

// Similar to `prefilterTables`, but for OPTIONAL semantics: The first
// generator re-yields ALL input (never skips any), while the second generator
// still yields only the matching prefiltered blocks. This ensures that
// OPTIONAL joins produce output for all left rows, even when they don't
// match.
std::pair<Result::LazyResult, Result::LazyResult> prefilterTablesForOptional(
Result::LazyResult input, ColumnIndex joinColumn);

private:
// Implementation detail that allows to consume a lazy range from two other
// cooperating ranges. Needs to be forward declared as it is used by
Expand All @@ -147,6 +155,11 @@ class IndexScan final : public Operation {
Result::LazyResult createPrefilteredIndexScanSide(
std::shared_ptr<SharedGeneratorState> innerState);

// Helper function for OPTIONAL semantics: creates a lazy range that re-yields
// ALL input without filtering (even inputs that don't have matching blocks).
static Result::LazyResult createPrefilteredJoinSideForOptional(
std::shared_ptr<SharedGeneratorState> innerState);

// TODO<joka921> Make the `getSizeEstimateBeforeLimit()` function `const` for
// ALL the `Operations`.
uint64_t getSizeEstimateBeforeLimit() override { return sizeEstimate_; }
Expand Down Expand Up @@ -208,6 +221,17 @@ class IndexScan final : public Operation {
// Retrieve the `Permutation` entity for this `IndexScan`.
const Permutation& permutation() const;

// Access the metadata and blocks for this scan. Used by join operations to
// perform block-level prefiltering. Returns `std::nullopt` if the scan
// doesn't have metadata (e.g., for very small relations).
std::optional<Permutation::MetadataAndBlocks> getMetadataForScan() const;

// Get a lazy scan that only reads the specified blocks. Used by join
// operations to scan only the prefiltered blocks.
CompressedRelationReader::IdTableGeneratorInputRange getLazyScan(
std::optional<std::vector<CompressedBlockMetadata>> blocks =
std::nullopt) const;

private:
std::unique_ptr<Operation> cloneImpl() const override;

Expand Down Expand Up @@ -253,13 +277,6 @@ class IndexScan final : public Operation {
// `Permutation` class.
ScanSpecAndBlocks getScanSpecAndBlocks() const;

// Helper functions for the public `getLazyScanFor...` methods and
// `chunkedIndexScan` (see above).
CompressedRelationReader::IdTableGeneratorInputRange getLazyScan(
std::optional<std::vector<CompressedBlockMetadata>> blocks =
std::nullopt) const;
std::optional<Permutation::MetadataAndBlocks> getMetadataForScan() const;

// If the `varsToKeep_` member is set, meaning that this `IndexScan` only
// returns a subset of this actual columns, return the subset of columns that
// has to be applied to the "full" result (without any columns stripped) to
Expand Down
36 changes: 36 additions & 0 deletions src/engine/JoinHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ static constexpr size_t CHUNK_SIZE = 100'000;

using namespace ad_utility;

// Forward declaration for getRowAdderForJoin
class Operation;

using OptionalPermutation = std::optional<std::vector<ColumnIndex>>;

// _____________________________________________________________________________
Expand Down Expand Up @@ -123,6 +126,39 @@ CPP_template_2(typename ActionT)(
});
}

// Helper function to create a Result from an action, either lazy or
// materialized depending on the requestLaziness parameter. The action is
// expected to be a callable that takes a callback and returns an
// IdTableVocabPair. An optional permutation can be applied to the result.
template <typename Action, typename GetSortedOn>
inline Result createResultFromAction(bool requestLaziness, Action&& action,
GetSortedOn&& getSortedOn,
OptionalPermutation permutation = {}) {
if (requestLaziness) {
return {runLazyJoinAndConvertToGenerator(std::forward<Action>(action),
std::move(permutation)),
getSortedOn()};
} else {
auto [idTable, localVocab] = action(ad_utility::noop);
applyPermutation(idTable, permutation);
return {std::move(idTable), getSortedOn(), std::move(localVocab)};
}
}

// Helper function to create an AddCombinedRowToIdTable for join operations.
// This encapsulates the common pattern of constructing the row adder with
// parameters derived from the operation.
inline auto getRowAdderForJoin(
const Operation& op, size_t numJoinColumns, bool keepJoinColumns,
AddCombinedRowToIdTable::BlockwiseCallback yieldTable) {
return AddCombinedRowToIdTable{numJoinColumns,
IdTable{op.getResultWidth(), op.allocator()},
op.cancellationHandle_,
keepJoinColumns,
CHUNK_SIZE,
std::move(yieldTable)};
}

// Helper function to check if the join of two columns propagate the value
// returned by `Operation::columnOriginatesFromGraphOrUndef`.
inline bool doesJoinProduceGuaranteedGraphValuesOrUndef(
Expand Down
207 changes: 207 additions & 0 deletions src/engine/JoinWithIndexScanHelpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2025, University of Freiburg
// Chair of Algorithms and Data Structures
// Author: Johannes Kalmbach ([email protected])

#ifndef QLEVER_SRC_ENGINE_JOINWITHINDEXSCANHELPERS_H
#define QLEVER_SRC_ENGINE_JOINWITHINDEXSCANHELPERS_H

#include "engine/AddCombinedRowToTable.h"
#include "engine/IndexScan.h"
#include "engine/Result.h"
#include "index/CompressedRelation.h"
#include "util/Iterators.h"
#include "util/JoinAlgorithms/JoinAlgorithms.h"
#include "util/JoinAlgorithms/JoinColumnMapping.h"
#include "util/MemoryHelpers.h"

namespace qlever::joinWithIndexScanHelpers {

// Tag types to indicate the join semantics
struct InnerJoinTag {};
struct OptionalJoinTag {};
struct MinusTag {};

// Helper to convert generators to the format expected by join algorithms
using IteratorWithSingleCol =
ad_utility::InputRangeTypeErased<ad_utility::IdTableAndFirstCol<IdTable>>;

inline IteratorWithSingleCol convertGenerator(
CompressedRelationReader::IdTableGeneratorInputRange&& gen,
IndexScan& scan) {
// Store the generator in a wrapper so we can access its details after moving
auto generatorStorage =
std::make_shared<CompressedRelationReader::IdTableGeneratorInputRange>(
std::move(gen));

using SendPriority = RuntimeInformation::SendPriority;

auto range = ad_utility::CachingTransformInputRange(
*generatorStorage,
[generatorStorage, &scan,
sendPriority = SendPriority::Always](auto& table) mutable {
scan.updateRuntimeInfoForLazyScan(generatorStorage->details(),
sendPriority);
sendPriority = SendPriority::IfDue;
// IndexScans don't have a local vocabulary, so we can just use an empty
// one.
return ad_utility::IdTableAndFirstCol{std::move(table), LocalVocab{}};
});

return IteratorWithSingleCol{std::move(range)};
}

// Helper to get blocks for join based on number of join columns
inline std::array<CompressedRelationReader::IdTableGeneratorInputRange, 2>
getBlocksForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2,
size_t numJoinColumns) {
AD_CONTRACT_CHECK(s1.numVariables() <= 3 && s2.numVariables() <= 3);
AD_CONTRACT_CHECK(s1.numVariables() >= 1 && s2.numVariables() >= 1);

auto metaBlocks1 = s1.getMetadataForScan();
auto metaBlocks2 = s2.getMetadataForScan();

if (!metaBlocks1.has_value() || !metaBlocks2.has_value()) {
return {{}};
}

std::array<std::vector<CompressedBlockMetadata>, 2> blocks;
if (numJoinColumns == 1) {
blocks = CompressedRelationReader::getBlocksForJoin(metaBlocks1.value(),
metaBlocks2.value());
} else {
blocks = CompressedRelationReader::getBlocksForJoinMultiColumn(
metaBlocks1.value(), metaBlocks2.value(), numJoinColumns);
}

std::array<CompressedRelationReader::IdTableGeneratorInputRange, 2> result{
s1.getLazyScan(blocks[0]), s2.getLazyScan(blocks[1])};
result[0].details().numBlocksAll_ = metaBlocks1.value().sizeBlockMetadata_;
result[1].details().numBlocksAll_ = metaBlocks2.value().sizeBlockMetadata_;
return result;
}

// Helper to check if the first row of any of the specified columns contains
// UNDEF values. Returns true if any join column in the first row is undefined,
// false otherwise. Returns false if the table is empty.
inline bool firstRowHasUndef(
const IdTable& table,
const std::vector<std::array<ColumnIndex, 2>>& joinColumns,
size_t sideIndex) {
if (table.empty()) {
return false;
}
for (const auto& jc : joinColumns) {
if (table.at(0, jc[sideIndex]).isUndefined()) {
return true;
}
}
return false;
}

// Helper to get blocks for join of a column with a scan (multi-column version)
inline CompressedRelationReader::IdTableGeneratorInputRange
getBlocksForJoinOfColumnsWithScan(
const IdTable& idTable,
const std::vector<std::array<ColumnIndex, 2>>& joinColumns,
const IndexScan& scan, ColumnIndex scanJoinColIndex) {
AD_EXPENSIVE_CHECK(ql::ranges::is_sorted(
idTable.getColumn(joinColumns[scanJoinColIndex][0])));
AD_CORRECTNESS_CHECK(scan.numVariables() <= 3 && scan.numVariables() > 0);

auto metaBlocks = scan.getMetadataForScan();
if (!metaBlocks.has_value()) {
return {};
}

// Cannot prefilter if first row has UNDEF in any join column
if (firstRowHasUndef(idTable, joinColumns, 0)) {
return {};
}

CompressedRelationReader::GetBlocksForJoinResult blocksResult;

if (joinColumns.size() == 1) {
auto joinColumn = idTable.getColumn(joinColumns[0][0]);
blocksResult = CompressedRelationReader::getBlocksForJoin(
joinColumn, metaBlocks.value());
} else if (joinColumns.size() == 2) {
auto col1 = idTable.getColumn(joinColumns[0][0]);
auto col2 = idTable.getColumn(joinColumns[1][0]);
blocksResult = CompressedRelationReader::getBlocksForJoinMultiColumn(
col1, col2, metaBlocks.value());
} else if (joinColumns.size() == 3) {
auto col1 = idTable.getColumn(joinColumns[0][0]);
auto col2 = idTable.getColumn(joinColumns[1][0]);
auto col3 = idTable.getColumn(joinColumns[2][0]);
blocksResult = CompressedRelationReader::getBlocksForJoinMultiColumn(
col1, col2, col3, metaBlocks.value());
} else {
AD_FAIL();
}

auto result = scan.getLazyScan(std::move(blocksResult.matchingBlocks_));
result.details().numBlocksAll_ = metaBlocks.value().sizeBlockMetadata_;
return result;
}

// Helper to convert prefiltered lazy generators to the format expected by
// zipperJoinForBlocksWithPotentialUndef. Takes the left and right generators
// from prefilterTablesForOptional and converts them to ranges of
// IdTableAndFirstCol with appropriate column permutations applied.
inline auto convertPrefilteredGenerators(
std::shared_ptr<Result::LazyResult> leftGenerator,
std::shared_ptr<Result::LazyResult> rightGenerator, size_t leftWidth,
ColumnIndex rightJoinColumn) {
// Create identity permutation for left (all columns in order)
std::vector<ColumnIndex> identityPerm(leftWidth);
std::iota(identityPerm.begin(), identityPerm.end(), 0);

auto leftRange = ad_utility::CachingTransformInputRange(
std::move(*leftGenerator), [identityPerm](auto& pair) {
return ad_utility::IdTableAndFirstCol{
pair.idTable_.asColumnSubsetView(identityPerm),
std::move(pair.localVocab_)};
});

// Right permutation puts the join column first
std::vector<ColumnIndex> rightPerm = {rightJoinColumn};
auto rightRange = ad_utility::CachingTransformInputRange(
std::move(*rightGenerator), [rightPerm](auto& pair) {
return ad_utility::IdTableAndFirstCol{
pair.idTable_.asColumnSubsetView(rightPerm),
std::move(pair.localVocab_)};
});

return std::pair{std::move(leftRange), std::move(rightRange)};
}

// Helper to set scan status to lazily completed (variadic, accepts 1+ scans)
template <typename... Scans>
inline void setScanStatusToLazilyCompleted(Scans&... scans) {
(void(scans.runtimeInfo().status_ =
RuntimeInformation::Status::lazilyMaterializedCompleted),
...);
}

// Helper to get unfiltered blocks for the left scan and filtered blocks for
// the right scan. Returns shared_ptrs ready for use in action lambdas.
// Used by OptionalJoin and Minus where the left side must be complete and only
// the right side can be prefiltered.
inline auto getUnfilteredLeftAndFilteredRightSideFromIndexScans(
IndexScan& leftScan, IndexScan& rightScan, size_t numJoinColumns) {
auto leftMetaBlocks = leftScan.getMetadataForScan();

auto leftBlocks = leftScan.getLazyScan(std::nullopt);
leftBlocks.details().numBlocksAll_ =
leftMetaBlocks.value().sizeBlockMetadata_;

auto rightBlocks =
getBlocksForJoinOfTwoScans(leftScan, rightScan, numJoinColumns);

return std::pair{ad_utility::toSharedPtr(std::move(leftBlocks)),
ad_utility::toSharedPtr(std::move(rightBlocks[1]))};
}

} // namespace qlever::joinWithIndexScanHelpers

#endif // QLEVER_SRC_ENGINE_JOINWITHINDEXSCANHELPERS_H
Loading
Loading