|
13 | 13 | #include "engine/QueryExecutionTree.h" |
14 | 14 | #include "index/IndexImpl.h" |
15 | 15 | #include "parser/ParsedQuery.h" |
| 16 | +#include "util/Generator.h" |
| 17 | +#include "util/GeneratorConverter.h" |
| 18 | +#include "util/InputRangeUtils.h" |
| 19 | +#include "util/Iterators.h" |
16 | 20 |
|
17 | 21 | using std::string; |
18 | 22 | using LazyScanMetadata = CompressedRelationReader::LazyScanMetadata; |
@@ -235,10 +239,11 @@ IndexScan::makeCopyWithPrefilteredScanSpecAndBlocks( |
235 | 239 | } |
236 | 240 |
|
237 | 241 | // _____________________________________________________________________________ |
238 | | -Result::Generator IndexScan::chunkedIndexScan() const { |
239 | | - for (IdTable& idTable : getLazyScan()) { |
240 | | - co_yield {std::move(idTable), LocalVocab{}}; |
241 | | - } |
| 242 | +Result::LazyResult IndexScan::chunkedIndexScan() const { |
| 243 | + return Result::LazyResult{ |
| 244 | + ad_utility::CachingTransformInputRange(getLazyScan(), [](auto& table) { |
| 245 | + return Result::IdTableVocabPair{std::move(table), LocalVocab{}}; |
| 246 | + })}; |
242 | 247 | } |
243 | 248 |
|
244 | 249 | // _____________________________________________________________________________ |
@@ -372,13 +377,13 @@ Permutation::IdTableGenerator IndexScan::getLazyScan( |
372 | 377 | auto lazyScanAllCols = getScanPermutation().lazyScan( |
373 | 378 | scanSpecAndBlocks_, filteredBlocks, additionalColumns(), |
374 | 379 | cancellationHandle_, locatedTriplesSnapshot(), getLimitOffset()); |
375 | | - auto& detailsRef = co_await cppcoro::getDetails; |
376 | | - lazyScanAllCols.setDetailsPointer(&detailsRef); |
377 | | - auto applySubset = makeApplyColumnSubset(); |
378 | 380 |
|
379 | | - for (auto& table : lazyScanAllCols) { |
380 | | - co_yield applySubset(std::move(table)); |
381 | | - } |
| 381 | + return cppcoro::fromInputRange( |
| 382 | + ad_utility::InputRangeTypeErased<IdTable, LazyScanMetadata>( |
| 383 | + ad_utility::CachingTransformInputRange< |
| 384 | + ad_utility::OwningView<Permutation::IdTableGenerator>, |
| 385 | + decltype(makeApplyColumnSubset()), LazyScanMetadata>{ |
| 386 | + std::move(lazyScanAllCols), makeApplyColumnSubset()})); |
382 | 387 | }; |
383 | 388 |
|
384 | 389 | // _____________________________________________________________________________ |
@@ -514,10 +519,7 @@ struct IndexScan::SharedGeneratorState { |
514 | 519 | iterator_ = generator_.begin(); |
515 | 520 | } |
516 | 521 | auto& iterator = iterator_.value(); |
517 | | - while (iterator != generator_.end()) { |
518 | | - if (!iterator->idTable_.empty()) { |
519 | | - break; |
520 | | - } |
| 522 | + while (iterator != generator_.end() && iterator->idTable_.empty()) { |
521 | 523 | ++iterator; |
522 | 524 | } |
523 | 525 | doneFetching_ = iterator_ == generator_.end(); |
@@ -586,75 +588,104 @@ struct IndexScan::SharedGeneratorState { |
586 | 588 | }; |
587 | 589 |
|
588 | 590 | // _____________________________________________________________________________ |
589 | | -Result::Generator IndexScan::createPrefilteredJoinSide( |
| 591 | +Result::LazyResult IndexScan::createPrefilteredJoinSide( |
590 | 592 | std::shared_ptr<SharedGeneratorState> innerState) { |
591 | | - if (innerState->hasUndef()) { |
592 | | - AD_CORRECTNESS_CHECK(innerState->prefetchedValues_.empty()); |
593 | | - for (auto& value : ql::ranges::subrange{innerState->iterator_.value(), |
594 | | - innerState->generator_.end()}) { |
595 | | - co_yield value; |
596 | | - } |
597 | | - co_return; |
598 | | - } |
599 | | - auto& prefetchedValues = innerState->prefetchedValues_; |
600 | | - while (true) { |
601 | | - if (prefetchedValues.empty()) { |
602 | | - if (innerState->doneFetching_) { |
603 | | - co_return; |
604 | | - } |
605 | | - innerState->fetch(); |
606 | | - AD_CORRECTNESS_CHECK(!prefetchedValues.empty() || |
607 | | - innerState->doneFetching_); |
608 | | - } |
609 | | - // Make a defensive copy of the values to avoid modification during |
610 | | - // iteration when yielding. |
611 | | - auto copy = std::move(prefetchedValues); |
612 | | - // Moving out does not necessarily clear the values, so we do it explicitly. |
613 | | - prefetchedValues.clear(); |
614 | | - for (auto& value : copy) { |
615 | | - co_yield value; |
616 | | - } |
617 | | - } |
| 593 | + using LoopControl = ad_utility::LoopControl<Result::IdTableVocabPair>; |
| 594 | + |
| 595 | + auto range = ad_utility::InputRangeFromLoopControlGet{ |
| 596 | + [state = std::move(innerState)]() mutable { |
| 597 | + // Handle UNDEF case: pass through remaining input |
| 598 | + if (state->hasUndef()) { |
| 599 | + if (!state->iterator_.has_value()) { |
| 600 | + state->iterator_ = state->generator_.begin(); |
| 601 | + } |
| 602 | + return LoopControl::breakWithYieldAll(ql::ranges::subrange( |
| 603 | + state->iterator_.value(), state->generator_.end())); |
| 604 | + } |
| 605 | + |
| 606 | + auto& prefetched = state->prefetchedValues_; |
| 607 | + |
| 608 | + if (prefetched.empty() && !state->doneFetching_) { |
| 609 | + state->fetch(); |
| 610 | + } |
| 611 | + |
| 612 | + if (prefetched.empty()) { |
| 613 | + AD_CORRECTNESS_CHECK(state->doneFetching_); |
| 614 | + return LoopControl::makeBreak(); |
| 615 | + } |
| 616 | + |
| 617 | + // Make a defensive copy of the values to avoid modification during |
| 618 | + // iteration when yielding. |
| 619 | + auto copy = std::move(prefetched); |
| 620 | + prefetched.clear(); |
| 621 | + |
| 622 | + // Yield all the newly found values |
| 623 | + return LoopControl::yieldAll(std::move(copy)); |
| 624 | + }}; |
| 625 | + return Result::LazyResult{std::move(range)}; |
618 | 626 | } |
619 | 627 |
|
620 | 628 | // _____________________________________________________________________________ |
621 | | -Result::Generator IndexScan::createPrefilteredIndexScanSide( |
| 629 | +Result::LazyResult IndexScan::createPrefilteredIndexScanSide( |
622 | 630 | std::shared_ptr<SharedGeneratorState> innerState) { |
623 | | - if (innerState->hasUndef()) { |
624 | | - for (auto& pair : chunkedIndexScan()) { |
625 | | - co_yield pair; |
626 | | - } |
627 | | - co_return; |
628 | | - } |
629 | | - LazyScanMetadata metadata; |
630 | | - auto& pendingBlocks = innerState->pendingBlocks_; |
631 | | - while (true) { |
632 | | - if (pendingBlocks.empty()) { |
633 | | - if (innerState->doneFetching_) { |
634 | | - metadata.numBlocksAll_ = innerState->metaBlocks_.sizeBlockMetadata_; |
635 | | - updateRuntimeInfoForLazyScan(metadata); |
636 | | - co_return; |
637 | | - } |
638 | | - innerState->fetch(); |
639 | | - } |
640 | | - auto scan = getLazyScan(std::move(pendingBlocks)); |
641 | | - AD_CORRECTNESS_CHECK(pendingBlocks.empty()); |
642 | | - for (IdTable& idTable : scan) { |
643 | | - co_yield {std::move(idTable), LocalVocab{}}; |
644 | | - } |
645 | | - metadata.aggregate(scan.details()); |
646 | | - } |
| 631 | + using LoopControl = ad_utility::LoopControl<Result::IdTableVocabPair>; |
| 632 | + |
| 633 | + auto range = ad_utility::InputRangeFromLoopControlGet{ |
| 634 | + [this, state = std::move(innerState), |
| 635 | + metadata = LazyScanMetadata{}]() mutable { |
| 636 | + // Handle UNDEF case using LoopControl pattern |
| 637 | + if (state->hasUndef()) { |
| 638 | + return LoopControl::breakWithYieldAll(chunkedIndexScan()); |
| 639 | + } |
| 640 | + |
| 641 | + auto& pendingBlocks = state->pendingBlocks_; |
| 642 | + |
| 643 | + while (pendingBlocks.empty()) { |
| 644 | + if (state->doneFetching_) { |
| 645 | + metadata.numBlocksAll_ = state->metaBlocks_.sizeBlockMetadata_; |
| 646 | + updateRuntimeInfoForLazyScan(metadata); |
| 647 | + return LoopControl::makeBreak(); |
| 648 | + } |
| 649 | + state->fetch(); |
| 650 | + } |
| 651 | + |
| 652 | + // We now have non-empty pending blocks |
| 653 | + auto scan = getLazyScan(std::move(pendingBlocks)); |
| 654 | + AD_CORRECTNESS_CHECK(pendingBlocks.empty()); |
| 655 | + |
| 656 | + // Capture scan details by reference so we get the updated values |
| 657 | + const auto& scanDetails = scan.details(); |
| 658 | + |
| 659 | + // Transform the scan to Result::IdTableVocabPair and yield all |
| 660 | + auto transformedScan = ad_utility::CachingTransformInputRange( |
| 661 | + std::move(scan), [](auto& table) { |
| 662 | + return Result::IdTableVocabPair{std::move(table), LocalVocab{}}; |
| 663 | + }); |
| 664 | + |
| 665 | + // Use CallbackOnEndView to aggregate metadata after scan is consumed |
| 666 | + auto callback = ad_utility::makeAssignableLambda( |
| 667 | + [&metadata, &scanDetails]() mutable { |
| 668 | + metadata.aggregate(scanDetails); |
| 669 | + }); |
| 670 | + |
| 671 | + auto scanWithCallback = ad_utility::CallbackOnEndView{ |
| 672 | + std::move(transformedScan), std::move(callback)}; |
| 673 | + return LoopControl::yieldAll(std::move(scanWithCallback)); |
| 674 | + }}; |
| 675 | + return Result::LazyResult{std::move(range)}; |
647 | 676 | } |
648 | 677 |
|
649 | 678 | // _____________________________________________________________________________ |
650 | | -std::pair<Result::Generator, Result::Generator> IndexScan::prefilterTables( |
| 679 | +std::pair<Result::LazyResult, Result::LazyResult> IndexScan::prefilterTables( |
651 | 680 | Result::LazyResult input, ColumnIndex joinColumn) { |
652 | 681 | AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0); |
653 | 682 | auto metaBlocks = getMetadataForScan(); |
654 | 683 |
|
655 | 684 | if (!metaBlocks.has_value()) { |
656 | | - return {Result::Generator{}, Result::Generator{}}; |
| 685 | + // Return empty results |
| 686 | + return {Result::LazyResult{}, Result::LazyResult{}}; |
657 | 687 | } |
| 688 | + |
658 | 689 | auto state = std::make_shared<SharedGeneratorState>( |
659 | 690 | std::move(input), joinColumn, std::move(metaBlocks.value())); |
660 | 691 | return {createPrefilteredJoinSide(state), |
|
0 commit comments