Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port HappyPandas to java #49

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ cache:
- $HOME/.sbt/launchers
scala:
- 2.11.6
jdk:
- oraclejdk8
apt:
sources:
- ubuntu-toolchain-r-test
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ scalaVersion := "2.11.6"

crossScalaVersions := Seq("2.11.6")

javacOptions ++= Seq("-source", "1.7", "-target", "1.7")
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

sparkVersion := "1.6.1"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package com.highperformancespark.examples.dataframe;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.hive.HiveContext;

import java.util.HashMap;
import java.util.Map;

import static org.apache.spark.sql.functions.*;

public class JavaHappyPandas {

/**
* Creates SQLContext with an existing SparkContext.
*/
public static SQLContext sqlContext(JavaSparkContext jsc) {
SQLContext sqlContext = new SQLContext(jsc);
return sqlContext;
}

/**
* Creates HiveContext with an existing SparkContext.
*/
public static HiveContext hiveContext(JavaSparkContext jsc) {
HiveContext hiveContext = new HiveContext(jsc);
return hiveContext;
}

/**
* Illustrate loading some JSON data.
*/
public static DataFrame loadDataSimple(JavaSparkContext jsc, SQLContext sqlContext, String path) {
DataFrame df1 = sqlContext.read().json(path);

DataFrame df2 = sqlContext.read().format("json").option("samplingRatio", "1.0").load(path);

JavaRDD<String> jsonRDD = jsc.textFile(path);
DataFrame df3 = sqlContext.read().json(jsonRDD);

return df1;
}

public static DataFrame jsonLoadFromRDD(SQLContext sqlContext, JavaRDD<String> input) {
JavaRDD<String> rdd = input.filter(e -> e.contains("panda"));
DataFrame df = sqlContext.read().json(rdd);
return df;
}

// Here will be some examples on PandaInfo DataFrame

/**
* Gets the percentage of happy pandas per place.
*
* @param pandaInfo the input DataFrame
* @return Returns DataFrame of (place, percentage of happy pandas)
*/
public static DataFrame happyPandasPercentage(DataFrame pandaInfo) {
DataFrame happyPercentage = pandaInfo.select(pandaInfo.col("place"),
(pandaInfo.col("happyPandas").divide(pandaInfo.col("totalPandas"))).as("percentHappy"));
return happyPercentage;
}

/**
* Encodes pandaType to Integer values instead of String values.
*
* @param pandaInfo the input DataFrame
* @return Returns a DataFrame of pandaId and integer value for pandaType.
*/
public static DataFrame encodePandaType(DataFrame pandaInfo) {
DataFrame encodedDF = pandaInfo.select(pandaInfo.col("id"),
when(pandaInfo.col("pt").equalTo("giant"), 0).
when(pandaInfo.col("pt").equalTo("red"), 1).
otherwise(2).as("encodedType"));

return encodedDF;
}

/**
* Gets places with happy pandas more than minHappinessBound.
*/
public static DataFrame minHappyPandas(DataFrame pandaInfo, int minHappyPandas) {
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(minHappyPandas));
}

/**
* Find pandas that are sad.
*/
public static DataFrame sadPandas(DataFrame pandaInfo) {
return pandaInfo.filter(pandaInfo.col("happy").notEqual(true));
}

/**
* Find pandas that are happy and fuzzier than squishy.
*/
public static DataFrame happyFuzzyPandas(DataFrame pandaInfo) {
DataFrame df = pandaInfo.filter(
pandaInfo.col("happy").and(pandaInfo.col("attributes").apply(0)).gt(pandaInfo.col("attributes").apply(1))
);

return df;
}

/**
* Gets places that contains happy pandas more than unhappy pandas.
*/
public static DataFrame happyPandasPlaces(DataFrame pandaInfo) {
return pandaInfo.filter(pandaInfo.col("happyPandas").geq(pandaInfo.col("totalPandas").divide(2)));
}

/**
* Remove duplicate pandas by id.
*/
public static DataFrame removeDuplicates(DataFrame pandas) {
DataFrame df = pandas.dropDuplicates(new String[]{"id"});
return df;
}

public static DataFrame describePandas(DataFrame pandas) {
return pandas.describe();
}

public static DataFrame maxPandaSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).max("pandaSize");
}

public static DataFrame minMaxPandaSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).agg(min("pandaSize"), max("pandaSize"));
}

public static DataFrame minPandaSizeMaxAgePerZip(DataFrame pandas) {
Map<String, String> map = new HashMap<>();
map.put("pandaSize", "min");
map.put("age", "max");

DataFrame df = pandas.groupBy(pandas.col("zip")).agg(map);
return df;
}

public static DataFrame minMeanSizePerZip(DataFrame pandas) {
return pandas.groupBy(pandas.col("zip")).agg(min(pandas.col("pandaSize")), mean(pandas.col("pandaSize")));
}

public static DataFrame simpleSqlExample(DataFrame pandas) {
SQLContext sqlContext = pandas.sqlContext();
pandas.registerTempTable("pandas");

DataFrame miniPandas = sqlContext.sql("SELECT * FROM pandas WHERE pandaSize < 12");
return miniPandas;
}

/**
* Orders pandas by size ascending and by age descending.
* Pandas will be sorted by "size" first and if two pandas
* have the same "size" will be sorted by "age".
*/
public static DataFrame orderPandas(DataFrame pandas) {
return pandas.orderBy(pandas.col("pandaSize").asc(), pandas.col("age").desc());
}

public static DataFrame computeRelativePandaSizes(DataFrame pandas) {
//tag::relativePandaSizesWindow[]
WindowSpec windowSpec = Window
.orderBy(pandas.col("age"))
.partitionBy(pandas.col("zip"))
.rowsBetween(-10, 10); // can use rangeBetween for range instead
//end::relativePandaSizesWindow[]

//tag::relativePandaSizesQuery[]
Column pandaRelativeSizeCol = pandas.col("pandaSize").minus(avg(pandas.col("pandaSize")).over(windowSpec));

return pandas.select(pandas.col("name"), pandas.col("zip"), pandas.col("pandaSize"),
pandas.col("age"), pandaRelativeSizeCol.as("panda_relative_size"));
//end::relativePandaSizesQuery[]
}

public static void joins(DataFrame df1, DataFrame df2) {
//tag::innerJoin[]
// Inner join implicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")));
// Inner join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "inner");
//end::innerJoin[]

//tag::leftouterJoin[]
// Left outer join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "left_outer");
//end::leftouterJoin[]

//tag::rightouterJoin[]
// Right outer join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "right_outer");
//end::rightouterJoin[]

//tag::leftsemiJoin[]
// Left semi join explicit
df1.join(df2, df1.col("name").equalTo(df2.col("name")), "leftsemi");
//end::leftsemiJoin[]
}

public static DataFrame selfJoin(DataFrame df) {
return (df.as("a")).join(df.as("b")).where("a.name = b.name");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package com.highperformancespark.examples.dataframe;

import com.highperformancespark.examples.objects.JavaPandaPlace;
import com.highperformancespark.examples.objects.JavaRawPanda;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;

import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

public class JavaLoadSave {
private SQLContext sqlContext;

public JavaLoadSave(SQLContext sqlContext) {
this.sqlContext = sqlContext;
}

//tag::createFromRDD[]
public DataFrame createFromJavaBean(JavaRDD<JavaPandaPlace> input) {
// Create DataFrame using Java Bean
DataFrame df1 = sqlContext.createDataFrame(input, JavaPandaPlace.class);

// Create DataFrame using JavaRDD<Row>
JavaRDD<Row> rowRDD = input.map(pm -> RowFactory.create(pm.getName(),
pm.getPandas().stream()
.map(pi -> RowFactory.create(pi.getId(), pi.getZip(), pi.isHappy(), pi.getAttributes()))
.collect(Collectors.toList())));

ArrayType pandasType = DataTypes.createArrayType(new StructType(
new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("zip", DataTypes.StringType, true, Metadata.empty()),
new StructField("happy", DataTypes.BooleanType, true, Metadata.empty()),
new StructField("attributes", DataTypes.createArrayType(DataTypes.FloatType), true, Metadata.empty())
}
));

StructType schema = new StructType(new StructField[]{
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("pandas", pandasType, true, Metadata.empty())
});

DataFrame df2 = sqlContext.createDataFrame(rowRDD, schema);
return df2;
}
//end::createFromRDD[]

//tag::createFromLocal[]
public DataFrame createFromLocal(List<PandaPlace> input) {
return sqlContext.createDataFrame(input, PandaPlace.class);
}
//end::createFromLocal[]

//tag::collectResults[]
public Row[] collectDF(DataFrame df) {
return df.collect();
}
//end::collectResults[]

//tag::toRDD[]
public JavaRDD<JavaRawPanda> toRDD(DataFrame input) {
JavaRDD<JavaRawPanda> rdd = input.javaRDD().map(row -> new JavaRawPanda(row.getLong(0), row.getString(1),
row.getString(2), row.getBoolean(3), row.getList(4)));
return rdd;
}
//end::toRDD[]

//tag::partitionedOutput[]
public void writeOutByZip(DataFrame input) {
input.write().partitionBy("zipcode").format("json").save("output/");
}
//end::partitionedOutput[]

//tag::saveAppend[]
public void writeAppend(DataFrame input) {
input.write().mode(SaveMode.Append).save("output/");
}
//end::saveAppend[]

public DataFrame createJDBC() {
//tag::createJDBC[]
DataFrame df1 = sqlContext.read().jdbc("jdbc:dialect:serverName;user=user;password=pass",
"table", new Properties());

DataFrame df2 = sqlContext.read().format("jdbc")
.option("url", "jdbc:dialect:serverName")
.option("dbtable", "table").load();

return df2;
//end::createJDBC[]
}

public void writeJDBC(DataFrame df) {
//tag::writeJDBC[]
df.write().jdbc("jdbc:dialect:serverName;user=user;password=pass",
"table", new Properties());

df.write().format("jdbc")
.option("url", "jdbc:dialect:serverName")
.option("user", "user")
.option("password", "pass")
.option("dbtable", "table").save();
//end::writeJDBC[]
}

//tag::loadParquet[]
public DataFrame loadParquet(String path) {
// Configure Spark to read binary data as string, note: must be configured on SQLContext
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true");

// Load parquet data using merge schema (configured through option)
DataFrame df = sqlContext.read()
.option("mergeSchema", "true")
.format("parquet")
.load(path);

return df;
}
//end::loadParquet[]

//tag::writeParquet[]
public void writeParquet(DataFrame df, String path) {
df.write().format("parquet").save(path);
}
//end::writeParquet[]

//tag::loadHiveTable[]
public DataFrame loadHiveTable() {
return sqlContext.read().table("pandas");
}
//end::loadHiveTable[]

//tag::saveManagedTable[]
public void saveManagedTable(DataFrame df) {
df.write().saveAsTable("pandas");
}
//end::saveManagedTable[]
}
Loading