From d62f7365eebfb74f42d9cfdcb038f5c197b83546 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Thu, 17 Oct 2024 11:13:19 +0100 Subject: [PATCH] Update S3-Sink to use the S3-commons package Signed-off-by: Aindriu Lavelle --- .../aiven/kafka/connect/s3/S3BaseConfig.java | 10 +- .../iam/AwsCredentialProviderFactoryTest.java | 2 +- .../connect/tools/AwsCredentialConfig.java | 22 +- s3-sink-connector/build.gradle.kts | 2 + .../io/aiven/kafka/connect/s3/S3SinkTask.java | 2 +- .../connect/s3/config/AwsAccessSecret.java | 43 --- .../config/AwsCredentialProviderFactory.java | 61 --- .../s3/config/AwsStsEndpointConfig.java | 43 --- .../kafka/connect/s3/config/AwsStsRole.java | 62 ---- .../kafka/connect/s3/config/S3SinkConfig.java | 349 +----------------- .../s3/AwsCredentialProviderFactoryTest.java | 80 ---- .../kafka/connect/s3/S3SinkTaskTest.java | 2 +- .../connect/s3/config/S3SinkConfigTest.java | 12 +- .../config/S3SinkCredentialsConfigTest.java | 2 +- 14 files changed, 58 insertions(+), 634 deletions(-) delete mode 100644 s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsAccessSecret.java delete mode 100644 s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsCredentialProviderFactory.java delete mode 100644 s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsEndpointConfig.java delete mode 100644 s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsRole.java delete mode 100644 s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/AwsCredentialProviderFactoryTest.java diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java index ee2e64de..8f91d78a 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java @@ -24,12 +24,12 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; +import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; @@ -47,7 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class S3BaseConfig extends AbstractConfig { +public class S3BaseConfig extends AivenCommonConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3BaseConfig.class); @Deprecated @@ -87,6 +87,10 @@ public class S3BaseConfig extends AbstractConfig { @Deprecated public static final String AWS_S3_PREFIX = "aws_s3_prefix"; + // FIXME since we support so far both old style and new style of property names + // Importance was set to medium, + // as soon we will migrate to new values it must be set to HIGH + // same for default value public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider"; @@ -376,10 +380,12 @@ public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() { public BasicAWSCredentials getAwsCredentials() { if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID_CONFIG)) && Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { + return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(), getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value()); } else if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID)) && Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY))) { + LOGGER.warn("Config options {} and {} are deprecated", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY); return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID).value(), getPassword(AWS_SECRET_ACCESS_KEY).value()); } diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java index 3ce92566..17b4e94d 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java @@ -39,8 +39,8 @@ final class AwsCredentialProviderFactoryTest { public void setUp() { factory = new AwsCredentialProviderFactory(); props = new HashMap<>(); - props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "anyBucket"); props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"); + props.put(S3BaseConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}"); } @Test diff --git a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java b/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java index 49b14e82..48e0f0a5 100644 --- a/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java +++ b/s3-commons/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java @@ -20,18 +20,36 @@ import org.apache.kafka.common.config.ConfigDef; +import io.aiven.kafka.connect.common.config.AivenCommonConfig; +import io.aiven.kafka.connect.common.config.CompressionType; +import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.s3.S3BaseConfig; public class AwsCredentialConfig extends S3BaseConfig { public AwsCredentialConfig(final Map properties) { - super(configDef(new ConfigDef()), properties); + super(configDef(), properties); } - public static ConfigDef configDef(ConfigDef configDef) { // NOPMD UnusedAssignment + public static ConfigDef configDef() { // NOPMD UnusedAssignment + final ConfigDef configDef = getBaseConfigDefinition(); addS3RetryPolicies(configDef); addAwsConfigGroup(configDef); addAwsStsConfigGroup(configDef); addDeprecatedConfiguration(configDef); return configDef; } + + private static ConfigDef getBaseConfigDefinition() { + final ConfigDef definition = new ConfigDef(); + addOutputFieldsFormatConfigGroup(definition, OutputFieldType.VALUE); + definition.define(AivenCommonConfig.FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null, + ConfigDef.Importance.MEDIUM, "File name template"); + definition.define(AivenCommonConfig.FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING, + CompressionType.NONE.name, ConfigDef.Importance.MEDIUM, "File compression"); + definition.define(AivenCommonConfig.FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM, + "The maximum number of records to put in a single file. " + "Must be a non-negative integer number. " + + "0 is interpreted as \"unlimited\", which is the default."); + return definition; + } + } diff --git a/s3-sink-connector/build.gradle.kts b/s3-sink-connector/build.gradle.kts index 4302d33a..7527bda5 100644 --- a/s3-sink-connector/build.gradle.kts +++ b/s3-sink-connector/build.gradle.kts @@ -65,6 +65,7 @@ dependencies { compileOnly(apache.kafka.connect.runtime) implementation(project(":commons")) + implementation(project(":s3-commons")) implementation(tools.spotbugs.annotations) implementation(logginglibs.slf4j) @@ -73,6 +74,7 @@ dependencies { testImplementation(compressionlibs.snappy) testImplementation(compressionlibs.zstd.jni) + testImplementation(project(":s3-commons")) testImplementation(apache.kafka.connect.api) testImplementation(apache.kafka.connect.runtime) diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java index 2a266e83..f7abaefa 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/S3SinkTask.java @@ -41,7 +41,7 @@ import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory; import io.aiven.kafka.connect.common.output.OutputWriter; import io.aiven.kafka.connect.common.templating.VariableTemplatePart; -import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory; +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.config.S3SinkConfig; import com.amazonaws.PredefinedClientConfigurations; diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsAccessSecret.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsAccessSecret.java deleted file mode 100644 index 206c04ce..00000000 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsAccessSecret.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2021 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.config; - -import java.util.Objects; - -import org.apache.kafka.common.config.types.Password; - -final class AwsAccessSecret { - private final Password accessKeyId; - private final Password secretAccessKey; - - public AwsAccessSecret(final Password accessKeyId, final Password secretAccessKey) { - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - } - - public Password getAccessKeyId() { - return accessKeyId; - } - - public Password getSecretAccessKey() { - return secretAccessKey; - } - - public Boolean isValid() { - return Objects.nonNull(accessKeyId) && Objects.nonNull(secretAccessKey); - } -} diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsCredentialProviderFactory.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsCredentialProviderFactory.java deleted file mode 100644 index fe67a6dd..00000000 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsCredentialProviderFactory.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2021 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.config; - -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.securitytoken.AWSSecurityTokenService; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; - -public class AwsCredentialProviderFactory { - public AWSCredentialsProvider getProvider(final S3SinkConfig config) { - if (config.hasAwsStsRole()) { - return getStsProvider(config); - } - final AwsAccessSecret awsCredentials = config.getAwsCredentials(); - if (!awsCredentials.isValid()) { - return config.getCustomCredentialsProvider(); - } - return new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsCredentials.getAccessKeyId().value(), - awsCredentials.getSecretAccessKey().value())); - } - - private AWSCredentialsProvider getStsProvider(final S3SinkConfig config) { - final AwsStsRole awsstsRole = config.getStsRole(); - final AWSSecurityTokenService sts = securityTokenService(config); - return new STSAssumeRoleSessionCredentialsProvider.Builder(awsstsRole.getArn(), awsstsRole.getSessionName()) - .withStsClient(sts) - .withExternalId(awsstsRole.getExternalId()) - .withRoleSessionDurationSeconds(awsstsRole.getSessionDurationSeconds()) - .build(); - } - - private AWSSecurityTokenService securityTokenService(final S3SinkConfig config) { - if (config.hasStsEndpointConfig()) { - final AwsStsEndpointConfig endpointConfig = config.getStsEndpointConfig(); - final AwsClientBuilder.EndpointConfiguration stsConfig = new AwsClientBuilder.EndpointConfiguration( - endpointConfig.getServiceEndpoint(), endpointConfig.getSigningRegion()); - final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard(); - stsBuilder.setEndpointConfiguration(stsConfig); - return stsBuilder.build(); - } - return AWSSecurityTokenServiceClientBuilder.defaultClient(); - } -} diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsEndpointConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsEndpointConfig.java deleted file mode 100644 index 735a02c3..00000000 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsEndpointConfig.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2021 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.config; - -import java.util.Objects; - -final class AwsStsEndpointConfig { - public static final String AWS_STS_GLOBAL_ENDPOINT = "https://sts.amazonaws.com"; - - private final String serviceEndpoint; - private final String signingRegion; - - public AwsStsEndpointConfig(final String serviceEndpoint, final String signingRegion) { - this.serviceEndpoint = serviceEndpoint; - this.signingRegion = signingRegion; - } - - public String getServiceEndpoint() { - return serviceEndpoint; - } - - public String getSigningRegion() { - return signingRegion; - } - - public Boolean isValid() { - return Objects.nonNull(signingRegion); - } -} diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsRole.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsRole.java deleted file mode 100644 index f2e1bd4b..00000000 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/AwsStsRole.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2021 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.s3.config; - -import java.util.Objects; - -import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; - -final class AwsStsRole { - - // AssumeRole request limit details here: - // https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html - public static final int MIN_SESSION_DURATION = STSAssumeRoleSessionCredentialsProvider.DEFAULT_DURATION_SECONDS; - public static final int MAX_SESSION_DURATION = 43_200; - - private final String arn; - private final String externalId; - private final String sessionName; - private final int sessionDurationSeconds; - - public AwsStsRole(final String arn, final String externalId, final String sessionName, - final int sessionDurationSeconds) { - this.arn = arn; - this.externalId = externalId; - this.sessionName = sessionName; - this.sessionDurationSeconds = sessionDurationSeconds; - } - - public String getArn() { - return arn; - } - - public String getExternalId() { - return externalId; - } - - public String getSessionName() { - return sessionName; - } - - public int getSessionDurationSeconds() { - return sessionDurationSeconds; - } - - public Boolean isValid() { - return Objects.nonNull(arn) && Objects.nonNull(sessionName); - } -} diff --git a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java index 7f428e58..38d5c511 100644 --- a/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java +++ b/s3-sink-connector/src/main/java/io/aiven/kafka/connect/s3/config/S3SinkConfig.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigException; -import io.aiven.kafka.connect.common.config.AivenCommonConfig; import io.aiven.kafka.connect.common.config.CompressionType; import io.aiven.kafka.connect.common.config.FixedSetRecommender; import io.aiven.kafka.connect.common.config.OutputField; @@ -40,15 +39,15 @@ import io.aiven.kafka.connect.common.config.TimestampSource; import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; import io.aiven.kafka.connect.common.config.validators.FilenameTemplateValidator; -import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; -import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; import io.aiven.kafka.connect.common.config.validators.TimeZoneValidator; import io.aiven.kafka.connect.common.config.validators.TimestampSourceValidator; -import io.aiven.kafka.connect.common.config.validators.UrlValidator; import io.aiven.kafka.connect.common.templating.Template; +import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; +import io.aiven.kafka.connect.iam.AwsStsRole; +import io.aiven.kafka.connect.s3.S3BaseConfig; import io.aiven.kafka.connect.s3.S3OutputStream; -import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Region; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; @@ -57,80 +56,12 @@ import org.slf4j.LoggerFactory; @SuppressWarnings({ "PMD.TooManyMethods", "PMD.GodClass", "PMD.ExcessiveImports" }) -final public class S3SinkConfig extends AivenCommonConfig { +final public class S3SinkConfig extends S3BaseConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SinkConfig.class); - @Deprecated - public static final String AWS_ACCESS_KEY_ID = "aws_access_key_id"; - @Deprecated - public static final String AWS_SECRET_ACCESS_KEY = "aws_secret_access_key"; - @Deprecated - public static final String AWS_S3_BUCKET = "aws_s3_bucket"; - @Deprecated - public static final String AWS_S3_ENDPOINT = "aws_s3_endpoint"; - @Deprecated - public static final String AWS_S3_REGION = "aws_s3_region"; - @Deprecated - public static final String AWS_S3_PREFIX = "aws_s3_prefix"; - - @Deprecated - public static final String OUTPUT_COMPRESSION = "output_compression"; - @Deprecated - public static final String OUTPUT_COMPRESSION_TYPE_GZIP = "gzip"; - @Deprecated - public static final String OUTPUT_COMPRESSION_TYPE_NONE = "none"; - - @Deprecated - public static final String OUTPUT_FIELDS = "output_fields"; - @Deprecated - public static final String TIMESTAMP_TIMEZONE = "timestamp.timezone"; - @Deprecated - public static final String TIMESTAMP_SOURCE = "timestamp.source"; - @Deprecated - public static final String OUTPUT_FIELD_NAME_KEY = "key"; - @Deprecated - public static final String OUTPUT_FIELD_NAME_OFFSET = "offset"; - @Deprecated - public static final String OUTPUT_FIELD_NAME_TIMESTAMP = "timestamp"; - @Deprecated - public static final String OUTPUT_FIELD_NAME_VALUE = "value"; - @Deprecated - public static final String OUTPUT_FIELD_NAME_HEADERS = "headers"; - @Deprecated - public static final String AWS_S3_PREFIX_CONFIG = "aws.s3.prefix"; - - public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; - public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; - public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider"; - public static final String AWS_CREDENTIAL_PROVIDER_DEFAULT = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"; - public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name"; - public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm"; - public static final String AWS_S3_ENDPOINT_CONFIG = "aws.s3.endpoint"; - public static final String AWS_S3_REGION_CONFIG = "aws.s3.region"; - public static final String AWS_S3_PART_SIZE = "aws.s3.part.size.bytes"; - - // FIXME since we support so far both old style and new style of property names - // Importance was set to medium, - // as soon we will migrate to new values it must be set to HIGH - // same for default value - - public static final String AWS_STS_ROLE_ARN = "aws.sts.role.arn"; - public static final String AWS_STS_ROLE_EXTERNAL_ID = "aws.sts.role.external.id"; - public static final String AWS_STS_ROLE_SESSION_NAME = "aws.sts.role.session.name"; - public static final String AWS_STS_ROLE_SESSION_DURATION = "aws.sts.role.session.duration"; - public static final String AWS_STS_CONFIG_ENDPOINT = "aws.sts.config.endpoint"; - private static final String GROUP_AWS = "AWS"; private static final String GROUP_FILE = "File"; - private static final String GROUP_FORMAT = "Format"; // NOPMD UnusedPrivateField - private static final String GROUP_AWS_STS = "AWS STS"; - - private static final String GROUP_S3_RETRY_BACKOFF_POLICY = "S3 retry backoff policy"; - - public static final String AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG = "aws.s3.backoff.delay.ms"; - public static final String AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "aws.s3.backoff.max.delay.ms"; - public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries"; // Default values from AWS SDK, since they are hidden public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; @@ -182,6 +113,7 @@ private static Map handleDeprecatedYyyyUppercase(final Map props; - - @BeforeEach - public void setUp() { - factory = new AwsCredentialProviderFactory(); - props = new HashMap<>(); - props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "anyBucket"); - props.put(S3SinkConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"); - } - - @Test - void createsStsCredentialProviderIfSpecified() { - props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); - props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - props.put(S3SinkConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask"); - props.put(S3SinkConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME"); - props.put(S3SinkConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); - props.put(S3SinkConfig.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); - - final var config = new S3SinkConfig(props); - - final var credentialProvider = factory.getProvider(config); - assertThat(credentialProvider).isInstanceOf(STSAssumeRoleSessionCredentialsProvider.class); - } - - @Test - void createStaticCredentialProviderByDefault() { - props.put(S3SinkConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); - props.put(S3SinkConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); - - final var config = new S3SinkConfig(props); - - final var credentialProvider = factory.getProvider(config); - assertThat(credentialProvider).isInstanceOf(AWSStaticCredentialsProvider.class); - } - - @Test - void createDefaultCredentialsWhenNoCredentialsSpecified() { - final var config = new S3SinkConfig(props); - - final var credentialProvider = factory.getProvider(config); - assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class); - } -} diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java index 74eeffc0..33ed66d5 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/S3SinkTaskTest.java @@ -59,7 +59,7 @@ import org.apache.kafka.connect.sink.SinkTaskContext; import io.aiven.kafka.connect.common.config.CompressionType; -import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory; +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; import io.aiven.kafka.connect.s3.config.S3SinkConfig; import io.aiven.kafka.connect.s3.testutils.BucketAccessor; diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java index a174e4bb..7c40a4da 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkConfigTest.java @@ -69,8 +69,8 @@ void correctFullConfig() { final var conf = new S3SinkConfig(props); final var awsCredentials = conf.getAwsCredentials(); - assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID"); - assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY"); + assertThat(awsCredentials.getAWSAccessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); + assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket"); assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX"); assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT"); @@ -113,8 +113,8 @@ void correctFullConfigForOldStyleConfigParameters() { final var conf = new S3SinkConfig(props); final var awsCredentials = conf.getAwsCredentials(); - assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID"); - assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY"); + assertThat(awsCredentials.getAWSAccessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); + assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket"); assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX"); assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT"); @@ -161,8 +161,8 @@ void newConfigurationPropertiesHaveHigherPriorityOverOldOne() { final var conf = new S3SinkConfig(props); final var awsCredentials = conf.getAwsCredentials(); - assertThat(awsCredentials.getAccessKeyId().value()).isEqualTo("AWS_ACCESS_KEY_ID"); - assertThat(awsCredentials.getSecretAccessKey().value()).isEqualTo("AWS_SECRET_ACCESS_KEY"); + assertThat(awsCredentials.getAWSAccessKeyId()).isEqualTo("AWS_ACCESS_KEY_ID"); + assertThat(awsCredentials.getAWSSecretKey()).isEqualTo("AWS_SECRET_ACCESS_KEY"); assertThat(conf.getAwsS3BucketName()).isEqualTo("the-bucket"); assertThat(conf.getAwsS3Prefix()).isEqualTo("AWS_S3_PREFIX"); assertThat(conf.getAwsS3EndPoint()).isEqualTo("AWS_S3_ENDPOINT"); diff --git a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkCredentialsConfigTest.java b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkCredentialsConfigTest.java index 88e6c9a7..46fe84b0 100644 --- a/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkCredentialsConfigTest.java +++ b/s3-sink-connector/src/test/java/io/aiven/kafka/connect/s3/config/S3SinkCredentialsConfigTest.java @@ -69,7 +69,7 @@ void emptyAwsSecretAccessKey() { void defaultCredentials() { final Map props = Map.of(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket"); final S3SinkConfig config = new S3SinkConfig(props); - assertThat(config.getAwsCredentials().isValid()).isFalse(); + assertThat(config.getAwsCredentials()).isNull(); assertThat(config.getCustomCredentialsProvider()).isInstanceOf(DefaultAWSCredentialsProviderChain.class); } }