Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader with ReaderGroup can lose first messages sent to the new topic #1319

Open
datacompboy opened this issue Aug 13, 2024 · 1 comment
Open
Labels

Comments

@datacompboy
Copy link

Describe the bug

Consider the scenario ([P] = publisher, [R] = reader)
[P] testtopic is created (empty)
[R] new Reader is created with StartOffset: LastOffset
=> new ConsumerGroup initialized; with map[{topic:testtopic partition:0}:-1]
=> new reader starts, which seeks to offset 0 and start polling
[P] publish message to testtopic [message with offset 0 became available]
[R] tries to reader.read message, got connection error; it retries and fails N times in a row
[R] reestablishes connection
[R] get offset from ConsumerGroup: -1, which is "LastOffset"
[R] starts reader which seeks to offset 1
[R] polls from offset 1

At this point message with offset [0] is lost.
Basically, ANY amount of published messages until the first successful CommitMessages is lost.

Kafka Version

kafka-go v0.4.47

To Reproduce

Issue captured by running large system under deterministic distributed simulator.
I am not sure how to construct hermetic test here as I am not familiar with your code.

Expected Behavior

Message with offset 0 is read and processed, at least once.

Observed Behavior

Few initial messages is lost:

...
00:00:53.23     info    [email protected]/consumergroup.go:952   joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.23     info    [email protected]/consumergroup.go:1012  selected as leader for group, READERCONSUMERGROUP
00:00:53.27     info    [email protected]/consumergroup.go:1040  using 'range' balancer to assign group, READERCONSUMERGROUP
00:00:53.27     info    [email protected]/consumergroup.go:1042  found member: READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/[]byte(nil)
00:00:53.27     info    [email protected]/consumergroup.go:1045  found topic/partition: TOPICNAME/0
00:00:53.27     info    [email protected]/consumergroup.go:966   assigned member/topic/partitions READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9/TOPICNAME/[0]
00:00:53.27     info    [email protected]/consumergroup.go:973   joinGroup succeeded for response, READERCONSUMERGROUP.  generationID=3, memberID=READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.27     info    [email protected]/consumergroup.go:806   Joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 3
00:00:53.27     info    [email protected]/consumergroup.go:1139  Syncing 1 assignments for generation 3 as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9
00:00:53.29     info    [email protected]/consumergroup.go:1104  sync group finished for group, READERCONSUMERGROUP
00:00:53.31     info    [email protected]/reader.go:141  subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
00:00:53.31     info    [email protected]/consumergroup.go:467   started heartbeat for group, READERCONSUMERGROUP [3s]
00:00:53.31     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:00:53.31     info    [email protected]/reader.go:274  started commit for group READERCONSUMERGROUP
00:00:53.43     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:00:53.57     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
...
00:03:00.06     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:36119->11.0.0.5:9093: i/o timeout
00:03:00.16     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:00.28     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:00.42     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 0: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:03:00.52     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:47346->11.0.0.5:9093: i/o timeout
00:03:00.62     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:07.74     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:07.88     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:27880->11.0.0.5:9093: i/o timeout
00:03:07.98     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:09.1      info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:09.24     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:31747->11.0.0.5:9093: i/o timeout
00:03:09.34     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:13.48     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:17.64     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:51137->11.0.0.5:9093: i/o timeout
00:03:17.74     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:24.86     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:28        info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 0: read tcp 11.0.0.4:49546->11.0.0.5:9093: i/o timeout
00:03:28.1      info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 0
00:03:31.24     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 0
00:03:31.31     info    [email protected]/consumergroup.go:470   stopped heartbeat for group READERCONSUMERGROUP
00:03:31.31     info    [email protected]/reader.go:277  stopped commit for group READERCONSUMERGROUP
00:03:38.18     info    [email protected]/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:58618->11.0.0.5:9093: i/o timeout
00:03:40.3      info    [email protected]/consumergroup.go:952   joined group READERCONSUMERGROUP as member READER@READER (github.com/segmentio/kafka-go)-6dab7dfe-da33-4878-a70a-d1cf303436c9 in generation 4
...
00:03:46.42     info    [email protected]/reader.go:141  subscribed to topics and partitions: map[{topic:TOPICNAME partition:0}:-1]
^^^ still -1 in the ConsumerGroup
00:03:46.42     info    [email protected]/consumergroup.go:467   started heartbeat for group, READERCONSUMERGROUP [3s]
00:03:46.42     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:46.42     info    [email protected]/reader.go:274  started commit for group READERCONSUMERGROUP
00:03:54.59     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 2
^^^ At this point messages at offset 0 and 1 is lost
00:03:57.53     info    [email protected]/reader.go:1302 error initializing the kafka reader for partition 0 of TOPICNAME: read tcp 11.0.0.4:48613->11.0.0.5:9093: i/o timeout
00:03:57.63     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset last offset
00:03:58.75     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
^^^ At this point message at offset 2 is also lost
00:03:58.89     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 3: read tcp 11.0.0.4:60875->11.0.0.5:9093: i/o timeout
00:03:58.99     info    [email protected]/reader.go:1272 initializing kafka reader for partition 0 of TOPICNAME starting at offset 3
00:04:00.13     info    [email protected]/reader.go:1473 the kafka reader for partition 0 of TOPICNAME is seeking to offset 3
00:04:00.27     info    [email protected]/reader.go:1375 no messages received from kafka within the allocated time for partition 0 of TOPICNAME at offset 4: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
00:04:00.27     info    reader/reader.go:143    FetchMessage result: {Topic:TOPICNAME Partition:0 Offset:3 HighWaterMark:4 Key:[] Value:....}
^^^ first delivered message to the app -- from the offset 3
00:04:00.37     info    [email protected]/reader.go:1427 the kafka reader got an unknown error reading partition 0 of TOPICNAME at offset 4: read tcp 11.0.0.4:22278->11.0.0.5:9093: i/o timeout
...
@datacompboy
Copy link
Author

Not sure how good is that approach, but pushing the initial offset if it differs to the parent and commit it from FetchMessage itself seems like fixes the behaviour:

diff -u ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/[email protected]/reader.go ./reader.go
--- ~/go/pkg/mod/cache/download/github.com/segmentio/kafka-go/@v/github.com/segmentio/[email protected]/reader.go	1979-12-31 00:00:00.000000000 +0100
+++ ./reader.go	2024-08-14 13:09:29.231441428 +0200
@@ -835,6 +835,16 @@
 				return Message{}, io.EOF
 			}
 
+            // "initial offset" notification
+            if m.error == nil && m.version == version && m.message.HighWaterMark==-1 && m.watermark == -1 {
+                if r.useConsumerGroup() {
+                    if err := r.CommitMessages(ctx, m.message); err != nil {
+                        return Message{}, err
+                    }
+                }
+                continue
+            }
+
 			if m.version >= version {
 				r.mutex.Lock()
 
@@ -1312,6 +1322,14 @@
 
 		// Now we're sure to have an absolute offset number, may anything happen
 		// to the connection we know we'll want to restart from this offset.
+        if offset != start {
+            // If we had to seek from non-absolute offset, inform Reader about initial point
+            // So it get remembered in case of reconnection.
+            if err = r.sendMessage(ctx, Message{Topic:r.topic, Partition:r.partition, Offset:start-1, HighWaterMark:-1}, -1); err != nil {
+                conn.Close()
+                return
+            }
+        }
 		offset = start
 
 		errcount := 0

Just as an idea to fix, i've verified it fixes the issue -- but I believe the code may and should be better & tested :) -- so I don't have ready-to-use patch here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant