Skip to content

Commit

Permalink
fix(sink): remove max poll records config (#1705)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Oct 18, 2024
1 parent 14322da commit eb2d1cf
Showing 1 changed file with 0 additions and 16 deletions.
16 changes: 0 additions & 16 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,6 @@ type ConsumerConfigParams struct {
// Available strategies: range, roundrobin, cooperative-sticky.
PartitionAssignmentStrategy string

// The maximum number of records returned in a single call to poll().
// Note, that max.poll.records does not impact the underlying fetching behavior.
// The consumer will cache the records from each fetch request and returns them incrementally from each poll.
// See https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#max-poll-records
MaxPollRecords int

// The maximum delay between invocations of poll() when using consumer group management.
// This places an upper bound on the amount of time that the consumer can be idle before fetching more records.
// If poll() is not called before expiration of this timeout, then the consumer is considered failed and
Expand Down Expand Up @@ -234,10 +228,6 @@ func (c ConsumerConfigParams) Validate() error {
}
}

if c.MaxPollRecords < 0 {
return errors.New("max poll records must be non negative")
}

return nil
}

Expand Down Expand Up @@ -270,12 +260,6 @@ func (c ConsumerConfigParams) AsConfigMap() (kafka.ConfigMap, error) {
}
}

if c.MaxPollRecords > 0 {
if err := m.SetKey("max.poll.records", c.MaxPollRecords); err != nil {
return nil, err
}
}

if c.MaxPollInterval > 0 {
if err := m.SetKey("max.poll.interval.ms", c.MaxPollInterval); err != nil {
return nil, err
Expand Down

0 comments on commit eb2d1cf

Please sign in to comment.