File tree Expand file tree Collapse file tree 5 files changed +36
-2
lines changed
spark-plugin/example_3_5_1/src/main/scala/io/dataflint/example Expand file tree Collapse file tree 5 files changed +36
-2
lines changed Original file line number Diff line number Diff line change 1+ package io .dataflint .example
2+
3+ import org .apache .spark .sql .SparkSession
4+
5+ object SchedulingSmallTasksSkipAlerts extends App {
6+ val spark = SparkSession
7+ .builder()
8+ .appName(" SchedulingSmallTasks" )
9+ .config(" spark.plugins" , " io.dataflint.spark.SparkDataflintPlugin" )
10+ .config(" spark.dataflint.telemetry.enabled" , false )
11+ .config(" spark.ui.port" , " 10000" )
12+ .config(" spark.dataflint.telemetry.enabled" , value = false )
13+ .config(" spark.sql.maxMetadataStringLength" , " 10000" )
14+ .config(" spark.dataflint.alert.disabled" , " smallTasks,idleCoresTooHigh" )
15+ .master(" local[*]" )
16+ .getOrCreate()
17+
18+ val numbers = spark.range(0 , 10000 ).repartition(10000 ).count()
19+
20+ println(s " count numbers to 10000: $numbers" )
21+
22+ scala.io.StdIn .readLine()
23+ spark.stop()
24+ }
Original file line number Diff line number Diff line change @@ -87,6 +87,7 @@ export interface ConfigEntry {
8787export type ConfigEntries = ConfigEntry [ ] ;
8888
8989export interface ConfigStore {
90+ alertDisabled : string | undefined ;
9091 resourceControlType : ResourceMode ;
9192 configs : ConfigEntries ;
9293 executorMemoryBytes : number ;
Original file line number Diff line number Diff line change @@ -18,6 +18,7 @@ import { reduceSQLInputOutputAlerts } from "./Alerts/MemorySQLInputOutputAlerts"
1818import { reducePartitionSkewAlert } from "./Alerts/PartitionSkewAlert" ;
1919import { reduceSmallTasksAlert } from "./Alerts/SmallTasksAlert" ;
2020import { reduceWastedCoresAlerts } from "./Alerts/WastedCoresAlertsReducer" ;
21+ import { parseAlertDisabledConfig } from "../utils/ConfigParser" ;
2122
2223export function reduceAlerts (
2324 sqlStore : SparkSQLStore ,
@@ -39,8 +40,9 @@ export function reduceAlerts(
3940 reduceJoinToBroadcastAlert ( sqlStore , alerts ) ;
4041 reduceLargeCrossJoinScanAlert ( sqlStore , alerts ) ;
4142 reduceMaxPartitionToBigAlert ( sqlStore , stageStore , alerts ) ;
42-
43+ const disabledAlerts = parseAlertDisabledConfig ( config . alertDisabled ) ;
44+ const filteredAlerts = alerts . filter ( alert => ! disabledAlerts . has ( alert . name ) ) ;
4345 return {
44- alerts : alerts ,
46+ alerts : filteredAlerts ,
4547 } ;
4648}
Original file line number Diff line number Diff line change @@ -134,6 +134,7 @@ export function extractConfig(
134134 const resourceControlType = findResourceControlType ( sparkPropertiesObj ) ;
135135
136136 const appName = sparkPropertiesObj [ "spark.app.name" ] ;
137+ const alertDisabled = sparkPropertiesObj [ "spark.dataflint.alert.disabled" ] || undefined ;
137138 const config : ConfigEntries = [
138139 {
139140 name : "app name" ,
@@ -381,6 +382,7 @@ export function extractConfig(
381382 return [
382383 appName ,
383384 {
385+ alertDisabled : alertDisabled ,
384386 resourceControlType : resourceControlType ,
385387 configs : config ,
386388 executorMemoryOverheadViaConfigString : memoryOverheadViaConfigString ,
Original file line number Diff line number Diff line change 1+ // Utility to parse the spark.dataflint.alert.disabled config
2+ export function parseAlertDisabledConfig ( config : string | undefined ) : Set < string > {
3+ if ( ! config ) return new Set ( ) ;
4+ return new Set ( config . split ( ',' ) . map ( x => x . trim ( ) ) . filter ( Boolean ) ) ;
5+ }
You can’t perform that action at this time.
0 commit comments