Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements #316

Open
wants to merge 11 commits into
base: s3-source-release
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_ENDPOINT_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_S3_PREFIX_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.OUTPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.TARGET_TOPICS;
Expand Down Expand Up @@ -146,6 +147,7 @@ void bytesTest(final TestInfo testInfo) throws ExecutionException, InterruptedEx
final var topicName = IntegrationBase.topicName(testInfo);
final Map<String, String> connectorConfig = getConfig(basicConnectorConfig(CONNECTOR_NAME), topicName);

connectorConfig.put(MAX_MESSAGE_BYTES_SIZE, "2");
connectRunner.createConnector(connectorConfig);
connectorConfig.put(OUTPUT_FORMAT_KEY, OutputFormat.BYTES.getValue());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.aiven.kafka.connect.s3.source.output.OutputWriter;
import io.aiven.kafka.connect.s3.source.output.OutputWriterFactory;
import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.FileReader;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
Expand All @@ -54,7 +55,7 @@
* S3SourceTask is a Kafka Connect SourceTask implementation that reads from source-s3 buckets and generates Kafka
* Connect records.
*/
@SuppressWarnings("PMD.TooManyMethods")
@SuppressWarnings({ "PMD.TooManyMethods", "PMD.ExcessiveImports" })
public class S3SourceTask extends SourceTask {

private static final Logger LOGGER = LoggerFactory.getLogger(S3SourceTask.class);
Expand All @@ -71,7 +72,7 @@ public class S3SourceTask extends SourceTask {
private S3SourceConfig s3SourceConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default graceful shutdown time is 5 seconds, so a 10 second poll interval will have a pretty high chance of causing ungraceful shutdown: https://github.com/apache/kafka/blob/5313f8eb92a033dc74d8e72a48ac033113b186d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L101

You need to ensure that the poll() method returns multiple times while you're waiting for back-offs for other operations.

private AmazonS3 s3Client;

private Iterator<List<AivenS3SourceRecord>> sourceRecordIterator;
private Iterator<AivenS3SourceRecord> sourceRecordIterator;
private Optional<Converter> keyConverter;

private Converter valueConverter;
Expand Down Expand Up @@ -132,8 +133,9 @@ private void initializeS3Client() {
}

private void prepareReaderFromOffsetStorageReader() {
final FileReader fileReader = new FileReader(s3SourceConfig, this.s3Bucket, failedObjectKeys);
sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, s3Client, this.s3Bucket, offsetManager,
this.outputWriter, failedObjectKeys);
this.outputWriter, fileReader);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ final public class S3SourceConfig extends AbstractConfig {
public static final String TARGET_TOPICS = "topics";
public static final String FETCH_PAGE_SIZE = "aws.s3.fetch.page.size";
public static final String MAX_POLL_RECORDS = "max.poll.records";

muralibasani marked this conversation as resolved.
Show resolved Hide resolved
public static final String MAX_MESSAGE_BYTES_SIZE = "max.message.bytes";
public static final String KEY_CONVERTER = "key.converter";
public static final String VALUE_CONVERTER = "value.converter";
public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3;
Expand Down Expand Up @@ -179,6 +181,11 @@ private static void addOtherConfig(final S3SourceConfigDef configDef) {
"Value converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bad idea to define configurations with the same name as the framework. One will take precedence, and you can't be sure if the default you're specifying is correct or even present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Removed those configs.

// UnusedAssignment
ConfigDef.Width.NONE, VALUE_CONVERTER);
configDef.define(MAX_MESSAGE_BYTES_SIZE, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very confusing name, because it's not doing the same thing as Kafka's configuration.

It's only active for the ByteArrayTransformer, and is a "chunk size" for breaking up large files into multiple messages.

Without some metadata about where a particular chunk of a file is from, or an indication of when a file is truncated, I think this configuration is a huge footgun. Users will get one message of the first 1MB of their file, and may not know that the file was actually larger.

This needs more thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now renamed it to EXPECTED_MAX_MESSAGE_BYTES. However we have another jira to handle all these records which exceed the size on all formats. I expect we address this in more generic way in that story.

"The largest record batch size allowed by Kafka config max.message.bytes", GROUP_OTHER,
awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, MAX_MESSAGE_BYTES_SIZE);
}

private static void addAwsStsConfigGroup(final ConfigDef configDef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void configureValueConverter(final Map<String, String> config, final S3So
}

@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition) {
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
return readAvroRecords(inputStream, datumReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package io.aiven.kafka.connect.s3.source.output;

import static io.aiven.kafka.connect.s3.source.config.S3SourceConfig.MAX_MESSAGE_BYTES_SIZE;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;

import com.amazonaws.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,18 +37,33 @@ public void configureValueConverter(final Map<String, String> config, final S3So

muralibasani marked this conversation as resolved.
Show resolved Hide resolved
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition) {
return List.of(inputStream);
}
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
final int maxMessageBytesSize = s3SourceConfig.getInt(MAX_MESSAGE_BYTES_SIZE);
final byte[] buffer = new byte[maxMessageBytesSize];
int bytesRead;

final List<Object> chunks = new ArrayList<>();
try {
return IOUtils.toByteArray((InputStream) record);
bytesRead = inputStream.read(buffer);
while (bytesRead != -1) {
final byte[] chunk = new byte[bytesRead];
System.arraycopy(buffer, 0, chunk, 0, bytesRead);
chunks.add(chunk);
bytesRead = inputStream.read(buffer);
}
} catch (IOException e) {
LOGGER.error("Error in reading s3 object stream " + e.getMessage());
return new byte[0];
LOGGER.error("Error reading from input stream: " + e.getMessage(), e);
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
}

return chunks;
}

@Override
public byte[] getValueBytes(final Object record, final String topic, final S3SourceConfig s3SourceConfig) {
return (byte[]) record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void configureValueConverter(final Map<String, String> config, final S3So
}

@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition) {
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
final List<Object> jsonNodeList = new ArrayList<>();
JsonNode jsonNode;
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
Expand Down
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface OutputWriter {

void configureValueConverter(Map<String, String> config, S3SourceConfig s3SourceConfig);

List<Object> getRecords(InputStream inputStream, String topic, int topicPartition);
List<Object> getRecords(InputStream inputStream, String topic, int topicPartition, S3SourceConfig s3SourceConfig);
muralibasani marked this conversation as resolved.
Show resolved Hide resolved

byte[] getValueBytes(Object record, String topic, S3SourceConfig s3SourceConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public void configureValueConverter(final Map<String, String> config, final S3So
}

@Override
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition) {
public List<Object> getRecords(final InputStream inputStream, final String topic, final int topicPartition,
final S3SourceConfig s3SourceConfig) {
return getParquetRecords(inputStream, topic, topicPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.aiven.kafka.connect.s3.source.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -43,7 +42,8 @@ public final class RecordProcessor {
private RecordProcessor() {

}
public static List<SourceRecord> processRecords(final Iterator<List<AivenS3SourceRecord>> sourceRecordIterator,

public static List<SourceRecord> processRecords(final Iterator<AivenS3SourceRecord> sourceRecordIterator,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig,
final Optional<Converter> keyConverter, final Converter valueConverter,
final AtomicBoolean connectorStopped, final OutputWriter outputWriter, final Set<String> failedObjectKeys,
Expand All @@ -53,44 +53,39 @@ public static List<SourceRecord> processRecords(final Iterator<List<AivenS3Sourc
final int maxPollRecords = s3SourceConfig.getInt(S3SourceConfig.MAX_POLL_RECORDS);

for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
final List<AivenS3SourceRecord> recordList = sourceRecordIterator.next();
final List<SourceRecord> sourceRecords = createSourceRecords(recordList, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, outputWriter, failedObjectKeys, offsetManager);
results.addAll(sourceRecords);
final AivenS3SourceRecord aivenS3SourceRecord = sourceRecordIterator.next();
if (aivenS3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(aivenS3SourceRecord, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, outputWriter, failedObjectKeys, offsetManager);
results.add(sourceRecord);
}
}

LOGGER.info("Number of records sent {}", results.size());
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The records here are not yet sent, right? Can we move this to poll? Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed not yet sent as this method returns to poll, and from poll, they are sent.

If we move this to poll, we need to declare a variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which variable you mean? Is not it the same list instance that we are passing from poll or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this log statement to poll.

return results;
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
static List<SourceRecord> createSourceRecords(final List<AivenS3SourceRecord> aivenS3SourceRecordList,
static SourceRecord createSourceRecord(final AivenS3SourceRecord aivenS3SourceRecord,
final S3SourceConfig s3SourceConfig, final Optional<Converter> keyConverter, final Converter valueConverter,
final Map<String, String> conversionConfig, final OutputWriter outputWriter,
final Set<String> failedObjectKeys, final OffsetManager offsetManager) {

final List<SourceRecord> sourceRecordList = new ArrayList<>();
for (final AivenS3SourceRecord aivenS3SourceRecord : aivenS3SourceRecordList) {
LOGGER.info(" ******* CSR key ******** {}", aivenS3SourceRecord.getObjectKey());
final String topic = aivenS3SourceRecord.getToTopic();
final Optional<SchemaAndValue> keyData = keyConverter
.map(c -> c.toConnectData(topic, aivenS3SourceRecord.key()));

outputWriter.configureValueConverter(conversionConfig, s3SourceConfig);
valueConverter.configure(conversionConfig, false);
try {
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, aivenS3SourceRecord.value());
offsetManager.updateCurrentOffsets(aivenS3SourceRecord.getPartitionMap(),
aivenS3SourceRecord.getOffsetMap());
aivenS3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(aivenS3SourceRecord.getPartitionMap()));
sourceRecordList.add(aivenS3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue));
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream " + e.getMessage());
failedObjectKeys.add(aivenS3SourceRecord.getObjectKey());
throw e;
}
final String topic = aivenS3SourceRecord.getToTopic();
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
final Optional<SchemaAndValue> keyData = keyConverter
.map(c -> c.toConnectData(topic, aivenS3SourceRecord.key()));

outputWriter.configureValueConverter(conversionConfig, s3SourceConfig);
valueConverter.configure(conversionConfig, false);
try {
final SchemaAndValue schemaAndValue = valueConverter.toConnectData(topic, aivenS3SourceRecord.value());
offsetManager.updateCurrentOffsets(aivenS3SourceRecord.getPartitionMap(),
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
aivenS3SourceRecord.getOffsetMap());
aivenS3SourceRecord.setOffsetMap(offsetManager.getOffsets().get(aivenS3SourceRecord.getPartitionMap()));
return aivenS3SourceRecord.getSourceRecord(topic, keyData, schemaAndValue);
} catch (DataException e) {
LOGGER.error("Error in reading s3 object stream " + e.getMessage());
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
failedObjectKeys.add(aivenS3SourceRecord.getObjectKey());
throw e;
}

return sourceRecordList;
}
}
Loading