Skip to content

Commit

Permalink
treestore tests use testing.T instead of err returns
Browse files Browse the repository at this point in the history
  • Loading branch information
dylanlott committed Jul 17, 2023
1 parent 8839eaa commit 15d5c8a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 74 deletions.
19 changes: 11 additions & 8 deletions persistence/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,22 @@ type PostgresContext struct {
networkId string
}

func (p *PostgresContext) NewSavePoint(bytes []byte) error {
p.logger.Info().Bool("TODO", true).Msg("NewSavePoint not implemented")
// NewSavePoint generates a new Savepoint for this context.
func (p *PostgresContext) NewSavePoint() error {
if err := p.stateTrees.Savepoint(); err != nil {
return err
}
return nil
}

// TECHDEBT(#327): Guarantee atomicity betweens `prepareBlock`, `insertBlock` and `storeBlock` for save points & rollbacks.
func (p *PostgresContext) RollbackToSavePoint(bytes []byte) error {
p.logger.Info().Bool("TODO", true).Msg("RollbackToSavePoint not fully implemented")
return p.tx.Rollback(context.TODO())
// RollbackToSavepoint triggers a rollback for the current pgx transaction and the underylying submodule stores.
func (p *PostgresContext) RollbackToSavePoint() error {
ctx := context.TODO()
err := p.tx.Rollback(ctx)
p.stateTrees.Rollback()
return err
}

// IMPROVE(#361): Guarantee the integrity of the state
// Full details in the thread from the PR review: https://github.com/pokt-network/pocket/pull/285#discussion_r1018471719
func (p *PostgresContext) ComputeStateHash() (string, error) {
stateHash, err := p.stateTrees.Update(p.tx, uint64(p.Height))
Expand All @@ -58,7 +62,6 @@ func (p *PostgresContext) ComputeStateHash() (string, error) {
return p.stateHash, nil
}

// TECHDEBT(#327): Make sure these operations are atomic
func (p *PostgresContext) Commit(proposerAddr, quorumCert []byte) error {
p.logger.Info().Int64("height", p.Height).Msg("About to commit block & context")

Expand Down
2 changes: 2 additions & 0 deletions persistence/trees/atomic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func TestTreeStore_AtomicUpdates(t *testing.T) {
err := ts.merkleTrees[treeName].tree.Update([]byte("foo"), []byte("bar"))
require.NoError(t, err)
}
err = ts.Commit()
require.NoError(t, err)

hash1 := ts.getStateHash()
require.NotEmpty(t, hash1)
Expand Down
35 changes: 12 additions & 23 deletions persistence/trees/trees_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
Expand Down Expand Up @@ -100,24 +101,16 @@ func newTestPersistenceModule(t *testing.T, databaseUrl string) modules.Persiste
return persistenceMod.(modules.PersistenceModule)
}
func createAndInsertDefaultTestApp(t *testing.T, db *persistence.PostgresContext) (*coreTypes.Actor, error) {
app, err := newTestApp(t)
if err != nil {
return nil, err
}
// TODO(andrew): Avoid the use of `log.Fatal(fmt.Sprintf`
// TODO(andrew): Use `require.NoError` instead of `log.Fatal` in tests`
app := newTestApp(t)

addrBz, err := hex.DecodeString(app.Address)
if err != nil {
t.Errorf("an error occurred converting address to bytes %s", app.Address)
}
require.NoError(t, err)

pubKeyBz, err := hex.DecodeString(app.PublicKey)
if err != nil {
t.Errorf("an error occurred converting pubKey to bytes %s", app.PublicKey)
}
require.NoError(t, err)

outputBz, err := hex.DecodeString(app.Output)
if err != nil {
t.Errorf("an error occurred converting output to bytes %s", app.Output)
}
require.NoError(t, err)
return app, db.InsertApp(
addrBz,
pubKeyBz,
Expand All @@ -130,16 +123,12 @@ func createAndInsertDefaultTestApp(t *testing.T, db *persistence.PostgresContext
DefaultUnstakingHeight)
}

func newTestApp(t *testing.T) (*coreTypes.Actor, error) {
func newTestApp(t *testing.T) *coreTypes.Actor {
operatorKey, err := crypto.GeneratePublicKey()
if err != nil {
t.Errorf("failed to generate test app: %v", err)
}
require.NoError(t, err)

outputAddr, err := crypto.GenerateAddress()
if err != nil {
t.Errorf("failed to generate test app: %v", err)
}
require.NoError(t, err)

return &coreTypes.Actor{
Address: hex.EncodeToString(operatorKey.Address()),
Expand All @@ -149,7 +138,7 @@ func newTestApp(t *testing.T) (*coreTypes.Actor, error) {
PausedHeight: DefaultPauseHeight,
UnstakingHeight: DefaultUnstakingHeight,
Output: hex.EncodeToString(outputAddr),
}, nil
}
}

func NewTestPostgresContext(t testing.TB, height int64, testPersistenceMod modules.PersistenceModule) *persistence.PostgresContext {
Expand Down
4 changes: 2 additions & 2 deletions shared/modules/persistence_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type PersistenceRWContext interface {
// PersistenceWriteContext has no use-case independent of `PersistenceRWContext`, but is a useful abstraction
type PersistenceWriteContext interface {
// Context Operations
NewSavePoint([]byte) error
RollbackToSavePoint([]byte) error
NewSavePoint() error
RollbackToSavePoint() error
Release()

// Commits (and releases) the current context to disk (i.e. finality).
Expand Down
3 changes: 2 additions & 1 deletion shared/modules/treestore_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type TreeStoreModule interface {

AtomicStore

// Update returns the new state hash for a given height.
// Update returns the computed state hash for a given height.
// * Height is passed through to the Update function and is used to query the TxIndexer for transactions
// to update into the merkle tree set
// * Passing a higher height will cause a change but repeatedly calling the same or a lower height will
// not incur a change.
// * By nature of it taking a pgx transaction at runtime, Update inherits the pgx transaction's read view of the
// database.
// * Commit must be called after Update to persist any changes it made to disk.
Update(pgtx pgx.Tx, height uint64) (string, error)
// DebugClearAll completely clears the state of the trees. For debugging purposes only.
DebugClearAll() error
Expand Down
33 changes: 9 additions & 24 deletions utility/unit_of_work/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,18 @@ func (uow *baseUtilityUnitOfWork) prevBlockByzantineValidators() ([][]byte, erro
return nil, nil
}

// TODO: This has not been tested or investigated in detail
func (uow *baseUtilityUnitOfWork) revertLastSavePoint() coreTypes.Error {
// TODO(@deblasis): Implement this
// if len(u.savePointsSet) == 0 {
// return coreTypes.ErrEmptySavePoints()
// }
// var key []byte
// popIndex := len(u.savePointsList) - 1
// key, u.savePointsList = u.savePointsList[popIndex], u.savePointsList[:popIndex]
// delete(u.savePointsSet, hex.EncodeToString(key))
// if err := u.store.RollbackToSavePoint(key); err != nil {
// return coreTypes.ErrRollbackSavePoint(err)
// }
if err := uow.persistenceRWContext.RollbackToSavePoint(); err != nil {
uow.logger.Err(err).Msgf("failed to rollback to savepoint at height %d", uow.height)
return coreTypes.ErrRollbackSavePoint(err)
}
return nil
}

//nolint:unused // TODO: This has not been tested or investigated in detail
func (uow *baseUtilityUnitOfWork) newSavePoint(txHashBz []byte) coreTypes.Error {
// TODO(@deblasis): Implement this
// if err := u.store.NewSavePoint(txHashBz); err != nil {
// return coreTypes.ErrNewSavePoint(err)
// }
// txHash := hex.EncodeToString(txHashBz)
// if _, exists := u.savePointsSet[txHash]; exists {
// return coreTypes.ErrDuplicateSavePoint()
// }
// u.savePointsList = append(u.savePointsList, txHashBz)
// u.savePointsSet[txHash] = struct{}{}
func (uow *baseUtilityUnitOfWork) newSavePoint() coreTypes.Error {
if err := uow.persistenceRWContext.NewSavePoint(); err != nil {
uow.logger.Err(err).Msgf("failed to create new savepoint at height %d", uow.height)
return coreTypes.ErrNewSavePoint(err)
}
return nil
}
35 changes: 19 additions & 16 deletions utility/unit_of_work/module.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package unit_of_work

import (
"fmt"

coreTypes "github.com/pokt-network/pocket/shared/core/types"
"github.com/pokt-network/pocket/shared/mempool"
"github.com/pokt-network/pocket/shared/modules"
"github.com/pokt-network/pocket/shared/modules/base_modules"
"go.uber.org/multierr"
)

const (
Expand Down Expand Up @@ -48,6 +47,7 @@ func (uow *baseUtilityUnitOfWork) SetProposalBlock(blockHash string, proposerAdd
return nil
}

// ApplyBlock atomically applies a block to the persistence layer for a given height.
func (uow *baseUtilityUnitOfWork) ApplyBlock() error {
log := uow.logger.With().Fields(map[string]interface{}{
"source": "ApplyBlock",
Expand All @@ -58,51 +58,55 @@ func (uow *baseUtilityUnitOfWork) ApplyBlock() error {
return coreTypes.ErrProposalBlockNotSet()
}

if err := uow.newSavePoint(); err != nil {
return err
}

// begin block lifecycle phase
log.Debug().Msg("calling beginBlock")
if err := uow.beginBlock(); err != nil {
return err
}

// processProposalBlockTransactions indexes the transactions into the TxIndexer.
// if it fails, it triggers a rollback to undo the changes that processProposalBlockTransactions
// could have caused.
log.Debug().Msg("processing transactions from proposal block")
txMempool := uow.GetBus().GetUtilityModule().GetMempool()
if err := uow.processProposalBlockTransactions(txMempool); err != nil {
return err
rollErr := uow.revertLastSavePoint()
return multierr.Combine(rollErr, err)
}

// end block lifecycle phase
// end block lifecycle phase calls endBlock and reverts to the last known savepoint if it encounters any errors
log.Debug().Msg("calling endBlock")
if err := uow.endBlock(uow.proposalProposerAddr); err != nil {
return err
rollErr := uow.revertLastSavePoint()
return multierr.Combine(rollErr, err)
}

// return the app hash (consensus module will get the validator set directly)
log.Debug().Msg("computing state hash")
stateHash, err := uow.persistenceRWContext.ComputeStateHash()
if err != nil {
log.Fatal().Err(err).Bool("TODO", true).Msg("Updating the app hash failed. TODO: Look into roll-backing the entire commit...")
return coreTypes.ErrAppHash(err)
rollErr := uow.persistenceRWContext.RollbackToSavePoint()
return coreTypes.ErrAppHash(multierr.Append(err, rollErr))
}

// IMPROVE(#655): this acts as a feature flag to allow tests to ignore the check if needed, ideally the tests should have a way to determine
// the hash and set it into the proposal block it's currently hard to do because the state is different at every test run (non-determinism)
if uow.proposalStateHash != IgnoreProposalBlockCheckHash {
if uow.proposalStateHash != stateHash {
log.Fatal().Bool("TODO", true).
Str("proposalStateHash", uow.proposalStateHash).
Str("stateHash", stateHash).
Msg("State hash mismatch. TODO: Look into roll-backing the entire commit...")
return coreTypes.ErrAppHash(fmt.Errorf("state hash mismatch: expected %s from the proposal, got %s", uow.proposalStateHash, stateHash))
return uow.revertLastSavePoint()
}
}

log.Info().Str("state_hash", stateHash).Msgf("ApplyBlock succeeded!")
log.Info().Str("state_hash", stateHash).Msgf("🧱 ApplyBlock succeeded!")

uow.stateHash = stateHash

return nil
}

// TODO(@deblasis): change tracking here
func (uow *baseUtilityUnitOfWork) Commit(quorumCert []byte) error {
uow.logger.Debug().Msg("committing the rwPersistenceContext...")
if err := uow.persistenceRWContext.Commit(uow.proposalProposerAddr, quorumCert); err != nil {
Expand All @@ -112,7 +116,6 @@ func (uow *baseUtilityUnitOfWork) Commit(quorumCert []byte) error {
return nil
}

// TODO(@deblasis): change tracking reset here
func (uow *baseUtilityUnitOfWork) Release() error {
rwCtx := uow.persistenceRWContext
if rwCtx != nil {
Expand Down

0 comments on commit 15d5c8a

Please sign in to comment.