Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Native Dask implementation for area interpolation (Do not merge) #187

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

darribas
Copy link
Member

This is my first attempt at implementing area interpolation fully in Dask (as opposed to using the single-core logic inside the Dask scheduler). The main motivation for this is to obtain correct estimates for intensive/extensive variables, which currently are erroneous using the chunk-by-chunk code that does work for categorical (as discussed in #185 )

A couple of thoughts on what I learned:

  • My sense for why the approach that works for categoricals doesn't work in intensive/extensive is that the weights are built globally:

den = den + (den == 0)
den = 1.0 / den
n = den.shape[0]
den = den.reshape((n,))
den = diags([den], [0])
weights = den.dot(table) # row standardize table

This means we need to build that table of weights before we split things to each worker for independent work. Now, the weights can be built in parallel (this is essentially about filling in different parts of a matrix which are not all related to each other. This is what I attempt in this PR on id_area_table (which is copied from the single-core implementation, works in chunks and is added to the existing Dask computation graph. This returns a three-column Dask dataframe with source ID, target ID and shared area, where only non-zero values are stored (i.e., you never have a row in this table with a value of 0 in the third column. As @knaaptime suggests, this is not a million miles away from the new graph implementation. id_area_table, which builds the cross-over table, can be run in parallel with minimal inter-worker communication, it's performant and returns what is expected.

  • The main problem comes next when we want to use that table to allocate source values into the target geography. Given we have the cross-over table (AoI in Eli's term) as an adjacency table, this is really a set of SQL-type joins and group-by operations. I attempt that here:

https://github.com/darribas/tobler/blob/5ce79e71a89fc4ede93ec22f6eb84b1acf884e4a/tobler/area_weighted/area_interpolate_dask.py#L346-L360

The logic of the code is as follows:

  1. Row standardisation: grab the area of intersections and divide each by the total area of each source geom. The area of each source geom can be taken by grouping by source id and applying a transform that sums it --> weights
  2. Use the resulting weights for each intersection area and join to them the values to be interpolated through the source IDs.
  3. For extensive variables, multiply the source values by the weights (this is multiplying two values in the same row of our table)
  4. Compile output geoms: group-by the table by target IDs and sum inside each group.
  5. Congratulations, you're done!

Now the issue with the logic above is that. although 1., 3., and potentially 4., are fast and very performant on Dask, 2. involves a significant amount of inter-worker communication and, since it is not done on sorted indices (that I can see), it will be slow.

This is where I stopped. A couple of further thoughts:

  • The bottleneck above in 2. does not apply if all the operation is performed in memory (e.g., as a single pandas.DataFrame or polars.DataFrame, so if we can live with in-memory computation only, I think this logic would speed up our current implementation
  • There may be a way I'm not seeing of getting around 2.
  • The whole logic above may not be the ideal and there might be a way to fully chunk up the computation into smaller pieces that need a lightweight aggregation later (similar to how we do categoricals currently), but I haven't found it.

Very happy to explore further options and discuss alternative views!

@knaaptime
Copy link
Member

knaaptime commented Sep 11, 2023

The main problem comes next when we want to use that table to allocate source values into the target geography. Given we have the cross-over table (AoI in Eli's term) as an adjacency table, this is really a set of SQL-type joins and group-by operations. I attempt that here:

so what about just returning area in line 348 (which gives us the row-transformed values of table), without following on to build tmp. Converting this to a dataframe back in singlecore (no need to worry about geoms anymore), this gives us a multi-index series that we can convert directly to sparse (like Graph). This would also amount to a direct replacement for _area_tables_binning since it would result in a scipy.sparse, and multiplying through that is really cheap, so we dont need to worry about distributing it?

The bottleneck above in 2. does not apply if all the operation is performed in memory (e.g., as a single pandas.DataFrame or polars.DataFrame, so if we can live with in-memory computation only, I think this logic would speed up our current implementation

so i say we go ahead and stop here and move back to memory

@darribas
Copy link
Member Author

That sounds good, I'd just be very clear on the documentation these approaches will not work with larger-than-memory data (which the released version for categoricals does). In reality, I expect most use cases to be fine with that, and it is also true that what we bring up in memory does not have geometries so it is significatly cheaper in terms of memory. But we should make it clear in the documentation?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants