diff --git a/broker/client/reader.go b/broker/client/reader.go index 93222a5d..0f7d6dfe 100644 --- a/broker/client/reader.go +++ b/broker/client/reader.go @@ -26,9 +26,9 @@ import ( // seek to the requested offset, and read its content. // // Reader returns EOF if: -// * The broker closes the RPC, eg because its assignment has change or it's shutting down. -// * The requested EndOffset has been read through. -// * A Fragment being read by the Reader reaches EOF. +// - The broker closes the RPC, eg because its assignment has change or it's shutting down. +// - The requested EndOffset has been read through. +// - A Fragment being read by the Reader reaches EOF. // // If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable // is returned upon reaching the journal write head. @@ -179,6 +179,8 @@ func (r *Reader) Read(p []byte) (n int, err error) { if err != io.EOF { panic(err.Error()) // Status_OK implies graceful stream closure. } + case pb.Status_JOURNAL_NOT_FOUND: + err = ErrJournalNotFound case pb.Status_NOT_JOURNAL_BROKER: err = ErrNotJournalBroker case pb.Status_INSUFFICIENT_JOURNAL_BROKERS: @@ -199,9 +201,10 @@ func (r *Reader) AdjustedOffset(br *bufio.Reader) int64 { } // Seek provides a limited form of seeking support. Specifically, if: -// * A Fragment URL is being directly read, and -// * The Seek offset is ahead of the current Reader offset, and -// * The Fragment also covers the desired Seek offset +// - A Fragment URL is being directly read, and +// - The Seek offset is ahead of the current Reader offset, and +// - The Fragment also covers the desired Seek offset +// // Then a seek is performed by reading and discarding to the seeked offset. // Seek will otherwise return ErrSeekRequiresNewReader. func (r *Reader) Seek(offset int64, whence int) (int64, error) { diff --git a/broker/client/retry_reader.go b/broker/client/retry_reader.go index 72e731b5..c62da1ea 100644 --- a/broker/client/retry_reader.go +++ b/broker/client/retry_reader.go @@ -100,13 +100,24 @@ func (rr *RetryReader) Read(p []byte) (n int, err error) { } else { return // Surface to caller. } - case ErrInsufficientJournalBrokers, ErrNotJournalBroker, ErrJournalNotFound: + case ErrInsufficientJournalBrokers, ErrNotJournalBroker: // Suppress logging for expected errors on first read attempt. // We may be racing a concurrent Etcd watch and assignment of the broker cluster. squelch = attempt == 0 + case ErrJournalNotFound: + // If a Header was attached to the request, the journal was not found + // even after honoring its read-through Etcd revision. If not, + // we allow a handful of attempts to work around expected propagation + // delays of Etcd revisions if the journal was just created. + if rr.Reader.Request.Header != nil || attempt > 3 { + return // Surface to caller. + } case io.EOF: - // Repeated EOF is common if topology changes or authorizations - // expire on a journal with no active appends. + // EOF means we had an established RPC, but it might not have sent + // any data before being closed server-side, so clear `attempts` to + // reduce log noise: it's common to next see ErrNotJournalBroker + // on broker topology changes or when authorizations are refreshed. + attempt = 0 squelch = true default: }