Ray runner significantly slower than native runner #5812
Replies: 3 comments 7 replies
-
|
@yuchaoran2011 Can you add to the test cases here? cc @colin-ho |
Beta Was this translation helpful? Give feedback.
-
|
Some additional information. I printed out the logical plans under both Ray runner and native runner. It turns out that they are identical, except that in the Ray case, the number of partitions are much larger. An example: Native runner plan for The same partition mismatch also happens for other dataframes defined in the tutorial (e.g. |
Beta Was this translation helpful? Give feedback.
-
|
Figured it out, thanks to a kind member in the community whom I talked to off Github! The trick was to manually repartition the dataframes involved in a join before the join. Now the script can complete successfully in 3 minutes, as opposed to an hour+. I was hoping that Daft would be able to intelligently decide on the number of partitions to use, but looks like manual tuning is still required |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi all, I have tried to run a file deduplication pipeline using Daft by borrowing most of the code from this tutorial: https://docs.daft.ai/en/stable/examples/minhash-dedupe/. The code works well using the native Swordfish runner, processing a 100MB dataset takes a couple of seconds. Now I'm ready to run the same code in cluster mode on Ray. I kept all the code intact except for adding
daft.set_runner_ray()to the beginning of the script. Then for the same input dataset, it became extremely slow. Now it's been 30 minutes and the job still hasn't finished.I've tested that on a much smaller dataset (10 MB), the Ray runner was able to run the script successfully in about a minute. But why does the performance degrade so much on a larger dataset? Initially I thought that maybe the Ray worker communication overhead was dominating, so I tried
ray.init(num_cpus=2)to limit the number of CPUs. But it didn't help.From the console output, I can see that
(InMemoryScan, InMemoryScan)->HashJoin->Project->UnGroupedAggregateis the stage that's most time consuming, which is understandable, but still doesn't explain the huge discrepancy with native runner. I should also add that there's no errors reported. Any ideas?Beta Was this translation helpful? Give feedback.
All reactions