Skip to content

Commit

Permalink
Merge pull request #620 from onflow/mpeter/fix-streaming-new-block-he…
Browse files Browse the repository at this point in the history
…aders-response

Introduce dedicated `BlockHeader` type for usage in `newHeads` subscription endpoint
  • Loading branch information
m-Peter authored Oct 23, 2024
2 parents db833bc + 5e57a61 commit 6f99e01
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 19 deletions.
18 changes: 18 additions & 0 deletions api/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,24 @@ type Block struct {
BaseFeePerGas hexutil.Big `json:"baseFeePerGas"`
}

type BlockHeader struct {
Number hexutil.Uint64 `json:"number"`
Hash common.Hash `json:"hash"`
ParentHash common.Hash `json:"parentHash"`
Nonce types.BlockNonce `json:"nonce"`
Sha3Uncles common.Hash `json:"sha3Uncles"`
LogsBloom hexutil.Bytes `json:"logsBloom"`
TransactionsRoot common.Hash `json:"transactionsRoot"`
StateRoot common.Hash `json:"stateRoot"`
ReceiptsRoot common.Hash `json:"receiptsRoot"`
Miner common.Address `json:"miner"`
ExtraData hexutil.Bytes `json:"extraData"`
GasLimit hexutil.Uint64 `json:"gasLimit"`
GasUsed hexutil.Uint64 `json:"gasUsed"`
Timestamp hexutil.Uint64 `json:"timestamp"`
Difficulty hexutil.Uint64 `json:"difficulty"`
}

type SyncStatus struct {
StartingBlock hexutil.Uint64 `json:"startingBlock"`
CurrentBlock hexutil.Uint64 `json:"currentBlock"`
Expand Down
55 changes: 49 additions & 6 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"context"
"fmt"

evmTypes "github.com/onflow/flow-go/fvm/evm/types"
"github.com/onflow/go-ethereum/common/hexutil"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/logs"
"github.com/onflow/flow-evm-gateway/storage"
)

