Skip to content

Commit

Permalink
Revert "Compute backlog time."
Browse files Browse the repository at this point in the history
This reverts commit 7a17548.
  • Loading branch information
seizethedave committed Oct 18, 2024
1 parent 5d619d3 commit 271992a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 106 deletions.
2 changes: 1 addition & 1 deletion pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (b *BlockBuilder) getLagForPartition(ctx context.Context, partition int32)
})
var lastErr error
for boff.Ongoing() {
groupLag, err := GetGroupLag(ctx, kadm.NewClient(b.kafkaClient), b.cfg.Kafka.Topic, b.cfg.ConsumerGroup, b.fallbackOffsetMillis)
groupLag, err := getGroupLag(ctx, kadm.NewClient(b.kafkaClient), b.cfg.Kafka.Topic, b.cfg.ConsumerGroup, b.fallbackOffsetMillis)
if err != nil {
lastErr = fmt.Errorf("get consumer group lag: %w", err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/blockbuilder/kafkautil.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
)

// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// getGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
func getGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/blockbuilder/kafkautil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestKafkaGetGroupLag(t *testing.T) {
// get the timestamp of the last produced record
rec := producedRecords[len(producedRecords)-1]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
groupLag, err := getGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
Expand All @@ -89,7 +89,7 @@ func TestKafkaGetGroupLag(t *testing.T) {
// get the timestamp of third to last produced record (record before earliest in partition 1)
rec := producedRecords[len(producedRecords)-3]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
groupLag, err := getGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
Expand All @@ -98,7 +98,7 @@ func TestKafkaGetGroupLag(t *testing.T) {
})

t.Run("fallbackOffset=0", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0)
groupLag, err := getGroupLag(ctx, admClient, testTopic, testGroup, 0)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
Expand All @@ -107,12 +107,12 @@ func TestKafkaGetGroupLag(t *testing.T) {
})

t.Run("fallbackOffset=wrong", func(t *testing.T) {
_, err := GetGroupLag(ctx, admClient, testTopic, testGroup, -1)
_, err := getGroupLag(ctx, admClient, testTopic, testGroup, -1)
require.Error(t, err)
})

t.Run("group=unknown", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0)
groupLag, err := getGroupLag(ctx, admClient, testTopic, "unknown", 0)
require.NoError(t, err)

// This group doesn't have any commits, so it must calc its lag from the fallback.
Expand Down
5 changes: 0 additions & 5 deletions pkg/blockbuilder/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type schedulerMetrics struct {
monitorPartitionsDuration prometheus.Histogram
partitionStartOffsets *prometheus.GaugeVec
partitionEndOffsets *prometheus.GaugeVec
partitionBacklogTime *prometheus.GaugeVec
}

func newSchedulerMetrics(reg prometheus.Registerer) schedulerMetrics {
Expand All @@ -30,9 +29,5 @@ func newSchedulerMetrics(reg prometheus.Registerer) schedulerMetrics {
Name: "cortex_blockbuilder_scheduler_partition_end_offsets",
Help: "The observed end offset of each partition.",
}, []string{"partition"}),
partitionBacklogTime: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_blockbuilder_scheduler_partition_backlog_time_seconds",
Help: "The computed backlog time of each partition.",
}, []string{"partition"}),
}
}
99 changes: 6 additions & 93 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,23 @@ package scheduler

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/mimir/pkg/blockbuilder"
"github.com/grafana/mimir/pkg/storage/ingest"
)

