diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go index e9f8b5dc6be..5cc5fc50340 100644 --- a/storage/operation/badgerimpl/iterator.go +++ b/storage/operation/badgerimpl/iterator.go @@ -34,10 +34,12 @@ func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage } } +// First seeks to the smallest key greater than or equal to the given key. func (i *badgerIterator) First() { i.iter.Seek(i.lowerBound) } +// Valid returns whether the iterator is positioned at a valid key-value pair. func (i *badgerIterator) Valid() bool { // if it's beyond the upper bound, it's invalid if !i.iter.Valid() { @@ -49,16 +51,20 @@ func (i *badgerIterator) Valid() bool { return valid } +// Next advances the iterator to the next key-value pair. func (i *badgerIterator) Next() { i.iter.Next() } +// IterItem returns the current key-value pair, or nil if done. func (i *badgerIterator) IterItem() storage.IterItem { return i.iter.Item() } var _ storage.IterItem = (*badger.Item)(nil) +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation func (i *badgerIterator) Close() error { i.iter.Close() return nil diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go index 8d7d982d65e..a410067a6b7 100644 --- a/storage/operation/badgerimpl/reader.go +++ b/storage/operation/badgerimpl/reader.go @@ -20,6 +20,14 @@ var _ io.Closer = (*noopCloser)(nil) func (noopCloser) Close() error { return nil } +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { tx := b.db.NewTransaction(false) defer tx.Discard() @@ -40,6 +48,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return value, noopCloser{}, nil } +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil } diff --git a/storage/operation/badgerimpl/writer.go b/storage/operation/badgerimpl/writer.go index 3837be3917f..769187166ba 100644 --- a/storage/operation/badgerimpl/writer.go +++ b/storage/operation/badgerimpl/writer.go @@ -19,22 +19,36 @@ type ReaderBatchWriter struct { var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b.globalReader } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b } +// BadgerWriteBatch returns the badger write batch func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { return b.batch } +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. func (b *ReaderBatchWriter) AddCallback(callback func(error)) { b.callbacks.AddCallback(callback) } +// Commit flushes the batch to the database. +// No errors expected during normal operation func (b *ReaderBatchWriter) Commit() error { err := b.batch.Flush() @@ -69,14 +83,27 @@ func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { var _ storage.Writer = (*ReaderBatchWriter)(nil) +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Set(key, value []byte) error { return b.batch.Set(key, value) } +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key) } +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error { err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error { err := b.batch.Delete(key) diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go index b1b6630cc51..bc0cd2bae69 100644 --- a/storage/operation/pebbleimpl/iterator.go +++ b/storage/operation/pebbleimpl/iterator.go @@ -32,18 +32,22 @@ func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops }, nil } +// First seeks to the smallest key greater than or equal to the given key. func (i *pebbleIterator) First() { i.iter.First() } +// Valid returns whether the iterator is positioned at a valid key-value pair. func (i *pebbleIterator) Valid() bool { return i.iter.Valid() } +// Next advances the iterator to the next key-value pair. func (i *pebbleIterator) Next() { i.iter.Next() } +// IterItem returns the current key-value pair, or nil if done. func (i *pebbleIterator) IterItem() storage.IterItem { return pebbleIterItem{iter: i.iter} } @@ -67,6 +71,8 @@ func (i pebbleIterItem) Value(fn func([]byte) error) error { return fn(val) } +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation func (i *pebbleIterator) Close() error { return i.iter.Close() } diff --git a/storage/operation/pebbleimpl/reader.go b/storage/operation/pebbleimpl/reader.go index 6cfdfd93da5..cff5a916048 100644 --- a/storage/operation/pebbleimpl/reader.go +++ b/storage/operation/pebbleimpl/reader.go @@ -22,6 +22,14 @@ var _ io.Closer = (*noopCloser)(nil) func (noopCloser) Close() error { return nil } +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { value, closer, err := b.db.Get(key) @@ -37,6 +45,11 @@ func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { return value, closer, nil } +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { return newPebbleIterator(b.db, startPrefix, endPrefix, ops) } diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go index c6ccdff06b9..3525bb59f2c 100644 --- a/storage/operation/pebbleimpl/writer.go +++ b/storage/operation/pebbleimpl/writer.go @@ -16,10 +16,18 @@ type ReaderBatchWriter struct { var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. func (b *ReaderBatchWriter) GlobalReader() storage.Reader { return b.globalReader } +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. func (b *ReaderBatchWriter) Writer() storage.Writer { return b } @@ -28,10 +36,15 @@ func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch { return b.batch } +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. func (b *ReaderBatchWriter) AddCallback(callback func(error)) { b.callbacks.AddCallback(callback) } +// Commit flushes the batch to the database. +// No errors expected during normal operation func (b *ReaderBatchWriter) Commit() error { err := b.batch.Commit(pebble.Sync) @@ -66,15 +79,27 @@ func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter { var _ storage.Writer = (*ReaderBatchWriter)(nil) +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Set(key, value []byte) error { return b.batch.Set(key, value, pebble.Sync) } +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation func (b *ReaderBatchWriter) Delete(key []byte) error { return b.batch.Delete(key, pebble.Sync) } -// DeleteByRange deletes all keys with a prefix in the range [startPrefix, endPrefix] (both inclusive). +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). // therefore, we need to increment the endPrefix to make it inclusive. diff --git a/storage/operations.go b/storage/operations.go index e0fe101f636..d407da299e7 100644 --- a/storage/operations.go +++ b/storage/operations.go @@ -63,6 +63,7 @@ type Reader interface { } // Writer is an interface for batch writing to a storage backend. +// It cannot be used concurrently for writing. type Writer interface { // Set sets the value for the given key. It overwrites any previous value // for that key; a DB is not a multi-map.