Skip to content

Commit

Permalink
Update S3-Sink to use the S3-commons package
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Oct 17, 2024
1 parent 7a3073f commit d62f736
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 634 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.config.AivenCommonConfig;
import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator;
import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator;
Expand All @@ -47,7 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3BaseConfig extends AbstractConfig {
public class S3BaseConfig extends AivenCommonConfig {
public static final Logger LOGGER = LoggerFactory.getLogger(S3BaseConfig.class);

@Deprecated
Expand Down Expand Up @@ -87,6 +87,10 @@ public class S3BaseConfig extends AbstractConfig {
@Deprecated
public static final String AWS_S3_PREFIX = "aws_s3_prefix";

// FIXME since we support so far both old style and new style of property names
// Importance was set to medium,
// as soon we will migrate to new values it must be set to HIGH
// same for default value
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider";
Expand Down Expand Up @@ -376,10 +380,12 @@ public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
public BasicAWSCredentials getAwsCredentials() {
if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {

return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(),
getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value());
} else if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY))) {
LOGGER.warn("Config options {} and {} are deprecated", AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY);
return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID).value(),
getPassword(AWS_SECRET_ACCESS_KEY).value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ final class AwsCredentialProviderFactoryTest {
public void setUp() {
factory = new AwsCredentialProviderFactory();
props = new HashMap<>();
props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "anyBucket");
props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket");
props.put(S3BaseConfig.FILE_NAME_TEMPLATE_CONFIG, "{{topic}}-{{partition}}-{{start_offset}}");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,36 @@

import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.AivenCommonConfig;
import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.OutputFieldType;
import io.aiven.kafka.connect.s3.S3BaseConfig;

public class AwsCredentialConfig extends S3BaseConfig {
public AwsCredentialConfig(final Map<String, String> properties) {
super(configDef(new ConfigDef()), properties);
super(configDef(), properties);
}

public static ConfigDef configDef(ConfigDef configDef) { // NOPMD UnusedAssignment
public static ConfigDef configDef() { // NOPMD UnusedAssignment
final ConfigDef configDef = getBaseConfigDefinition();
addS3RetryPolicies(configDef);
addAwsConfigGroup(configDef);
addAwsStsConfigGroup(configDef);
addDeprecatedConfiguration(configDef);
return configDef;
}

private static ConfigDef getBaseConfigDefinition() {
final ConfigDef definition = new ConfigDef();
addOutputFieldsFormatConfigGroup(definition, OutputFieldType.VALUE);
definition.define(AivenCommonConfig.FILE_NAME_TEMPLATE_CONFIG, ConfigDef.Type.STRING, null,
ConfigDef.Importance.MEDIUM, "File name template");
definition.define(AivenCommonConfig.FILE_COMPRESSION_TYPE_CONFIG, ConfigDef.Type.STRING,
CompressionType.NONE.name, ConfigDef.Importance.MEDIUM, "File compression");
definition.define(AivenCommonConfig.FILE_MAX_RECORDS, ConfigDef.Type.INT, 0, ConfigDef.Importance.MEDIUM,
"The maximum number of records to put in a single file. " + "Must be a non-negative integer number. "
+ "0 is interpreted as \"unlimited\", which is the default.");
return definition;
}

}
2 changes: 2 additions & 0 deletions s3-sink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies {
compileOnly(apache.kafka.connect.runtime)

implementation(project(":commons"))
implementation(project(":s3-commons"))

implementation(tools.spotbugs.annotations)
implementation(logginglibs.slf4j)
Expand All @@ -73,6 +74,7 @@ dependencies {

testImplementation(compressionlibs.snappy)
testImplementation(compressionlibs.zstd.jni)
testImplementation(project(":s3-commons"))

testImplementation(apache.kafka.connect.api)
testImplementation(apache.kafka.connect.runtime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import io.aiven.kafka.connect.common.templating.VariableTemplatePart;
import io.aiven.kafka.connect.s3.config.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory;
import io.aiven.kafka.connect.s3.config.S3SinkConfig;

import com.amazonaws.PredefinedClientConfigurations;
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit d62f736

Please sign in to comment.