Skip to content

Commit 4c365fa

Browse files
ShubhamGupta29varunsaxena
authored andcommitted
Spark heuristic modification (#407)
* Revert "Dr. Elephant Tez Support working patch (#313)" This reverts commit a0470a3. * Rerevert "Dr. Elephant Tez Support working patch (#313)" including attribution. This reverts commit e3fd598. Co-authored-by: Abhishek Das <[email protected]> * Auto tuning: Support for parameter set multi-try (#386) * Changes in some of the Spark Heuristics * Adding test for changes executor gc heuristic and unified memory heuristic * Update ExecutorGcHeuristic.scala * Update UnifiedMemoryHeuristic.scala * Changed some hard coded values to variables * Due to strict inequality changing the other thereshold levels for executor and driver
1 parent 1b88ffe commit 4c365fa

File tree

9 files changed

+92
-15
lines changed

9 files changed

+92
-15
lines changed

app/com/linkedin/drelephant/ElephantContext.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ private void loadConfiguration() {
110110
loadFetchers();
111111
loadHeuristics();
112112
loadJobTypes();
113-
114113
loadGeneralConf();
115114
loadAutoTuningConf();
116115

app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class SparkRestClient(sparkConf: SparkConf) {
101101
Await.result(futureJobDatas, DEFAULT_TIMEOUT),
102102
Await.result(futureStageDatas, DEFAULT_TIMEOUT),
103103
Await.result(futureExecutorSummaries, Duration(5, SECONDS)),
104+
Seq.empty,
104105
Await.result(futureLogData, Duration(5, SECONDS))
105106
)
106107

app/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristic.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
118118
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD)
119119
}
120120
if(evaluator.severityExecutorCores != Severity.NONE) {
121-
result.addResultDetail("Executor cores", "The number of executor cores should be <=" + evaluator.DEFAULT_SPARK_CORES_THRESHOLDS.low + ". Please change it in the field " + SPARK_EXECUTOR_CORES_KEY)
121+
result.addResultDetail("Executor cores", "The number of executor cores should be <" + evaluator.DEFAULT_SPARK_CORES_THRESHOLDS.low + ". Please change it in the field " + SPARK_EXECUTOR_CORES_KEY)
122122
}
123123
if(evaluator.severityExecutorMemory != Severity.NONE) {
124124
result.addResultDetail("Executor memory", "Please do not specify excessive amount of executor memory. Change it in the field " + SPARK_EXECUTOR_MEMORY_KEY)
@@ -204,7 +204,7 @@ object ConfigurationHeuristic {
204204
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
205205
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
206206
val DEFAULT_SPARK_CORES_THRESHOLDS =
207-
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)
207+
SeverityThresholds(low = 5, moderate = 7, severe = 9, critical = 11, ascending = true)
208208

209209
val severityExecutorMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(executorMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
210210
val severityExecutorCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(executorCores.getOrElse(0).asInstanceOf[Number].intValue)

app/com/linkedin/drelephant/spark/heuristics/DriverHeuristic.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,10 @@ class DriverHeuristic(private val heuristicConfigurationData: HeuristicConfigura
6161
SPARK_DRIVER_MEMORY_KEY,
6262
formatProperty(evaluator.driverMemoryBytes.map(MemoryFormatUtils.bytesToString))
6363
),
64-
new HeuristicResultDetails(
65-
"Ratio of time spent in GC to total time", evaluator.ratio.toString
66-
),
64+
//Removing driver GC heuristics for now
65+
// new HeuristicResultDetails(
66+
// "Ratio of time spent in GC to total time", evaluator.ratio.toString
67+
// ),
6768
new HeuristicResultDetails(
6869
SPARK_DRIVER_CORES_KEY,
6970
formatProperty(evaluator.driverCores.map(_.toString))
@@ -76,6 +77,7 @@ class DriverHeuristic(private val heuristicConfigurationData: HeuristicConfigura
7677
)
7778
if(evaluator.severityJvmUsedMemory != Severity.NONE) {
7879
resultDetails = resultDetails :+ new HeuristicResultDetails("Driver Peak JVM used Memory", "The allocated memory for the driver (in " + SPARK_DRIVER_MEMORY_KEY + ") is much more than the peak JVM used memory by the driver.")
80+
resultDetails = resultDetails :+ new HeuristicResultDetails("Suggested spark.driver.memory", MemoryFormatUtils.roundOffMemoryStringToNextInteger(MemoryFormatUtils.bytesToString(((1 + BUFFER_FRACTION) * (evaluator.maxDriverPeakJvmUsedMemory + reservedMemory)).toLong)))
7981
}
8082
if (evaluator.severityGc != Severity.NONE) {
8183
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio high", "The driver is spending too much time on GC. We recommend increasing the driver memory.")
@@ -113,6 +115,7 @@ object DriverHeuristic {
113115
val EXECUTION_MEMORY = "executionMemory"
114116
val STORAGE_MEMORY = "storageMemory"
115117
val JVM_USED_MEMORY = "jvmUsedMemory"
118+
val BUFFER_FRACTION = 0.2
116119

117120
// 300 * FileUtils.ONE_MB (300 * 1024 * 1024)
118121
val reservedMemory : Long = 314572800
@@ -172,10 +175,10 @@ object DriverHeuristic {
172175

173176
//The following thresholds are for checking if the memory and cores values (driver) are above normal. These thresholds are experimental, and may change in the future.
174177
val DEFAULT_SPARK_MEMORY_THRESHOLDS =
175-
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
178+
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), moderate = MemoryFormatUtils.stringToBytes("15G"),
176179
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
177180
val DEFAULT_SPARK_CORES_THRESHOLDS =
178-
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)
181+
SeverityThresholds(low = 5, moderate = 7, severe = 9, critical = 11, ascending = true)
179182

180183
val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
181184
val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)

app/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristic.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfi
5959
}
6060
//severityTimeD corresponds to the descending severity calculation
6161
if (evaluator.severityTimeD.getValue > Severity.LOW.getValue) {
62-
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too less time in GC. Please check if you have asked for more executor memory than required.")
62+
resultDetails = resultDetails :+ new HeuristicResultDetails("Gc ratio low", "The job is spending too little time in GC. Please check if you have asked for more executor memory than required.")
6363
}
6464

6565
val result = new HeuristicResult(
@@ -76,6 +76,7 @@ class ExecutorGcHeuristic(private val heuristicConfigurationData: HeuristicConfi
7676
object ExecutorGcHeuristic {
7777
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
7878
val SPARK_EXECUTOR_CORES = "spark.executor.cores"
79+
val EXECUTOR_RUNTIME_THRESHOLD_IN_MINUTES = 5
7980

8081
/** The ascending severity thresholds for the ratio of JVM GC Time and executor Run Time (checking whether ratio is above normal)
8182
* These thresholds are experimental and are likely to change */
@@ -107,7 +108,12 @@ object ExecutorGcHeuristic {
107108

108109
var ratio: Double = jvmTime.toDouble / executorRunTimeTotal.toDouble
109110

110-
lazy val severityTimeA: Severity = executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio)
111+
//If the total Executor Runtime is less then 5 minutes then we won't consider for the severity due to GC
112+
lazy val severityTimeA: Severity = if (executorRunTimeTotal >= (EXECUTOR_RUNTIME_THRESHOLD_IN_MINUTES * Statistics.MINUTE_IN_MS))
113+
executorGcHeuristic.gcSeverityAThresholds.severityOf(ratio)
114+
else
115+
Severity.NONE
116+
111117
lazy val severityTimeD: Severity = executorGcHeuristic.gcSeverityDThresholds.severityOf(ratio)
112118

113119
/**
@@ -145,4 +151,3 @@ object ExecutorGcHeuristic {
145151
}
146152
}
147153
}
148-

app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ object UnifiedMemoryHeuristic {
7272
val PEAK_UNIFIED_MEMORY_THRESHOLD_KEY = "peak_unified_memory_threshold"
7373
val SPARK_EXECUTOR_MEMORY_THRESHOLD_KEY = "spark_executor_memory_threshold"
7474
val DEFAULT_SPARK_EXECUTOR_MEMORY_THRESHOLD = "2G"
75+
val UNIFIED_MEMORY_ALLOCATED_THRESHOLD = "256M"
76+
val SPARK_MEMORY_FRACTION_THRESHOLD : Double = 0.05
7577

7678
class Evaluator(unifiedMemoryHeuristic: UnifiedMemoryHeuristic, data: SparkApplicationData) {
7779
lazy val appConfigurationProperties: Map[String, String] =
@@ -116,10 +118,15 @@ object UnifiedMemoryHeuristic {
116118
}
117119
}.max
118120

119-
lazy val severity: Severity = if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(unifiedMemoryHeuristic.sparkExecutorMemoryThreshold)) {
120-
Severity.NONE
121+
//If sparkMemoryFraction or total Unified Memory allocated is less than their respective thresholds then won't consider for severity
122+
lazy val severity: Severity = if (sparkMemoryFraction > SPARK_MEMORY_FRACTION_THRESHOLD && maxMemory > MemoryFormatUtils.stringToBytes(UNIFIED_MEMORY_ALLOCATED_THRESHOLD)) {
123+
if (sparkExecutorMemory <= MemoryFormatUtils.stringToBytes(unifiedMemoryHeuristic.sparkExecutorMemoryThreshold)) {
124+
Severity.NONE
125+
} else {
126+
PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(maxUnifiedMemory)
127+
}
121128
} else {
122-
PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(maxUnifiedMemory)
129+
Severity.NONE
123130
}
124131
}
125132
}

test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
143143
it("returns executor cores") {
144144
val details = heuristicResultDetails.get(10)
145145
details.getName should include("Executor cores")
146-
details.getValue should be("The number of executor cores should be <=4. Please change it in the field spark.executor.cores")
146+
details.getValue should be("The number of executor cores should be <5. Please change it in the field spark.executor.cores")
147147
}
148148
}
149149

test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,57 @@ class ExecutorGcHeuristicTest extends FunSpec with Matchers {
6767
)
6868
)
6969

70+
val executorSummaries2 = Seq(
71+
newFakeExecutorSummary(
72+
id = "1",
73+
totalGCTime = 12000,
74+
totalDuration = Duration("4min").toMillis
75+
),
76+
newFakeExecutorSummary(
77+
id = "2",
78+
totalGCTime = 13000,
79+
totalDuration = Duration("1min").toMillis
80+
)
81+
)
82+
83+
val executorSummaries3 = Seq(
84+
newFakeExecutorSummary(
85+
id = "1",
86+
totalGCTime = 9000,
87+
totalDuration = Duration("2min").toMillis
88+
)
89+
)
90+
7091
describe(".apply") {
7192
val data = newFakeSparkApplicationData(executorSummaries)
7293
val data1 = newFakeSparkApplicationData(executorSummaries1)
94+
val data2 = newFakeSparkApplicationData(executorSummaries2)
95+
val data3 = newFakeSparkApplicationData(executorSummaries3)
7396
val heuristicResult = executorGcHeuristic.apply(data)
7497
val heuristicResult1 = executorGcHeuristic.apply(data1)
98+
val heuristicResult2 = executorGcHeuristic.apply(data2)
99+
val heuristicResult3 = executorGcHeuristic.apply(data3)
75100
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails
76101
val heuristicResultDetails1 = heuristicResult1.getHeuristicResultDetails
102+
val heuristicResultDetails2 = heuristicResult2.getHeuristicResultDetails
103+
val heuristicResultDetails3 = heuristicResult3.getHeuristicResultDetails
77104

78105
it("returns the severity") {
79106
heuristicResult.getSeverity should be(Severity.CRITICAL)
80107
}
81108

109+
it("return the low severity") {
110+
heuristicResult2.getSeverity should be(Severity.LOW)
111+
}
112+
113+
it("return NONE severity for runtime less than 5 min") {
114+
heuristicResult2.getSeverity should be(Severity.LOW)
115+
}
116+
117+
it("return none severity") {
118+
heuristicResult3.getSeverity should be(Severity.NONE)
119+
}
120+
82121
it("returns the JVM GC time to Executor Run time duration") {
83122
val details = heuristicResultDetails.get(0)
84123
details.getName should include("GC time to Executor Run time ratio")

test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
2121
val unifiedMemoryHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData)
2222
val appConfigurationProperties = Map("spark.executor.memory"->"3147483647")
2323
val appConfigurationProperties1 = Map("spark.executor.memory"->"214567874847")
24+
val appConfigurationProperties2 = Map("spark.executor.memory"->"214567874847", "spark.memory.fraction"->"0.06")
2425

2526
val executorData = Seq(
2627
newDummyExecutorData("1", 999999999, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
@@ -41,13 +42,27 @@ class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
4142
newDummyExecutorData("2", 999999999, Map("executionMemory" -> 999999990, "storageMemory" -> 9))
4243
)
4344

45+
val executorData3 = Seq(
46+
newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
47+
newDummyExecutorData("2", 500000, Map("executionMemory" -> 5000, "storageMemory" -> 9))
48+
)
49+
50+
val executorData4 = Seq(
51+
newDummyExecutorData("1", 268435460L, Map("executionMemory" -> 300000, "storageMemory" -> 94567)),
52+
newDummyExecutorData("2", 268435460L, Map("executionMemory" -> 900000, "storageMemory" -> 500000))
53+
)
54+
4455
describe(".apply") {
4556
val data = newFakeSparkApplicationData(appConfigurationProperties, executorData)
4657
val data1 = newFakeSparkApplicationData(appConfigurationProperties1, executorData1)
4758
val data2 = newFakeSparkApplicationData(appConfigurationProperties1, executorData2)
59+
val data3 = newFakeSparkApplicationData(appConfigurationProperties1, executorData3)
60+
val data4 = newFakeSparkApplicationData(appConfigurationProperties2, executorData4)
4861
val heuristicResult = unifiedMemoryHeuristic.apply(data)
4962
val heuristicResult1 = unifiedMemoryHeuristic.apply(data1)
5063
val heuristicResult2 = unifiedMemoryHeuristic.apply(data2)
64+
val heuristicResult3 = unifiedMemoryHeuristic.apply(data3)
65+
val heuristicResult4 = unifiedMemoryHeuristic.apply(data4)
5166
val evaluator = new Evaluator(unifiedMemoryHeuristic, data1)
5267

5368
it("has severity") {
@@ -85,6 +100,14 @@ class UnifiedMemoryHeuristicTest extends FunSpec with Matchers {
85100
it("has no severity when max and allocated memory are the same") {
86101
heuristicResult2.getSeverity should be(Severity.NONE)
87102
}
103+
104+
it("has no severity when maxMemory is less than 256Mb") {
105+
heuristicResult3.getSeverity should be(Severity.NONE)
106+
}
107+
108+
it("has critical severity when maxMemory is greater than 256Mb and spark memory fraction is greater than 0.05") {
109+
heuristicResult4.getSeverity should be(Severity.CRITICAL)
110+
}
88111
}
89112
}
90113

0 commit comments

Comments
 (0)