|
43 | 43 | import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; |
44 | 44 | import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; |
45 | 45 | import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; |
| 46 | +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; |
46 | 47 | import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; |
47 | 48 | import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; |
48 | 49 | import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; |
@@ -91,6 +92,7 @@ public class AwsObjectStorage extends AbstractObjectStorage { |
91 | 92 | public static final String AUTH_TYPE_KEY = "authType"; |
92 | 93 | public static final String STATIC_AUTH_TYPE = "static"; |
93 | 94 | public static final String INSTANCE_AUTH_TYPE = "instance"; |
| 95 | + public static final String DEFAULT_AUTH_TYPE = "default"; |
94 | 96 | public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm"; |
95 | 97 |
|
96 | 98 | // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html |
@@ -372,24 +374,35 @@ protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() { |
372 | 374 | } |
373 | 375 |
|
374 | 376 | protected List<AwsCredentialsProvider> credentialsProviders() { |
375 | | - String authType = bucketURI.extensionString(AUTH_TYPE_KEY, STATIC_AUTH_TYPE); |
| 377 | + String authType = bucketURI.extensionString(AUTH_TYPE_KEY, DEFAULT_AUTH_TYPE); |
376 | 378 | switch (authType) { |
377 | 379 | case STATIC_AUTH_TYPE: { |
378 | | - String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY")); |
379 | | - String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY")); |
380 | | - if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) { |
381 | | - return Collections.emptyList(); |
382 | | - } |
383 | | - return List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); |
| 380 | + AwsCredentialsProvider acp = staticProfileCredentialsProvider(); |
| 381 | + return acp != null ? List.of(acp) : Collections.emptyList(); |
384 | 382 | } |
385 | 383 | case INSTANCE_AUTH_TYPE: { |
386 | 384 | return List.of(instanceProfileCredentialsProvider()); |
387 | 385 | } |
| 386 | + case DEFAULT_AUTH_TYPE: { |
| 387 | + AwsCredentialsProvider acp = staticProfileCredentialsProvider(); |
| 388 | + if (acp == null) |
| 389 | + return List.of(DefaultCredentialsProvider.create()); |
| 390 | + return List.of(acp, DefaultCredentialsProvider.create()); |
| 391 | + } |
388 | 392 | default: |
389 | 393 | throw new UnsupportedOperationException("Unsupported auth type: " + authType); |
390 | 394 | } |
391 | 395 | } |
392 | 396 |
|
| 397 | + protected AwsCredentialsProvider staticProfileCredentialsProvider() { |
| 398 | + String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY")); |
| 399 | + String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY")); |
| 400 | + if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) { |
| 401 | + return null; |
| 402 | + } |
| 403 | + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); |
| 404 | + } |
| 405 | + |
393 | 406 | protected AwsCredentialsProvider instanceProfileCredentialsProvider() { |
394 | 407 | if (instanceProfileCredentialsProvider == null) { |
395 | 408 | synchronized (AwsObjectStorage.class) { |
@@ -438,7 +451,6 @@ protected ClientOverrideConfiguration clientOverrideConfiguration() { |
438 | 451 | private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) { |
439 | 452 | List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders); |
440 | 453 | // Add default providers to the end of the chain |
441 | | - providers.add(InstanceProfileCredentialsProvider.create()); |
442 | 454 | providers.add(AnonymousCredentialsProvider.create()); |
443 | 455 | return AwsCredentialsProviderChain.builder() |
444 | 456 | .reuseLastProviderEnabled(true) |
|
0 commit comments