17
17
package io .cdap .plugin .gcp .gcs .source ;
18
18
19
19
import com .google .auth .Credentials ;
20
+ import com .google .cloud .storage .Bucket ;
20
21
import com .google .cloud .storage .Storage ;
21
22
import com .google .cloud .storage .StorageException ;
22
23
import com .google .common .base .Strings ;
55
56
import java .util .HashMap ;
56
57
import java .util .List ;
57
58
import java .util .Map ;
59
+ import java .util .Objects ;
58
60
import java .util .regex .Pattern ;
59
61
import javax .annotation .Nullable ;
60
62
@@ -95,40 +97,49 @@ public void prepareRun(BatchSourceContext context) throws Exception {
95
97
bucketName = GCSPath .from (path ).getBucket ();
96
98
} catch (IllegalArgumentException e ) {
97
99
collector .addFailure (e .getMessage (), null )
98
- .withStacktrace (e .getStackTrace ());
100
+ .withStacktrace (e .getStackTrace ());
99
101
collector .getOrThrowException ();
100
102
}
101
103
102
104
Boolean isServiceAccountFilePath = config .connection .isServiceAccountFilePath ();
103
105
if (isServiceAccountFilePath == null ) {
104
106
collector .addFailure ("Service account type is undefined." ,
105
- "Must be `filePath` or `JSON`" );
107
+ "Must be `filePath` or `JSON`" );
106
108
collector .getOrThrowException ();
107
109
}
108
110
109
111
Credentials credentials = null ;
110
112
try {
111
113
credentials = config .connection .getServiceAccount () == null ?
112
- null : GCPUtils .loadServiceAccountCredentials (config .connection .getServiceAccount (),
113
- isServiceAccountFilePath );
114
+ null : GCPUtils .loadServiceAccountCredentials (config .connection .getServiceAccount (),
115
+ isServiceAccountFilePath );
114
116
} catch (Exception e ) {
115
117
String errorReason = "Unable to load service account credentials." ;
116
118
collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()), null )
117
- .withStacktrace (e .getStackTrace ());
119
+ .withStacktrace (e .getStackTrace ());
118
120
collector .getOrThrowException ();
119
121
}
120
122
121
123
Storage storage = GCPUtils .getStorage (config .connection .getProject (), credentials );
122
124
String location = null ;
123
125
try {
124
126
// Get location of the source for lineage
125
- location = storage .get (bucketName ).getLocation ();
127
+ Bucket bucket = storage .get (bucketName );
128
+ if (Objects .isNull (bucket )) {
129
+ String errorReason = String .format ("Unable to access GCS bucket '%s'." ,
130
+ bucketName );
131
+ collector .addFailure (
132
+ String .format ("%s Ensure you entered the correct bucket path." , errorReason ),
133
+ null );
134
+ collector .getOrThrowException ();
135
+ }
136
+ location = bucket .getLocation ();
126
137
} catch (StorageException e ) {
127
138
String errorReason = String .format ("Error code: %s, Unable to access GCS bucket '%s'. " ,
128
- e .getCode (), bucketName );
139
+ e .getCode (), bucketName );
129
140
collector .addFailure (String .format ("%s %s" , errorReason , e .getMessage ()),
130
- "Ensure you entered the correct bucket path and have permissions for it." )
131
- .withStacktrace (e .getStackTrace ());
141
+ "Ensure you entered the correct bucket path and have permissions for it." )
142
+ .withStacktrace (e .getStackTrace ());
132
143
collector .getOrThrowException ();
133
144
}
134
145
@@ -142,7 +153,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
142
153
143
154
// set error details provider
144
155
context .setErrorDetailsProvider (
145
- new ErrorDetailsProviderSpec (GCSErrorDetailsProvider .class .getName ()));
156
+ new ErrorDetailsProviderSpec (GCSErrorDetailsProvider .class .getName ()));
146
157
147
158
// super is called down here to avoid instantiating the lineage recorder with a null asset
148
159
super .prepareRun (context );
@@ -151,12 +162,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
151
162
@ Override
152
163
protected Map <String , String > getFileSystemProperties (BatchSourceContext context ) {
153
164
Map <String , String > properties = GCPUtils .getFileSystemProperties (config .connection , config .getPath (),
154
- new HashMap <>(config .getFileSystemProperties ()));
165
+ new HashMap <>(config .getFileSystemProperties ()));
155
166
if (config .isCopyHeader ()) {
156
167
properties .put (PathTrackingInputFormat .COPY_HEADER , Boolean .TRUE .toString ());
157
168
}
158
169
if (config .getFileEncoding () != null
159
- && !config .getFileEncoding ().equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
170
+ && !config .getFileEncoding ().equalsIgnoreCase (AbstractFileSourceConfig .DEFAULT_FILE_ENCODING )) {
160
171
properties .put (PathTrackingInputFormat .SOURCE_FILE_ENCODING , config .getFileEncoding ());
161
172
}
162
173
if (config .getMinSplitSize () != null ) {
@@ -179,17 +190,17 @@ protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
179
190
@ Override
180
191
protected void recordLineage (LineageRecorder lineageRecorder , List <String > outputFields ) {
181
192
lineageRecorder .recordRead ("Read" , String .format ("Read %s from Google Cloud Storage." ,
182
- config .isEncrypted () ? " and decrypt " : " " ), outputFields );
193
+ config .isEncrypted () ? " and decrypt " : " " ), outputFields );
183
194
}
184
195
185
196
@ Override
186
197
protected boolean shouldGetSchema () {
187
198
return !config .containsMacro (GCPConnectorConfig .NAME_PROJECT ) &&
188
- !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (GCSSourceConfig .NAME_FORMAT ) &&
189
- !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
190
- !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
191
- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
192
- !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
199
+ !config .containsMacro (GCSSourceConfig .NAME_PATH ) && !config .containsMacro (GCSSourceConfig .NAME_FORMAT ) &&
200
+ !config .containsMacro (GCSSourceConfig .NAME_DELIMITER ) &&
201
+ !config .containsMacro (GCSSourceConfig .NAME_FILE_SYSTEM_PROPERTIES ) &&
202
+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_FILE_PATH ) &&
203
+ !config .containsMacro (GCPConnectorConfig .NAME_SERVICE_ACCOUNT_JSON );
193
204
}
194
205
195
206
/**
@@ -225,19 +236,19 @@ public static class GCSSourceConfig extends AbstractFileSourceConfig implements
225
236
@ Macro
226
237
@ Nullable
227
238
@ Description ("Whether the data file is encrypted. If it is set to 'true', a associated metadata file needs to be "
228
- + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
239
+ + "provided for each data file. Please refer to the Documentation for the details of the metadata file content." )
229
240
private Boolean encrypted ;
230
241
231
242
@ Macro
232
243
@ Nullable
233
244
@ Description ("The file name suffix for the metadata file of the encrypted data file. "
234
- + "The default is '" + DEFAULT_ENCRYPTED_METADATA_SUFFIX + "'." )
245
+ + "The default is '" + DEFAULT_ENCRYPTED_METADATA_SUFFIX + "'." )
235
246
private String encryptedMetadataSuffix ;
236
247
237
248
@ Macro
238
249
@ Nullable
239
250
@ Description ("A list of columns with the corresponding data types for whom the automatic data type detection gets" +
240
- " skipped." )
251
+ " skipped." )
241
252
private String override ;
242
253
243
254
@ Macro
@@ -270,23 +281,23 @@ public void validate(FailureCollector collector) {
270
281
GCSPath .from (path );
271
282
} catch (IllegalArgumentException e ) {
272
283
collector .addFailure (e .getMessage (), null ).withConfigProperty (NAME_PATH )
273
- .withStacktrace (e .getStackTrace ());
284
+ .withStacktrace (e .getStackTrace ());
274
285
}
275
286
}
276
287
if (!containsMacro (NAME_FILE_SYSTEM_PROPERTIES )) {
277
288
try {
278
289
getFileSystemProperties ();
279
290
} catch (Exception e ) {
280
291
collector .addFailure ("File system properties must be a valid json." , null )
281
- .withConfigProperty (NAME_FILE_SYSTEM_PROPERTIES ).withStacktrace (e .getStackTrace ());
292
+ .withConfigProperty (NAME_FILE_SYSTEM_PROPERTIES ).withStacktrace (e .getStackTrace ());
282
293
}
283
294
}
284
295
if (!containsMacro (NAME_FILE_REGEX )) {
285
296
try {
286
297
getFilePattern ();
287
298
} catch (IllegalArgumentException e ) {
288
299
collector .addFailure (e .getMessage (), null ).withConfigProperty (NAME_FILE_REGEX )
289
- .withStacktrace (e .getStackTrace ());
300
+ .withStacktrace (e .getStackTrace ());
290
301
}
291
302
}
292
303
}
@@ -320,7 +331,7 @@ public boolean isEncrypted() {
320
331
321
332
public String getEncryptedMetadataSuffix () {
322
333
return Strings .isNullOrEmpty (encryptedMetadataSuffix ) ?
323
- DEFAULT_ENCRYPTED_METADATA_SUFFIX : encryptedMetadataSuffix ;
334
+ DEFAULT_ENCRYPTED_METADATA_SUFFIX : encryptedMetadataSuffix ;
324
335
}
325
336
326
337
Map <String , String > getFileSystemProperties () {
@@ -330,4 +341,4 @@ Map<String, String> getFileSystemProperties() {
330
341
return GSON .fromJson (fileSystemProperties , MAP_STRING_STRING_TYPE );
331
342
}
332
343
}
333
- }
344
+ }
0 commit comments