type BlockBuilderScheduler struct {
services.Service

kafkaClient *kgo.Client
adminClient *kadm.Client
cfg Config
logger log.Logger
register prometheus.Registerer
Expand Down Expand Up @@ -57,7 +53,6 @@ func (s *BlockBuilderScheduler) starting(context.Context) (err error) {
if err != nil {
return fmt.Errorf("creating kafka reader: %w", err)
}
s.adminClient = kadm.NewClient(s.kafkaClient)
return nil
}

Expand All @@ -82,106 +77,24 @@ func (s *BlockBuilderScheduler) running(ctx context.Context) error {
// monitorPartitions updates knowledge of all active partitions.
func (s *BlockBuilderScheduler) monitorPartitions(ctx context.Context) {
startTime := time.Now()
defer s.metrics.monitorPartitionsDuration.Observe(time.Since(startTime).Seconds())
// Eventually this will also include job computation. But for now, collect partition data.
admin := kadm.NewClient(s.kafkaClient)

startOffsets, err := s.adminClient.ListStartOffsets(ctx, s.cfg.Kafka.Topic)
startOffsets, err := admin.ListStartOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list start offsets", "err", err)
}
endOffsets, err := s.adminClient.ListEndOffsets(ctx, s.cfg.Kafka.Topic)
endOffsets, err := admin.ListEndOffsets(ctx, s.cfg.Kafka.Topic)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list end offsets", "err", err)
}

s.metrics.monitorPartitionsDuration.Observe(time.Since(startTime).Seconds())

startOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionStartOffsets.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})
endOffsets.Each(func(o kadm.ListedOffset) {
s.metrics.partitionEndOffsets.WithLabelValues(fmt.Sprint(o.Partition)).Set(float64(o.Offset))
})

// Any errors from here on are ones that cause us to bail from this monitoring iteration.

consumedOffsets, err := s.adminClient.FetchOffsets(ctx, s.cfg.BuilderConsumerGroup)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to list consumed offsets", "err", err)
return
}

backlogTime, err := s.computeBacklogTime(ctx, consumedOffsets.KOffsets(), endOffsets.KOffsets())
if err != nil {
level.Warn(s.logger).Log("msg", "failed to compute backlog time", "err", err)
return
}

for p, t := range backlogTime {
s.metrics.partitionBacklogTime.WithLabelValues(fmt.Sprint(p)).Set(t.Seconds())
}
}

func (s *BlockBuilderScheduler) computeBacklogTime(ctx context.Context, consumedOffsets, endOffsets map[string]map[int32]kgo.Offset) (map[int32]time.Duration, error) {
// fetch the (consumed, end) records for each partition, and store the difference in the returned map.

loRecords, err := s.fetchSingleRecords(ctx, consumedOffsets)
if err != nil {
return nil, fmt.Errorf("fetch consumed: %w", err)
}
hiRecords, err := s.fetchSingleRecords(ctx, endOffsets)
if err != nil {
return nil, fmt.Errorf("fetch end records: %w", err)
}

backlogTime := make(map[int32]time.Duration, len(consumedOffsets))
for p, loRecord := range loRecords {
if hiRecord, ok := hiRecords[p]; ok {
backlogTime[p] = hiRecord.Timestamp.Sub(loRecord.Timestamp)
}
}
return backlogTime, nil
}

// fetchSingleRecords fetches the first record from each partition starting at the given offsets.
func (s *BlockBuilderScheduler) fetchSingleRecords(ctx context.Context, offsets map[string]map[int32]kgo.Offset) (map[int32]*kgo.Record, error) {
s.kafkaClient.AddConsumePartitions(offsets)
defer s.kafkaClient.PurgeTopicsFromConsuming(s.cfg.Kafka.Topic)
out := make(map[int32]*kgo.Record)
f := s.kafkaClient.PollFetches(ctx)
f.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, context.Canceled) {
level.Error(s.logger).Log("msg", "failed to fetch records", "err", err)
}
})
f.EachPartition(func(tp kgo.FetchTopicPartition) {
if _, seen := out[tp.Partition]; seen {
return
}
if len(tp.Records) > 0 {
// We're only interested in the first record per partition.
out[tp.Partition] = tp.Records[0]
}
})
return out, nil
}

// builderLag computes the lag for block-builder's consumer group, for all partitions.
func (s *BlockBuilderScheduler) builderLag(ctx context.Context) (kadm.GroupLag, error) {
boff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: time.Second,
MaxRetries: 10,
})
var lastErr error
for boff.Ongoing() {
groupLag, err := blockbuilder.GetGroupLag(ctx, s.adminClient, s.cfg.Kafka.Topic, s.cfg.BuilderConsumerGroup, (-1 * time.Hour).Milliseconds())
if err != nil {
lastErr = fmt.Errorf("get consumer group lag: %w", err)
} else {
return groupLag, nil
}

level.Warn(s.logger).Log("msg", "failed to get consumer group lag; will retry", "err", lastErr)
boff.Wait()
}

return nil, lastErr
}

0 comments on commit 271992a

Please sign in to comment.