@@ -97,26 +97,26 @@ public void prepareRun(BatchSourceContext context) throws Exception {
97
97
bucketName = GCSPath .from (path ).getBucket ();
98
98
} catch (IllegalArgumentException e ) {
99
99
collector .addFailure (e .getMessage (), null )
100
- .withStacktrace (e .getStackTrace ());
100
+ .withStacktrace (e .getStackTrace ());
101
101
collector .getOrThrowException ();
102
102
}
103
103
104
104
Boolean isServiceAccountFilePath = config .connection .isServiceAccountFilePath ();
105
105
if (isServiceAccountFilePath == null ) {
106
106
collector .addFailure ("Service account type is undefined." ,
107
- "Must be `filePath` or `JSON`" );
107
+ "Must be `filePath` or `JSON`" );
108
108
collector .getOrThrowException ();
109
109
}
110
110
111
111
Credentials credentials = null ;
112
112
try {
113
113
credentials = config .connection .getServiceAccount () == null ?
114
- null : GCPUtils .loadServiceAccountCredentials (config .connection .getServiceAccount (),
115
- isServiceAccountFilePath );
114
+ null : GCPUtils .loadServiceAccountCredentials (config .connection .getServiceAccount (),
115
+ isServiceAccountFilePath );
116
116
} catch (Exception e ) {
117
117
String errorReason = "Unable to load service account credentials." ;
118
118
collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
119
- .withStacktrace (e .getStackTrace ());
119
+ .withStacktrace (e .getStackTrace ());
120
120
collector .getOrThrowException ();
121
121
}
122
122
@@ -136,10 +136,10 @@ public void prepareRun(BatchSourceContext context) throws Exception {
136
136
location = bucket .getLocation ();
137
137
} catch (StorageException e ) {
138
138
String errorReason = String .format ("Error code: %s, Unable to access GCS bucket '%s'. " ,
139
- e .getCode (), bucketName );
139
+ e .getCode (), bucketName );
140
140
collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()),
141
- "Ensure you entered the correct bucket path and have permissions for it." )
142
- .withStacktrace (e .getStackTrace ());
141
+ "Ensure you entered the correct bucket path and have permissions for it." )
142
+ .withStacktrace (e .getStackTrace ());
143
143
collector .getOrThrowException ();
144
144
}
145
145
@@ -153,7 +153,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
153
153
154
154
// set error details provider
155
155
context .setErrorDetailsProvider (
156
- new ErrorDetailsProviderSpec (GCSErrorDetailsProvider .class .getName ()));
156
+ new ErrorDetailsProviderSpec (GCSErrorDetailsProvider .class .getName ()));
157
157
158
158
// super is called down here to avoid instantiating the lineage recorder with a null asset
159
159
super .prepareRun (context );
@@ -162,12 +162,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
162
162
@ Override
163
163
protected Map <String , String > getFileSystemProperties (BatchSourceContext context ) {
164
164
Map <String , String > properties = GCPUtils .getFileSystemProperties (config .connection , config .getPath (),
165
- new HashMap <>(config .getFileSystemProperties ()));
165
+ new HashMap <>(config .getFileSystemProperties ()));
166
166
if (config .isCopyHeader ()) {
167
167
properties .put (PathTrackingInputFormat .COPY_HEADER , Boolean .TRUE .toString ());
168
168
}
169
169
if (config .getFileEncoding () != null
170
- && !config .getFileEncoding ().equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
170
+ && !config .getFileEncoding ().equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
171
171
properties .put (PathTrackingInputFormat .SOURCE_FILE_ENCODING , config .getFileEncoding ());
172
172
}
173
173
if (config .getMinSplitSize () != null ) {
@@ -190,17 +190,17 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
190
190
@ Override
191
191
protected void recordLineage (LineageRecorder lineageRecorder , List <String > outputFields ) {
192
192
lineageRecorder .recordRead ("Read" , String .format ("Read %s from Google Cloud Storage." ,
193
- config .isEncrypted () ? " and decrypt " : " " ), outputFields );
193
+ config .isEncrypted () ? " and decrypt " : " " ), outputFields );
194
194
}
195
195
196
196
@ Override
197
197
protected boolean shouldGetSchema () {
198
198
return !config .containsMacro (GCPConnectorConfig .NAME_PROJECT ) &&
199
- !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (GCSSourceConfig .NAME_FORMAT ) &&
200
- !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
201
- !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
202
- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
203
- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
199
+ !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (GCSSourceConfig .NAME_FORMAT ) &&
200
+ !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
201
+ !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
202
+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
203
+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
204
204
}
205
205
206
206
/**
@@ -236,19 +236,19 @@ public static class GCSSourceConfig extends AbstractFileSourceConfig implements
236
236
@ Macro
237
237
@ Nullable
238
238
@ Description ("Whether the data file is encrypted. If it is set to 'true', a associated metadata file needs to be "
239
- + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
239
+ + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
240
240
private Boolean encrypted ;
241
241
242
242
@ Macro
243
243
@ Nullable
244
244
@ Description ("The file name suffix for the metadata file of the encrypted data file. "
245
- + "The default is '" + DEFAULT_ENCRYPTED_METADATA_SUFFIX + "'." )
245
+ + "The default is '" + DEFAULT_ENCRYPTED_METADATA_SUFFIX + "'." )
246
246
private String encryptedMetadataSuffix ;
247
247
248
248
@ Macro
249
249
@ Nullable
250
250
@ Description ("A list of columns with the corresponding data types for whom the automatic data type detection gets" +
251
- " skipped." )
251
+ " skipped." )
252
252
private String override ;
253
253
254
254
@ Macro
@@ -281,23 +281,23 @@ public void validate(FailureCollector collector) {
281
281
GCSPath .from (path );
282
282
} catch (IllegalArgumentException e ) {
283
283
collector .addFailure (e .getMessage (), null ).withConfigProperty (NAME_PATH )
284
- .withStacktrace (e .getStackTrace ());
284
+ .withStacktrace (e .getStackTrace ());
285
285
}
286
286
}
287
287
if (!containsMacro (NAME_FILE_SYSTEM_PROPERTIES )) {
288
288
try {
289
289
getFileSystemProperties ();
290
290
} catch (Exception e ) {
291
291
collector .addFailure ("File system properties must be a valid json." , null )
292
- .withConfigProperty (NAME_FILE_SYSTEM_PROPERTIES ).withStacktrace (e .getStackTrace ());
292
+ .withConfigProperty (NAME_FILE_SYSTEM_PROPERTIES ).withStacktrace (e .getStackTrace ());
293
293
}
294
294
}
295
295
if (!containsMacro (NAME_FILE_REGEX )) {
296
296
try {
297
297
getFilePattern ();
298
298
} catch (IllegalArgumentException e ) {
299
299
collector .addFailure (e .getMessage (), null ).withConfigProperty (NAME_FILE_REGEX )
300
- .withStacktrace (e .getStackTrace ());
300
+ .withStacktrace (e .getStackTrace ());
301
301
}
302
302
}
303
303
}
@@ -331,7 +331,7 @@ public boolean isEncrypted() {
331
331
332
332
public String getEncryptedMetadataSuffix () {
333
333
return Strings .isNullOrEmpty (encryptedMetadataSuffix ) ?
334
- DEFAULT_ENCRYPTED_METADATA_SUFFIX : encryptedMetadataSuffix ;
334
+ DEFAULT_ENCRYPTED_METADATA_SUFFIX : encryptedMetadataSuffix ;
335
335
}
336
336
337
337
Map <String , String > getFileSystemProperties () {
0 commit comments