Skip to content

Commit

Permalink
broker/client: surface JOURNAL_NOT_FOUND from RetryReader
Browse files Browse the repository at this point in the history
JOURNAL_NOT_FOUND is generally not a retry-able condition. It was
orignally made so with a "shrug", on the premise that users may create
journals after they had started their consumers. In practice,
I'm not aware of any utility for it.

Also tweak logging to reduce noise in environments that have many
journals with no current append volume, where read re-authorizations
are somewhat frequently obtained.
  • Loading branch information
jgraettinger committed Sep 6, 2024
1 parent 0463356 commit a6f7fe7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
15 changes: 9 additions & 6 deletions broker/client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
// seek to the requested offset, and read its content.
//
// Reader returns EOF if:
// * The broker closes the RPC, eg because its assignment has change or it's shutting down.
// * The requested EndOffset has been read through.
// * A Fragment being read by the Reader reaches EOF.
// - The broker closes the RPC, eg because its assignment has change or it's shutting down.
// - The requested EndOffset has been read through.
// - A Fragment being read by the Reader reaches EOF.
//
// If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable
// is returned upon reaching the journal write head.
Expand Down Expand Up @@ -179,6 +179,8 @@ func (r *Reader) Read(p []byte) (n int, err error) {
if err != io.EOF {
panic(err.Error()) // Status_OK implies graceful stream closure.
}
case pb.Status_JOURNAL_NOT_FOUND:
err = ErrJournalNotFound
case pb.Status_NOT_JOURNAL_BROKER:
err = ErrNotJournalBroker
case pb.Status_INSUFFICIENT_JOURNAL_BROKERS:
Expand All @@ -199,9 +201,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 {
}

// Seek provides a limited form of seeking support. Specifically, if:
// * A Fragment URL is being directly read, and
// * The Seek offset is ahead of the current Reader offset, and
// * The Fragment also covers the desired Seek offset
// - A Fragment URL is being directly read, and
// - The Seek offset is ahead of the current Reader offset, and
// - The Fragment also covers the desired Seek offset
//
// Then a seek is performed by reading and discarding to the seeked offset.
// Seek will otherwise return ErrSeekRequiresNewReader.
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
Expand Down
17 changes: 14 additions & 3 deletions broker/client/retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,24 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) {
} else {
return // Surface to caller.
}
case ErrInsufficientJournalBrokers, ErrNotJournalBroker, ErrJournalNotFound:
case ErrInsufficientJournalBrokers, ErrNotJournalBroker:
// Suppress logging for expected errors on first read attempt.
// We may be racing a concurrent Etcd watch and assignment of the broker cluster.
squelch = attempt == 0
case ErrJournalNotFound:
// If a Header was attached to the request, the journal was not found
// even after honoring its read-through Etcd revision. If not,
// we allow a handful of attempts to work around expected propagation
// delays of Etcd revisions if the journal was just created.
if rr.Reader.Request.Header != nil || attempt > 3 {
return // Surface to caller.
}
case io.EOF:
// Repeated EOF is common if topology changes or authorizations
// expire on a journal with no active appends.
// EOF means we had an established RPC, but it might not have sent
// any data before being closed server-side, so clear `attempts` to
// reduce log noise: it's common to next see ErrNotJournalBroker
// on broker topology changes or when authorizations are refreshed.
attempt = 0
squelch = true
default:
}
Expand Down

0 comments on commit a6f7fe7

Please sign in to comment.