Skip to content

Commit

Permalink
core: improve conflict records storage scheme
Browse files Browse the repository at this point in the history
Implement the neo-project/neo#2907 (comment)
and port a part of neo-project/neo#2913.

Signed-off-by: Anna Shaleva <[email protected]>
  • Loading branch information
AnnaShaleva committed Oct 10, 2023
1 parent 19c59c3 commit 88ff18f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 76 deletions.
14 changes: 8 additions & 6 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2498,7 +2498,7 @@ func (bc *Blockchain) verifyAndPoolTx(t *transaction.Transaction, pool *mempool.
return fmt.Errorf("%w: net fee is %v, need %v", ErrTxSmallNetworkFee, t.NetworkFee, needNetworkFee)
}
// check that current tx wasn't included in the conflicts attributes of some other transaction which is already in the chain
if err := bc.dao.HasTransaction(t.Hash(), t.Signers); err != nil {
if err := bc.dao.HasTransaction(t.Hash(), t.Signers, height, bc.config.MaxTraceableBlocks); err != nil {
switch {
case errors.Is(err, dao.ErrAlreadyExists):
return ErrAlreadyExists
Expand Down Expand Up @@ -2594,8 +2594,8 @@ func (bc *Blockchain) verifyTxAttributes(d *dao.Simple, tx *transaction.Transact
case transaction.ConflictsT:
conflicts := tx.Attributes[i].Value.(*transaction.Conflicts)
// Only fully-qualified dao.ErrAlreadyExists error bothers us here, thus, we
// can safely omit the payer argument to HasTransaction call to improve performance a bit.
if err := bc.dao.HasTransaction(conflicts.Hash, nil); errors.Is(err, dao.ErrAlreadyExists) {
// can safely omit the signers, current index and MTB arguments to HasTransaction call to improve performance a bit.
if err := bc.dao.HasTransaction(conflicts.Hash, nil, 0, 0); errors.Is(err, dao.ErrAlreadyExists) {
return fmt.Errorf("%w: conflicting transaction %s is already on chain", ErrInvalidAttribute, conflicts.Hash.StringLE())
}
case transaction.NotaryAssistedT:
Expand All @@ -2621,14 +2621,16 @@ func (bc *Blockchain) verifyTxAttributes(d *dao.Simple, tx *transaction.Transact
// was already done so we don't need to check basic things like size, input/output
// correctness, presence in blocks before the new one, etc.
func (bc *Blockchain) IsTxStillRelevant(t *transaction.Transaction, txpool *mempool.Pool, isPartialTx bool) bool {
var recheckWitness bool
var curheight = bc.BlockHeight()
var (
recheckWitness bool
curheight = bc.BlockHeight()
)

if t.ValidUntilBlock <= curheight {
return false
}
if txpool == nil {
if bc.dao.HasTransaction(t.Hash(), t.Signers) != nil {
if bc.dao.HasTransaction(t.Hash(), t.Signers, curheight, bc.config.MaxTraceableBlocks) != nil {
return false
}
} else if txpool.HasConflicts(t, bc) {
Expand Down
116 changes: 56 additions & 60 deletions pkg/core/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/encoding/bigint"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
Expand Down Expand Up @@ -610,14 +611,15 @@ func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction,
if err != nil {
return nil, 0, err
}
if len(b) < 6 {
if len(b) < 1 {
return nil, 0, errors.New("bad transaction bytes")
}
if b[0] != storage.ExecTransaction {
// It may be a block.
return nil, 0, storage.ErrKeyNotFound
}
if b[5] == transaction.DummyVersion {
if len(b) == 1+4 { // storage.ExecTransaction + index
// It's a conflict record stub.

Check warning on line 622 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L622

Added line #L622 was not covered by tests
return nil, 0, storage.ErrKeyNotFound
}
r := io.NewBinReaderFromBuf(b)
Expand Down Expand Up @@ -686,46 +688,48 @@ func (dao *Simple) StoreHeaderHashes(hashes []util.Uint256, height uint32) error
// or in the list of conflicting transactions. If non-zero signers are specified,
// then additional check against the conflicting transaction signers intersection
// is held. Do not omit signers in case if it's important to check the validity
// of a supposedly conflicting on-chain transaction.
func (dao *Simple) HasTransaction(hash util.Uint256, signers []transaction.Signer) error {
// of a supposedly conflicting on-chain transaction. The retrieved conflict isn't
// checked against the maxTraceableBlocks setting if signers are omitted.
// HasTransaction does not consider the case of block executable.
func (dao *Simple) HasTransaction(hash util.Uint256, signers []transaction.Signer, currentIndex uint32, maxTraceableBlocks uint32) error {
key := dao.makeExecutableKey(hash)
bytes, err := dao.Store.Get(key)
if err != nil {
return nil
}

if len(bytes) < 6 {
if len(bytes) < 5 { // (storage.ExecTransaction + index) for conflict record
return nil
}
if bytes[5] != transaction.DummyVersion {
return ErrAlreadyExists
if len(bytes) != 5 {
return ErrAlreadyExists // fully-qualified transaction
}
if len(signers) == 0 {
return ErrHasConflicts
}

sMap := make(map[util.Uint160]struct{}, len(signers))
if !isTraceableBlock(bytes[1:], currentIndex, maxTraceableBlocks) {
// The most fresh conflict record is already outdated.
return nil
}

for _, s := range signers {
sMap[s.Account] = struct{}{}
}
br := io.NewBinReaderFromBuf(bytes[6:])
for {
var u util.Uint160
u.DecodeBinary(br)
if br.Err != nil {
if errors.Is(br.Err, iocore.EOF) {
break
v, err := dao.Store.Get(append(key, s.Account.BytesBE()...))
if err == nil {
if isTraceableBlock(v[1:], currentIndex, maxTraceableBlocks) {
return ErrHasConflicts
}
return fmt.Errorf("failed to decode conflict record: %w", err)
}
if _, ok := sMap[u]; ok {
return ErrHasConflicts
}
}

return nil
}

func isTraceableBlock(indexBytes []byte, height, maxTraceableBlocks uint32) bool {
index := binary.LittleEndian.Uint32(indexBytes)
return index <= height && index+maxTraceableBlocks > height
}

// StoreAsBlock stores given block as DataBlock. It can reuse given buffer for
// the purpose of value serialization.
func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult) error {
Expand Down Expand Up @@ -768,7 +772,30 @@ func (dao *Simple) DeleteBlock(h util.Uint256) error {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash
copy(key[1:], hash.BytesBE())
dao.Store.Delete(key)

v, err := dao.Store.Get(key)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err)
}
index := binary.LittleEndian.Uint32(v[1:])
// We can check for `<=` here, but use equality comparison to be more precise
// and do not touch earlier conflict records (if any). Their removal must be triggered
// by the caller code.
if index == b.Index {
dao.Store.Delete(key)
}

Check warning on line 786 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L775-L786

Added lines #L775 - L786 were not covered by tests

for _, s := range tx.Signers {
sKey := append(key, s.Account.BytesBE()...)
v, err := dao.Store.Get(sKey)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err)
}
index = binary.LittleEndian.Uint32(v[1:])
if index == b.Index {
dao.Store.Delete(sKey)
}

Check warning on line 797 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L788-L797

Added lines #L788 - L797 were not covered by tests
}
}
}

Expand Down Expand Up @@ -826,51 +853,20 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
if buf.Err != nil {
return buf.Err
}
dao.Store.Put(key, buf.Bytes())
val := buf.Bytes()
dao.Store.Put(key, val)

var (
valuePrefix []byte
newSigners []byte
)
val = val[:5] // storage.ExecTransaction (1 byte) + index (4 bytes)
attrs := tx.GetAttributes(transaction.ConflictsT)
for _, attr := range attrs {
// Conflict record stub.
hash := attr.Value.(*transaction.Conflicts).Hash
copy(key[1:], hash.BytesBE())

old, err := dao.Store.Get(key)
if err != nil && !errors.Is(err, storage.ErrKeyNotFound) {
return fmt.Errorf("failed to retrieve previous conflict record for %s: %w", hash.StringLE(), err)
}
if err == nil {
if len(old) <= 6 { // storage.ExecTransaction + U32LE index + transaction.DummyVersion
return fmt.Errorf("invalid conflict record format of length %d", len(old))
}
}
buf.Reset()
buf.WriteBytes(old)
if len(old) == 0 {
if len(valuePrefix) != 0 {
buf.WriteBytes(valuePrefix)
} else {
buf.WriteB(storage.ExecTransaction)
buf.WriteU32LE(index)
buf.WriteB(transaction.DummyVersion)
}
}
newSignersOffset := buf.Len()
if len(newSigners) == 0 {
for _, s := range tx.Signers {
s.Account.EncodeBinary(buf.BinWriter)
}
} else {
buf.WriteBytes(newSigners)
}
val := buf.Bytes()
dao.Store.Put(key, val)

if len(attrs) > 1 && len(valuePrefix) == 0 {
valuePrefix = slice.Copy(val[:6])
newSigners = slice.Copy(val[newSignersOffset:])
// Conflicting signers.
for _, s := range tx.Signers {
dao.Store.Put(append(key, s.Account.BytesBE()...), val)
}
}
return nil
Expand Down
25 changes: 15 additions & 10 deletions pkg/core/dao/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestStoreAsTransaction(t *testing.T) {
}
err := dao.StoreAsTransaction(tx, 0, aer)
require.NoError(t, err)
err = dao.HasTransaction(hash, nil)
err = dao.HasTransaction(hash, nil, 0, 0)
require.ErrorIs(t, err, ErrAlreadyExists)
gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All)
require.NoError(t, err)
Expand Down Expand Up @@ -229,7 +229,8 @@ func TestStoreAsTransaction(t *testing.T) {
Stack: []stackitem.Item{},
},
}
err := dao.StoreAsTransaction(tx1, 0, aer1)
const blockIndex = 5
err := dao.StoreAsTransaction(tx1, blockIndex, aer1)
require.NoError(t, err)
aer2 := &state.AppExecResult{
Container: hash2,
Expand All @@ -239,31 +240,35 @@ func TestStoreAsTransaction(t *testing.T) {
Stack: []stackitem.Item{},
},
}
err = dao.StoreAsTransaction(tx2, 0, aer2)
err = dao.StoreAsTransaction(tx2, blockIndex, aer2)
require.NoError(t, err)
err = dao.HasTransaction(hash1, nil)
err = dao.HasTransaction(hash1, nil, 0, 0)
require.ErrorIs(t, err, ErrAlreadyExists)
err = dao.HasTransaction(hash2, nil)
err = dao.HasTransaction(hash2, nil, 0, 0)
require.ErrorIs(t, err, ErrAlreadyExists)

// Conflicts: unimportant payer.
err = dao.HasTransaction(conflictsH, nil)
err = dao.HasTransaction(conflictsH, nil, 0, 0)
require.ErrorIs(t, err, ErrHasConflicts)

// Conflicts: payer is important, conflict isn't malicious, test signer #1.
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer1}})
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer1}}, blockIndex+1, 5)
require.ErrorIs(t, err, ErrHasConflicts)

// Conflicts: payer is important, conflict isn't malicious, test signer #2.
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer2}})
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer2}}, blockIndex+1, 5)
require.ErrorIs(t, err, ErrHasConflicts)

// Conflicts: payer is important, conflict isn't malicious, test signer #3.
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer3}})
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer3}}, blockIndex+1, 5)
require.ErrorIs(t, err, ErrHasConflicts)

// Conflicts: payer is important, conflict isn't malicious, but the conflict is far away than MTB.
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signer3}}, blockIndex+10, 5)
require.NoError(t, err)

// Conflicts: payer is important, conflict is malicious.
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signerMalicious}})
err = dao.HasTransaction(conflictsH, []transaction.Signer{{Account: signerMalicious}}, blockIndex+1, 5)
require.NoError(t, err)

gotAppExecResult, err := dao.GetAppExecResults(hash1, trigger.All)
Expand Down

0 comments on commit 88ff18f

Please sign in to comment.