Skip to content

Commit 8a395d4

Browse files
committed
Merge pull request #49 from mahmoudhanafy/port-HappyPandas-to-Java
Port HappyPandas to java
2 parents 09b3d85 + d7de259 commit 8a395d4

File tree

13 files changed

+789
-28
lines changed

13 files changed

+789
-28
lines changed

.travis.yml

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ cache:
1111
- $HOME/.sbt/launchers
1212
scala:
1313
- 2.11.6
14+
jdk:
15+
- oraclejdk8
1416
apt:
1517
sources:
1618
- ubuntu-toolchain-r-test

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ scalaVersion := "2.11.6"
1010

1111
crossScalaVersions := Seq("2.11.6")
1212

13-
javacOptions ++= Seq("-source", "1.7", "-target", "1.7")
13+
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
1414

1515
sparkVersion := "1.6.1"
1616

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package com.highperformancespark.examples.dataframe;
2+
3+
import org.apache.spark.api.java.JavaRDD;
4+
import org.apache.spark.api.java.JavaSparkContext;
5+
import org.apache.spark.sql.Column;
6+
import org.apache.spark.sql.DataFrame;
7+
import org.apache.spark.sql.SQLContext;
8+
import org.apache.spark.sql.expressions.Window;
9+
import org.apache.spark.sql.expressions.WindowSpec;
10+
import org.apache.spark.sql.hive.HiveContext;
11+
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
import static org.apache.spark.sql.functions.*;
16+
17+
public class JavaHappyPandas {
18+
19+
/**
20+
* Creates SQLContext with an existing SparkContext.
21+
*/
22+
public static SQLContext sqlContext(JavaSparkContext jsc) {
23+
SQLContext sqlContext = new SQLContext(jsc);
24+
return sqlContext;
25+
}
26+
27+
/**
28+
* Creates HiveContext with an existing SparkContext.
29+
*/
30+
public static HiveContext hiveContext(JavaSparkContext jsc) {
31+
HiveContext hiveContext = new HiveContext(jsc);
32+
return hiveContext;
33+
}
34+
35+
/**
36+
* Illustrate loading some JSON data.
37+
*/
38+
public static DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) {
39+
DataFrame df1 = sqlContext.read().json(path);
40+
41+
DataFrame df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path);
42+
43+
JavaRDD<String> jsonRDD = jsc.textFile(path);
44+
DataFrame df3 = sqlContext.read().json(jsonRDD);
45+
46+
return df1;
47+
}
48+
49+
public static DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD<String> input) {
50+
JavaRDD<String> rdd = input.filter(e -> e.contains("panda"));
51+
DataFrame df = sqlContext.read().json(rdd);
52+
return df;
53+
}
54+
55+
// Here will be some examples on PandaInfo DataFrame
56+
57+
/**
58+
* Gets the percentage of happy pandas per place.
59+
*
60+
* @param pandaInfo the input DataFrame
61+
* @return Returns DataFrame of (place, percentage of happy pandas)
62+
*/
63+
public static DataFrame happyPandasPercentage(DataFrame pandaInfo) {
64+
DataFrame happyPercentage = pandaInfo.select(pandaInfo.col("place"),
65+
(pandaInfo.col("happyPandas").divide(pandaInfo.col("totalPandas"))).as("percentHappy"));
66+
return happyPercentage;
67+
}
68+
69+
/**
70+
* Encodes pandaType to Integer values instead of String values.
71+
*
72+
* @param pandaInfo the input DataFrame
73+
* @return Returns a DataFrame of pandaId and integer value for pandaType.
74+
*/
75+
public static DataFrame encodePandaType(DataFrame pandaInfo) {
76+
DataFrame encodedDF = pandaInfo.select(pandaInfo.col("id"),
77+
when(pandaInfo.col("pt").equalTo("giant"), 0).
78+
when(pandaInfo.col("pt").equalTo("red"), 1).
79+
otherwise(2).as("encodedType"));
80+
81+
return encodedDF;
82+
}
83+
84+
/**
85+
* Gets places with happy pandas more than minHappinessBound.
86+
*/
87+
public static DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) {
88+
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas));
89+
}
90+
91+
/**
92+
* Find pandas that are sad.
93+
*/
94+
public static DataFrame sadPandas(DataFrame pandaInfo) {
95+
return pandaInfo.filter(pandaInfo.col("happy").notEqual(true));
96+
}
97+
98+
/**
99+
* Find pandas that are happy and fuzzier than squishy.
100+
*/
101+
public static DataFrame happyFuzzyPandas(DataFrame pandaInfo) {
102+
DataFrame df = pandaInfo.filter(
103+
pandaInfo.col("happy").and(pandaInfo.col("attributes").apply(0)).gt(pandaInfo.col("attributes").apply(1))
104+
);
105+
106+
return df;
107+
}
108+
109+
/**
110+
* Gets places that contains happy pandas more than unhappy pandas.
111+
*/
112+
public static DataFrame happyPandasPlaces(DataFrame pandaInfo) {
113+
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(pandaInfo.col("totalPandas").divide(2)));
114+
}
115+
116+
/**
117+
* Remove duplicate pandas by id.
118+
*/
119+
public static DataFrame removeDuplicates(DataFrame pandas) {
120+
DataFrame df = pandas.dropDuplicates(new String[]{"id"});
121+
return df;
122+
}
123+
124+
public static DataFrame describePandas(DataFrame pandas) {
125+
return pandas.describe();
126+
}
127+
128+
public static DataFrame maxPandaSizePerZip(DataFrame pandas) {
129+
return pandas.groupBy(pandas.col("zip")).max("pandaSize");
130+
}
131+
132+
public static DataFrame minMaxPandaSizePerZip(DataFrame pandas) {
133+
return pandas.groupBy(pandas.col("zip")).agg(min("pandaSize"), max("pandaSize"));
134+
}
135+
136+
public static DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) {
137+
Map<String, String> map = new HashMap<>();
138+
map.put("pandaSize", "min");
139+
map.put("age", "max");
140+
141+
DataFrame df = pandas.groupBy(pandas.col("zip")).agg(map);
142+
return df;
143+
}
144+
145+
public static DataFrame minMeanSizePerZip(DataFrame pandas) {
146+
return pandas.groupBy(pandas.col("zip")).agg(min(pandas.col("pandaSize")), mean(pandas.col("pandaSize")));
147+
}
148+
149+
public static DataFrame simpleSqlExample(DataFrame pandas) {
150+
SQLContext sqlContext = pandas.sqlContext();
151+
pandas.registerTempTable("pandas");
152+
153+
DataFrame miniPandas = sqlContext.sql("SELECT * FROM pandas WHERE pandaSize < 12");
154+
return miniPandas;
155+
}
156+
157+
/**
158+
* Orders pandas by size ascending and by age descending.
159+
* Pandas will be sorted by "size" first and if two pandas
160+
* have the same "size" will be sorted by "age".
161+
*/
162+
public static DataFrame orderPandas(DataFrame pandas) {
163+
return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc());
164+
}
165+
166+
public static DataFrame computeRelativePandaSizes(DataFrame pandas) {
167+
//tag::relativePandaSizesWindow[]
168+
WindowSpec windowSpec = Window
169+
.orderBy(pandas.col("age"))
170+
.partitionBy(pandas.col("zip"))
171+
.rowsBetween(-10, 10); // can use rangeBetween for range instead
172+
//end::relativePandaSizesWindow[]
173+
174+
//tag::relativePandaSizesQuery[]
175+
Column pandaRelativeSizeCol = pandas.col("pandaSize").minus(avg(pandas.col("pandaSize")).over(windowSpec));
176+
177+
return pandas.select(pandas.col("name"), pandas.col("zip"), pandas.col("pandaSize"),
178+
pandas.col("age"), pandaRelativeSizeCol.as("panda_relative_size"));
179+
//end::relativePandaSizesQuery[]
180+
}
181+
182+
public static void joins(DataFrame df1, DataFrame df2) {
183+
//tag::innerJoin[]
184+
// Inner join implicit
185+
df1.join(df2, df1.col("name").equalTo(df2.col("name")));
186+
// Inner join explicit
187+
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "inner");
188+
//end::innerJoin[]
189+
190+
//tag::leftouterJoin[]
191+
// Left outer join explicit
192+
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_outer");
193+
//end::leftouterJoin[]
194+
195+
//tag::rightouterJoin[]
196+
// Right outer join explicit
197+
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "right_outer");
198+
//end::rightouterJoin[]
199+
200+
//tag::leftsemiJoin[]
201+
// Left semi join explicit
202+
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "leftsemi");
203+
//end::leftsemiJoin[]
204+
}
205+
206+
public static DataFrame selfJoin(DataFrame df) {
207+
return (df.as("a")).join(df.as("b")).where("a.name = b.name");
208+
}
209+
210+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package com.highperformancespark.examples.dataframe;
2+
3+
import com.highperformancespark.examples.objects.JavaPandaPlace;
4+
import com.highperformancespark.examples.objects.JavaRawPanda;
5+
import org.apache.spark.api.java.JavaRDD;
6+
import org.apache.spark.sql.*;
7+
import org.apache.spark.sql.types.*;
8+
9+
import java.util.List;
10+
import java.util.Properties;
11+
import java.util.stream.Collectors;
12+
13+
public class JavaLoadSave {
14+
private SQLContext sqlContext;
15+
16+
public JavaLoadSave(SQLContext sqlContext) {
17+
this.sqlContext = sqlContext;
18+
}
19+
20+
//tag::createFromRDD[]
21+
public DataFrame createFromJavaBean(JavaRDD<JavaPandaPlace> input) {
22+
// Create DataFrame using Java Bean
23+
DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class);
24+
25+
// Create DataFrame using JavaRDD<Row>
26+
JavaRDD<Row> rowRDD = input.map(pm -> RowFactory.create(pm.getName(),
27+
pm.getPandas().stream()
28+
.map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes()))
29+
.collect(Collectors.toList())));
30+
31+
ArrayType pandasType = DataTypes.createArrayType(new StructType(
32+
new StructField[]{
33+
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
34+
new StructField("zip", DataTypes.StringType, true, Metadata.empty()),
35+
new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()),
36+
new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty())
37+
}
38+
));
39+
40+
StructType schema = new StructType(new StructField[]{
41+
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
42+
new StructField("pandas", pandasType, true, Metadata.empty())
43+
});
44+
45+
DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema);
46+
return df2;
47+
}
48+
//end::createFromRDD[]
49+
50+
//tag::createFromLocal[]
51+
public DataFrame createFromLocal(List<PandaPlace> input) {
52+
return sqlContext.createDataFrame(input, PandaPlace.class);
53+
}
54+
//end::createFromLocal[]
55+
56+
//tag::collectResults[]
57+
public Row[] collectDF(DataFrame df) {
58+
return df.collect();
59+
}
60+
//end::collectResults[]
61+
62+
//tag::toRDD[]
63+
public JavaRDD<JavaRawPanda> toRDD(DataFrame input) {
64+
JavaRDD<JavaRawPanda> rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1),
65+
row.getString(2), row.getBoolean(3), row.getList(4)));
66+
return rdd;
67+
}
68+
//end::toRDD[]
69+
70+
//tag::partitionedOutput[]
71+
public void writeOutByZip(DataFrame input) {
72+
input.write().partitionBy("zipcode").format("json").save("output/");
73+
}
74+
//end::partitionedOutput[]
75+
76+
//tag::saveAppend[]
77+
public void writeAppend(DataFrame input) {
78+
input.write().mode(SaveMode.Append).save("output/");
79+
}
80+
//end::saveAppend[]
81+
82+
public DataFrame createJDBC() {
83+
//tag::createJDBC[]
84+
DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass",
85+
"table", new Properties());
86+
87+
DataFrame df2 = sqlContext.read().format("jdbc")
88+
.option("url", "jdbc:dialect:serverName")
89+
.option("dbtable", "table").load();
90+
91+
return df2;
92+
//end::createJDBC[]
93+
}
94+
95+
public void writeJDBC(DataFrame df) {
96+
//tag::writeJDBC[]
97+
df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass",
98+
"table", new Properties());
99+
100+
df.write().format("jdbc")
101+
.option("url", "jdbc:dialect:serverName")
102+
.option("user", "user")
103+
.option("password", "pass")
104+
.option("dbtable", "table").save();
105+
//end::writeJDBC[]
106+
}
107+
108+
//tag::loadParquet[]
109+
public DataFrame loadParquet(String path) {
110+
// Configure Spark to read binary data as string, note: must be configured on SQLContext
111+
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");
112+
113+
// Load parquet data using merge schema (configured through option)
114+
DataFrame df = sqlContext.read()
115+
.option("mergeSchema", "true")
116+
.format("parquet")
117+
.load(path);
118+
119+
return df;
120+
}
121+
//end::loadParquet[]
122+
123+
//tag::writeParquet[]
124+
public void writeParquet(DataFrame df, String path) {
125+
df.write().format("parquet").save(path);
126+
}
127+
//end::writeParquet[]
128+
129+
//tag::loadHiveTable[]
130+
public DataFrame loadHiveTable() {
131+
return sqlContext.read().table("pandas");
132+
}
133+
//end::loadHiveTable[]
134+
135+
//tag::saveManagedTable[]
136+
public void saveManagedTable(DataFrame df) {
137+
df.write().saveAsTable("pandas");
138+
}
139+
//end::saveManagedTable[]
140+
}

0 commit comments

Comments
 (0)