Skip to content

Commit 14c5e6b

Browse files
committed
fix(s3stream): Set default aws provider as fallback provider
- (AutoMQ#2026)
1 parent c41faff commit 14c5e6b

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@
4343
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
4444
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
4545
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
46+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4647
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
4748
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
49+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4850
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
4951
import software.amazon.awssdk.core.async.AsyncRequestBody;
5052
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -91,6 +93,7 @@ public class AwsObjectStorage extends AbstractObjectStorage {
9193
public static final String AUTH_TYPE_KEY = "authType";
9294
public static final String STATIC_AUTH_TYPE = "static";
9395
public static final String INSTANCE_AUTH_TYPE = "instance";
96+
public static final String DEFAULT_AUTH_TYPE = "default";
9497
public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm";
9598

9699
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -372,24 +375,32 @@ protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() {
372375
}
373376

374377
protected List<AwsCredentialsProvider> credentialsProviders() {
375-
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, STATIC_AUTH_TYPE);
378+
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, DEFAULT_AUTH_TYPE);
376379
switch (authType) {
377380
case STATIC_AUTH_TYPE: {
378-
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
379-
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
380-
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
381-
return Collections.emptyList();
382-
}
383-
return List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
381+
AwsCredentialsProvider acp = staticProfileCredentialsProvider();
382+
return acp != null ? List.of(acp) : Collections.emptyList();
384383
}
385384
case INSTANCE_AUTH_TYPE: {
386385
return List.of(instanceProfileCredentialsProvider());
387386
}
387+
case DEFAULT_AUTH_TYPE: {
388+
return List.of(DefaultCredentialsProvider.create());
389+
}
388390
default:
389391
throw new UnsupportedOperationException("Unsupported auth type: " + authType);
390392
}
391393
}
392394

395+
protected AwsCredentialsProvider staticProfileCredentialsProvider() {
396+
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
397+
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
398+
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
399+
return null;
400+
}
401+
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
402+
}
403+
393404
protected AwsCredentialsProvider instanceProfileCredentialsProvider() {
394405
if (instanceProfileCredentialsProvider == null) {
395406
synchronized (AwsObjectStorage.class) {
@@ -438,7 +449,6 @@ protected ClientOverrideConfiguration clientOverrideConfiguration() {
438449
private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
439450
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
440451
// Add default providers to the end of the chain
441-
providers.add(InstanceProfileCredentialsProvider.create());
442452
providers.add(AnonymousCredentialsProvider.create());
443453
return AwsCredentialsProviderChain.builder()
444454
.reuseLastProviderEnabled(true)

0 commit comments

Comments
 (0)