Skip to content

Commit 44b2a28

Browse files
committed
fix(s3stream): Set default aws provider as fallback provider
- (AutoMQ#2026)
1 parent d90d73e commit 44b2a28

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/operator/AwsObjectStorage.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
4646
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
4747
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
48+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4849
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
4950
import software.amazon.awssdk.core.async.AsyncRequestBody;
5051
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
@@ -91,6 +92,7 @@ public class AwsObjectStorage extends AbstractObjectStorage {
9192
public static final String AUTH_TYPE_KEY = "authType";
9293
public static final String STATIC_AUTH_TYPE = "static";
9394
public static final String INSTANCE_AUTH_TYPE = "instance";
95+
public static final String DEFAULT_AUTH_TYPE = "default";
9496
public static final String CHECKSUM_ALGORITHM_KEY = "checksumAlgorithm";
9597

9698
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
@@ -372,24 +374,35 @@ protected DeleteObjectsAccumulator newDeleteObjectsAccumulator() {
372374
}
373375

374376
protected List<AwsCredentialsProvider> credentialsProviders() {
375-
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, STATIC_AUTH_TYPE);
377+
String authType = bucketURI.extensionString(AUTH_TYPE_KEY, DEFAULT_AUTH_TYPE);
376378
switch (authType) {
377379
case STATIC_AUTH_TYPE: {
378-
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
379-
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
380-
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
381-
return Collections.emptyList();
382-
}
383-
return List.of(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
380+
AwsCredentialsProvider acp = staticProfileCredentialsProvider();
381+
return acp != null ? List.of(acp) : Collections.emptyList();
384382
}
385383
case INSTANCE_AUTH_TYPE: {
386384
return List.of(instanceProfileCredentialsProvider());
387385
}
386+
case DEFAULT_AUTH_TYPE: {
387+
AwsCredentialsProvider acp = staticProfileCredentialsProvider();
388+
if (acp == null)
389+
return List.of(DefaultCredentialsProvider.create());
390+
return List.of(acp, DefaultCredentialsProvider.create());
391+
}
388392
default:
389393
throw new UnsupportedOperationException("Unsupported auth type: " + authType);
390394
}
391395
}
392396

397+
protected AwsCredentialsProvider staticProfileCredentialsProvider() {
398+
String accessKey = bucketURI.extensionString(BucketURI.ACCESS_KEY_KEY, System.getenv("KAFKA_S3_ACCESS_KEY"));
399+
String secretKey = bucketURI.extensionString(BucketURI.SECRET_KEY_KEY, System.getenv("KAFKA_S3_SECRET_KEY"));
400+
if (StringUtils.isBlank(accessKey) || StringUtils.isBlank(secretKey)) {
401+
return null;
402+
}
403+
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
404+
}
405+
393406
protected AwsCredentialsProvider instanceProfileCredentialsProvider() {
394407
if (instanceProfileCredentialsProvider == null) {
395408
synchronized (AwsObjectStorage.class) {
@@ -438,7 +451,6 @@ protected ClientOverrideConfiguration clientOverrideConfiguration() {
438451
private AwsCredentialsProvider newCredentialsProviderChain(List<AwsCredentialsProvider> credentialsProviders) {
439452
List<AwsCredentialsProvider> providers = new ArrayList<>(credentialsProviders);
440453
// Add default providers to the end of the chain
441-
providers.add(InstanceProfileCredentialsProvider.create());
442454
providers.add(AnonymousCredentialsProvider.create());
443455
return AwsCredentialsProviderChain.builder()
444456
.reuseLastProviderEnabled(true)

0 commit comments

Comments
 (0)