Skip to content

Commit 7775b9e

Browse files
committed
fix(s3stream): Set default aws provider as fallback provider
- (AutoMQ#2026)
1 parent c41faff commit 7775b9e

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
4646
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
4747
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
48+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4849
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
4950
import software.amazon.awssdk.core.async.AsyncRequestBody;
5051
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -91,6 +92,7 @@ public class AwsObjectStorage extends AbstractObjectStorage {
9192
public static final String AUTH_TYPE_KEY = "authType";
9293
public static final String STATIC_AUTH_TYPE = "static";
9394
public static final String INSTANCE_AUTH_TYPE = "instance";
95+
public static final String DEFAULT_AUTH_TYPE = "default";
9496
public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm";
9597

9698
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -372,24 +374,32 @@ protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() {
372374
}
373375

374376
protected List<AwsCredentialsProvider> credentialsProviders() {
375-
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, STATIC_AUTH_TYPE);
377+
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, DEFAULT_AUTH_TYPE);
376378
switch (authType) {
377379
case STATIC_AUTH_TYPE: {
378-
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
379-
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
380-
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
381-
return Collections.emptyList();
382-
}
383-
return List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
380+
AwsCredentialsProvider acp = staticProfileCredentialsProvider();
381+
return acp != null ? List.of(acp) : Collections.emptyList();
384382
}
385383
case INSTANCE_AUTH_TYPE: {
386384
return List.of(instanceProfileCredentialsProvider());
387385
}
386+
case DEFAULT_AUTH_TYPE: {
387+
return List.of(DefaultCredentialsProvider.create());
388+
}
388389
default:
389390
throw new UnsupportedOperationException("Unsupported auth type: " + authType);
390391
}
391392
}
392393

394+
protected AwsCredentialsProvider staticProfileCredentialsProvider() {
395+
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
396+
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
397+
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
398+
return null;
399+
}
400+
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
401+
}
402+
393403
protected AwsCredentialsProvider instanceProfileCredentialsProvider() {
394404
if (instanceProfileCredentialsProvider == null) {
395405
synchronized (AwsObjectStorage.class) {
@@ -438,7 +448,6 @@ protected ClientOverrideConfiguration clientOverrideConfiguration() {
438448
private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
439449
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
440450
// Add default providers to the end of the chain
441-
providers.add(InstanceProfileCredentialsProvider.create());
442451
providers.add(AnonymousCredentialsProvider.create());
443452
return AwsCredentialsProviderChain.builder()
444453
.reuseLastProviderEnabled(true)

0 commit comments

Comments
 (0)