2424import java .util .Map ;
2525import models .JobExecution ;
2626import models .JobExecution .ExecutionState ;
27- import models .TuningJobExecution ;
28- import models .TuningJobExecution .ParamSetStatus ;
27+ import models .JobSuggestedParamSet ;
28+ import models .JobSuggestedParamSet .ParamSetStatus ;
29+ import models .TuningJobExecutionParamSet ;
2930import org .apache .log4j .Logger ;
3031
3132
@@ -39,26 +40,27 @@ public class AzkabanJobCompleteDetector extends JobCompleteDetector {
3940 private AzkabanJobStatusUtil _azkabanJobStatusUtil ;
4041
4142 public enum AzkabanJobStatus {
42- FAILED , CANCELLED , KILLED , SUCCEEDED
43+ FAILED , CANCELLED , KILLED , SUCCEEDED , SKIPPED
4344 }
4445
4546 /**
4647 * Returns the list of completed executions
47- * @param jobExecutions Started Execution list
48+ * @param inProgressExecutionParamSet List of executions (with corresponding param set) in progress
4849 * @return List of completed executions
49- * @throws MalformedURLException
50- * @throws URISyntaxException
50+ * @throws MalformedURLException MalformedURLException
51+ * @throws URISyntaxException URISyntaxException
5152 */
52- protected List <TuningJobExecution > getCompletedExecutions (List <TuningJobExecution > jobExecutions )
53+ protected List <JobExecution > getCompletedExecutions (List <TuningJobExecutionParamSet > inProgressExecutionParamSet )
5354 throws MalformedURLException , URISyntaxException {
5455 logger .info ("Fetching the list of executions completed since last iteration" );
55- List <TuningJobExecution > completedExecutions = new ArrayList <TuningJobExecution >();
56+ List <JobExecution > completedExecutions = new ArrayList <JobExecution >();
5657 try {
57- for (TuningJobExecution tuningJobExecution : jobExecutions ) {
58+ for (TuningJobExecutionParamSet tuningJobExecutionParamSet : inProgressExecutionParamSet ) {
5859
59- JobExecution jobExecution = tuningJobExecution .jobExecution ;
60+ JobSuggestedParamSet jobSuggestedParamSet = tuningJobExecutionParamSet .jobSuggestedParamSet ;
61+ JobExecution jobExecution = tuningJobExecutionParamSet .jobExecution ;
6062
61- logger .info ("Checking current status of started execution: " + tuningJobExecution . jobExecution .jobExecId );
63+ logger .info ("Checking current status of started execution: " + jobExecution .jobExecId );
6264
6365 if (_azkabanJobStatusUtil == null ) {
6466 logger .info ("Initializing AzkabanJobStatusUtil" );
@@ -72,23 +74,32 @@ protected List<TuningJobExecution> getCompletedExecutions(List<TuningJobExecutio
7274 logger .info ("Job Found:" + job .getKey () + ". Status: " + job .getValue ());
7375 if (job .getKey ().equals (jobExecution .job .jobName )) {
7476 if (job .getValue ().equals (AzkabanJobStatus .FAILED .toString ())) {
75- tuningJobExecution .paramSetState = ParamSetStatus .EXECUTED ;
77+ if (jobSuggestedParamSet .paramSetState .equals (ParamSetStatus .SENT )) {
78+ jobSuggestedParamSet .paramSetState = ParamSetStatus .EXECUTED ;
79+ }
7680 jobExecution .executionState = ExecutionState .FAILED ;
77- }
78- if (job .getValue ().equals (AzkabanJobStatus .CANCELLED .toString ()) || job .getValue ()
79- .equals (AzkabanJobStatus .KILLED .toString ())) {
80- tuningJobExecution .paramSetState = ParamSetStatus .EXECUTED ;
81- jobExecution .executionState = ExecutionState .CANCELLED ;
82- }
83- if (job .getValue ().equals (AzkabanJobStatus .SUCCEEDED .toString ())) {
84- tuningJobExecution .paramSetState = ParamSetStatus .EXECUTED ;
81+ } else if (job .getValue ().equals (AzkabanJobStatus .SUCCEEDED .toString ())) {
82+ if (jobSuggestedParamSet .paramSetState .equals (ParamSetStatus .SENT )) {
83+ jobSuggestedParamSet .paramSetState = ParamSetStatus .EXECUTED ;
84+ }
8585 jobExecution .executionState = ExecutionState .SUCCEEDED ;
86+ } else if (job .getValue ().equals (AzkabanJobStatus .CANCELLED .toString ()) || job .getValue ()
87+ .equals (AzkabanJobStatus .KILLED .toString ()) || job .getValue ()
88+ .equals (AzkabanJobStatus .SKIPPED .toString ())) {
89+ if (jobSuggestedParamSet .paramSetState .equals (ParamSetStatus .SENT )) {
90+ jobSuggestedParamSet .paramSetState = ParamSetStatus .EXECUTED ;
91+ }
92+ jobExecution .executionState = ExecutionState .CANCELLED ;
8693 }
87- if (tuningJobExecution .paramSetState .equals (ParamSetStatus .EXECUTED )) {
88- completedExecutions .add (tuningJobExecution );
89- logger .info ("Execution " + tuningJobExecution .jobExecution .jobExecId + " is completed" );
94+
95+ if (jobExecution .executionState .equals (ExecutionState .SUCCEEDED ) || jobExecution .executionState .equals (
96+ ExecutionState .FAILED ) || jobExecution .executionState .equals (ExecutionState .CANCELLED )) {
97+ jobExecution .update ();
98+ jobSuggestedParamSet .update ();
99+ completedExecutions .add (jobExecution );
100+ logger .info ("Execution " + jobExecution .jobExecId + " is completed" );
90101 } else {
91- logger .info ("Execution " + tuningJobExecution . jobExecution .jobExecId + " is still in running state" );
102+ logger .info ("Execution " + jobExecution .jobExecId + " is still in running state" );
92103 }
93104 }
94105 }
0 commit comments