Skip to content

Commit c675ef1

Browse files
committed
Create rule S7470: PySpark's \"RDD.groupByKey\", when used in conjuction with \"RDD.mapValues\" with a commutative and associative operation, should be replaced by \"RDD.reduceByKey\"
1 parent b36c03c commit c675ef1

File tree

2 files changed

+57
-23
lines changed

2 files changed

+57
-23
lines changed

Diff for: rules/S7470/python/metadata.json

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
{
2-
"title": "FIXME",
2+
"title": "PySpark's \"RDD.groupByKey\", when used in conjuction with \"RDD.mapValues\" with a commutative and associative operation, should be replaced by \"RDD.reduceByKey\"",
33
"type": "CODE_SMELL",
44
"status": "ready",
55
"remediation": {
66
"func": "Constant\/Issue",
77
"constantCost": "5min"
88
},
99
"tags": [
10+
"data-science",
11+
"pyspark"
1012
],
1113
"defaultSeverity": "Major",
1214
"ruleSpecification": "RSPEC-7470",
@@ -16,10 +18,8 @@
1618
"quickfix": "unknown",
1719
"code": {
1820
"impacts": {
19-
"MAINTAINABILITY": "HIGH",
20-
"RELIABILITY": "MEDIUM",
21-
"SECURITY": "LOW"
21+
"MAINTAINABILITY": "MEDIUM"
2222
},
23-
"attribute": "CONVENTIONAL"
23+
"attribute": "EFFICIENT"
2424
}
2525
}

Diff for: rules/S7470/python/rule.adoc

+52-18
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,78 @@
1-
FIXME: add a description
1+
This rule raises an issue when `RDD.groupByKey` is used in conjuction with `RDD.mapValues`
2+
and a commutative and associative function instead of `RDD.reduceByKey`.
23

3-
// If you want to factorize the description uncomment the following line and create the file.
4-
//include::../description.adoc[]
54

65
== Why is this an issue?
76

8-
FIXME: remove the unused optional headers (that are commented out)
7+
The PySpark API offers multiple ways of performing aggregation.
8+
When performing aggregations, data is usually shuffled between partitions of course,
9+
this shuffling and its associated cost are needed to compute the result correctly.
10+
11+
There are however cases where some aggregation methods could be more efficient than others.
12+
For example when using `RDD.groupByKey` in conjunction with `RDD.mapValues` if the function passed to `RDD.mapValues`
13+
is commutative and associative, it is preferable to use `RDD.reduceByKey` instead.
14+
The performance gain from `RDD.reduceByKey` comes from the amount of data that needs to be moved between PySpark tasks.
15+
`RDD.reduceByKey` will effectively reduce the number of rows in a partition before sending the data over the network for further reduction.
16+
On the other hand, when using `RDD.groupByKey` with `RDD.mapValues` the reduction is only done
17+
after the data has been moved around the cluster, effectively slowing down
18+
the computation process by transferring a higher amount of data over the network.
919

10-
//=== What is the potential impact?
1120

1221
== How to fix it
13-
//== How to fix it in FRAMEWORK NAME
22+
23+
To fix this issue replace the call `RDD.groupByKey` and `RDD.mapValues` with `RDD.reduceByKey`.
1424

1525
=== Code examples
1626

1727
==== Noncompliant code example
1828

1929
[source,python,diff-id=1,diff-type=noncompliant]
2030
----
21-
FIXME
31+
from pyspark import SparkContext
32+
33+
sc = SparkContext("local", "Example")
34+
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 3)])
35+
result = rdd.groupByKey().mapValues(lambda values: sum(values)).collect() # Noncompliant: an associative and commutative operation is used with `groupByKey` and `mapValues`
2236
----
2337

2438
==== Compliant solution
2539

2640
[source,python,diff-id=1,diff-type=compliant]
2741
----
28-
FIXME
42+
from pyspark import SparkContext
43+
44+
sc = SparkContext("local", "Example")
45+
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 3)])
46+
result = rdd.reduceByKey(lambda x, y: x + y).collect() # Compliant
2947
----
3048

31-
//=== How does this work?
49+
== Resources
50+
=== Documentation
51+
52+
* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.reduceByKey.html#pyspark.RDD.reduceByKey[pyspark.RDD.reduceByKey]
53+
* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.groupByKey.html#pyspark.RDD.groupByKey[pyspark.RDD.groupByKey]
54+
* PySpark Documentation - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.mapValues.html#pyspark.RDD.mapValues[pyspark.RDD.mapValues]
55+
56+
=== Articles & blog posts
57+
58+
* Spark By Example - https://sparkbyexamples.com/spark/spark-groupbykey-vs-reducebykey/[Spark groupByKey() vs reduceByKey()]
59+
60+
ifdef::env-github,rspecator-view[]
61+
=== Implementation Specification
62+
63+
As a first implementation we should focus on simple operations: sum and math.prod
64+
65+
=== Message
66+
67+
Replace the usage of "RDD.groupByKey" and "RDD.mapValues" with "RDD.reduceByKey"
68+
69+
=== Highlighting
70+
71+
The main location is the method `groupByKey` and the secondary location is the `mapValues` call.
3272

33-
//=== Pitfalls
73+
=== Quickfix
3474

35-
//=== Going the extra mile
75+
N/A as we cannot easily convert the function passed to mapValues to a function passed to reduceByKey
3676

77+
endif::env-github,rspecator-view[]
3778

38-
//== Resources
39-
//=== Documentation
40-
//=== Articles & blog posts
41-
//=== Conference presentations
42-
//=== Standards
43-
//=== External coding guidelines
44-
//=== Benchmarks

0 commit comments

Comments
 (0)