File tree Expand file tree Collapse file tree 5 files changed +36
-2
lines changed
spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example Expand file tree Collapse file tree 5 files changed +36
-2
lines changed Original file line number Diff line number Diff line change
1
+ package io .dataflint .example
2
+
3
+ import org .apache .spark .sql .SparkSession
4
+
5
+ object SchedulingSmallTasksSkipAlerts extends App {
6
+ val spark = SparkSession
7
+ .builder()
8
+ .appName(" SchedulingSmallTasks" )
9
+ .config(" spark.plugins" , " io.dataflint.spark.SparkDataflintPlugin" )
10
+ .config(" spark.dataflint.telemetry.enabled" , false )
11
+ .config(" spark.ui.port" , " 10000" )
12
+ .config(" spark.dataflint.telemetry.enabled" , value = false )
13
+ .config(" spark.sql.maxMetadataStringLength" , " 10000" )
14
+ .config(" spark.dataflint.alert.disabled" , " smallTasks,idleCoresTooHigh" )
15
+ .master(" local[*]" )
16
+ .getOrCreate()
17
+
18
+ val numbers = spark.range(0 , 10000 ).repartition(10000 ).count()
19
+
20
+ println(s " count numbers to 10000: $numbers" )
21
+
22
+ scala.io.StdIn .readLine()
23
+ spark.stop()
24
+ }
Original file line number Diff line number Diff line change @@ -87,6 +87,7 @@ export interface ConfigEntry {
87
87
export type ConfigEntries = ConfigEntry [ ] ;
88
88
89
89
export interface ConfigStore {
90
+ alertDisabled : string | undefined ;
90
91
resourceControlType : ResourceMode ;
91
92
configs : ConfigEntries ;
92
93
executorMemoryBytes : number ;
Original file line number Diff line number Diff line change @@ -18,6 +18,7 @@ import { reduceSQLInputOutputAlerts } from "./Alerts/MemorySQLInputOutputAlerts"
18
18
import { reducePartitionSkewAlert } from "./Alerts/PartitionSkewAlert" ;
19
19
import { reduceSmallTasksAlert } from "./Alerts/SmallTasksAlert" ;
20
20
import { reduceWastedCoresAlerts } from "./Alerts/WastedCoresAlertsReducer" ;
21
+ import { parseAlertDisabledConfig } from "../utils/ConfigParser" ;
21
22
22
23
export function reduceAlerts (
23
24
sqlStore : SparkSQLStore ,
@@ -39,8 +40,9 @@ export function reduceAlerts(
39
40
reduceJoinToBroadcastAlert ( sqlStore , alerts ) ;
40
41
reduceLargeCrossJoinScanAlert ( sqlStore , alerts ) ;
41
42
reduceMaxPartitionToBigAlert ( sqlStore , stageStore , alerts ) ;
42
-
43
+ const disabledAlerts = parseAlertDisabledConfig ( config . alertDisabled ) ;
44
+ const filteredAlerts = alerts . filter ( alert => ! disabledAlerts . has ( alert . name ) ) ;
43
45
return {
44
- alerts : alerts ,
46
+ alerts : filteredAlerts ,
45
47
} ;
46
48
}
Original file line number Diff line number Diff line change @@ -134,6 +134,7 @@ export function extractConfig(
134
134
const resourceControlType = findResourceControlType ( sparkPropertiesObj ) ;
135
135
136
136
const appName = sparkPropertiesObj [ "spark.app.name" ] ;
137
+ const alertDisabled = sparkPropertiesObj [ "spark.dataflint.alert.disabled" ] || undefined ;
137
138
const config : ConfigEntries = [
138
139
{
139
140
name : "app name" ,
@@ -381,6 +382,7 @@ export function extractConfig(
381
382
return [
382
383
appName ,
383
384
{
385
+ alertDisabled : alertDisabled ,
384
386
resourceControlType : resourceControlType ,
385
387
configs : config ,
386
388
executorMemoryOverheadViaConfigString : memoryOverheadViaConfigString ,
Original file line number Diff line number Diff line change
1
+ // Utility to parse the spark.dataflint.alert.disabled config
2
+ export function parseAlertDisabledConfig ( config : string | undefined ) : Set < string > {
3
+ if ( ! config ) return new Set ( ) ;
4
+ return new Set ( config . split ( ',' ) . map ( x => x . trim ( ) ) . filter ( Boolean ) ) ;
5
+ }
You can’t perform that action at this time.
0 commit comments