@@ -77,19 +77,17 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
7777 protected final GsonSerDe <List <JobExecutionPlan >> serDe ;
7878 private final JobExecutionPlanDagFactory jobExecPlanDagFactory ;
7979
80- protected static final String CREATE_TABLE_STATEMENT =
81- "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys .MAX_DAG_NODE_ID_LENGTH
82- + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83- + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84- + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85- + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86- + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))" ;
87-
88- protected static final String INSERT_STATEMENT =
89- "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
90- + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag" ;
91- protected static final String GET_DAG_NODES_STATEMENT =
92- "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?" ;
80+ protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
81+ + "dag_node_id VARCHAR(" + ServiceConfigKeys .MAX_DAG_NODE_ID_LENGTH
82+ + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83+ + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84+ + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85+ + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86+ + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))" ;
87+
88+ protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
89+ + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag" ;
90+ protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?" ;
9391 protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?" ;
9492 protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?" ;
9593 private final ContextAwareCounter totalDagCount ;
@@ -105,8 +103,7 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
105103 DataSource dataSource = MysqlDataSourceFactory .get (config , SharedResourcesBrokerFactory .getImplicitBroker ());
106104
107105 try (Connection connection = dataSource .getConnection ();
108- PreparedStatement createStatement = connection .prepareStatement (
109- String .format (CREATE_TABLE_STATEMENT , tableName ))) {
106+ PreparedStatement createStatement = connection .prepareStatement (String .format (CREATE_TABLE_STATEMENT , tableName ))) {
110107 createStatement .executeUpdate ();
111108 connection .commit ();
112109 } catch (SQLException e ) {
@@ -168,8 +165,7 @@ public void cleanUp(String dagId) throws IOException {
168165 @ Override
169166 public List <Dag <JobExecutionPlan >> getDags () throws IOException {
170167 throw new NotSupportedException (getClass ().getSimpleName () + " does not need this legacy API that originated with "
171- + "the DagManager that is replaced by DagProcessingEngine" );
172- }
168+ + "the DagManager that is replaced by DagProcessingEngine" ); }
173169
174170 @ Override
175171 public Dag <JobExecutionPlan > getDag (DagManager .DagId dagId ) throws IOException {
@@ -193,13 +189,12 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
193189 if (dagNodes .isEmpty ()) {
194190 return null ;
195191 }
196- Dag <JobExecutionPlan > dag =
197- jobExecPlanDagFactory .createDag (dagNodes .stream ().map (Dag .DagNode ::getValue ).collect (Collectors .toList ()));
198- if (dagNodes .stream ().anyMatch (Dag .DagNode ::isFailedDag )) {
192+ Dag <JobExecutionPlan > dag = jobExecPlanDagFactory .createDag (dagNodes .stream ().map (Dag .DagNode ::getValue ).collect (Collectors .toList ()));
193+
194+ // if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true
195+ if (dag .getNodes ().stream ().anyMatch (Dag .DagNode ::isFailedDag )) {
199196 dag .setFailedDag (true );
200- for (Dag .DagNode dagNode : dag .getNodes ()) {
201- dagNode .setFailedDag (true );
202- }
197+ dag .getNodes ().forEach (node -> node .setFailedDag (true ));
203198 }
204199 return dag ;
205200 }
0 commit comments