Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements #316

Open
wants to merge 11 commits into
base: s3-source-release
Choose a base branch
from
Open

Conversation

muralibasani
Copy link
Contributor

@muralibasani muralibasani commented Oct 15, 2024

  • New iterator approach without big changes
  • poll method minor fix for max poll recs
  • new config max.message.bytes for output bytes format

@muralibasani muralibasani changed the base branch from main to s3-source-release October 15, 2024 19:07
@muralibasani muralibasani changed the title New iterator approach and performance improvement Performance improvements Oct 15, 2024
@muralibasani muralibasani marked this pull request as ready for review October 15, 2024 20:34
@muralibasani muralibasani requested review from a team as code owners October 15, 2024 20:34
Copy link

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a few comments

@Claudenw
Copy link
Contributor

Claudenw commented Oct 16, 2024 via email

@muralibasani
Copy link
Contributor Author

You will still skip them.

On Wed 16 Oct 2024, 17:09 Murali Basani, @.***> wrote: @muralibasani https://github.com/muralibasani requested your review on: #316 <#316> Performance improvements . — Reply to this email directly, view it on GitHub <#316 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AASTVHUGCICMMGSYWPQ7ISTZ32FSTAVCNFSM6AAAAABP72JFUGVHI2DSMVQWIX3LMV45UABCJFZXG5LFIV3GK3TUJZXXI2LGNFRWC5DJN5XDWMJUGY4DGOBUGEYDCMI . You are receiving this because your review was requested.Message ID: </pull/316/issue_event/14683841011 @github.com>

We should skip the serialization from record to bytes. To do this, we need to pass more arguments like offsetManager, partitionMap which is not looking correct to me.

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good changes. Thank you. Please consider the Transformer change for simplicity.

I also notice that the code converts from S3Object to ConsumerRecord<byte[], byte[]> before converting to AivenS3SourceRecord. The AivenS3SourceRecord has all the data from the ConsumerRecord and more, and all the data is available when the InputStream is pulled from the S3Object. Why not directly convert to AivenS3SourceRecord?

The above is a question and not a request for change. But I would like to see a conversation about this.

@Claudenw
Copy link
Contributor

Claudenw commented Oct 17, 2024 via email

@Claudenw
Copy link
Contributor

Claudenw commented Oct 17, 2024 via email

@muralibasani
Copy link
Contributor Author

What I think the Transformer should do is adhere to a contract that says convert this S3Object to zero or more byte[] that are properly formatted for use in the Kafka message. Pass it the S3Object, the topic, the partition, and the OffsetManager. Let it manage the offset.

On Thu, Oct 17, 2024 at 11:56 AM Claude Warren @.> wrote: If you pass the OffsetManager into the transform you could do that. On Thu, Oct 17, 2024 at 11:05 AM Murali Basani @.> wrote: > @.**** commented on this pull request. > ------------------------------ > > On > s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/output/OutputWriter.java > <#316 (comment)> > : > > But we can still skip this call > OutputUtils.serializeAvroRecordToBytes(Collections.singletonList((GenericRecord) > record), topic, > s3SourceConfig) which is happening in transformer. In your suggestion, we > cannot do that. > > — > Reply to this email directly, view it on GitHub > <#316 (comment)>, > or unsubscribe > https://github.com/notifications/unsubscribe-auth/AASTVHRVYSLOZD6VSQ4DD53Z36DYPAVCNFSM6AAAAABP72JFUGVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDGNZUG42TKNZUGU > . > You are receiving this because your review was requested.Message ID: > </pull/316/review/2374755745 > @github.com> > -- LinkedIn: http://www.linkedin.com/in/claudewarren
-- LinkedIn: http://www.linkedin.com/in/claudewarren

Yea, but why pass in offset manager into it. All sort of offset handlings are in iterator, and it would be clearly visible if they are there.

@muralibasani
Copy link
Contributor Author

Good changes. Thank you. Please consider the Transformer change for simplicity.

