Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests #99

Merged
merged 6 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.5")

//addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.0")

//tag::sbtJNIPlugin[]
addSbtPlugin("ch.jodersky" %% "sbt-jni" % "1.0.0-RC3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class MixedDataset(sqlCtx: SQLContext) {
Dataset[(RawPanda, CoffeeShop)] = {
//tag::joinWith[]
val result: Dataset[(RawPanda, CoffeeShop)] = pandas.joinWith(coffeeShops,
$"zip" === $"zip")
pandas("zip") === coffeeShops("zip"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below log is the reason I need to change as above:

[info] - self join *** FAILED ***
[info]   org.apache.spark.sql.AnalysisException: Reference 'zip' is ambiguous, could be: zip#178, zip#195.;
[info]   at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287)

//end::joinWith[]
result
}
Expand All @@ -100,8 +100,8 @@ class MixedDataset(sqlCtx: SQLContext) {
def selfJoin(pandas: Dataset[RawPanda]):
Dataset[(RawPanda, RawPanda)] = {
//tag::selfJoin[]
val result: Dataset[(RawPanda, RawPanda)] = pandas.joinWith(pandas,
$"zip" === $"zip")
val result: Dataset[(RawPanda, RawPanda)] = pandas.as("l").joinWith(pandas.as("r"),
$"l.zip" === $"r.zip")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

//end::selfJoin[]
result
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.highperformancespark.examples.ffi

object StandAlone {
// $COVERAGE-OFF$
def main(args: Array[String]) {
//tag::systemLoadLibrary[]
System.loadLibrary("highPerformanceSpark0")
//end::systemLoadLibrary[]
println(new SumJNI().sum(Array(1,2,3)))
}
// $COVERAGE-ON$
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.types._
* A simple performance test to compare a simple sort between DataFrame, and RDD
*/
object SimplePerfTest {
// $COVERAGE-OFF$
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("simple-perf-test")
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
Expand Down Expand Up @@ -81,4 +82,5 @@ object SimplePerfTest {
println(s"Time ${t1 - t0}ns")
(result, t1 - t0)
}
// $COVERAGE-ON$
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.scalatest.Matchers._
import org.scalatest.FunSuite

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random

class MixedDatasetSuite extends FunSuite with DataFrameSuiteBase {
class MixedDatasetSuite extends FunSuite with DataFrameSuiteBase with DatasetSuiteBase with RDDComparisons {

val rawPandaList = List(
RawPanda(10L, "94110", "giant", true, Array(1.0, 0.9, 20.0)),
Expand Down Expand Up @@ -60,4 +61,111 @@ class MixedDatasetSuite extends FunSuite with DataFrameSuiteBase {
assert(bigPandas.size === 1)
assert(bigPandas(0)._2 === 30.0 +- 0.00001)
}

test("max pandas size per zip scala version") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val inputDF = sqlCtx.createDataFrame(rawPandaList)
val inputDS = inputDF.as[RawPanda]
val mixedDS = new MixedDataset(sqlCtx)
val bigPandas = mixedDS.maxPandaSizePerZipScala(inputDS).collect()
assert(bigPandas.size === 1)
assert(bigPandas(0)._2 === 30.0 +- 0.00001)
}

test("union pandas") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val happyPandas = sqlCtx.createDataset(rawPandaList.take(1))
val sadPandas = sqlCtx.createDataset(rawPandaList.drop(1))
val mixedDS = new MixedDataset(sqlCtx)
val unionPandas = mixedDS.unionPandas(happyPandas, sadPandas).collect
assert(unionPandas.toSet == rawPandaList.toSet)
}

test("typed query") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val inputDF = sqlCtx.createDataFrame(rawPandaList)
val inputDS = inputDF.as[RawPanda]
val mixedDS = new MixedDataset(sqlCtx)
val typedResult = mixedDS.typedQueryExample(inputDS)
assert(typedResult.collect().toList == rawPandaList.map(_.attributes(0)))
}

test("join different dataset") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val pandaDS = sqlCtx.createDataFrame(rawPandaList).as[RawPanda]
val rawCoffeeShop = List(CoffeeShop("94110", "Starbucks"), CoffeeShop("98765", "Caribou"))
val coffeeShopDS = sqlCtx.createDataFrame(rawCoffeeShop).as[CoffeeShop]
val mixedDS = new MixedDataset(sqlCtx)
val joinResult = mixedDS.joinSample(pandaDS, coffeeShopDS)
val expected = for {
panda <- rawPandaList
coffeeShop <- rawCoffeeShop
if (panda.zip == coffeeShop.zip)
} yield (panda, coffeeShop)
assert(joinResult.collect().toSet == expected.toSet)
}

test("self join") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val inputDF = sqlCtx.createDataFrame(rawPandaList)
val inputDS = inputDF.as[RawPanda]
val mixedDS = new MixedDataset(sqlCtx)
val selfJoinResult = mixedDS.selfJoin(inputDS)
val expected = for {
left <- rawPandaList
right <- rawPandaList
if (left.zip == right.zip)
} yield (left, right)
assert(selfJoinResult.collect().toSet == expected.toSet)
}

test("convert an RDD to DS") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val mixedDS = new MixedDataset(sqlCtx)
val rdd = sc.parallelize(rawPandaList)
val result = mixedDS.fromRDD(rdd)
val expected = sqlCtx.createDataFrame(rawPandaList).as[RawPanda]
assertDatasetEquals(expected, result)
}

test("convert a Dataset to an RDD") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val mixedDS = new MixedDataset(sqlCtx)
val rdd = sc.parallelize(rawPandaList)
val dataset = sqlCtx.createDataFrame(rawPandaList).as[RawPanda]
val result = mixedDS.toRDD(dataset)
val expected = sc.parallelize(rawPandaList)
assertRDDEquals(expected, result)
}

test("convert a Dataset to a DataFrame") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val mixedDS = new MixedDataset(sqlCtx)
val rdd = sc.parallelize(rawPandaList)
val dataset = sqlCtx.createDataFrame(rawPandaList).as[RawPanda]
val result = mixedDS.toDF(dataset)
val expected = sqlCtx.createDataFrame(rawPandaList)
assertDataFrameEquals(expected, result)
}