type StreamAPI struct {
logger zerolog.Logger
api *BlockChainAPI
config *config.Config
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
Expand All @@ -30,7 +32,6 @@ type StreamAPI struct {
func NewStreamAPI(
logger zerolog.Logger,
config *config.Config,
api *BlockChainAPI,
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
Expand All @@ -41,7 +42,6 @@ func NewStreamAPI(
return &StreamAPI{
logger: logger,
config: config,
api: api,
blocks: blocks,
transactions: transactions,
receipts: receipts,
Expand All @@ -59,12 +59,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
s.blocksPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error {
return func(block *models.Block) error {
response, err := s.api.prepareBlockResponse(block, false)
blockHeader, err := s.prepareBlockHeader(block)
if err != nil {
return fmt.Errorf("failed to get block response: %w", err)
return fmt.Errorf("failed to get block header response: %w", err)
}

return notifier.Notify(sub.ID, response)
return notifier.Notify(sub.ID, blockHeader)
}
},
)
Expand Down Expand Up @@ -121,6 +121,49 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

func (s *StreamAPI) prepareBlockHeader(
block *models.Block,
) (*BlockHeader, error) {
h, err := block.Hash()
if err != nil {
s.logger.Error().Err(err).Msg("failed to calculate hash for block by number")
return nil, errs.ErrInternal
}

blockHeader := &BlockHeader{
Number: hexutil.Uint64(block.Height),
Hash: h,
ParentHash: block.ParentBlockHash,
Nonce: gethTypes.BlockNonce{0x1},
Sha3Uncles: gethTypes.EmptyUncleHash,
LogsBloom: gethTypes.LogsBloom([]*gethTypes.Log{}),
TransactionsRoot: block.TransactionHashRoot,
ReceiptsRoot: block.ReceiptRoot,
Miner: evmTypes.CoinbaseAddress.ToCommon(),
GasLimit: hexutil.Uint64(blockGasLimit),
Timestamp: hexutil.Uint64(block.Timestamp),
}

txHashes := block.TransactionHashes
if len(txHashes) > 0 {
totalGasUsed := hexutil.Uint64(0)
logs := make([]*gethTypes.Log, 0)
for _, txHash := range txHashes {
txReceipt, err := s.receipts.GetByTransactionID(txHash)
if err != nil {
return nil, err
}
totalGasUsed += hexutil.Uint64(txReceipt.GasUsed)
logs = append(logs, txReceipt.Logs...)
}
blockHeader.GasUsed = totalGasUsed
// TODO(m-Peter): Consider if its worthwhile to move this in storage.
blockHeader.LogsBloom = gethTypes.LogsBloom(logs)
}

return blockHeader, nil
}

func newSubscription[T any](
ctx context.Context,
logger zerolog.Logger,
Expand Down
1 change: 0 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
streamAPI := api.NewStreamAPI(
b.logger,
b.config,
blockchainAPI,
b.storages.Blocks,
b.storages.Transactions,
b.storages.Receipts,
Expand Down
54 changes: 42 additions & 12 deletions tests/web3js/eth_streaming_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const conf = require('./config')
const helpers = require('./helpers')
const { assert } = require('chai')
const { Web3 } = require('web3')
const web3 = conf.web3

it('streaming of blocks, transactions, logs using filters', async () => {
// this is a failsafe if socket is kept open since test node process won't finish otherwise
Expand All @@ -24,21 +25,21 @@ it('streaming of blocks, transactions, logs using filters', async () => {
// wait for subscription for a bit
await new Promise((res, rej) => setTimeout(() => res(), 1000))

// subscribe to new blocks being produced by bellow transaction submission
let blockTxHashes = []
// subscribe to new blocks being produced by transaction submissions below
let blocksHeaders = []
let subBlocks = await ws.eth.subscribe('newBlockHeaders')
subBlocks.on('error', async (err) => {
assert.fail(err.message)
})
subBlocks.on('data', async (block) => {
blockTxHashes.push(block.transactions[0]) // add received tx hash
blocksHeaders.push(block) // add received tx hash

if (blockTxHashes.length === testValues.length) {
if (blocksHeaders.length === testValues.length) {
subBlocks.unsubscribe()
}
})

// subscribe to all new transaction events being produced by transaction submission bellow
// subscribe to all new transaction events being produced by transaction submissions below
let txHashes = []
let subTx = await ws.eth.subscribe('pendingTransactions')
subTx.on('error', async (err) => {
Expand All @@ -52,18 +53,18 @@ it('streaming of blocks, transactions, logs using filters', async () => {
}
})

// subscribe to events being emitted by a deployed contract and bellow transaction interactions
let logTxHashes = []
// subscribe to events being emitted by a deployed contract and transaction interactions below
let logs = []
let subLog = await ws.eth.subscribe('logs', {
address: contractAddress,
})
subLog.on('error', async err => {
assert.fail(err.message)
})
subLog.on('data', async (log) => {
logTxHashes.push(log.transactionHash)
logs.push(log)

if (logTxHashes.length === testValues.length) {
if (logs.length === testValues.length) {
subLog.unsubscribe()
}
})
Expand All @@ -86,8 +87,37 @@ it('streaming of blocks, transactions, logs using filters', async () => {
await new Promise((res, rej) => setTimeout(() => res(), 1000))

// check that transaction hashes we received when submitting transactions above
// match array of transaction hashes received from events for blocks and txs
assert.deepEqual(blockTxHashes, sentHashes)
// match array of transaction hashes received from subscriptions
assert.deepEqual(txHashes, sentHashes)
assert.deepEqual(logTxHashes, sentHashes)

assert.lengthOf(blocksHeaders, testValues.length)
for (let blockHeader of blocksHeaders) {
let block = await web3.eth.getBlock(blockHeader.number)

assert.equal(blockHeader.number, block.number)
assert.equal(blockHeader.hash, block.hash)
assert.equal(blockHeader.parentHash, block.parentHash)
assert.equal(blockHeader.nonce, block.nonce)
assert.equal(blockHeader.sha3Uncles, block.sha3Uncles)
assert.equal(blockHeader.logsBloom, block.logsBloom)
assert.equal(blockHeader.transactionsRoot, block.transactionsRoot)
assert.equal(blockHeader.stateRoot, block.stateRoot)
assert.equal(blockHeader.receiptsRoot, block.receiptsRoot)
assert.equal(blockHeader.miner, block.miner)
assert.equal(blockHeader.extraData, block.extraData)
assert.equal(blockHeader.gasLimit, block.gasLimit)
assert.equal(blockHeader.gasUsed, block.gasUsed)
assert.equal(blockHeader.timestamp, block.timestamp)
assert.equal(blockHeader.difficulty, block.difficulty)
}

assert.lengthOf(logs, testValues.length)
for (let log of logs) {
let matchingLogs = await web3.eth.getPastLogs({
address: log.address,
blockHash: log.blockHash
})
assert.lengthOf(matchingLogs, 1)
assert.deepEqual(log, matchingLogs[0])
}
})

0 comments on commit 6f99e01

Please sign in to comment.