Skip to content

Commit

Permalink
Address comments to simplify credential creation
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Oct 10, 2024
1 parent 2df807f commit 7a3073f
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 96 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.729")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package io.aiven.kafka.connect.iam;

import java.util.Objects;

import io.aiven.kafka.connect.s3.S3BaseConfig;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;

Expand All @@ -32,12 +33,11 @@ public AWSCredentialsProvider getProvider(final S3BaseConfig config) {
if (config.hasAwsStsRole()) {
return getStsProvider(config);
}
final AwsAccessSecret awsCredentials = config.getAwsCredentials();
if (!awsCredentials.isValid()) {
final BasicAWSCredentials awsCredentials = config.getAwsCredentials();
if (Objects.isNull(awsCredentials)) {
return config.getCustomCredentialsProvider();
}
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsCredentials.getAccessKeyId().value(),
awsCredentials.getSecretAccessKey().value()));
return new AWSStaticCredentialsProvider(awsCredentials);
}

private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) {
Expand All @@ -52,11 +52,8 @@ private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) {

private AWSSecurityTokenService securityTokenService(final S3BaseConfig config) {
if (config.hasStsEndpointConfig()) {
final AwsStsEndpointConfig endpointConfig = config.getStsEndpointConfig();
final AwsClientBuilder.EndpointConfiguration stsConfig = new AwsClientBuilder.EndpointConfiguration(
endpointConfig.getServiceEndpoint(), endpointConfig.getSigningRegion());
final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard();
stsBuilder.setEndpointConfiguration(stsConfig);
stsBuilder.setEndpointConfiguration(config.getAwsEndpointConfiguration());
return stsBuilder.build();
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator;
import io.aiven.kafka.connect.common.config.validators.UrlValidator;
import io.aiven.kafka.connect.iam.AwsAccessSecret;
import io.aiven.kafka.connect.iam.AwsStsEndpointConfig;
import io.aiven.kafka.connect.iam.AwsStsRole;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
Expand Down Expand Up @@ -125,11 +126,11 @@ public class S3BaseConfig extends AbstractConfig {
// in other words we can't use values greater than 30
public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3;

protected S3BaseConfig(ConfigDef definition, Map<?, ?> originals) { // NOPMD UnusedAssignment
super(definition, originals);
protected S3BaseConfig(ConfigDef definition, Map<String, String> originals) { // NOPMD UnusedAssignment
super(definition, handleDeprecatedYyyyUppercase(originals));
}

protected static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, String> properties) {
private static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, String> properties) {
if (!properties.containsKey(AWS_S3_PREFIX_CONFIG)) {
return properties;
}
Expand Down Expand Up @@ -216,34 +217,6 @@ protected static void addAwsConfigGroup(final ConfigDef configDef) {
ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG);
}

protected static void addS3SinkConfig(final ConfigDef configDef) {
int awsS3SinkCounter = 0;

configDef.define(AWS_S3_PART_SIZE, Type.INT, DEFAULT_PART_SIZE, new ConfigDef.Validator() {

static final int MAX_BUFFER_SIZE = 2_000_000_000;

@Override
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(name, null, "Part size must be non-null");
}
final var number = (Number) value;
if (number.longValue() <= 0) {
throw new ConfigException(name, value, "Part size must be greater than 0");
}
if (number.longValue() > MAX_BUFFER_SIZE) {
throw new ConfigException(name, value,
"Part size must be no more: " + MAX_BUFFER_SIZE + " bytes (2GB)");
}
}
}, Importance.MEDIUM,
"The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + Integer.MAX_VALUE
+ " (2GB) and default is " + DEFAULT_PART_SIZE + " (5MB)",
GROUP_AWS, awsS3SinkCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, AWS_S3_PART_SIZE);
}

protected static void addAwsStsConfigGroup(final ConfigDef configDef) {
int awsStsGroupCounter = 0;
configDef.define(AWS_STS_ROLE_ARN, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
Expand Down Expand Up @@ -395,12 +368,22 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return new AwsStsEndpointConfig(getString(AWS_STS_CONFIG_ENDPOINT), getString(AWS_S3_REGION_CONFIG));
}

public AwsAccessSecret getNewAwsCredentials() {
return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID_CONFIG), getPassword(AWS_SECRET_ACCESS_KEY_CONFIG));
public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
final AwsStsEndpointConfig config = getStsEndpointConfig();
return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion());
}

public AwsAccessSecret getAwsCredentials() {
return getNewAwsCredentials().isValid() ? getNewAwsCredentials() : getOldAwsCredentials();
public BasicAWSCredentials getAwsCredentials() {
if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {
return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(),
getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value());
} else if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY))) {
return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID).value(),
getPassword(AWS_SECRET_ACCESS_KEY).value());
}
return null;
}

public String getAwsS3EndPoint() {
Expand Down Expand Up @@ -453,11 +436,8 @@ public int getS3RetryBackoffMaxRetries() {
return getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG);
}

public AwsAccessSecret getOldAwsCredentials() {
return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID), getPassword(AWS_SECRET_ACCESS_KEY));
}

public AWSCredentialsProvider getCustomCredentialsProvider() {
return getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

public class S3ClientUtils {
public class S3Utility {

private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@

public class AwsCredentialConfig extends S3BaseConfig {
public AwsCredentialConfig(final Map<String, String> properties) {
super(configDef(new ConfigDef()), handleDeprecatedYyyyUppercase(properties));
super(configDef(new ConfigDef()), properties);
}

public static ConfigDef configDef(ConfigDef configDef) { // NOPMD UnusedAssignment
addS3RetryPolicies(configDef);
addAwsConfigGroup(configDef);
addAwsStsConfigGroup(configDef);
addDeprecatedConfiguration(configDef);
addS3SinkConfig(configDef);
return configDef;
}
}

This file was deleted.

2 changes: 1 addition & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ dependencyResolutionManagement {

include("commons")

include("s3-connectors-common")
include("s3-commons")

include("gcs-sink-connector")

Expand Down

0 comments on commit 7a3073f

Please sign in to comment.