Skip to content

Commit

Permalink
Merge pull request #622 from onflow/mpeter/sort-flow-events-by-event-…
Browse files Browse the repository at this point in the history
…index

Sort Flow EVM events received from Event Streaming API by their `TransactionIndex` & `EventIndex` fields in ascending order
  • Loading branch information
m-Peter authored Oct 25, 2024
2 parents 5774ba3 + 3d4533d commit e95b413
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 9 deletions.
20 changes: 17 additions & 3 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package models

import (
"fmt"
"strings"
"sort"

"github.com/onflow/cadence"
"github.com/onflow/flow-go-sdk"
Expand All @@ -12,20 +12,25 @@ import (
errs "github.com/onflow/flow-evm-gateway/models/errors"
)

const (
BlockExecutedQualifiedIdentifier = string(events.EventTypeBlockExecuted)
TransactionExecutedQualifiedIdentifier = string(events.EventTypeTransactionExecuted)
)

// isBlockExecutedEvent checks whether the given event contains block executed data.
func isBlockExecutedEvent(event cadence.Event) bool {
if event.EventType == nil {
return false
}
return strings.Contains(event.EventType.ID(), string(events.EventTypeBlockExecuted))
return event.EventType.QualifiedIdentifier == BlockExecutedQualifiedIdentifier
}

// isTransactionExecutedEvent checks whether the given event contains transaction executed data.
func isTransactionExecutedEvent(event cadence.Event) bool {
if event.EventType == nil {
return false
}
return strings.Contains(event.EventType.ID(), string(events.EventTypeTransactionExecuted))
return event.EventType.QualifiedIdentifier == TransactionExecutedQualifiedIdentifier
}

// CadenceEvents contains Flow emitted events containing one or zero evm block executed event,
Expand All @@ -39,6 +44,15 @@ type CadenceEvents struct {

// NewCadenceEvents decodes the events into evm types.
func NewCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
// first we sort all the events in the block, by their TransactionIndex,
// and then we also sort events in the same transaction, by their EventIndex.
sort.Slice(events.Events, func(i, j int) bool {
if events.Events[i].TransactionIndex != events.Events[j].TransactionIndex {
return events.Events[i].TransactionIndex < events.Events[j].TransactionIndex
}
return events.Events[i].EventIndex < events.Events[j].EventIndex
})

e, err := decodeCadenceEvents(events)
if err != nil {
return nil, err
Expand Down
97 changes: 91 additions & 6 deletions models/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestCadenceEvents_Block(t *testing.T) {

// generate txs
for i := 0; i < txCount; i++ {
tx, _, txEvent, err := newTransaction(uint64(i))
tx, _, txEvent, err := newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = tx.Hash()
events = append(events, txEvent)
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestCadenceEvents_Block(t *testing.T) {
})

t.Run("block with more transaction hashes", func(t *testing.T) {
tx, _, _, err := newTransaction(1)
tx, _, _, err := newTransaction(1, 0)
require.NoError(t, err)

// generate single block
Expand All @@ -153,6 +153,78 @@ func TestCadenceEvents_Block(t *testing.T) {
"block 1 references missing transaction/s",
)
})

t.Run("EVM events are ordered by Flow TransactionIndex & EventIndex", func(t *testing.T) {
txCount := 3
blockEvents := flow.BlockEvents{
BlockID: flow.Identifier{0x1},
Height: 1,
}

// tx1 and tx2 are EVM transactions executed on a single Flow transaction.
tx1, _, txEvent1, err := newTransaction(0, 0)
require.NoError(t, err)
txEvent1.TransactionIndex = 0
txEvent1.EventIndex = 2

tx2, _, txEvent2, err := newTransaction(1, 1)
require.NoError(t, err)
txEvent2.TransactionIndex = 0
txEvent2.EventIndex = 5

// tx3 is a Flow transaction with a single EVM transaction on EventIndex=1
tx3, _, txEvent3, err := newTransaction(2, 0)
require.NoError(t, err)
txEvent3.TransactionIndex = 2
txEvent3.EventIndex = 1

// needed for computing the `TransactionHashRoot` field on
// EVM.BlockExecuted event payload. the order is sensitive.
hashes = []gethCommon.Hash{
tx1.Hash(),
tx2.Hash(),
tx3.Hash(),
}

// add the tx events in a shuffled order
blockEvents.Events = []flow.Event{
txEvent3,
txEvent1,
txEvent2,
}

// generate single block
_, blockEvent, err := newBlock(1, hashes)
require.NoError(t, err)
blockEvent.TransactionIndex = 4
blockEvent.EventIndex = 0
blockEvents.Events = append(blockEvents.Events, blockEvent)

// parse the EventStreaming API response
cdcEvents, err := NewCadenceEvents(blockEvents)
require.NoError(t, err)

// assert that Flow events are sorted by their TransactionIndex and EventIndex fields
assert.Equal(
t,
[]flow.Event{
txEvent1,
txEvent2,
txEvent3,
blockEvent,
},
cdcEvents.events.Events,
)

// assert that EVM transactions & receipts are sorted by their
// TransactionIndex field
for i := 0; i < txCount; i++ {
tx := cdcEvents.transactions[i]
receipt := cdcEvents.receipts[i]
assert.Equal(t, tx.Hash(), receipt.TxHash)
assert.Equal(t, uint(i), receipt.TransactionIndex)
}
})
}

func Test_EventDecoding(t *testing.T) {
Expand All @@ -171,7 +243,7 @@ func Test_EventDecoding(t *testing.T) {
// generate txs
for i := 0; i < txCount; i++ {
var err error
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i))
txs[i], results[i], txEvents[i], err = newTransaction(uint64(i), uint16(i))
require.NoError(t, err)
hashes[i] = txs[i].Hash()
blockEvents.Events = append(blockEvents.Events, txEvents[i])
Expand Down Expand Up @@ -224,12 +296,22 @@ func Test_EventDecoding(t *testing.T) {
}
}

func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(nonce, gethCommon.HexToAddress("0x1"), big.NewInt(10), uint64(100), big.NewInt(123), nil)
func newTransaction(nonce uint64, txIndex uint16) (Transaction, *types.Result, flow.Event, error) {
tx := gethTypes.NewTransaction(
nonce,
gethCommon.HexToAddress("0x1"),
big.NewInt(10),
uint64(100),
big.NewInt(123),
nil,
)
res := &types.Result{
ValidationError: nil,
VMError: nil,
TxType: tx.Type(),
GasConsumed: 1,
CumulativeGasUsed: 1,
GasRefund: 0,
DeployedContractAddress: &types.Address{0x5, 0x6, 0x7},
ReturnedData: []byte{0x55},
Logs: []*gethTypes.Log{{
Expand All @@ -239,7 +321,10 @@ func newTransaction(nonce uint64) (Transaction, *types.Result, flow.Event, error
Address: gethCommon.Address{0x3, 0x5},
Topics: []gethCommon.Hash{{0x2, 0x66}, {0x7, 0x1}},
}},
TxHash: tx.Hash(),
TxHash: tx.Hash(),
Index: txIndex,
PrecompiledCalls: []byte{},
StateChangeCommitment: []byte{},
}

txEncoded, err := tx.MarshalBinary()
Expand Down

0 comments on commit e95b413

Please sign in to comment.