@@ -77,18 +77,20 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
7777 protected final GsonSerDe <List <JobExecutionPlan >> serDe ;
7878 private final JobExecutionPlanDagFactory jobExecPlanDagFactory ;
7979
80- protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
81- + "dag_node_id VARCHAR(" + ServiceConfigKeys .MAX_DAG_NODE_ID_LENGTH
82- + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83- + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84- + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85- + "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86- + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))" ;
87-
88- protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
89- + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag" ;
90- protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?" ;
91- protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?" ;
80+ protected static final String CREATE_TABLE_STATEMENT =
81+ "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys .MAX_DAG_NODE_ID_LENGTH
82+ + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
83+ + ServiceConfigKeys .MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
84+ + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
85+ + "is_failed_dag TINYINT(1) DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
86+ + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))" ;
87+
88+ protected static final String INSERT_STATEMENT =
89+ "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
90+ + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag" ;
91+ protected static final String GET_DAG_NODES_STATEMENT =
92+ "SELECT dag_node FROM %s WHERE parent_dag_id = ?" ;
93+ protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?" ;
9294 protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?" ;
9395 private final ContextAwareCounter totalDagCount ;
9496
@@ -103,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
103105 DataSource dataSource = MysqlDataSourceFactory .get (config , SharedResourcesBrokerFactory .getImplicitBroker ());
104106
105107 try (Connection connection = dataSource .getConnection ();
106- PreparedStatement createStatement = connection .prepareStatement (String .format (CREATE_TABLE_STATEMENT , tableName ))) {
108+ PreparedStatement createStatement = connection .prepareStatement (
109+ String .format (CREATE_TABLE_STATEMENT , tableName ))) {
107110 createStatement .executeUpdate ();
108111 connection .commit ();
109112 } catch (SQLException e ) {
@@ -165,7 +168,8 @@ public void cleanUp(String dagId) throws IOException {
165168 @ Override
166169 public List <Dag <JobExecutionPlan >> getDags () throws IOException {
167170 throw new NotSupportedException (getClass ().getSimpleName () + " does not need this legacy API that originated with "
168- + "the DagManager that is replaced by DagProcessingEngine" ); }
171+ + "the DagManager that is replaced by DagProcessingEngine" );
172+ }
169173
170174 @ Override
171175 public Dag <JobExecutionPlan > getDag (DagManager .DagId dagId ) throws IOException {
@@ -189,14 +193,7 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
189193 if (dagNodes .isEmpty ()) {
190194 return null ;
191195 }
192- Dag <JobExecutionPlan > dag = jobExecPlanDagFactory .createDag (dagNodes .stream ().map (Dag .DagNode ::getValue ).collect (Collectors .toList ()));
193-
194- // if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true
195- if (dag .getNodes ().stream ().anyMatch (Dag .DagNode ::isFailedDag )) {
196- dag .setFailedDag (true );
197- dag .getNodes ().forEach (node -> node .setFailedDag (true ));
198- }
199- return dag ;
196+ return jobExecPlanDagFactory .createDag (dagNodes .stream ().map (Dag .DagNode ::getValue ).collect (Collectors .toList ()));
200197 }
201198
202199 @ Override
@@ -224,7 +221,7 @@ public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDag
224221 HashSet <Dag .DagNode <JobExecutionPlan >> dagNodes = new HashSet <>();
225222 try (ResultSet rs = getStatement .executeQuery ()) {
226223 while (rs .next ()) {
227- dagNodes .add (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 ), rs . getBoolean ( 2 ) ));
224+ dagNodes .add (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 )));
228225 }
229226 return dagNodes ;
230227 } catch (SQLException e ) {
@@ -239,7 +236,7 @@ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) t
239236 getStatement .setString (1 , dagNodeId .toString ());
240237 try (ResultSet rs = getStatement .executeQuery ()) {
241238 if (rs .next ()) {
242- return Optional .of (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 ), rs . getBoolean ( 2 ) ));
239+ return Optional .of (new Dag .DagNode <>(this .serDe .deserialize (rs .getString (1 )).get (0 )));
243240 }
244241 return Optional .empty ();
245242 } catch (SQLException e ) {
0 commit comments