Skip to content

Commit

Permalink
[wip] gets destination backup working for KVStore
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanlott committed Jul 26, 2023
1 parent f6ef7e9 commit 8b4844c
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 19 deletions.
4 changes: 4 additions & 0 deletions persistence/blockstore/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (bs *blockStore) Stop() error {
return bs.kv.Stop()
}

func (bs *blockStore) Backup(path string) error {
return bs.kv.Backup(path)
}

func (bs *blockStore) Delete(key []byte) error { return bs.kv.Delete(key) }
func (bs *blockStore) Exists(key []byte) (bool, error) { return bs.kv.Exists(key) }
func (bs *blockStore) GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error) {
Expand Down
30 changes: 30 additions & 0 deletions persistence/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ package kvstore

import (
"errors"
"fmt"
"log"
"os"
"path/filepath"

badger "github.com/dgraph-io/badger/v3"
"github.com/pokt-network/smt"
Expand All @@ -22,6 +25,8 @@ type KVStore interface {
GetAll(prefixKey []byte, descending bool) (keys, values [][]byte, err error)
Exists(key []byte) (bool, error)
ClearAll() error

Backup(filepath string) error
}

const (
Expand Down Expand Up @@ -141,6 +146,31 @@ func (store *badgerKVStore) Stop() error {
return store.db.Close()
}

// Backup creates a backup for the badgerDB at the provided path.
// It creates a file
func (store *badgerKVStore) Backup(backupPath string) error {
// create backup directory if it doesn't exist
if err := os.MkdirAll(filepath.Dir(backupPath), os.ModePerm); err != nil {
return fmt.Errorf("failed to create backup directory: %v", err)
}

// create the backup file itself
backupFile, err := os.Create(backupPath)
if err != nil {
return fmt.Errorf("failed to create backup file: %v", err)
}
defer backupFile.Close()

// dump the database to the backup file
// CONSIDER: ??? err := store.db.NewStream().Backup(backupFile, 0) // What's the difference here?
_, err = store.db.Backup(backupFile, 0)
if err != nil {
return err
}

return nil
}

// PrefixEndBytes returns the end byteslice for a noninclusive range
// that would include all byte slices for which the input is the prefix
func prefixEndBytes(prefix []byte) []byte {
Expand Down
59 changes: 59 additions & 0 deletions persistence/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package kvstore

import (
"encoding/hex"
"io"
"io/ioutil"

Check failure on line 6 in persistence/kvstore/kvstore_test.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package io or package os, and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
"os"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -330,10 +334,65 @@ func TestKVStore_ClearAll(t *testing.T) {
require.NoError(t, err)
}

func TestKVStore_Backup(t *testing.T) {
t.Run("should backup an in-memory database", func(t *testing.T) {
store := NewMemKVStore()
require.NotNil(t, store)

tmpdir := t.TempDir()
path := filepath.Join(tmpdir, "TestKVStore_Backup_InMemory.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, tmpdir)
require.NoError(t, err)
require.False(t, empty)

// assert on individual files
files, err := ioutil.ReadDir(tmpdir)
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
t.Run("should backup an on-disk store database", func(t *testing.T) {
tmpdir := t.TempDir()
kvpath := filepath.Join(tmpdir, "TestKVStore_Backup_OnDisk_Source.bak")
store, err := NewKVStore(kvpath)
require.NoError(t, err)
require.NotNil(t, store)

backupDir := t.TempDir()
path := filepath.Join(backupDir, "TestKVStore_Backup_OnDisk_Destination.bak")
require.NoError(t, store.Backup(path))

empty, err := isEmpty(t, backupDir)
require.NoError(t, err)
require.False(t, empty)

// assert on individual files
files, err := ioutil.ReadDir(backupDir)
require.NoError(t, err)
require.Equal(t, len(files), 1)
})
}

func setupStore(t *testing.T, store KVStore) {
t.Helper()
err := store.Set([]byte("foo"), []byte("bar"))
require.NoError(t, err)
err = store.Set([]byte("baz"), []byte("bin"))
require.NoError(t, err)
}

func isEmpty(t *testing.T, dir string) (bool, error) {
t.Helper()
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1)
if err == io.EOF {
return true, nil
}
return false, err
}
2 changes: 1 addition & 1 deletion persistence/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (*persistenceModule) Create(bus modules.Bus, options ...modules.ModuleOptio
trees.WithTreeStoreDirectory(persistenceCfg.TreesStoreDir),
trees.WithLogger(m.logger))
if err != nil {
return nil, fmt.Errorf("failed to create TreeStoreModule: %w", err)
return nil, fmt.Errorf("failed to create %s: %w", modules.TreeStoreSubmoduleName, err)
}

m.config = persistenceCfg
Expand Down
69 changes: 67 additions & 2 deletions persistence/trees/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package trees
import (
"encoding/hex"
"fmt"
"io"
"os"
"testing"

"github.com/pokt-network/pocket/logger"
Expand Down Expand Up @@ -148,8 +150,6 @@ func TestTreeStore_SaveAndLoad(t *testing.T) {
logger: logger.Global.CreateLoggerForModule(modules.TreeStoreSubmoduleName),
treeStoreDir: tmpDir,
}
// TODO IN THIS COMMIT do we need to start this treestore?
// require.NoError(t, ts2.Start())

// Load sets a tree store to the provided worldstate
err = ts2.Load(w)
Expand All @@ -160,3 +160,68 @@ func TestTreeStore_SaveAndLoad(t *testing.T) {
// Assert that hash is unchanged from save and load
require.Equal(t, hash1, hash2)
}

func TestTreeStore_SaveBackup(t *testing.T) {
ts := newTestTreeStore(t)
tmpdir := t.TempDir()
// assert that the directory is empty before backup
ok, err := isEmpty(tmpdir)
require.NoError(t, err)
require.True(t, ok)

// Trigger a backup
require.NoError(t, ts.Backup(tmpdir))

// assert that the directory is not empty after Backup has returned
ok, err = isEmpty(tmpdir)
require.NoError(t, err)
require.False(t, ok)
}

// creates a new tree store with a tmp directory for nodestore persistence
// and then starts the tree store and returns its pointer.
func newTestTreeStore(t *testing.T) *treeStore {
ctrl := gomock.NewController(t)
tmpDir := t.TempDir()

mockTxIndexer := mock_types.NewMockTxIndexer(ctrl)
mockBus := mockModules.NewMockBus(ctrl)
mockPersistenceMod := mockModules.NewMockPersistenceModule(ctrl)

mockBus.EXPECT().GetPersistenceModule().AnyTimes().Return(mockPersistenceMod)
mockPersistenceMod.EXPECT().GetTxIndexer().AnyTimes().Return(mockTxIndexer)

ts := &treeStore{
logger: &zerolog.Logger{},
treeStoreDir: tmpDir,
}
require.NoError(t, ts.Start())
require.NotNil(t, ts.rootTree.tree)

for _, treeName := range stateTreeNames {
err := ts.merkleTrees[treeName].tree.Update([]byte("foo"), []byte("bar"))
require.NoError(t, err)
}

err := ts.Commit()
require.NoError(t, err)

hash1 := ts.getStateHash()
require.NotEmpty(t, hash1)

return ts
}

func isEmpty(dir string) (bool, error) {
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()

_, err = f.Readdirnames(1) // Or f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err // Either not empty or error, suits both cases
}
7 changes: 7 additions & 0 deletions persistence/trees/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (

var _ modules.TreeStoreModule = &treeStore{}

// Create returns a TreeStoreSubmodule that has been setup with the provided TreeStoreOptions, started,
// and then registered to the bus.
func (*treeStore) Create(bus modules.Bus, options ...modules.TreeStoreOption) (modules.TreeStoreModule, error) {
m := &treeStore{}

for _, option := range options {
option(m)
}

if err := m.Start(); err != nil {
return nil, fmt.Errorf("failed to start %s: %w", modules.TreeStoreSubmoduleName, err)
}

bus.RegisterModule(m)

return m, nil
Expand Down Expand Up @@ -71,6 +77,7 @@ func (t *treeStore) Stop() error {

func (t *treeStore) GetModuleName() string { return modules.TreeStoreSubmoduleName }

// setupTrees is called by Start and it loads the treestore at the given directory
func (t *treeStore) setupTrees() error {
if t.treeStoreDir == ":memory:" {
return t.setupInMemory()
Expand Down
17 changes: 7 additions & 10 deletions persistence/trees/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ func TestTreeStore_Create(t *testing.T) {
treemod, err := trees.Create(mockBus, trees.WithTreeStoreDirectory(":memory:"))
assert.NoError(t, err)

require.NoError(t, treemod.Start())
// Create should setup a value for each tree
for _, v := range stateTreeNames {
root, ns := treemod.GetTree(v)
require.NotEmpty(t, root)
require.NotEmpty(t, ns)
}

got := treemod.GetBus()
assert.Equal(t, got, mockBus)
Expand All @@ -68,21 +73,13 @@ func TestTreeStore_StartAndStop(t *testing.T) {
mockRuntimeMgr := mockModules.NewMockRuntimeMgr(ctrl)
mockBus := createMockBus(t, mockRuntimeMgr)

// Create returns a started TreeStoreSubmodule
treemod, err := trees.Create(
mockBus,
trees.WithTreeStoreDirectory(":memory:"),
trees.WithLogger(&zerolog.Logger{}))
assert.NoError(t, err)

// GetTree should return nil for each tree if Start has not been called
for _, v := range stateTreeNames {
root, ns := treemod.GetTree(v)
require.Empty(t, root)
require.Empty(t, ns)
}
// Should start without error
require.NoError(t, treemod.Start())

// Should stop without error
require.NoError(t, treemod.Stop())

Expand Down
37 changes: 35 additions & 2 deletions persistence/trees/trees.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash"
"log"
"path/filepath"

"github.com/jackc/pgx/v5"
"github.com/pokt-network/pocket/persistence/indexer"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"github.com/pokt-network/smt"
"go.uber.org/multierr"
)

// smtTreeHasher sets the hasher used by the tree SMT trees
Expand Down Expand Up @@ -363,16 +365,47 @@ func (t *treeStore) save() (*worldstate, error) {
w := &worldstate{
TreeStoreDir: t.treeStoreDir,
MerkleRoots: make(map[string][]byte),
MerkleTrees: make(map[string]*stateTree),
RootHash: t.rootTree.tree.Root(),
RootTree: t.rootTree,
}

for treeName, tree := range t.merkleTrees {
w.MerkleRoots[treeName] = tree.tree.Root()
for treeName := range t.merkleTrees {
root, nodeStore := t.GetTree(treeName)
tree := smt.ImportSparseMerkleTree(nodeStore, smtTreeHasher, root)
w.MerkleTrees[treeName] = &stateTree{
name: treeName,
tree: tree,
nodeStore: nodeStore,
}
w.MerkleRoots[treeName] = tree.Root()
}

root, nodeStore := t.GetTree(RootTreeName)
tree := smt.ImportSparseMerkleTree(nodeStore, smtTreeHasher, root)
w.RootTree = &stateTree{
name: RootTreeName,
tree: tree,
nodeStore: nodeStore,
}

return w, nil
}

// Backup creates a new backup of each tree in the tree store to the provided directory.
// Each tree is backed up in an eponymous file in the provided backupDir.
func (t *treeStore) Backup(backupDir string) error {
errs := []error{}
for _, st := range t.merkleTrees {
treePath := filepath.Join(backupDir, st.name)
if err := st.nodeStore.Backup(treePath); err != nil {
t.logger.Err(err).Msgf("failed to backup %s tree: %+v", st.name, err)
errs = append(errs, err)
}
}
return multierr.Combine(errs...)
}

////////////////////////
// Actor Tree Helpers //
////////////////////////
Expand Down
4 changes: 0 additions & 4 deletions persistence/trees/trees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func TestTreeStore_Update(t *testing.T) {
pmod := newTestPersistenceModule(t, dbUrl)
context := newTestPostgresContext(t, 0, pmod)

require.NoError(t, context.SetSavePoint())

hash1, err := context.ComputeStateHash()
require.NoError(t, err)
require.NotEmpty(t, hash1)
Expand All @@ -59,8 +57,6 @@ func TestTreeStore_Update(t *testing.T) {
_, err = createAndInsertDefaultTestApp(t, context)
require.NoError(t, err)

require.NoError(t, context.SetSavePoint())

hash2, err := context.ComputeStateHash()
require.NoError(t, err)
require.NotEmpty(t, hash2)
Expand Down

0 comments on commit 8b4844c

Please sign in to comment.