Skip to content

Commit

Permalink
add databricks knowledge base
Browse files Browse the repository at this point in the history
  • Loading branch information
endymecy committed May 10, 2016
1 parent 44c30fe commit d0ce727
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 0 deletions.
130 changes: 130 additions & 0 deletions databricks-spark-knowledge-base.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,133 @@ INFO: Master: Removing app app-YYY-0000

## 3 性能和优化

### 3.1 一个 RDD 有多少分区

  在调试和故障处理的时候,我们通常有必要知道 `RDD` 有多少个分区。这里有几个方法可以找到这些信息:

#### 使用 UI 查看在分区上执行的任务数

  `stage` 执行的时候,你可以在 `Spark UI` 上看到这个 `stage` 上的分区数。 下面的例子中的简单任务在 4 个分区上创建了共 100 个元素的 `RDD` ,然后在这些元素被收集到 `driver` 之前分发一个 `map` 任务:

```scala
scala> val someRDD = sc.parallelize(1 to 100, 4)
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> someRDD.map(x => x).collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23,
24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
```

&emsp;&emsp;`Spark`的应用 `UI` 里,从下面截图上看到的 "Total Tasks" 代表了分区数。

<div align="center"><img src="imgs/partitions-as-tasks.png" alt="partitions-as-tasks" align="center" /></div>

#### 使用 UI 查看分区缓存

&emsp;&emsp;持久化` RDD` 时通常需要知道有多少个分区被存储。下面的这个例子和之前的一样,除了现在我们要对 `RDD` 做缓存处理。操作完成之后,我们可以在 `UI` 上看到这个操作导致什么被我们存储了。

```scala
scala> someRDD.setName("toy").cache
res2: someRDD.type = toy ParallelCollectionRDD[0] at parallelize at <console>:12

scala> someRDD.map(x => x).collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37,
38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
```

&emsp;&emsp;注意:下面的截图有 4 个分区被缓存。

<div align="center"><img src="imgs/cached-partitions.png" alt="cached-partitions" align="center" /></div>

#### 编程查看 RDD 分区

&emsp;&emsp;`Scala API` 里,`RDD` 持有一个分区数组的引用, 你可以使用它找到有多少个分区:

```scala
scala> val someRDD = sc.parallelize(1 to 100, 30)
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

scala> someRDD.partitions.size
res0: Int = 30
```

&emsp;&emsp;`Python API` 里, 有一个方法可以明确地列出有多少个分区:

```python
In [1]: someRDD = sc.parallelize(range(101),30)
In [2]: someRDD.getNumPartitions()
Out[2]: 30
```

### 3.2 数据本地性

&emsp;&emsp;`Spark` 是一个并行数据处理框架,这意味着任务应该在离数据尽可能近的地方执行(即最少的数据传输)。

#### 检查本地性

&emsp;&emsp;检查任务是否在本地运行的最好方式是在 `Spark UI` 上查看 `stage` 信息,注意下面截图中的 `Locality Level` 列显示任务运行在哪个地方。

<div align="center"><img src="imgs/locality.png" alt="locality" align="center" /></div>

#### 调整本地性配置

&emsp;&emsp;你可以调整 `Spark` 在每个数据本地性`level`(`data local --> process local --> node local --> rack local --> Any`)上等待的时长。更多详细的参数信息请查看程序配置文档的 `Scheduling` 章节里类似于 `spark.locality.*` 的配置。

## 4 Spark Streaming

### ERROR OneForOneStrategy

&emsp;&emsp;如果你在 `Spark Streaming` 里启用 `checkpointing``forEachRDD` 函数使用的对象都应该可以被序列化(`Serializable`)。否则会出现这样的异常 "ERROR OneForOneStrategy: ... java.io.NotSerializableException:"

```java
JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);

// This enables checkpointing.
jssc.checkpoint("/tmp/checkpoint_test");

JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999);

NotSerializable notSerializable = new NotSerializable();
dStream.foreachRDD(rdd -> {
if (rdd.count() == 0) {
return null;
}
String first = rdd.first();

notSerializable.doSomething(first);
return null;
}
);

// This does not work!!!!
```

&emsp;&emsp;按照下面的方式之一进行修改,上面的代码才能正常运行:

- 在配置文件里面删除 jssc.checkpoint 这一行关闭 checkpointing。
- 让对象能被序列化。
- 在 forEachRDD 函数里面声明 NotSerializable,下面的示例代码是可以正常运行的:

```java
JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);

jssc.checkpoint("/tmp/checkpoint_test");

JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999);

dStream.foreachRDD(rdd -> {
if (rdd.count() == 0) {
return null;
}
String first = rdd.first();
NotSerializable notSerializable = new NotSerializable();
notSerializable.doSomething(first);
return null;
}
);

// This code snippet is fine since the NotSerializable object
// is declared and only used within the forEachRDD function.
```
Binary file added imgs/cached-partitions.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/locality.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added imgs/partitions-as-tasks.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit d0ce727

Please sign in to comment.