Skip to content

Commit

Permalink
Lazy TransitivePath operation (#1595)
Browse files Browse the repository at this point in the history
This PR enables the `TransitivePath` operation to yield its result lazily and to consume its left/right child lazily. Note that the graph which is transitively traversed needs to be fully materialized due to the underlying algorithm. E.G when computing the (large) result of `wdt:P31/wdt:P279*`, the large result and the `wdt:P31` can be dealt with lazily, but the full `wdt:P279` predicate needs to be materialized.
  • Loading branch information
RobinTF authored Nov 4, 2024
1 parent 7bd2438 commit a090167
Show file tree
Hide file tree
Showing 7 changed files with 591 additions and 380 deletions.
124 changes: 64 additions & 60 deletions src/engine/TransitivePathBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,76 +63,80 @@ TransitivePathBase::decideDirection() {
}

// _____________________________________________________________________________
void TransitivePathBase::fillTableWithHull(IdTable& table, const Map& hull,
std::vector<Id>& nodes,
size_t startSideCol,
size_t targetSideCol,
const IdTable& startSideTable,
size_t skipCol) const {
CALL_FIXED_SIZE((std::array{table.numColumns(), startSideTable.numColumns()}),
&TransitivePathBase::fillTableWithHullImpl, this, table, hull,
nodes, startSideCol, targetSideCol, startSideTable, skipCol);
Result::Generator TransitivePathBase::fillTableWithHull(
NodeGenerator hull, size_t startSideCol, size_t targetSideCol,
size_t skipCol, bool yieldOnce, size_t inputWidth) const {
return ad_utility::callFixedSize(
std::array{inputWidth, getResultWidth()},
[&]<size_t INPUT_WIDTH, size_t OUTPUT_WIDTH>() {
return fillTableWithHullImpl<INPUT_WIDTH, OUTPUT_WIDTH>(
std::move(hull), startSideCol, targetSideCol, yieldOnce, skipCol);
});
}

// _____________________________________________________________________________
template <size_t WIDTH, size_t START_WIDTH>
void TransitivePathBase::fillTableWithHullImpl(
IdTable& tableDyn, const Map& hull, std::vector<Id>& nodes,
size_t startSideCol, size_t targetSideCol, const IdTable& startSideTable,
size_t skipCol) const {
IdTableStatic<WIDTH> table = std::move(tableDyn).toStatic<WIDTH>();
IdTableView<START_WIDTH> startView =
startSideTable.asStaticView<START_WIDTH>();

size_t rowIndex = 0;
for (size_t i = 0; i < nodes.size(); i++) {
Id node = nodes[i];
auto it = hull.find(node);
if (it == hull.end()) {
continue;
}

for (Id otherNode : it->second) {
table.emplace_back();
table(rowIndex, startSideCol) = node;
table(rowIndex, targetSideCol) = otherNode;

copyColumns<START_WIDTH, WIDTH>(startView, table, i, rowIndex, skipCol);

rowIndex++;
}
}

tableDyn = std::move(table).toDynamic();
}

// _____________________________________________________________________________
void TransitivePathBase::fillTableWithHull(IdTable& table, const Map& hull,
size_t startSideCol,
size_t targetSideCol) const {
CALL_FIXED_SIZE((std::array{table.numColumns()}),
&TransitivePathBase::fillTableWithHullImpl, this, table, hull,
startSideCol, targetSideCol);
Result::Generator TransitivePathBase::fillTableWithHull(NodeGenerator hull,
size_t startSideCol,
size_t targetSideCol,
bool yieldOnce) const {
return ad_utility::callFixedSize(getResultWidth(), [&]<size_t WIDTH>() {
return fillTableWithHullImpl<0, WIDTH>(std::move(hull), startSideCol,
targetSideCol, yieldOnce);
});
}

// _____________________________________________________________________________
template <size_t WIDTH>
void TransitivePathBase::fillTableWithHullImpl(IdTable& tableDyn,
const Map& hull,
size_t startSideCol,
size_t targetSideCol) const {
IdTableStatic<WIDTH> table = std::move(tableDyn).toStatic<WIDTH>();
size_t rowIndex = 0;
for (auto const& [node, linkedNodes] : hull) {
template <size_t INPUT_WIDTH, size_t OUTPUT_WIDTH>
Result::Generator TransitivePathBase::fillTableWithHullImpl(
NodeGenerator hull, size_t startSideCol, size_t targetSideCol,
bool yieldOnce, size_t skipCol) const {
ad_utility::Timer timer{ad_utility::Timer::Stopped};
size_t outputRow = 0;
IdTableStatic<OUTPUT_WIDTH> table{getResultWidth(), allocator()};
std::vector<LocalVocab> storedLocalVocabs;
for (auto& [node, linkedNodes, localVocab, idTable, inputRow] : hull) {
timer.cont();
// As an optimization nodes without any linked nodes should not get yielded
// in the first place.
AD_CONTRACT_CHECK(!linkedNodes.empty());
if (!yieldOnce) {
table.reserve(linkedNodes.size());
}
std::optional<IdTableView<INPUT_WIDTH>> inputView = std::nullopt;
if (idTable != nullptr) {
inputView = idTable->template asStaticView<INPUT_WIDTH>();
}
for (Id linkedNode : linkedNodes) {
table.emplace_back();
table(rowIndex, startSideCol) = node;
table(rowIndex, targetSideCol) = linkedNode;
table(outputRow, startSideCol) = node;
table(outputRow, targetSideCol) = linkedNode;

rowIndex++;
if (inputView.has_value()) {
copyColumns<INPUT_WIDTH, OUTPUT_WIDTH>(inputView.value(), table,
inputRow, outputRow, skipCol);
}

outputRow++;
}

if (yieldOnce) {
storedLocalVocabs.emplace_back(std::move(localVocab));
} else {
timer.stop();
runtimeInfo().addDetail("IdTable fill time", timer.msecs());
co_yield {std::move(table).toDynamic(), std::move(localVocab)};
table = IdTableStatic<OUTPUT_WIDTH>{getResultWidth(), allocator()};
outputRow = 0;
}
timer.stop();
}
if (yieldOnce) {
timer.start();
LocalVocab mergedVocab{};
mergedVocab.mergeWith(storedLocalVocabs);
runtimeInfo().addDetail("IdTable fill time", timer.msecs());
co_yield {std::move(table).toDynamic(), std::move(mergedVocab)};
}
tableDyn = std::move(table).toDynamic();
}

// _____________________________________________________________________________
Expand Down Expand Up @@ -405,7 +409,7 @@ void TransitivePathBase::copyColumns(const IdTableView<INPUT_WIDTH>& inputTable,
continue;
}

outputTable(outputRow, outCol) = inputTable(inputRow, inCol);
outputTable.at(outputRow, outCol) = inputTable.at(inputRow, inCol);
inCol++;
outCol++;
}
Expand Down
65 changes: 42 additions & 23 deletions src/engine/TransitivePathBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,31 @@ using Map = std::unordered_map<
Id, Set, HashId, std::equal_to<Id>,
ad_utility::AllocatorWithLimit<std::pair<const Id, Set>>>;

// Helper struct, that allows a generator to yield a a node and all its
// connected nodes (the `targets`), along with a local vocabulary and the row
// index of the node in the input table. The `IdTable` pointer might be null if
// the `Id` is not associated with a table. In this case the `row` value does
// not represent anything meaningful and should not be used.
struct NodeWithTargets {
Id node_;
Set targets_;
LocalVocab localVocab_;
const IdTable* idTable_;
size_t row_;

// Explicit to prevent issues with co_yield and lifetime.
// See https://gcc.gnu.org/bugzilla/show_bug.cgi?id=103909 for more info.
NodeWithTargets(Id node, Set targets, LocalVocab localVocab,
const IdTable* idTable, size_t row)
: node_{node},
targets_{std::move(targets)},
localVocab_{std::move(localVocab)},
idTable_{idTable},
row_{row} {}
};

using NodeGenerator = cppcoro::generator<NodeWithTargets>;

/**
* @class TransitivePathBase
* @brief A common base class for different implementations of the Transitive
Expand Down Expand Up @@ -147,37 +172,36 @@ class TransitivePathBase : public Operation {
* startSideTable to fill in the rest of the columns.
* This function is called if the start side is bound and a variable.
*
* @param table The result table which will be filled.
* @param hull The transitive hull.
* @param nodes The start nodes of the transitive hull. These need to be in
* the same order and amount as the starting side nodes in the startTable.
* @param hull The transitive hull, represented by a generator that yields
* sets of connected nodes with some metadata.
* @param startSideCol The column of the result table for the startSide of the
* hull
* @param targetSideCol The column of the result table for the targetSide of
* the hull
* @param startSideTable An IdTable that holds other results. The other
* results will be transferred to the new result table.
* @param skipCol This column contains the Ids of the start side in the
* startSideTable and will be skipped.
* @param yieldOnce If true, the generator will yield only a single time.
* @param inputWidth The width of the input table that is referenced by the
* elements of `hull`.
*/
void fillTableWithHull(IdTable& table, const Map& hull,
std::vector<Id>& nodes, size_t startSideCol,
size_t targetSideCol, const IdTable& startSideTable,
size_t skipCol) const;
Result::Generator fillTableWithHull(NodeGenerator hull, size_t startSideCol,
size_t targetSideCol, size_t skipCol,
bool yieldOnce, size_t inputWidth) const;

/**
* @brief Fill the given table with the transitive hull.
* This function is called if the sides are unbound or ids.
*
* @param table The result table which will be filled.
* @param hull The transitive hull.
* @param startSideCol The column of the result table for the startSide of the
* hull
* @param targetSideCol The column of the result table for the targetSide of
* the hull
* @param yieldOnce If true, the generator will yield only a single time.
*/
void fillTableWithHull(IdTable& table, const Map& hull, size_t startSideCol,
size_t targetSideCol) const;
Result::Generator fillTableWithHull(NodeGenerator hull, size_t startSideCol,
size_t targetSideCol,
bool yieldOnce) const;

// Copy the columns from the input table to the output table
template <size_t INPUT_WIDTH, size_t OUTPUT_WIDTH>
Expand All @@ -204,16 +228,11 @@ class TransitivePathBase : public Operation {
private:
uint64_t getSizeEstimateBeforeLimit() override;

template <size_t WIDTH, size_t START_WIDTH>
void fillTableWithHullImpl(IdTable& table, const Map& hull,
std::vector<Id>& nodes, size_t startSideCol,
size_t targetSideCol,
const IdTable& startSideTable,
size_t skipCol) const;

template <size_t WIDTH>
void fillTableWithHullImpl(IdTable& table, const Map& hull,
size_t startSideCol, size_t targetSideCol) const;
template <size_t INPUT_WIDTH, size_t OUTPUT_WIDTH>
Result::Generator fillTableWithHullImpl(NodeGenerator hull,
size_t startSideCol,
size_t targetSideCol, bool yieldOnce,
size_t skipCol = 0) const;

public:
size_t getCostEstimate() override;
Expand Down
Loading

0 comments on commit a090167

Please sign in to comment.