I also notice that the code converts from S3Object to ConsumerRecord<byte[], byte[]> before converting to AivenS3SourceRecord. The AivenS3SourceRecord has all the data from the ConsumerRecord and more, and all the data is available when the InputStream is pulled from the S3Object. Why not directly convert to AivenS3SourceRecord?

The above is a question and not a request for change. But I would like to see a conversation about this.

Didn't have any reason to keep that intermediate record. Removed it.

@Claudenw
Copy link
Contributor

Claudenw commented Oct 17, 2024 via email

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much better, and while I still disagree on some aspects, once the commented code is removed I would approve this.

Copy link

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@AnatolyPopov AnatolyPopov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments, did not manage to finish the review yet, sorry

@@ -51,11 +52,10 @@ public FileReader(final S3SourceConfig s3SourceConfig, final String bucketName,
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
List<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) throws IOException {
Iterator<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the iterator bring here? Are we going to make this lazy in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, when we have large number of files, a lazy iterator or batch config while listing will be better. However updated this iterator to be lazy.

try {
final List<S3ObjectSummary> chunks = fileReader.fetchObjectSummaries(s3Client);
nextFileIterator = chunks.iterator();
nextFileIterator = fileReader.fetchObjectSummaries(s3Client);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it is called nextFileIterator? do I understand correctly that it is not the next file but the iterator over all listed files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed the iterator for all the listed objects. We can rename it to objectIterator

Comment on lines 210 to +212
if (!recordIterator.hasNext()) {
// If there are still no records, return an empty list
return Collections.emptyList(); // or new ArrayList<>() for mutable list
}

final List<ConsumerRecord<byte[], byte[]>> consumerRecordList = recordIterator.next();
if (consumerRecordList.isEmpty()) {
// LOGGER.error("May be error in reading s3 object " + currentObjectKey);
return Collections.emptyList();
// throw new NoSuchElementException();
}
final List<AivenS3SourceRecord> aivenS3SourceRecordList = new ArrayList<>();

AivenS3SourceRecord aivenS3SourceRecord;
Map<String, Object> offsetMap;
Map<String, Object> partitionMap;
for (final ConsumerRecord<byte[], byte[]> currentRecord : consumerRecordList) {
partitionMap = ConnectUtils.getPartitionMap(currentRecord.topic(), currentRecord.partition(), bucketName);

offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentRecord.offset());

aivenS3SourceRecord = new AivenS3SourceRecord(partitionMap, offsetMap, currentRecord.topic(),
currentRecord.partition(), currentRecord.key(), currentRecord.value(), currentObjectKey);

aivenS3SourceRecordList.add(aivenS3SourceRecord);
// If there are still no records, return null or throw an exception
return null; // Or throw new NoSuchElementException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little questionable to me. Why we can not just remove this if altogether and rely on recordIterator.next(); call that will as well AFAIU throw NoSuchElementException if there is no next element?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this stage there is no record/file. Hence returning null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it throws NoSuchElementException, if not caught, it fails the task.

@@ -51,11 +52,10 @@ public FileReader(final S3SourceConfig s3SourceConfig, final String bucketName,
}

@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
List<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) throws IOException {
Iterator<S3ObjectSummary> fetchObjectSummaries(final AmazonS3 s3Client) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly is throwing IOException here? It's a checked exception but compilation does not fail if I just remove the throws

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, not sure why I had IOException. It will be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment on lines 78 to 81
try {
final List<S3ObjectSummary> chunks = fileReader.fetchObjectSummaries(s3Client);
nextFileIterator = chunks.iterator();
nextFileIterator = fileReader.fetchObjectSummaries(s3Client);
} catch (IOException e) {
throw new AmazonClientException("Failed to initialize S3 file reader", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm failing to see what is throwing IOException as mentioned in another call. I think try-catch can be just removed. Or otherwise if we want to handle the exceptions IMO it should be done in fileReader, not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileReader which has s3 operations can throw this exception, hence being caught here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this one too.

return new ConsumerRecord<>(topic, topicPartition, currentOffset, key.orElse(null), value);
final Map<String, Object> offsetMap = offsetManager.getOffsetValueMap(currentObjectKey, currentOffset);

return new AivenS3SourceRecord(partitionMap, offsetMap, topic, topicPartition, key.orElse(null), value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are wrapping nullable(?) value into optional if we then again unwrap it into null if there is no value? Would not it be easier just convert the value right away into bytes[] or null and pass that value as an argument? Not sure if this needed at all since as mentioned in another comment I doubt value can be null here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Should be ok now.

partitionMap);
}
}

@SuppressWarnings("PMD.CognitiveComplexity")
private Iterator<List<ConsumerRecord<byte[], byte[]>>> getObjectIterator(final InputStream valueInputStream,
final String topic, final int topicPartition, final long startOffset, final OutputWriter outputWriter,
private Iterator<AivenS3SourceRecord> getObjectIterator(final InputStream valueInputStream, final String topic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm failing to see the benefits of such an iterator since under the hood we read the whole stream in one go into memory anyways. IMO we want to go the iterators way we need to read lazily

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this iterator doesn't load all recs at once. It uses readNext/hasNext to fetch records only when necessary. It's is already a lazy one .

}

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling again");
synchronized (pollLock) {
final List<SourceRecord> results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead of passing this as an argument in chain of calls aggregate the results here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we end up loading all the records at once and could run into oom ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We anyways doing it afaiu. IMO this will not change anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say there are 1000 records in one file, and in one poll if we have max poll recs to be 100, then it takes 10 polls to send them back. I probably have a test for this too.

}

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling again");
synchronized (pollLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in case of big files we will be blocked until we read the whole file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of yes. We have this kind of pollLock in our jdbc source connector too.

final SourceRecord sourceRecord = createSourceRecord(aivenS3SourceRecord, s3SourceConfig, keyConverter,
valueConverter, conversionConfig, transformer, fileReader, offsetManager);
results.add(sourceRecord);
}
}

LOGGER.info("Number of records sent {}", results.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The records here are not yet sent, right? Can we move this to poll? Or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed not yet sent as this method returns to poll, and from poll, they are sent.

If we move this to poll, we need to declare a variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which variable you mean? Is not it the same list instance that we are passing from poll or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this log statement to poll.

Comment on lines 46 to 47
this.recordKey = Arrays.copyOf(recordKey, recordKey.length);
this.recordValue = Arrays.copyOf(recordValue, recordValue.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this copied on the way in, and the way out? Who is mutating this that you need to be cautious of?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As such when this record is created in iterator, PMD treats this as it can be mutable and becomes untrusted. Throws EXPOSE-REP2 PMD error during build.
Instead of Arrays.copy, changed it to clone().

Either we can add this in pmd exception list, or we handle in code.

@@ -107,8 +110,9 @@ public void start(final Map<String, String> props) {
initializeConverters();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little unusual/inflexible to instantiate the key and value converter in the task.

For a source task, the converters are for serializing the data into Kafka, and that can be independent of your input format.

The way we're instantiating the class directly also leaves them unconfigured, which may be undefined/undesirable for some converters.

For example, imagine someone configured this task with an Avro value.converter. Then their input data needs to already be in avro format, and has to be readable with an unconfigured Avro converter. That sounds like a very rigid setup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the data flow is (solid), and should be (dotted)

flowchart TB
InputStream -- Transformer.getRecords -->  Object -- Transformer.getValueBytes --> firstByte["byte[]"] -- Converter.toConnectData --> firstStruct[Struct] -- SMTs --> secondStruct[Struct] -- Converter.fromConnectData --> secondByte["byte[]"] -- Producer --> Kafka

InputStream -. "Transformer.getRecords" .-> firstStruct
Loading

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the getValueBytes transforms(deserializes) an avro record and the RecordProcessor converts these bytes to AvroRecord, if avro converter is configured.
If no converter is configured, default is bytearrayconverter and just bytes are returned, in which case, Consumer has to serialize them back to GenericRecord if they want to read as Avro records.

May be I didn't understand your point totally.

We can move these converters initialization to record processor too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move these converters initialization to record processor too.

My point was not where in the task the key/value converters are instantiated; they shouldn't be instantiated at all.

Your data format in S3 should be decoupled from your data format in Kafka.
e.g. Someone has Avro or parquet or protobuf or whatever data in S3, and wants to write Json to Kafka.

How would you configure that in the current implementation? value.converter=JsonConverter and input.format=avro would pass avro-serialized data from getValueBytes to the JsonConverter, which would throw an exception.

}

@Override
public List<SourceRecord> poll() throws InterruptedException {
LOGGER.info("Polling again");
synchronized (pollLock) {
final List<SourceRecord> results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flat sleep on error is not optimal, it should be some kind of exponential backoff.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging DataException does not fail the task. This will result in dropped records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exponential backoff system has an existing configuration, use it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DataException, we have another jira, to look at all corrupted records. That should address this, and any dropped records should be notified based on the configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exponential backoff, I only found something similar in sink connector/context but not in source. Can you provide an example ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exponential backoff, I only found something similar in sink connector/context but not in source. Can you provide an example ?

It's just a concept, not a specific implementation.

aws.s3.backoff.delay.ms could be specific to the S3 client, so maybe it's not reusable.

@@ -179,6 +173,11 @@ private static void addOtherConfig(final S3SourceConfigDef configDef) {
"Value converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, VALUE_CONVERTER);
configDef.define(MAX_MESSAGE_BYTES_SIZE, ConfigDef.Type.INT, 1_048_588, ConfigDef.Importance.MEDIUM,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very confusing name, because it's not doing the same thing as Kafka's configuration.

It's only active for the ByteArrayTransformer, and is a "chunk size" for breaking up large files into multiple messages.

Without some metadata about where a particular chunk of a file is from, or an indication of when a file is truncated, I think this configuration is a huge footgun. Users will get one message of the first 1MB of their file, and may not know that the file was actually larger.

This needs more thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now renamed it to EXPECTED_MAX_MESSAGE_BYTES. However we have another jira to handle all these records which exceed the size on all formats. I expect we address this in more generic way in that story.

@@ -179,6 +173,11 @@ private static void addOtherConfig(final S3SourceConfigDef configDef) {
"Value converter", GROUP_OTHER, awsOtherGroupCounter++, // NOPMD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bad idea to define configurations with the same name as the framework. One will take precedence, and you can't be sure if the default you're specifying is correct or even present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Removed those configs.

@@ -71,12 +72,12 @@ public class S3SourceTask extends SourceTask {
private S3SourceConfig s3SourceConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default graceful shutdown time is 5 seconds, so a 10 second poll interval will have a pretty high chance of causing ungraceful shutdown: https://github.com/apache/kafka/blob/5313f8eb92a033dc74d8e72a48ac033113b186d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L101

You need to ensure that the poll() method returns multiple times while you're waiting for back-offs for other operations.

public OutputFormat getOutputFormat() {
return OutputFormat.valueOf(getString(OUTPUT_FORMAT_KEY).toUpperCase(Locale.ROOT));
public InputFormat getInputFormat() {
return InputFormat.valueOf(getString(INPUT_FORMAT_KEY).toUpperCase(Locale.ROOT));
}

Region getAwsS3Region() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this connector's partitioning strategy and multi-task strategy.

It looks like the taskConfigs method generates N identical tasks, which will all try to write all objects, causing lots of duplicates.

It looks like the user is expected to configure "topic.partitions", and configure N different connectors to read each partition of the data? That's very unusual.

Also topic.partitions limits the partitions that are loaded by the OffsetManager, but doesn't appear to limit which partitions actually get transferred. Maybe it does, but I couldn't figure out how.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree with this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree and we were doubting this, and we would like to introduce the functionality about topic partitioning strategy in a different jira.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants