You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
removed redundant Shuffles when two operators require hash the same key, e.g. a join and aggregate
decompose aggregate into local agg->shuffle->local agg
What Raco does not do is distribute the join across the decomposed aggregate.
Pagerank case study
Consider the following MyriaL program.
-- Simplified PageRank; assumes that all nodes have out degree > 0
alpha = [.85];
epsilon = [.0001];
Edge = SCAN(public:adhoc:edges);
--Vertex = SCAN(public:adhoc:vertices);
-- the distinct and union query to calculate vertex
srcs = select src as id, src as fake from Edge;
dsts = select dst as id, dst as fake from Edge;
dups = UNIONALL(srcs, dsts);
vertexfake = select id, max(fake) from dups; --dedup
Vertex = select id from vertexfake;
N = [FROM Vertex EMIT COUNT(id) AS val];
min_rank = [(1 - *alpha) / *N];
OutDegree = [FROM Edge EMIT Edge.src AS id, COUNT(Edge.dst) AS cnt];
PageRank = [FROM Vertex EMIT Vertex.id AS id, 1.0 / *N AS rank];
DO
-- Calculate each node's outbound page rank contribution
PrOut = [FROM PageRank, OutDegree WHERE PageRank.id == OutDegree.id
EMIT PageRank.id AS id, PageRank.rank / OutDegree.cnt AS out_rank];
-- Compute the inbound summands for each node
Summand = [FROM PrOut, Edge, Vertex
WHERE Edge.dst == Vertex.id AND Edge.src == PrOut.id
EMIT Vertex.id AS id, PrOut.out_rank AS summand];
-- Sum up the summands; adjust by alpha
NewPageRank = [FROM Summand EMIT id AS id,
*min_rank + *alpha * SUM(Summand.summand) AS rank];
Delta = [FROM NewPageRank, PageRank WHERE NewPageRank.id == PageRank.id
EMIT ABS(NewPageRank.rank - PageRank.rank) AS val];
Continue = [FROM Delta EMIT MAX(Delta.val) > *epsilon];
PageRank = NewPageRank;
WHILE Continue;
STORE(PageRank, OUTPUT);
The resulting Myria plan is here, with the relevant subquery highlighted. myria_plan.pdf
This subquery is responsible for the real work. Pagerank for a target vertex is the sum of all pageranks from vertices that point to it. In relational algebra, this is a join between the vertex relation (target vertex), the edge relation, and the pagerank relation (vertices that point to the target), followed by a SUM over those pagerank contributions, grouped by target vertex.
Notice that the entire join between source vertex, edge, and destination vertex is computed before any aggregation. This means that Myria doesn't get to aggregate local pagerank contributions before communicating them. Instead Raco could have done JOIN(vertex,edge,prout)->Groupby(SUM)->Shuffle->JOIN(vertex,edge,prout)->Groupby(SUM).
The text was updated successfully, but these errors were encountered:
Here are some benchmarking results with Radish. Just pay attention to good join order and w/o group by opt. The good join order uses the same plan as the Myria one above and w/o group by opt just removes optimization 2 from the top of this issue.
Notice that the difference between not doing aggregate decomposition (optimization 2 at the top of this issue) and doing it is not that much. I hypothesize that it is because of the subject of this issue. In both plans we pretty much have to communicate O(V*Degree); all the communication is already done after the join happens, so decomposing the aggregate is not all that helpful. We really want O(V*Partitions), which is what handwritten does. I think the proposal in this issue would get us a lot closer to the handwritten code.
Raco already has rules to:
Shuffle
s when two operators require hash the same key, e.g. a join and aggregateWhat Raco does not do is distribute the join across the decomposed aggregate.
Pagerank case study
Consider the following MyriaL program.
The resulting Myria plan is here, with the relevant subquery highlighted.
myria_plan.pdf
This subquery is responsible for the real work. Pagerank for a target vertex is the sum of all pageranks from vertices that point to it. In relational algebra, this is a join between the vertex relation (target vertex), the edge relation, and the pagerank relation (vertices that point to the target), followed by a SUM over those pagerank contributions, grouped by target vertex.
Notice that the entire join between source vertex, edge, and destination vertex is computed before any aggregation. This means that Myria doesn't get to aggregate local pagerank contributions before communicating them. Instead Raco could have done JOIN(vertex,edge,prout)->Groupby(SUM)->Shuffle->JOIN(vertex,edge,prout)->Groupby(SUM).
The text was updated successfully, but these errors were encountered: