Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,43 @@ This module provides an example of processing event data using Apache Spark.

## Getting started

This example assumes that you're running a CDH5.1 or later cluster (such as the
[Cloudera Quickstart VM][getvm]) that has Spark configured. This example requires
the `spark-submit` command to execute the Spark job on the cluster. If you're using
the Quickstart VM, be sure to run this example from the VM rather than the host
computer.
This example assumes that you're running a CDH5.1 or later cluster (such as the [Cloudera Quickstart VM][getvm]) that has Spark configured. This example requires the `spark-submit` command to execute the Spark job on the cluster. If you're using the Quickstart VM, run this example from the VM rather than the host computer.

[getvm]: http://www.cloudera.com/content/support/en/downloads/quickstart_vms.html

On the cluster, check out a copy of the code:
On the cluster, check out a copy of the code and navigate to the `/spark` directory using the following commands in a terminal window.

```bash
```
git clone https://github.com/kite-sdk/kite-examples.git
cd kite-examples
cd spark
```

## Building
## Building the Application

To build the project, type
To build the project, enter the following command in a terminal window.

```bash
```
mvn install
```

## Running

### Create and populate the events dataset
## Creating and Populating the Events Dataset

First we need to create and populate the `events` dataset.
In this example, you store raw events in a Hive-backed dataset so that you can process the results using Hive. Use `CreateEvents`, provided with the demo, to both create and populate random event records. Execute the following command from a terminal window in the `kite-examples/spark` directory.

We store the raw events in a Hive-backed dataset so you can also process the data
using Impala or Hive. We'll use a tool provided with the demo to both create and
populate the random events:

```bash
```
mvn exec:java -Dexec.mainClass="org.kitesdk.examples.spark.CreateEvents"
```

You can browse the generated events using [Hue on the QuickstartVM](http://localhost:8888/metastore/table/default/events/read).

### Use Spark to correlate events
## Using Spark to Correlate Events

Now we want to use Spark to correlate events from the same IP address within a
five minute window. Before we implement our algorithm, we need to configure Spark.
In particular, we need to set up Spark to use the Kryo serialization library and
configure Kryo to automatically serialize our Avro objects.
In this example, you use Spark to correlate events generated from the same IP address within a five-minute window. Begin by configuring Spark to use the Kryo serialization library.

```java
// Create our Spark configuration and get a Java context
SparkConf sparkConf = new SparkConf()
.setAppName("Correlate Events")
// Configure the use of Kryo serialization including our Avro registrator
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.kitesdk.examples.spark.AvroKyroRegistrator");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
```
Register your Avro classes with the following Scala class to use Avro's specific binary serialization for both the `StandardEvent` and `CorrelatedEvents` classes.

We can register our Avro classes with a small bit of Scala code:
### AvroKyroRegistrator.scala

```scala
class AvroKyroRegistrator extends KryoRegistrator {
Expand All @@ -72,22 +51,34 @@ class AvroKyroRegistrator extends KryoRegistrator {
}
```

This will register the use of Avro's specific binary serialization for bot the
`StandardEvent` and `CorrelatedEvents` classes.
### Highlights from CorrelateEventsTask.class

The following snippets show examples of code you use to configure and invoke Spark tasks.

Configure Kryo to automatically serialize Avro objects.

```java
// Create the Spark configuration and get a Java context
SparkConf sparkConf = new SparkConf()
.setAppName("Correlate Events")
// Configure the use of Kryo serialization including the Avro registrator
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.kitesdk.examples.spark.AvroKyroRegistrator");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
``

In order to access our Hive-backed datasets from remote Spark tasks, we need to
register some JARs in Spark's equivalent of the Hadoop DistributedCache:
To access Hive-backed datasets from remote Spark tasks,
register JARs in the Spark equivalent of the Hadoop DistributedCache:

```java
// Register some classes that will be needed in remote Spark tasks
// Register classes needed for remote Spark tasks
addJarFromClass(sparkContext, getClass());
addJars(sparkContext, System.getenv("HIVE_HOME"), "lib");
sparkContext.addFile(System.getenv("HIVE_HOME")+"/conf/hive-site.xml");
```

Now we're ready to read from the events dataset by configuring the MapReduce
`DatasetKeyInputFormat` and then using Spark's built-in support to generate an
RDD form an `InputFormat`.
Configure the MapReduce `DatasetKeyInputFormat` to enable the application to read from the _events_ dataset. Use Spark built-in support to generate an
RDD (Resilient Distributed Dataset) from the input format.

```java
Configuration conf = new Configuration();
Expand All @@ -97,9 +88,7 @@ JavaPairRDD<StandardEvent, Void> events = sparkContext.newAPIHadoopRDD(conf,
DatasetKeyInputFormat.class, StandardEvent.class, Void.class);
```

We can now process the events as needed. Once we have our finall RDD, we can
configure `DatasetKeyOutputFormat` in the same way and use the
`saveAsNewAPIHadoopFile` method to persist the data to our output dataset.
The application can now process events as needed. Using your RDD, configure `DatasetKeyOutputFormat` the same way and use `saveAsNewAPIHadoopFile` to store data in an output dataset.

```java
DatasetKeyOutputFormat.configure(conf).writeTo(correlatedEventsUri).withType(CorrelatedEvents.class);
Expand All @@ -108,21 +97,51 @@ matches.saveAsNewAPIHadoopFile("dummy", CorrelatedEvents.class, Void.class,
DatasetKeyOutputFormat.class, conf);
```

You can run the example Spark job by executing the following:
In a terminal window, run the Spark job using the following command.

```bash
```
spark-submit --class org.kitesdk.examples.spark.CorrelateEvents --jars $(mvn dependency:build-classpath | grep -v '^\[' | sed -e 's/:/,/g') target/kite-spark-demo-*.jar
```

You can browse the correlated events using [Hue on the QuickstartVM](http://localhost:8888/metastore/table/default/correlated_events/read).

### Delete the datasets
## Deleting the datasets

When you're done or if you want to run the example again, you can delete the datasets we created:
When you're done, or if you want to run the example again, delete the datasets using the Kite CLI `delete` command.

```bash
```
curl http://central.maven.org/maven2/org/kitesdk/kite-tools/0.17.0/kite-tools-0.17.0-binary.jar -o kite-dataset
chmod +x kite-dataset
./kite-dataset delete events
./kite-dataset delete correlated_events
```

## Troubleshooting

The following are known issues and their solutions.

### ClassNotFoundException

The first time you execute `spark-submit`, the process might not find `CorrelateEvents`.

```
java.lang.ClassNotFoundException: org.kitesdk.examples.spark.CorrelateEvents
```

Execute the command a second time to get past this exception.

### AccessControlException

On some VMs, you might receive the following exception.

```
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): \
Permission denied: user=cloudera, access=EXECUTE, inode="/user/spark":spark:spark:drwxr-x---
```

In a terminal window, update permissions using the following commands.

```
$ sudo su - hdfs
$ hadoop fs -chmod -R 777 /user/spark
```