test("convert a DataFrame to a DataSset") {
val sqlCtx = sqlContext
import sqlCtx.implicits._
val mixedDS = new MixedDataset(sqlCtx)
val rdd = sc.parallelize(rawPandaList)
val dataframe = sqlCtx.createDataFrame(rawPandaList)
val result = mixedDS.fromDF(dataframe)
val expected = sqlCtx.createDataFrame(rawPandaList).as[RawPanda]
assertDatasetEquals(expected, result)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.highperformancespark.examples.dataframe.RawPanda
import com.holdenkarau.spark.testing._

import org.scalatest.FunSuite
import scala.collection.immutable.HashSet

class AccumulatorsTest extends FunSuite with SharedSparkContext {
test("accumulator max should function") {
Expand All @@ -23,4 +24,17 @@ class AccumulatorsTest extends FunSuite with SharedSparkContext {
val (_, sum) = Accumulators.computeTotalFuzzyNess(sc, input)
assert(sum === 5050.0)
}

test("accumulator unique should function") {
val input1 = sc.parallelize(1 to 100).map(x =>
RawPanda(1L, "1", "red", true, Array(x.toDouble))
)

val input2 = sc.parallelize(1 to 100).map(x =>
RawPanda(2L, "2", "blude", false, Array(x.toDouble))
)

val set = Accumulators.uniquePandas(sc, input1 ++ input2)
assert(set == HashSet(2, 1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,44 @@ class WordCountTest extends FunSuite with SharedSparkContext {
assert(wordCountsAsMap.contains("ing"))
assert(wordCountsAsMap.get("panda").get.equals(3))
}

test("word count with simple counting") {
val wordRDD = sc.parallelize(
Seq(
"a b c d",
"b c d e"
)
)
val wordCounts = WordCount.simpleWordCount(wordRDD)

val wordCountsAsMap = wordCounts.collectAsMap()

for (character <- 'a' to 'e') {
assert(wordCountsAsMap.contains(character.toString))
}

for (character <- 'b' to 'd') {
assert(wordCountsAsMap.get(character.toString).get == 2)
}
}

test("word count with bad idea") {
val wordRDD = sc.parallelize(
Seq(
"a b c d",
"b c d e"
)
)
val wordCounts = WordCount.badIdea(wordRDD)

val wordCountsAsMap = wordCounts.collectAsMap()

for (character <- 'a' to 'e') {
assert(wordCountsAsMap.contains(character.toString))
}

for (character <- 'b' to 'd') {
assert(wordCountsAsMap.get(character.toString).get == 2)
}
}
}