diff --git a/cmd/bootstrap/utils/md5.go b/cmd/bootstrap/utils/md5.go index 65823fd6e96..4d4bbe21046 100644 --- a/cmd/bootstrap/utils/md5.go +++ b/cmd/bootstrap/utils/md5.go @@ -2,7 +2,8 @@ package utils // The google storage API only provides md5 and crc32 hence overriding the linter flag for md5 import ( - "crypto/md5" //nolint:gosec + // #nosec + "crypto/md5" "io" "os" ) diff --git a/storage/batch.go b/storage/batch.go index 3147fc5c0e7..bc9c4853294 100644 --- a/storage/batch.go +++ b/storage/batch.go @@ -1,13 +1,19 @@ package storage -import "github.com/dgraph-io/badger/v2" +import ( + "github.com/dgraph-io/badger/v2" +) +// Deprecated: Transaction is being deprecated as part of the transition from Badger to Pebble. +// Use Writer instead of Transaction for all new code. type Transaction interface { Set(key, val []byte) error } // BatchStorage serves as an abstraction over batch storage, adding ability to add ability to add extra // callbacks which fire after the batch is successfully flushed. +// Deprecated: BatchStorage is being deprecated as part of the transition from Badger to Pebble. +// Use ReaderBatchWriter instead of BatchStorage for all new code. type BatchStorage interface { GetWriter() *badger.WriteBatch diff --git a/storage/operation/badgerimpl/iterator.go b/storage/operation/badgerimpl/iterator.go new file mode 100644 index 00000000000..5cc5fc50340 --- /dev/null +++ b/storage/operation/badgerimpl/iterator.go @@ -0,0 +1,71 @@ +package badgerimpl + +import ( + "bytes" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" +) + +type badgerIterator struct { + iter *badger.Iterator + lowerBound []byte + upperBound []byte +} + +var _ storage.Iterator = (*badgerIterator)(nil) + +func newBadgerIterator(db *badger.DB, startPrefix, endPrefix []byte, ops storage.IteratorOption) *badgerIterator { + options := badger.DefaultIteratorOptions + if ops.IterateKeyOnly { + options.PrefetchValues = false + } + + tx := db.NewTransaction(false) + iter := tx.NewIterator(options) + + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + return &badgerIterator{ + iter: iter, + lowerBound: lowerBound, + upperBound: upperBound, + } +} + +// First seeks to the smallest key greater than or equal to the given key. +func (i *badgerIterator) First() { + i.iter.Seek(i.lowerBound) +} + +// Valid returns whether the iterator is positioned at a valid key-value pair. +func (i *badgerIterator) Valid() bool { + // if it's beyond the upper bound, it's invalid + if !i.iter.Valid() { + return false + } + key := i.iter.Item().Key() + // "< 0" means "key < upperBound" + valid := bytes.Compare(key, i.upperBound) < 0 + return valid +} + +// Next advances the iterator to the next key-value pair. +func (i *badgerIterator) Next() { + i.iter.Next() +} + +// IterItem returns the current key-value pair, or nil if done. +func (i *badgerIterator) IterItem() storage.IterItem { + return i.iter.Item() +} + +var _ storage.IterItem = (*badger.Item)(nil) + +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation +func (i *badgerIterator) Close() error { + i.iter.Close() + return nil +} diff --git a/storage/operation/badgerimpl/reader.go b/storage/operation/badgerimpl/reader.go new file mode 100644 index 00000000000..a410067a6b7 --- /dev/null +++ b/storage/operation/badgerimpl/reader.go @@ -0,0 +1,63 @@ +package badgerimpl + +import ( + "errors" + "io" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *badger.DB +} + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + tx := b.db.NewTransaction(false) + defer tx.Discard() + + item, err := tx.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, nil, storage.ErrNotFound + } + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + value, err := item.ValueCopy(nil) + if err != nil { + return nil, nil, irrecoverable.NewExceptionf("could not load value: %w", err) + } + + return value, noopCloser{}, nil +} + +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newBadgerIterator(b.db, startPrefix, endPrefix, ops), nil +} + +// ToReader is a helper function to convert a *badger.DB to a Reader +func ToReader(db *badger.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/badgerimpl/writer.go b/storage/operation/badgerimpl/writer.go new file mode 100644 index 00000000000..769187166ba --- /dev/null +++ b/storage/operation/badgerimpl/writer.go @@ -0,0 +1,120 @@ +package badgerimpl + +import ( + "fmt" + + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *badger.WriteBatch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +// BadgerWriteBatch returns the badger write batch +func (b *ReaderBatchWriter) BadgerWriteBatch() *badger.WriteBatch { + return b.batch +} + +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +// Commit flushes the batch to the database. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Flush() + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *badger.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *badger.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewWriteBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value) +} + +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key) +} + +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation +func (b *ReaderBatchWriter) DeleteByRange(globalReader storage.Reader, startPrefix, endPrefix []byte) error { + err := operation.IterateKeysInPrefixRange(startPrefix, endPrefix, func(key []byte) error { + err := b.batch.Delete(key) + if err != nil { + return fmt.Errorf("could not add key to delete batch (%v): %w", key, err) + } + return nil + })(globalReader) + + if err != nil { + return fmt.Errorf("could not find keys by range to be deleted: %w", err) + } + return nil +} diff --git a/storage/operation/callbacks.go b/storage/operation/callbacks.go new file mode 100644 index 00000000000..40d414ded91 --- /dev/null +++ b/storage/operation/callbacks.go @@ -0,0 +1,24 @@ +package operation + +import "sync" + +type Callbacks struct { + sync.Mutex // protect callbacks + callbacks []func(error) +} + +func (b *Callbacks) AddCallback(callback func(error)) { + b.Lock() + defer b.Unlock() + + b.callbacks = append(b.callbacks, callback) +} + +func (b *Callbacks) NotifyCallbacks(err error) { + b.Lock() + defer b.Unlock() + + for _, callback := range b.callbacks { + callback(err) + } +} diff --git a/storage/operation/codec.go b/storage/operation/codec.go new file mode 100644 index 00000000000..43dc4c37f7a --- /dev/null +++ b/storage/operation/codec.go @@ -0,0 +1,34 @@ +package operation + +import ( + "encoding/binary" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +// EncodeKeyPart encodes a value to be used as a part of a key to be stored in storage. +func EncodeKeyPart(v interface{}) []byte { + switch i := v.(type) { + case uint8: + return []byte{i} + case uint32: + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, i) + return b + case uint64: + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b + case string: + return []byte(i) + case flow.Role: + return []byte{byte(i)} + case flow.Identifier: + return i[:] + case flow.ChainID: + return []byte(i) + default: + panic(fmt.Sprintf("unsupported type to convert (%T)", v)) + } +} diff --git a/storage/operation/dbtest/helper.go b/storage/operation/dbtest/helper.go new file mode 100644 index 00000000000..64a166c2390 --- /dev/null +++ b/storage/operation/dbtest/helper.go @@ -0,0 +1,60 @@ +package dbtest + +import ( + "testing" + + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/badgerimpl" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + "github.com/onflow/flow-go/utils/unittest" +) + +// helper types and functions +type WithWriter func(func(storage.Writer) error) error + +func RunWithStorages(t *testing.T, fn func(*testing.T, storage.Reader, WithWriter)) { + t.Run("BadgerStorage", func(t *testing.T) { + unittest.RunWithBadgerDB(t, func(db *badger.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := badgerimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := badgerimpl.ToReader(db) + fn(t, reader, withWriter) + }) + }) + + t.Run("PebbleStorage", func(t *testing.T) { + unittest.RunWithPebbleDB(t, func(db *pebble.DB) { + withWriter := func(writing func(storage.Writer) error) error { + writer := pebbleimpl.NewReaderBatchWriter(db) + err := writing(writer) + if err != nil { + return err + } + + err = writer.Commit() + if err != nil { + return err + } + return nil + } + + reader := pebbleimpl.ToReader(db) + fn(t, reader, withWriter) + }) + }) +} diff --git a/storage/operation/pebbleimpl/iterator.go b/storage/operation/pebbleimpl/iterator.go new file mode 100644 index 00000000000..bc0cd2bae69 --- /dev/null +++ b/storage/operation/pebbleimpl/iterator.go @@ -0,0 +1,78 @@ +package pebbleimpl + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" +) + +type pebbleIterator struct { + iter *pebble.Iterator +} + +var _ storage.Iterator = (*pebbleIterator)(nil) + +func newPebbleIterator(reader pebble.Reader, startPrefix, endPrefix []byte, ops storage.IteratorOption) (*pebbleIterator, error) { + lowerBound, upperBound := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + + options := pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + } + + iter, err := reader.NewIter(&options) + if err != nil { + return nil, fmt.Errorf("can not create iterator: %w", err) + } + + return &pebbleIterator{ + iter: iter, + }, nil +} + +// First seeks to the smallest key greater than or equal to the given key. +func (i *pebbleIterator) First() { + i.iter.First() +} + +// Valid returns whether the iterator is positioned at a valid key-value pair. +func (i *pebbleIterator) Valid() bool { + return i.iter.Valid() +} + +// Next advances the iterator to the next key-value pair. +func (i *pebbleIterator) Next() { + i.iter.Next() +} + +// IterItem returns the current key-value pair, or nil if done. +func (i *pebbleIterator) IterItem() storage.IterItem { + return pebbleIterItem{iter: i.iter} +} + +type pebbleIterItem struct { + iter *pebble.Iterator +} + +var _ storage.IterItem = (*pebbleIterItem)(nil) + +func (i pebbleIterItem) Key() []byte { + return i.iter.Key() +} + +func (i pebbleIterItem) Value(fn func([]byte) error) error { + val, err := i.iter.ValueAndErr() + if err != nil { + return err + } + + return fn(val) +} + +// Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. +// No errors expected during normal operation +func (i *pebbleIterator) Close() error { + return i.iter.Close() +} diff --git a/storage/operation/pebbleimpl/reader.go b/storage/operation/pebbleimpl/reader.go new file mode 100644 index 00000000000..cff5a916048 --- /dev/null +++ b/storage/operation/pebbleimpl/reader.go @@ -0,0 +1,60 @@ +package pebbleimpl + +import ( + "errors" + "io" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +type dbReader struct { + db *pebble.DB +} + +var _ storage.Reader = (*dbReader)(nil) + +type noopCloser struct{} + +var _ io.Closer = (*noopCloser)(nil) + +func (noopCloser) Close() error { return nil } + +// Get gets the value for the given key. It returns ErrNotFound if the DB +// does not contain the key. +// other errors are exceptions +// +// The caller should not modify the contents of the returned slice, but it is +// safe to modify the contents of the argument after Get returns. The +// returned slice will remain valid until the returned Closer is closed. On +// success, the caller MUST call closer.Close() or a memory leak will occur. +func (b dbReader) Get(key []byte) ([]byte, io.Closer, error) { + value, closer, err := b.db.Get(key) + + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return nil, nil, storage.ErrNotFound + } + + // exception while checking for the key + return nil, nil, irrecoverable.NewExceptionf("could not load data: %w", err) + } + + return value, closer, nil +} + +// NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. +// Specifically, all keys that meet ANY of the following conditions are included in the iteration: +// - have a prefix equal to startPrefix OR +// - have a prefix equal to the endPrefix OR +// - have a prefix that is lexicographically between startPrefix and endPrefix +func (b dbReader) NewIter(startPrefix, endPrefix []byte, ops storage.IteratorOption) (storage.Iterator, error) { + return newPebbleIterator(b.db, startPrefix, endPrefix, ops) +} + +// ToReader is a helper function to convert a *pebble.DB to a Reader +func ToReader(db *pebble.DB) storage.Reader { + return dbReader{db} +} diff --git a/storage/operation/pebbleimpl/writer.go b/storage/operation/pebbleimpl/writer.go new file mode 100644 index 00000000000..3525bb59f2c --- /dev/null +++ b/storage/operation/pebbleimpl/writer.go @@ -0,0 +1,108 @@ +package pebbleimpl + +import ( + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage" + op "github.com/onflow/flow-go/storage/operation" +) + +type ReaderBatchWriter struct { + globalReader storage.Reader + batch *pebble.Batch + + callbacks op.Callbacks +} + +var _ storage.ReaderBatchWriter = (*ReaderBatchWriter)(nil) + +// GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). +// This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. +// This reader may observe different values for the same key on subsequent reads. +func (b *ReaderBatchWriter) GlobalReader() storage.Reader { + return b.globalReader +} + +// Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. +// When we `Write` into the batch, that write operation is added to the pending batch, but not committed. +// The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. +// Note: +// - The writer cannot be used concurrently for writing. +func (b *ReaderBatchWriter) Writer() storage.Writer { + return b +} + +func (b *ReaderBatchWriter) PebbleWriterBatch() *pebble.Batch { + return b.batch +} + +// AddCallback adds a callback to execute after the batch has been flush +// regardless the batch update is succeeded or failed. +// The error parameter is the error returned by the batch update. +func (b *ReaderBatchWriter) AddCallback(callback func(error)) { + b.callbacks.AddCallback(callback) +} + +// Commit flushes the batch to the database. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Commit() error { + err := b.batch.Commit(pebble.Sync) + + b.callbacks.NotifyCallbacks(err) + + return err +} + +func WithReaderBatchWriter(db *pebble.DB, fn func(storage.ReaderBatchWriter) error) error { + batch := NewReaderBatchWriter(db) + + err := fn(batch) + if err != nil { + // fn might use lock to ensure concurrent safety while reading and writing data + // and the lock is usually released by a callback. + // in other words, fn might hold a lock to be released by a callback, + // we need to notify the callback for the locks to be released before + // returning the error. + batch.callbacks.NotifyCallbacks(err) + return err + } + + return batch.Commit() +} + +func NewReaderBatchWriter(db *pebble.DB) *ReaderBatchWriter { + return &ReaderBatchWriter{ + globalReader: ToReader(db), + batch: db.NewBatch(), + } +} + +var _ storage.Writer = (*ReaderBatchWriter)(nil) + +// Set sets the value for the given key. It overwrites any previous value +// for that key; a DB is not a multi-map. +// +// It is safe to modify the contents of the arguments after Set returns. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Set(key, value []byte) error { + return b.batch.Set(key, value, pebble.Sync) +} + +// Delete deletes the value for the given key. Deletes are blind all will +// succeed even if the given key does not exist. +// +// It is safe to modify the contents of the arguments after Delete returns. +// No errors expected during normal operation +func (b *ReaderBatchWriter) Delete(key []byte) error { + return b.batch.Delete(key, pebble.Sync) +} + +// DeleteByRange removes all keys with a prefix that falls within the +// range [start, end], both inclusive. +// No errors expected during normal operation +func (b *ReaderBatchWriter) DeleteByRange(_ storage.Reader, startPrefix, endPrefix []byte) error { + // DeleteRange takes the prefix range with start (inclusive) and end (exclusive, note: not inclusive). + // therefore, we need to increment the endPrefix to make it inclusive. + start, end := storage.StartEndPrefixToLowerUpperBound(startPrefix, endPrefix) + return b.batch.DeleteRange(start, end, pebble.Sync) +} diff --git a/storage/operation/reads.go b/storage/operation/reads.go new file mode 100644 index 00000000000..1be299ab9d8 --- /dev/null +++ b/storage/operation/reads.go @@ -0,0 +1,207 @@ +package operation + +import ( + "bytes" + "errors" + "fmt" + + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// CheckFunc is a function that checks if the value should be read and decoded. +// return (true, nil) to read the value and pass it to the CreateFunc and HandleFunc for decoding +// return (false, nil) to skip reading the value +// return (false, err) if running into any exception, the iteration should be stopped. +type CheckFunc func(key []byte) (bool, error) + +// createFunc returns a pointer to an initialized entity that we can potentially +// decode the next value into during a badger DB iteration. +type CreateFunc func() interface{} + +// handleFunc is a function that starts the processing of the current key-value +// pair during a badger iteration. It should be called after the key was checked +// and the entity was decoded. +// No errors are expected during normal operation. Any errors will halt the iteration. +type HandleFunc func() error +type IterationFunc func() (CheckFunc, CreateFunc, HandleFunc) + +// IterateKeysInPrefixRange will iterate over all keys with prefixes in the range [startPrefix, endPrefix] (both inclusive) +func IterateKeysInPrefixRange(startPrefix []byte, endPrefix []byte, check func(key []byte) error) func(storage.Reader) error { + return Iterate(startPrefix, endPrefix, func() (CheckFunc, CreateFunc, HandleFunc) { + return func(key []byte) (bool, error) { + err := check(key) + if err != nil { + return false, err + } + return false, nil + }, nil, nil + }, storage.IteratorOption{IterateKeyOnly: true}) +} + +// Iterate will iterate over all keys with prefixes in the given range [startPrefix, endPrefix] (both inclusive) +func Iterate(startPrefix []byte, endPrefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return func(r storage.Reader) error { + + if len(startPrefix) == 0 { + return fmt.Errorf("startPrefix prefix is empty") + } + + if len(endPrefix) == 0 { + return fmt.Errorf("endPrefix prefix is empty") + } + + // Reverse iteration is not supported by pebble + if bytes.Compare(startPrefix, endPrefix) > 0 { + return fmt.Errorf("startPrefix key must be less than or equal to endPrefix key") + } + + it, err := r.NewIter(startPrefix, endPrefix, opt) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + for it.First(); it.Valid(); it.Next() { + item := it.IterItem() + key := item.Key() + + // initialize processing functions for iteration + check, create, handle := iterFunc() + + keyCopy := make([]byte, len(key)) + + // The underlying database may re-use and modify the backing memory of the returned key. + // Tor safety we proactively make a copy before passing the key to the upper layer. + copy(keyCopy, key) + + // check if we should process the item at all + shouldReadValue, err := check(keyCopy) + if err != nil { + return err + } + if !shouldReadValue { // skip reading value + continue + } + + err = item.Value(func(val []byte) error { + + // decode into the entity + entity := create() + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + + // process the entity + err = handle() + if err != nil { + return fmt.Errorf("could not handle entity: %w", err) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("could not process value: %w", err) + } + } + + return nil + } +} + +// Traverse will iterate over all keys with the given prefix +func Traverse(prefix []byte, iterFunc IterationFunc, opt storage.IteratorOption) func(storage.Reader) error { + return Iterate(prefix, prefix, iterFunc, opt) +} + +// Exists returns true if a key exists in the database. +// No errors are expected during normal operation. +func Exists(key []byte, keyExists *bool) func(storage.Reader) error { + return func(r storage.Reader) error { + _, closer, err := r.Get(key) + if err != nil { + // the key does not exist in the database + if errors.Is(err, storage.ErrNotFound) { + *keyExists = false + return nil + } + // exception while checking for the key + return irrecoverable.NewExceptionf("could not load data: %w", err) + } + defer closer.Close() + + // the key does exist in the database + *keyExists = true + return nil + } +} + +// Retrieve will retrieve the binary data under the given key from the database +// and decode it into the given entity. The provided entity needs to be a +// pointer to an initialized entity of the correct type. +// Error returns: +// - storage.ErrNotFound if the key does not exist in the database +// - generic error in case of unexpected failure from the database layer, or failure +// to decode an existing database value +func Retrieve(key []byte, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + val, closer, err := r.Get(key) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("could not decode entity: %w", err) + } + return nil + } +} + +// FindHighestAtOrBelow finds the highest key with the given prefix and +// height equal to or below the given height. +func FindHighestAtOrBelow(prefix []byte, height uint64, entity interface{}) func(storage.Reader) error { + return func(r storage.Reader) error { + if len(prefix) == 0 { + return fmt.Errorf("prefix must not be empty") + } + + key := append(prefix, EncodeKeyPart(height)...) + it, err := r.NewIter(prefix, key, storage.DefaultIteratorOptions()) + if err != nil { + return fmt.Errorf("can not create iterator: %w", err) + } + defer it.Close() + + var highestKey []byte + // find highest value below the given height + for it.First(); it.Valid(); it.Next() { + highestKey = it.IterItem().Key() + } + + if len(highestKey) == 0 { + return storage.ErrNotFound + } + + // read the value of the highest key + val, closer, err := r.Get(highestKey) + if err != nil { + return err + } + + defer closer.Close() + + err = msgpack.Unmarshal(val, entity) + if err != nil { + return irrecoverable.NewExceptionf("failed to decode value: %w", err) + } + + return nil + } +} diff --git a/storage/operation/reads_test.go b/storage/operation/reads_test.go new file mode 100644 index 00000000000..b9addec418d --- /dev/null +++ b/storage/operation/reads_test.go @@ -0,0 +1,190 @@ +package operation_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func TestIterateKeysInPrefixRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define the prefix range + prefixStart := []byte{0x10} + prefixEnd := []byte{0x20} + + // Create a range of keys around the prefix start/end values + keys := [][]byte{ + // before start -> not included in range + {0x09, 0xff}, + // within the start prefix -> included in range + {0x10, 0x00}, + {0x10, 0xff}, + // between start and end -> included in range + {0x15, 0x00}, + {0x1A, 0xff}, + // within the end prefix -> included in range + {0x20, 0x00}, + {0x20, 0xff}, + // after end -> not included in range + {0x21, 0x00}, + } + + // The first and last keys are outside the prefix range, so we omit them + // from keysInRange, which is the set of keys we expect in the iteration + keysInRange := keys[1 : len(keys)-1] + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Forward iteration and check boundaries + var found [][]byte + require.NoError(t, operation.IterateKeysInPrefixRange(prefixStart, prefixEnd, func(key []byte) error { + found = append(found, key) + return nil + })(r), "should iterate forward without error") + require.ElementsMatch(t, keysInRange, found, "forward iteration should return the correct keys in range") + }) +} + +func TestTraverse(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + keyVals := map[[2]byte]uint64{ + {0x41, 0xff}: 3, + {0x42, 0x00}: 11, + {0xff}: 13, + {0x42, 0x56}: 17, + {0x00}: 19, + {0x42, 0xff}: 23, + {0x43, 0x00}: 33, + } + expected := []uint64{11, 23} + + // Insert the keys and values into storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for key, val := range keyVals { + err := operation.Upsert(key[:], val)(writer) + if err != nil { + return err + } + } + return nil + })) + + actual := make([]uint64, 0, len(keyVals)) + + // Define the iteration logic + iterationFunc := func() (operation.CheckFunc, operation.CreateFunc, operation.HandleFunc) { + check := func(key []byte) (bool, error) { + // Skip the key {0x42, 0x56} + return !bytes.Equal(key, []byte{0x42, 0x56}), nil + } + var val uint64 + create := func() interface{} { + return &val + } + handle := func() error { + actual = append(actual, val) + return nil + } + return check, create, handle + } + + // Traverse the keys starting with prefix {0x42} + err := operation.Traverse([]byte{0x42}, iterationFunc, storage.DefaultIteratorOptions())(r) + require.NoError(t, err, "traverse should not return an error") + + // Assert that the actual values match the expected values + require.Equal(t, expected, actual, "traversed values should match expected values") + }) +} + +func TestFindHighestAtOrBelow(t *testing.T) { + // Helper function to insert an entity into the storage + insertEntity := func(writer storage.Writer, prefix []byte, height uint64, entity Entity) error { + key := append(prefix, operation.EncodeKeyPart(height)...) + return operation.Upsert(key, entity)(writer) + } + + // Entities to be inserted + entities := []struct { + height uint64 + entity Entity + }{ + {5, Entity{ID: 41}}, + {10, Entity{ID: 42}}, + {15, Entity{ID: 43}}, + } + + // Run test with multiple storage backends + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + prefix := []byte("test_prefix") + + // Insert entities into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := insertEntity(writer, prefix, e.height, e.entity); err != nil { + return err + } + } + return nil + })) + + // Declare entity to store the results of FindHighestAtOrBelow + var entity Entity + + // Test cases + tests := []struct { + name string + height uint64 + expectedValue uint64 + expectError bool + expectedErrMsg string + }{ + {"target first height exists", 5, 41, false, ""}, + {"target height exists", 10, 42, false, ""}, + {"target height above", 11, 42, false, ""}, + {"target height above highest", 20, 43, false, ""}, + {"target height below lowest", 4, 0, true, storage.ErrNotFound.Error()}, + {"empty prefix", 5, 0, true, "prefix must not be empty"}, + } + + // Execute test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + prefixToUse := prefix + + if tt.name == "empty prefix" { + prefixToUse = []byte{} + } + + err := operation.FindHighestAtOrBelow( + prefixToUse, + tt.height, + &entity)(r) + + if tt.expectError { + require.Error(t, err, fmt.Sprintf("expected error but got nil, entity: %v", entity)) + require.Contains(t, err.Error(), tt.expectedErrMsg) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedValue, entity.ID) + } + }) + } + }) +} diff --git a/storage/operation/writes.go b/storage/operation/writes.go new file mode 100644 index 00000000000..3bbe08d12d2 --- /dev/null +++ b/storage/operation/writes.go @@ -0,0 +1,58 @@ +package operation + +import ( + "github.com/vmihailenco/msgpack" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" +) + +// Upsert will encode the given entity using msgpack and will insert the resulting +// binary data under the provided key. +// If the key already exists, the value will be overwritten. +// Error returns: +// - generic error in case of unexpected failure from the database layer or +// encoding failure. +func Upsert(key []byte, val interface{}) func(storage.Writer) error { + return func(w storage.Writer) error { + value, err := msgpack.Marshal(val) + if err != nil { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + + err = w.Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// Remove removes the entity with the given key, if it exists. If it doesn't +// exist, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func Remove(key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.Delete(key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} + +// RemoveByPrefix removes all keys with the given prefix defined by [startPrefix, endPrefix] (both inclusive). +// If no keys exist with the given prefix, this is a no-op. +// Error returns: +// * generic error in case of unexpected database error +func RemoveByPrefix(reader storage.Reader, key []byte) func(storage.Writer) error { + return func(w storage.Writer) error { + err := w.DeleteByRange(reader, key, key) + if err != nil { + return irrecoverable.NewExceptionf("could not delete item: %w", err) + } + return nil + } +} diff --git a/storage/operation/writes_test.go b/storage/operation/writes_test.go new file mode 100644 index 00000000000..9355b5822db --- /dev/null +++ b/storage/operation/writes_test.go @@ -0,0 +1,276 @@ +package operation_test + +import ( + "encoding/binary" + "errors" + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" +) + +func TestReadWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + // Test read nothing should return not found + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error") + + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value") + + // Test write again should overwrite + newEntity := Entity{ID: 42} + require.NoError(t, withWriter(operation.Upsert(e.Key(), newEntity))) + + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, newEntity, readBack, "expected overwritten value to be retrieved") + + // Test write should not overwrite a different key + anotherEntity := Entity{ID: 84} + require.NoError(t, withWriter(operation.Upsert(anotherEntity.Key(), anotherEntity))) + + var anotherReadBack Entity + require.NoError(t, operation.Retrieve(anotherEntity.Key(), &anotherReadBack)(r)) + require.Equal(t, anotherEntity, anotherReadBack, "expected different key to return different value") + }) +} + +func TestReadWriteMalformed(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + ue := UnencodeableEntity(e) + + // Test write should return encoding error + require.NoError(t, withWriter(func(writer storage.Writer) error { + err := operation.Upsert(e.Key(), ue)(writer) + require.Contains(t, err.Error(), errCantEncode.Error(), "expected encoding error") + return nil + })) + + // Test read should return decoding error + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + }) +} + +// Verify multiple entities can be removed in one batch update +func TestBatchWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + // Define multiple entities for batch insertion + entities := []Entity{ + {ID: 1337}, + {ID: 42}, + {ID: 84}, + } + + // Batch write: insert multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Upsert(e.Key(), e)(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity can be read back + for _, e := range entities { + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for entity ID %d", e.ID) + } + + // Batch update: remove multiple entities in a single transaction + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, e := range entities { + if err := operation.Remove(e.Key())(writer); err != nil { + return err + } + } + return nil + })) + + // Verify that each entity has been removed + for _, e := range entities { + var readBack Entity + err := operation.Retrieve(e.Key(), &readBack)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error for entity ID %d after removal", e.ID) + } + }) +} + +func TestRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + e := Entity{ID: 1337} + + var exists bool + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.False(t, exists, "expected key to not exist") + + // Test delete nothing should return OK + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Test write, delete, then read should return not found + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + require.NoError(t, operation.Exists(e.Key(), &exists)(r)) + require.True(t, exists, "expected key to exist") + + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete") + }) +} + +func TestConcurrentWrite(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numWrites := 10 // number of concurrent writes + + for i := 0; i < numWrites; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent write to a different key + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + + var readBack Entity + require.NoError(t, operation.Retrieve(e.Key(), &readBack)(r)) + require.Equal(t, e, readBack, "expected retrieved value to match written value for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestConcurrentRemove(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + var wg sync.WaitGroup + numDeletes := 10 // number of concurrent deletions + + // First, insert entities to be deleted concurrently + for i := 0; i < numDeletes; i++ { + e := Entity{ID: uint64(i)} + require.NoError(t, withWriter(operation.Upsert(e.Key(), e))) + } + + // Now, perform concurrent deletes + for i := 0; i < numDeletes; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + e := Entity{ID: uint64(i)} + + // Simulate a concurrent delete + require.NoError(t, withWriter(operation.Remove(e.Key()))) + + // Check that the item is no longer retrievable + var item Entity + err := operation.Retrieve(e.Key(), &item)(r) + require.True(t, errors.Is(err, storage.ErrNotFound), "expected not found error after delete for key %d", i) + }(i) + } + + wg.Wait() // Wait for all goroutines to finish + }) +} + +func TestRemoveRange(t *testing.T) { + dbtest.RunWithStorages(t, func(t *testing.T, r storage.Reader, withWriter dbtest.WithWriter) { + + // Define the prefix + prefix := []byte{0x10} + + // Create a range of keys around the boundaries of the prefix + keys := [][]byte{ + // before prefix -> not included in range + {0x09, 0xff}, + // within the prefix -> included in range + {0x10, 0x00}, + {0x10, 0x50}, + {0x10, 0xff}, + // after end -> not included in range + {0x11, 0x00}, + {0x1A, 0xff}, + } + + // Keys expected to be in the prefix range + includeStart, includeEnd := 1, 3 + + // Insert the keys into the storage + require.NoError(t, withWriter(func(writer storage.Writer) error { + for _, key := range keys { + value := []byte{0x00} // value are skipped, doesn't matter + err := operation.Upsert(key, value)(writer) + if err != nil { + return err + } + } + return nil + })) + + // Remove the keys in the prefix range + require.NoError(t, withWriter(operation.RemoveByPrefix(r, prefix))) + + // Verify that the keys in the prefix range have been removed + for i, key := range keys { + var exists bool + require.NoError(t, operation.Exists(key, &exists)(r)) + t.Logf("key %x exists: %t", key, exists) + + deleted := includeStart <= i && i <= includeEnd + + // An item that was not deleted must exist + require.Equal(t, !deleted, exists, + "expected key %x to be %s", key, map[bool]string{true: "deleted", false: "not deleted"}) + } + }) +} + +type Entity struct { + ID uint64 +} + +func (e Entity) Key() []byte { + byteSlice := make([]byte, 8) // uint64 is 8 bytes + binary.BigEndian.PutUint64(byteSlice, e.ID) + return byteSlice +} + +type UnencodeableEntity Entity + +var errCantEncode = fmt.Errorf("encoding not supported") +var errCantDecode = fmt.Errorf("decoding not supported") + +func (a UnencodeableEntity) MarshalJSON() ([]byte, error) { + return nil, errCantEncode +} + +func (a *UnencodeableEntity) UnmarshalJSON(b []byte) error { + return errCantDecode +} + +func (a UnencodeableEntity) MarshalMsgpack() ([]byte, error) { + return nil, errCantEncode +} + +func (a UnencodeableEntity) UnmarshalMsgpack(b []byte) error { + return errCantDecode +} diff --git a/storage/operations.go b/storage/operations.go new file mode 100644 index 00000000000..d407da299e7 --- /dev/null +++ b/storage/operations.go @@ -0,0 +1,146 @@ +package storage + +import ( + "io" +) + +// Iterator is an interface for iterating over key-value pairs in a storage backend. +type Iterator interface { + // First seeks to the smallest key greater than or equal to the given key. + First() + + // Valid returns whether the iterator is positioned at a valid key-value pair. + Valid() bool + + // Next advances the iterator to the next key-value pair. + Next() + + // IterItem returns the current key-value pair, or nil if done. + IterItem() IterItem + + // Close closes the iterator. Iterator must be closed, otherwise it causes memory leak. + // No errors expected during normal operation + Close() error +} + +// IterItem is an interface for iterating over key-value pairs in a storage backend. +type IterItem interface { + Key() []byte + + // Value returns the value of the current key-value pair + // The reason it takes a function is to follow badgerDB's API pattern + // No errors expected during normal operation + Value(func(val []byte) error) error +} + +type IteratorOption struct { + IterateKeyOnly bool // default false +} + +func DefaultIteratorOptions() IteratorOption { + return IteratorOption{ + IterateKeyOnly: false, // only needed for badger. ignored by pebble + } +} + +type Reader interface { + // Get gets the value for the given key. It returns ErrNotFound if the DB + // does not contain the key. + // other errors are exceptions + // + // The caller should not modify the contents of the returned slice, but it is + // safe to modify the contents of the argument after Get returns. The + // returned slice will remain valid until the returned Closer is closed. On + // success, the caller MUST call closer.Close() or a memory leak will occur. + Get(key []byte) (value []byte, closer io.Closer, err error) + + // NewIter returns a new Iterator for the given key prefix range [startPrefix, endPrefix], both inclusive. + // Specifically, all keys that meet ANY of the following conditions are included in the iteration: + // - have a prefix equal to startPrefix OR + // - have a prefix equal to the endPrefix OR + // - have a prefix that is lexicographically between startPrefix and endPrefix + NewIter(startPrefix, endPrefix []byte, ops IteratorOption) (Iterator, error) +} + +// Writer is an interface for batch writing to a storage backend. +// It cannot be used concurrently for writing. +type Writer interface { + // Set sets the value for the given key. It overwrites any previous value + // for that key; a DB is not a multi-map. + // + // It is safe to modify the contents of the arguments after Set returns. + // No errors expected during normal operation + Set(k, v []byte) error + + // Delete deletes the value for the given key. Deletes are blind all will + // succeed even if the given key does not exist. + // + // It is safe to modify the contents of the arguments after Delete returns. + // No errors expected during normal operation + Delete(key []byte) error + + // DeleteByRange removes all keys with a prefix that falls within the + // range [start, end], both inclusive. + // No errors expected during normal operation + DeleteByRange(globalReader Reader, startPrefix, endPrefix []byte) error +} + +// ReaderBatchWriter is an interface for reading and writing to a storage backend. +// It is useful for performing a related sequence of reads and writes, after which you would like +// to modify some non-database state if the sequence completed successfully (via AddCallback). +// If you are not using AddCallback, avoid using ReaderBatchWriter: use Reader and Writer directly. +type ReaderBatchWriter interface { + // GlobalReader returns a database-backed reader which reads the latest committed global database state ("read-committed isolation"). + // This reader will not read writes written to ReaderBatchWriter.Writer until the write batch is committed. + // This reader may observe different values for the same key on subsequent reads. + GlobalReader() Reader + + // Writer returns a writer associated with a batch of writes. The batch is pending until it is committed. + // When we `Write` into the batch, that write operation is added to the pending batch, but not committed. + // The commit operation is atomic w.r.t. the batch; either all writes are applied to the database, or no writes are. + // Note: + // - The writer cannot be used concurrently for writing. + Writer() Writer + + // AddCallback adds a callback to execute after the batch has been flush + // regardless the batch update is succeeded or failed. + // The error parameter is the error returned by the batch update. + AddCallback(func(error)) +} + +// OnCommitSucceed adds a callback to execute after the batch has been successfully committed. +func OnCommitSucceed(b ReaderBatchWriter, onSuccessFn func()) { + b.AddCallback(func(err error) { + if err == nil { + onSuccessFn() + } + }) +} + +func StartEndPrefixToLowerUpperBound(startPrefix, endPrefix []byte) (lowerBound, upperBound []byte) { + // Return value lowerBound specifies the smallest key to iterate and it's inclusive. + // Return value upperBound specifies the largest key to iterate and it's exclusive (not inclusive) + // in order to match all keys prefixed with `endPrefix`, we increment the bytes of `endPrefix` by 1, + // for instance, to iterate keys between "hello" and "world", + // we use "hello" as LowerBound, "worle" as UpperBound, so that "world", "world1", "worldffff...ffff" + // will all be included. + return startPrefix, PrefixUpperBound(endPrefix) +} + +// PrefixUpperBound returns a key K such that all possible keys beginning with the input prefix +// sort lower than K according to the byte-wise lexicographic key ordering used by Pebble. +// This is used to define an upper bound for iteration, when we want to iterate over +// all keys beginning with a given prefix. +// referred to https://pkg.go.dev/github.com/cockroachdb/pebble#example-Iterator-PrefixIteration +func PrefixUpperBound(prefix []byte) []byte { + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + // increment the bytes by 1 + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + return nil // no upper-bound +} diff --git a/utils/unittest/unittest.go b/utils/unittest/unittest.go index 4d13b279087..5edcd3d477e 100644 --- a/utils/unittest/unittest.go +++ b/utils/unittest/unittest.go @@ -368,6 +368,11 @@ func TempBadgerDB(t testing.TB) (*badger.DB, string) { return db, dir } +func TempPebbleDB(t testing.TB) (*pebble.DB, string) { + dir := TempDir(t) + return PebbleDB(t, dir), dir +} + func TempPebblePath(t *testing.T) string { return path.Join(TempDir(t), "pebble"+strconv.Itoa(rand.Int())+".db") } @@ -380,6 +385,43 @@ func TempPebbleDBWithOpts(t testing.TB, opts *pebble.Options) (*pebble.DB, strin return db, dbpath } +func RunWithPebbleDB(t testing.TB, f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + +func PebbleDB(t testing.TB, dir string) *pebble.DB { + db, err := pebble.Open(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +func TypedPebbleDB(t testing.TB, dir string, create func(string, *pebble.Options) (*pebble.DB, error)) *pebble.DB { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + return db +} + +func RunWithTypedPebbleDB( + t testing.TB, + create func(string, *pebble.Options) (*pebble.DB, error), + f func(*pebble.DB)) { + RunWithTempDir(t, func(dir string) { + db, err := create(dir, &pebble.Options{}) + require.NoError(t, err) + defer func() { + assert.NoError(t, db.Close()) + }() + f(db) + }) +} + func Concurrently(n int, f func(int)) { var wg sync.WaitGroup for i := 0; i < n; i++ {