Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions src/engine/CartesianProductJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,51 +369,55 @@ Result::LazyResult CartesianProductJoin::createLazyConsumer(
std::vector<std::shared_ptr<const Result>> subresults,
std::shared_ptr<const Result> lazyResult) const {
AD_CONTRACT_CHECK(lazyResult);
std::vector<std::reference_wrapper<const IdTable>> idTables;
std::vector<std::shared_ptr<const IdTable>> idTables;
idTables.reserve(subresults.size() + 1);
for (const auto& result : subresults) {
idTables.emplace_back(result->idTable());
for (auto& result : subresults) {
auto* idTable = &result->idTable();
idTables.emplace_back(std::move(result), idTable);
}
// Placeholder, will be replaced with the current `IdTable` from the lazy
// result.
idTables.emplace_back(std::make_shared<IdTable>(IdTable{0, allocator()}));

auto generatedTables = lazyResult->idTables();

auto get = [self = this, staticMergedVocab = std::move(staticMergedVocab),
limit = getLimitOffset().limitOrDefault(),
offset = getLimitOffset()._offset, idTables = std::move(idTables),
subresults = std::move(subresults), lastTableOffset = size_t{0},
producedTableSize = size_t{0},
idTableOpt = std::optional<Result::IdTableVocabPair>{}](
auto& idTableVocabPair) mutable {
lastTableOffset = size_t{0}, producedTableSize = size_t{0},
lazyResult =
std::move(lazyResult)](auto& idTableVocabPair) mutable {
// These things have to be done after handling a single input, so we do them
// at the beginning of each but the last iteration.
if (idTableOpt.has_value()) {
idTables.pop_back();
lastTableOffset += idTableOpt->idTable_.size();
limit -= producedTableSize;
offset += producedTableSize;
producedTableSize = 0;
}

idTableOpt = std::move(idTableVocabPair);
auto& [idTable, localVocab] = idTableOpt.value();
if (idTable.empty()) {
lastTableOffset += idTables.back()->size();
limit -= producedTableSize;
offset += producedTableSize;
producedTableSize = 0;

auto& [idTable, localVocab] = idTableVocabPair;
// We know that the last id table is mutable (we inserted it ourselves), so
// we can replace it without paying the cost of creating and destroying a
// shared pointer.
const_cast<IdTable&>(*idTables.back()) = std::move(idTable);
if (idTables.back()->empty()) {
return Result::IdTableLoopControl::makeContinue();
}

idTables.emplace_back(idTable);
localVocab.mergeWith(staticMergedVocab);

return Result::IdTableLoopControl::yieldAll(
ad_utility::InputRangeTypeErased{
ad_utility::OwningView{self->produceTablesLazily(
std::move(localVocab),
ql::views::transform(idTables,
ad_utility::staticCast<const IdTable&>),
offset, limit, lastTableOffset)} |
ql::views::transform(idTables, ad_utility::dereference), offset,
limit, lastTableOffset)} |
ql::views::transform([&producedTableSize](auto& tableAndVocab) {
producedTableSize += tableAndVocab.idTable_.size();
return std::move(tableAndVocab);
})});
};
return Result::LazyResult(ad_utility::CachingContinuableTransformInputRange(
lazyResult->idTables(), std::move(get)));
std::move(generatedTables), std::move(get)));
}

// _____________________________________________________________________________
Expand Down
Loading