Skip to content

Commit e6cd103

Browse files
author
Mahmoud Hanafy
committed
Add pandas test cases
1 parent 2bc36e0 commit e6cd103

File tree

2 files changed

+161
-39
lines changed

2 files changed

+161
-39
lines changed

Diff for: src/main/scala/com/high-performance-spark-examples/dataframe/HappyPandas.scala

+15-14
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ object HappyPanda {
105105

106106

107107
/**
108-
*
109108
* @param name name of panda
110109
* @param zip zip code
111110
* @param pandaSize size of panda in KG
@@ -121,17 +120,22 @@ object HappyPanda {
121120

122121
//tag::minMaxPandasSizePerZip[]
123122
def minMaxPandaSizePerZip(pandas: DataFrame): DataFrame = {
124-
// List of strings
125-
pandas.groupBy(pandas("zip")).agg(("min", "pandaSize"), ("max", "pandaSize"))
126-
// Map of column to aggregate
127-
pandas.groupBy(pandas("zip")).agg(Map("pandaSize" -> "min",
128-
"pandaSize" -> "max"))
129-
// expression literals
123+
pandas.groupBy(pandas("zip")).agg(min("pandaSize"), max("pandaSize"))
130124
}
131125
//end::minMaxPandasSizePerZip[]
132126

127+
def minPandaSizeMaxAgePerZip(pandas: DataFrame): DataFrame = {
128+
// this query can be written in two methods
129+
130+
// 1
131+
pandas.groupBy(pandas("zip")).agg(("pandaSize", "min"), ("age", "max"))
132+
133+
// 2
134+
pandas.groupBy(pandas("zip")).agg(Map("pandaSize" -> "min", "age" -> "max"))
135+
}
136+
133137
//tag::complexAggPerZip[]
134-
def complexAggPerZip(pandas: DataFrame): DataFrame = {
138+
def minMeanSizePerZip(pandas: DataFrame): DataFrame = {
135139
// Compute the min and mean
136140
pandas.groupBy(pandas("zip")).agg(min(pandas("pandaSize")), mean(pandas("pandaSize")))
137141
}
@@ -141,7 +145,7 @@ object HappyPanda {
141145
val sqlCtx = pandas.sqlContext
142146
//tag::pandasSQLQuery[]
143147
pandas.registerTempTable("pandas")
144-
val miniPandas = sqlCtx.sql("SELECT * FROM pandas WHERE pandaSize < 100")
148+
val miniPandas = sqlCtx.sql("SELECT * FROM pandas WHERE pandaSize < 12")
145149
//end::pandasSQLQuery[]
146150
miniPandas
147151
}
@@ -157,9 +161,6 @@ object HappyPanda {
157161
* Orders pandas by size ascending and by age descending.
158162
* Pandas will be sorted by "size" first and if two pandas have the same "size"
159163
* will be sorted by "age".
160-
*
161-
* @param pandas
162-
* @return
163164
*/
164165
def orderPandas(pandas: DataFrame): DataFrame = {
165166
//tag::simpleSort[]
@@ -172,14 +173,14 @@ object HappyPanda {
172173
val windowSpec = Window
173174
.orderBy(pandas("age"))
174175
.partitionBy(pandas("zip"))
175-
.rowsBetween(start = 10, end = 10) // use rangeBetween for range instead
176+
.rowsBetween(start = -1, end = 1) // use rangeBetween for range instead
176177
//end::relativePandaSizesWindow[]
177178

178179
//tag::relativePandaSizesQuery[]
179180
val pandaRelativeSizeFunc = (pandas("pandaSize") -
180181
avg(pandas("pandaSize")).over(windowSpec))
181182

182-
pandas.select(pandas("name"), pandas("zip"), pandas("pandaSize"),
183+
pandas.select(pandas("name"), pandas("zip"), pandas("pandaSize"), pandas("age"),
183184
pandaRelativeSizeFunc.as("panda_relative_size"))
184185
//end::relativePandaSizesQuery[]
185186
}

Diff for: src/test/scala/com/high-performance-spark-examples/dataframe/HappyPandas.scala

+146-25
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
package com.highperformancespark.examples.dataframe
55

6-
import com.highperformancespark.examples.dataframe.HappyPanda.PandaInfo
6+
import com.highperformancespark.examples.dataframe.HappyPanda.{PandaInfo, Pandas}
77
import com.holdenkarau.spark.testing._
88
import org.apache.spark.sql.types._
99
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
@@ -13,26 +13,32 @@ class HappyPandasTest extends DataFrameSuiteBase {
1313
val toronto = "toronto"
1414
val sandiego = "san diego"
1515
val virginia = "virginia"
16-
val pandInfoList = List(PandaInfo(toronto, "giant", 1, 2),
16+
val pandaInfoList = List(PandaInfo(toronto, "giant", 1, 2),
1717
PandaInfo(sandiego, "red", 2, 3),
1818
PandaInfo(virginia, "black", 1, 10))
1919

20+
val pandasList = List(Pandas("bata", "10010", 10, 2),
21+
Pandas("wiza", "10010", 20, 4),
22+
Pandas("dabdob", "11000", 8, 2),
23+
Pandas("hanafy", "11000", 15, 7),
24+
Pandas("hamdi", "11111", 20, 10))
25+
2026
//tag::approxEqualDataFrames[]
2127

2228
test("verify simple happy pandas Percentage") {
2329
val expectedResult = List(Row(toronto, 0.5), Row(sandiego, 2/3.0), Row(virginia, 1/10.0))
2430
val expectedDf = createDF(expectedResult, ("place", StringType),
2531
("percentHappy", DoubleType))
2632

27-
val inputDF = sqlContext.createDataFrame(pandInfoList)
33+
val inputDF = sqlContext.createDataFrame(pandaInfoList)
2834
val result = HappyPanda.happyPandasPercentage(inputDF)
2935

3036
approxEqualDataFrames(expectedDf, result, 1E-5)
3137
}
3238
//end::approxEqualDataFrames[]
3339

3440
test("verify approx by hand") {
35-
val inputDF = sqlContext.createDataFrame(pandInfoList)
41+
val inputDF = sqlContext.createDataFrame(pandaInfoList)
3642
val resultDF = HappyPanda.happyPandasPercentage(inputDF)
3743
val resultRows = resultDF.collect()
3844

@@ -48,7 +54,7 @@ class HappyPandasTest extends DataFrameSuiteBase {
4854
}
4955

5056
test("test encode Panda type") {
51-
val inputDF = sqlContext.createDataFrame(pandInfoList)
57+
val inputDF = sqlContext.createDataFrame(pandaInfoList)
5258
val resultDF = HappyPanda.encodePandaType(inputDF)
5359

5460
val expectedRows = List(Row(toronto, 0), Row(sandiego, 1), Row(virginia, 2))
@@ -61,7 +67,7 @@ class HappyPandasTest extends DataFrameSuiteBase {
6167
//tag::exactEqualDataFrames[]
6268
test("verify exact equality") {
6369
// test minHappyPandas
64-
val inputDF = sqlContext.createDataFrame(pandInfoList)
70+
val inputDF = sqlContext.createDataFrame(pandaInfoList)
6571
val result = HappyPanda.minHappyPandas(inputDF, 2)
6672
val resultRows = result.collect()
6773

@@ -71,7 +77,7 @@ class HappyPandasTest extends DataFrameSuiteBase {
7177
//end::exactEqualDataFrames[]
7278

7379
test("test happyPandasPlaces") {
74-
val inputDF = sqlContext.createDataFrame(pandInfoList)
80+
val inputDF = sqlContext.createDataFrame(pandaInfoList)
7581
val resultDF = HappyPanda.happyPandasPlaces(inputDF)
7682

7783
val expectedRows = List(PandaInfo(toronto, "giant", 1, 2),
@@ -81,26 +87,141 @@ class HappyPandasTest extends DataFrameSuiteBase {
8187
equalDataFrames(expectedDF, resultDF)
8288
}
8389

84-
// Make a test once we have hivectx in the base
85-
def futureTestRrelativePandaSize() {
86-
val sqlCtx = sqlContext
87-
// TODO: Generate some data instead of using the small static data
88-
val inputDF = loadPandaStuffies(sqlCtx)
89-
val result = HappyPanda.computeRelativePandaSizes(inputDF)
90-
val resultRows = result.collect()
91-
assert(List() === resultRows)
90+
test("test maxPandaSizePerZip") {
91+
val inputDF = sqlContext.createDataFrame(pandasList)
92+
val resultDF = HappyPanda.maxPandaSizePerZip(inputDF)
93+
94+
val expectedRows = List(Row(pandasList(1).zip, pandasList(1).pandaSize),
95+
Row(pandasList(3).zip, pandasList(3).pandaSize),
96+
Row(pandasList(4).zip, pandasList(4).pandaSize))
97+
val expectedDF = createDF(expectedRows, ("zip", StringType),
98+
("max(pandaSize)", IntegerType))
99+
100+
equalDataFrames(expectedDF.orderBy("zip"), resultDF.orderBy("zip"))
101+
}
102+
103+
test("test minMaxPandaSizePerZip"){
104+
val inputDF = sqlContext.createDataFrame(pandasList)
105+
val resultDF = HappyPanda.minMaxPandaSizePerZip(inputDF)
106+
107+
val expectedRows = List(
108+
Row(pandasList(1).zip, pandasList(0).pandaSize, pandasList(1).pandaSize),
109+
Row(pandasList(3).zip, pandasList(2).pandaSize, pandasList(3).pandaSize),
110+
Row(pandasList(4).zip, pandasList(4).pandaSize, pandasList(4).pandaSize))
111+
112+
val expectedDF = createDF(expectedRows, ("zip", StringType),
113+
("min(pandaSize)", IntegerType),
114+
("max(pandaSize)", IntegerType))
115+
116+
equalDataFrames(expectedDF.orderBy("zip"), resultDF.orderBy("zip"))
117+
}
118+
119+
test("test minPandaSizeMaxAgePerZip") {
120+
val inputDF = sqlContext.createDataFrame(pandasList)
121+
val resultDF = HappyPanda.minPandaSizeMaxAgePerZip(inputDF)
122+
123+
val expectedRows = List(
124+
Row(pandasList(1).zip, pandasList(0).pandaSize, pandasList(1).age),
125+
Row(pandasList(3).zip, pandasList(2).pandaSize, pandasList(3).age),
126+
Row(pandasList(4).zip, pandasList(4).pandaSize, pandasList(4).age))
127+
128+
val expectedDF = createDF(expectedRows, ("zip", StringType),
129+
("min(pandaSize)", IntegerType),
130+
("max(age)", IntegerType))
131+
132+
equalDataFrames(expectedDF.orderBy("zip"), resultDF.orderBy("zip"))
133+
}
134+
135+
test("test complexAggPerZip") {
136+
val inputDF = sqlContext.createDataFrame(pandasList)
137+
val resultDF = HappyPanda.minMeanSizePerZip(inputDF)
138+
139+
val expectedRows = List(
140+
Row(pandasList(1).zip, pandasList(0).pandaSize, 15.0),
141+
Row(pandasList(3).zip, pandasList(2).pandaSize, 11.5),
142+
Row(pandasList(4).zip, pandasList(4).pandaSize, 20.0))
143+
144+
val expectedDF = createDF(expectedRows, ("zip", StringType),
145+
("min(pandaSize)", IntegerType),
146+
("avg(pandaSize)", DoubleType))
147+
148+
approxEqualDataFrames(expectedDF.orderBy("zip"), resultDF.orderBy("zip"), 1e-5)
149+
}
150+
151+
152+
test("test Simple SQL example") {
153+
val inputDF = sqlContext.createDataFrame(pandasList)
154+
val resultDF = HappyPanda.simpleSqlExample(inputDF)
155+
156+
val expectedRows = List(pandasList(0), pandasList(2))
157+
val expectedDF = sqlContext.createDataFrame(expectedRows)
158+
159+
equalDataFrames(expectedDF, resultDF)
160+
}
161+
162+
test("test Order Pandas") {
163+
val inputDF = sqlContext.createDataFrame(pandasList)
164+
val resultDF = HappyPanda.orderPandas(inputDF)
165+
166+
val expectedRows = List(pandasList(2), pandasList(0), pandasList(3),
167+
pandasList(4), pandasList(1))
168+
val expectedDF = sqlContext.createDataFrame(expectedRows)
169+
170+
equalDataFrames(expectedDF, resultDF)
171+
}
172+
173+
174+
test("test computeRelativePandaSizes") {
175+
val inputDF = loadPandaStuffies()
176+
val resultDF = HappyPanda.computeRelativePandaSizes(inputDF)
177+
178+
val expectedDF = getExpectedPandasRelativeSize()
179+
180+
approxEqualDataFrames(expectedDF.orderBy("name"), resultDF.orderBy("name"), 1e-2)
181+
}
182+
183+
private def getExpectedPandasRelativeSize():DataFrame = {
184+
val expectedRows = List(
185+
Row("name1-1", "zip1", 10, 1, -5.0),
186+
Row("name2-1", "zip1", 20, 2, 5.0),
187+
Row("name3-1", "zip1", 15, 3, 1.6666),
188+
Row("name4-1", "zip1", 5, 4, -5.0),
189+
190+
Row("name1-2", "zip2", 5, 1, -7.5),
191+
Row("name2-2", "zip2", 20, 2, 4.66666),
192+
Row("name3-2", "zip2", 21, 3, 0.5),
193+
194+
Row("name1-3", "zip3", 10, 1, 0.0),
195+
Row("name2-3", "zip3", 10, 2, 0.0),
196+
197+
Row("name1-4", "zip4", 5, 1, 0.0))
198+
199+
val expectedDF = createDF(expectedRows, ("name", StringType),
200+
("zip", StringType),
201+
("pandaSize", IntegerType),
202+
("age", IntegerType),
203+
("panda_relative_size", DoubleType))
204+
205+
expectedDF
92206
}
93207

94-
def loadPandaStuffies(sqlCtx: SQLContext): DataFrame = {
95-
val pandaStuffies = List(Row("ikea", null, 0.2, 94110),
96-
Row("tube", 6, 0.4, 94110),
97-
Row("panda", 6, 0.5, 94110),
98-
Row("real", 30, 77.5, 100000))
99-
val schema = StructType(List(StructField("name", StringType, true),
100-
StructField("age", IntegerType, true),
101-
StructField("pandaSize", DoubleType, true),
102-
StructField("zip", IntegerType, true)))
103-
sqlCtx.createDataFrame(sc.parallelize(pandaStuffies), schema)
208+
private def loadPandaStuffies(): DataFrame = {
209+
val pandaStuffies = List(
210+
Pandas("name1-1", "zip1", 10, 1),
211+
Pandas("name2-1", "zip1", 20, 2),
212+
Pandas("name3-1", "zip1", 15, 3),
213+
Pandas("name4-1", "zip1", 5, 4),
214+
215+
Pandas("name1-2", "zip2", 5, 1),
216+
Pandas("name2-2", "zip2", 20, 2),
217+
Pandas("name3-2", "zip2", 21, 3),
218+
219+
Pandas("name1-3", "zip3", 10, 1),
220+
Pandas("name2-3", "zip3", 10, 2),
221+
222+
Pandas("name1-4", "zip4", 5, 1))
223+
224+
sqlContext.createDataFrame(sc.parallelize(pandaStuffies))
104225
}
105226

106227

0 commit comments

Comments
 (0)