1
1
package com .dtstack .taier .scheduler .service ;
2
2
3
3
import com .alibaba .fastjson .JSONObject ;
4
- import com .dtstack .taier .common .enums .EScheduleJobType ;
5
- import com .dtstack .taier .common .env .EnvironmentContext ;
6
4
import com .dtstack .taier .common .exception .TaierDefineException ;
7
- import com .dtstack .taier .common .util .TaskParamsUtils ;
8
5
import com .dtstack .taier .dao .domain .ScheduleJob ;
9
6
import com .dtstack .taier .dao .domain .ScheduleTaskShade ;
10
7
import com .dtstack .taier .dao .dto .ScheduleTaskParamShade ;
11
8
import com .dtstack .taier .pluginapi .constrant .ConfigConstant ;
12
- import com .dtstack .taier .pluginapi .enums .EDeployMode ;
13
9
import com .dtstack .taier .scheduler .PluginWrapper ;
14
- import com .dtstack .taier .scheduler .executor .DatasourceOperator ;
15
10
import com .dtstack .taier .scheduler .server .pipeline .JobParamReplace ;
16
11
import com .dtstack .taier .scheduler .utils .CreateJsonFileUtil ;
17
- import com .dtstack .taier .scheduler .utils .FileUtil ;
18
- import com .dtstack .taier .scheduler .utils .ScriptUtil ;
19
12
import org .apache .commons .collections .CollectionUtils ;
20
13
import org .apache .commons .io .FileUtils ;
21
14
import org .apache .commons .lang3 .StringUtils ;
35
28
@ Service
36
29
public class DataxService {
37
30
38
- @ Autowired
39
- private ClusterService clusterService ;
40
-
41
- @ Autowired
42
- private EnvironmentContext environmentContext ;
43
-
44
- @ Autowired
45
- private DatasourceOperator datasourceOperator ;
46
-
47
31
@ Autowired
48
32
private PluginWrapper pluginWrapper ;
49
33
@@ -74,19 +58,6 @@ public void handDataxParams(Map<String, Object> actionParam, ScheduleTaskShade t
74
58
}
75
59
dealDataxExeParams (actionParam , task , scheduleJob , sqlText );
76
60
}
77
- /**
78
- * 将脚本上传到 hdfs
79
- *
80
- * @param sqlText
81
- * @param task
82
- * @param scheduleJob
83
- * @return
84
- */
85
- private String uploadToHdfs (String sqlText , ScheduleTaskShade task , ScheduleJob scheduleJob ) {
86
- JSONObject pluginInfo = clusterService .pluginInfoJSON (task .getTenantId (), task .getTaskType (), null , null , null );
87
- String hdfsPath = environmentContext .getHdfsTaskPath () + (FileUtil .getUploadFileName (task .getTaskType (), scheduleJob .getJobId ()));
88
- return datasourceOperator .uploadToHdfs (pluginInfo , task .getTenantId (), sqlText , hdfsPath );
89
- }
90
61
91
62
private void dealDataxExeParams (Map <String , Object > actionParam , ScheduleTaskShade task , ScheduleJob scheduleJob ,
92
63
String sqlText ) throws IOException {
@@ -112,7 +83,7 @@ private void dealDataxExeParams(Map<String, Object> actionParam, ScheduleTaskSha
112
83
throw new TaierDefineException ("datax.local.path is null" );
113
84
}
114
85
//生成datax的json文件
115
- String taskTempPath = CreateJsonFileUtil .createJsonFile (task . getSqlText () , tempPath , task .getName ());
86
+ String taskTempPath = CreateJsonFileUtil .createJsonFile (sqlText , tempPath , task .getName ());
116
87
if (StringUtils .isBlank (taskTempPath )) {
117
88
throw new TaierDefineException ("创建datax.json文件失败" );
118
89
}
0 commit comments