79
79
import java .util .concurrent .TimeUnit ;
80
80
import java .util .stream .Collectors ;
81
81
import java .util .stream .StreamSupport ;
82
+ import javax .annotation .Nullable ;
82
83
83
84
/**
84
85
* Tests reading from GCS (Google Cloud Storage) and writing to GCS from within a Dataproc cluster.
@@ -811,9 +812,7 @@ public void testMultiSinkContentType() throws Exception {
811
812
Schema outputSchema = Schema .recordOf ("output.schema" ,
812
813
Schema .Field .of ("id" , Schema .of (Schema .Type .INT )),
813
814
Schema .Field .of ("name" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))),
814
- Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING )))
815
- );
816
-
815
+ Schema .Field .of ("email" , Schema .nullableOf (Schema .of (Schema .Type .STRING ))));
817
816
818
817
String line1 =
"1,Marilyn Hawkins,[email protected] ,DepartmentA" ;
819
818
String line2 =
"2,Terry Perez,[email protected] ,DepartmentB" ;
@@ -822,7 +821,6 @@ public void testMultiSinkContentType() throws Exception {
822
821
823
822
bucket .create (inputPath , String .join ("\n " , Arrays .asList (line1 , line2 , line3 )).getBytes (StandardCharsets .UTF_8 ));
824
823
825
-
826
824
Map <String , String > inputSourceConfig = new HashMap <>();
827
825
inputSourceConfig .put ("schema" , schema .toString ());
828
826
inputSourceConfig .put ("format" , "${sourceFormat}" );
@@ -838,7 +836,7 @@ public void testMultiSinkContentType() throws Exception {
838
836
ETLBatchConfig .Builder pipelineConfig = ETLBatchConfig .builder ().addStage (source );
839
837
840
838
String path = createPath (bucket , OUTPUT_BLOB_NAME );
841
- ETLStage sink = new ETLStage ("multsink" , createMultiSinkPlugin (path , schema , "departament " ));
839
+ ETLStage sink = new ETLStage ("multsink" , createMultiSinkPlugin ("csv " ));
842
840
pipelineConfig .addStage (sink ).addConnection (source .getName (), sink .getName ());
843
841
844
842
AppRequest <ETLBatchConfig > appRequest = getBatchAppRequestV2 (pipelineConfig .build ());
@@ -851,7 +849,10 @@ public void testMultiSinkContentType() throws Exception {
851
849
args .put (multisink1 , outputSchema .toString ());
852
850
args .put (multisink2 , outputSchema .toString ());
853
851
args .put ("sourceFormat" , "csv" );
854
- args .put ("sinkFormat" , "csv" );
852
+ args .put ("multiSinkPath" , path );
853
+ args .put ("multiSinkProjectId" , getProjectId ());
854
+ args .put ("multiSinkSchema" , schema .toString ());
855
+ args .put ("multiSinkSplitField" , "departament" );
855
856
args .put ("contentType" , CSV_CONTENT_TYPE );
856
857
startWorkFlow (appManager , ProgramRunStatus .COMPLETED , args );
857
858
@@ -879,7 +880,7 @@ private ETLPlugin createSinkPlugin(String format, String path, Schema schema) {
879
880
return createSinkPlugin (format , path , schema , null );
880
881
}
881
882
882
- private ETLPlugin createSinkPlugin (String format , String path , Schema schema , String contentType ) {
883
+ private ETLPlugin createSinkPlugin (String format , String path , Schema schema ,@ Nullable String contentType ) {
883
884
ImmutableMap .Builder <String , String > propertyBuilder = new ImmutableMap .Builder <String , String >()
884
885
.put ("path" , path )
885
886
.put ("format" , format )
@@ -892,14 +893,16 @@ private ETLPlugin createSinkPlugin(String format, String path, Schema schema, St
892
893
return new ETLPlugin (SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , propertyBuilder .build (), GOOGLE_CLOUD_ARTIFACT );
893
894
}
894
895
895
- private ETLPlugin createMultiSinkPlugin (String path , Schema schema , String splitField ) {
896
+ private ETLPlugin createMultiSinkPlugin (String sinkFormat ) {
896
897
Map <String , String > map = new HashMap <>();
897
- map .put ("path" , path );
898
- map .put ("format" , "${sinkFormat}" );
899
- map .put ("project" , getProjectId ());
900
- map .put ("schema" , schema .toString ());
898
+ map .put ("path" , "${multiSinkPath}" );
899
+ map .put ("format" , sinkFormat );
900
+ //todo make macro when https://cdap.atlassian.net/browse/PLUGIN-553 is fixed
901
+ //map.put("format", "${sinkFormat}");
902
+ map .put ("project" , "${multiSinkProjectId}" );
903
+ map .put ("schema" , "${multiSinkSchema}" );
901
904
map .put ("referenceName" , "gcs-multi-input" );
902
- map .put ("splitField" , splitField );
905
+ map .put ("splitField" , "${multiSinkSplitField}" );
903
906
map .put ("contentType" , "${contentType}" );
904
907
return new ETLPlugin (MULTI_SINK_PLUGIN_NAME , BatchSink .PLUGIN_TYPE , map , GOOGLE_CLOUD_ARTIFACT );
905
908
}
0 commit comments