From fc8972b017e43566adb588792df26bdccd02b4df Mon Sep 17 00:00:00 2001 From: Valentin Staykov <79150443+V-Staykov@users.noreply.github.com> Date: Tue, 23 Jul 2024 17:26:56 +0300 Subject: [PATCH] refactor datastream server (#837) * refactor datastream server * optimization and nil checks * added a datastream check tool * fix rpc to seqencer switching in datastream * move batch end check after injected --- eth/backend.go | 2 +- zk/datastream/client/stream_client.go | 40 +++ zk/datastream/server/data_stream_server.go | 205 +++++++++---- .../server/data_stream_server_utils.go | 33 +- zk/datastream/server/datastream_populate.go | 282 ++++++++++++------ zk/debug_tools/datastream-bytes/main.go | 39 ++- .../datastream-correctness-check/main.go | 123 ++++++++ .../legacy_executor_verifier.go | 66 ++-- zk/stages/stage_dataStreamCatchup.go | 6 +- zk/stages/stage_sequence_execute.go | 42 ++- zk/stages/stage_sequencer_executor_verify.go | 5 +- 11 files changed, 599 insertions(+), 244 deletions(-) create mode 100644 zk/debug_tools/datastream-correctness-check/main.go diff --git a/eth/backend.go b/eth/backend.go index 3be28bef3c6..89a5ff5c7d1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1078,7 +1078,7 @@ func (s *Ethereum) PreStart() error { // so here we loop and take a brief pause waiting for it to be ready attempts := 0 for { - _, err = zkStages.CatchupDatastream("stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors()) + _, err = zkStages.CatchupDatastream(s.sentryCtx, "stream-catchup", tx, s.dataStream, s.chainConfig.ChainID.Uint64(), s.config.DatastreamVersion, s.config.HasExecutors()) if err != nil { if errors.Is(err, datastreamer.ErrAtomicOpNotAllowed) { attempts++ diff --git a/zk/datastream/client/stream_client.go b/zk/datastream/client/stream_client.go index becca1a5721..e97f2d33e56 100644 --- a/zk/datastream/client/stream_client.go +++ b/zk/datastream/client/stream_client.go @@ -181,6 +181,46 @@ func (c *StreamClient) ReadEntries(bookmark *types.BookmarkProto, l2BlocksAmount return fullL2Blocks, gerUpates, batchBookmarks, blockBookmarks, entriesRead, nil } +func (c *StreamClient) ExecutePerFile(bookmark *types.BookmarkProto, function func(file *types.FileEntry) error) error { + // Get header from server + if err := c.GetHeader(); err != nil { + return fmt.Errorf("%s get header error: %v", c.id, err) + } + + protoBookmark, err := bookmark.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal bookmark: %v", err) + } + + if err := c.initiateDownloadBookmark(protoBookmark); err != nil { + return err + } + count := uint64(0) + logTicker := time.NewTicker(10 * time.Second) + + for { + select { + case <-logTicker.C: + fmt.Println("Entries read count: ", count) + default: + } + if c.Header.TotalEntries == count { + break + } + file, err := c.readFileEntry() + if err != nil { + return fmt.Errorf("error reading file entry: %v", err) + } + if err := function(file); err != nil { + return fmt.Errorf("error executing function: %v", err) + + } + count++ + } + + return nil +} + // reads entries to the end of the stream // at end will wait for new entries to arrive func (c *StreamClient) ReadAllEntriesToChannel() error { diff --git a/zk/datastream/server/data_stream_server.go b/zk/datastream/server/data_stream_server.go index 40ea2f3933e..c2c174c594f 100644 --- a/zk/datastream/server/data_stream_server.go +++ b/zk/datastream/server/data_stream_server.go @@ -1,22 +1,23 @@ package server import ( + "fmt" + "github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer" zktypes "github.com/ledgerwatch/erigon/zk/types" "github.com/ledgerwatch/erigon/zk/utils" - "github.com/gateway-fm/cdk-erigon-lib/common" libcommon "github.com/gateway-fm/cdk-erigon-lib/common" "github.com/gateway-fm/cdk-erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" eritypes "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" "github.com/ledgerwatch/erigon/zk/datastream/types" - - "github.com/ledgerwatch/erigon/zk/hermez_db" ) type DbReader interface { - GetLocalExitRootForBatchNo(batchNo uint64) (common.Hash, error) + GetL2BlockNosByBatch(batchNo uint64) ([]uint64, error) + GetLocalExitRootForBatchNo(batchNo uint64) (libcommon.Hash, error) GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber uint64) ([]types.GerUpdateProto, error) GetForkId(batchNumber uint64) (uint64, error) GetBlockGlobalExitRoot(blockNumber uint64) (libcommon.Hash, error) @@ -84,10 +85,33 @@ func (d *DataStreamEntries) AddMany(entries []DataStreamEntryProto) { } } +func (d *DataStreamEntries) Size() int { + if d == nil || d.entries == nil { + return 0 + } + return len(d.entries) +} + func (d *DataStreamEntries) Entries() []DataStreamEntryProto { + if d == nil || d.entries == nil { + return []DataStreamEntryProto{} + } return d.entries } +func (d *DataStreamEntries) Marshal() (result []byte, err error) { + var b []byte + for _, entry := range d.entries { + b, err = encodeEntryToBytesProto(entry) + if err != nil { + return nil, err + } + result = append(result, b...) + } + + return result, nil +} + func NewDataStreamEntries(size int) *DataStreamEntries { return &DataStreamEntries{ entries: make([]DataStreamEntryProto, size), @@ -115,85 +139,82 @@ func (srv *DataStreamServer) CommitEntriesToStreamProto(entries []DataStreamEntr } if latestBlockNum != nil { - srv.highestBlockWritten = latestBlockNum + a := *latestBlockNum + srv.highestBlockWritten = &a } if latestBatchNum != nil { - srv.highestBatchWritten = latestBatchNum + a := *latestBatchNum + srv.highestBatchWritten = &a } return nil } func createBlockWithBatchCheckStreamEntriesProto( - chainId uint64, reader DbReader, tx kv.Tx, block, lastBlock *eritypes.Block, batchNumber, - lastBatchNumber uint64, - l1InfoTreeMinTimestamps map[uint64]uint64, - isBatchEnd bool, - transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks -) ([]DataStreamEntryProto, error) { + lastBatchNumber, + chainId, + forkId uint64, + shouldSkipBatchEndEntry bool, +) (*DataStreamEntries, error) { var err error - var startEntriesProto, blockEntriesProto, endEntriesProto []DataStreamEntryProto - - gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber) - if err != nil { - return nil, err - } - + var endEntriesProto []DataStreamEntryProto + var startEntriesProto, blockEntries *DataStreamEntries // we might have a series of empty batches to account for, so we need to know the gap batchGap := batchNumber - lastBatchNumber isBatchStart := batchGap > 0 - // filter transactions by indexes that should be included - filteredTransactions := filterTransactionByIndexes(block.Transactions(), transactionsToIncludeByIndex) - - blockNum := block.NumberU64() // batch start // BATCH BOOKMARK if isBatchStart { + gers, err := reader.GetBatchGlobalExitRootsProto(lastBatchNumber, batchNumber) + if err != nil { + return nil, err + } + // the genesis we insert fully, so we would have to skip closing it + if !shouldSkipBatchEndEntry { + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) + if err != nil { + return nil, err + } + blockRoot := block.Root() + if endEntriesProto, err = addBatchEndEntriesProto(lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil { + return nil, err + } + } + if startEntriesProto, err = createBatchStartEntriesProto(reader, tx, batchNumber, lastBatchNumber, batchGap, chainId, block.Root(), gers); err != nil { return nil, err } } - forkId, err := reader.GetForkId(batchNumber) - if err != nil { - return nil, err - } + blockNum := block.NumberU64() + l1InfoTreeMinTimestamps := make(map[uint64]uint64) deltaTimestamp := block.Time() - lastBlock.Time() if blockNum == 1 { deltaTimestamp = block.Time() l1InfoTreeMinTimestamps[0] = 0 } - blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, block, filteredTransactions, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps) - if err != nil { + if blockEntries, err = createFullBlockStreamEntriesProto(reader, tx, block, block.Transactions(), forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps); err != nil { return nil, err } - blockEntriesProto = blockEntries.Entries() - if isBatchEnd { - localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) - if err != nil { - return nil, err - } - blockRoot := block.Root() - if endEntriesProto, err = addBatchEndEntriesProto(tx, batchNumber, lastBatchNumber, &blockRoot, gers, &localExitRoot); err != nil { - return nil, err - } + if blockEntries.Size() == 0 { + return nil, fmt.Errorf("didn't create any entries for block %d", blockNum) } - entries := NewDataStreamEntries(len(startEntriesProto) + len(blockEntriesProto) + len(endEntriesProto)) - entries.AddMany(startEntriesProto) - entries.AddMany(blockEntriesProto) + entries := NewDataStreamEntries(len(endEntriesProto) + startEntriesProto.Size() + blockEntries.Size()) entries.AddMany(endEntriesProto) + entries.AddMany(startEntriesProto.Entries()) + entries.AddMany(blockEntries.Entries()) - return entries.Entries(), nil + return entries, nil } func createFullBlockStreamEntriesProto( @@ -283,32 +304,102 @@ func createTransactionEntryProto( return txProto, nil } -func CreateAndBuildStreamEntryBytesProto( - chainId uint64, - block *eritypes.Block, - reader *hermez_db.HermezDbReader, +func BuildWholeBatchStreamEntriesProto( tx kv.Tx, - lastBlock *eritypes.Block, + reader DbReader, + chainId uint64, + previousBatchNumber, batchNumber uint64, - lastBatchNumber uint64, + blocks []eritypes.Block, + txsPerBlock map[uint64][]eritypes.Transaction, l1InfoTreeMinTimestamps map[uint64]uint64, - isBatchEnd bool, - transactionsToIncludeByIndex []int, // passing nil here will include all transactions in the blocks -) (result []byte, err error) { - entries, err := createBlockWithBatchCheckStreamEntriesProto(chainId, reader, tx, block, lastBlock, batchNumber, lastBatchNumber, l1InfoTreeMinTimestamps, isBatchEnd, transactionsToIncludeByIndex) +) (allEntries *DataStreamEntries, err error) { + var batchEndEntries []DataStreamEntryProto + var batchStartEntries *DataStreamEntries + + forkId, err := reader.GetForkId(batchNumber) if err != nil { return nil, err } - for _, entry := range entries { - b, err := encodeEntryToBytesProto(entry) + gers, err := reader.GetBatchGlobalExitRootsProto(previousBatchNumber, batchNumber) + if err != nil { + return nil, err + } + + if batchStartEntries, err = createBatchStartEntriesProto(reader, tx, batchNumber, previousBatchNumber, batchNumber-previousBatchNumber, chainId, blocks[0].Root(), gers); err != nil { + return nil, err + } + + prevBatchLastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0].NumberU64()-1) + if err != nil { + return nil, err + } + + lastBlock := *prevBatchLastBlock + + blocksEntries := make([]DataStreamEntryProto, 0) + + for _, block := range blocks { + blockNum := block.NumberU64() + + deltaTimestamp := block.Time() - lastBlock.Time() + if blockNum == 1 { + deltaTimestamp = block.Time() + l1InfoTreeMinTimestamps[0] = 0 + } + + txForBlock, found := txsPerBlock[blockNum] + if !found { + return nil, fmt.Errorf("no transactions array found for block %d", blockNum) + } + + blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, &block, txForBlock, forkId, deltaTimestamp, batchNumber, l1InfoTreeMinTimestamps) if err != nil { return nil, err } - result = append(result, b...) + blocksEntries = append(blocksEntries, blockEntries.Entries()...) + + lastBlock = block } - return result, nil + // the genesis we insert fully, so we would have to skip closing it + localExitRoot, err := utils.GetBatchLocalExitRootFromSCStorage(batchNumber, reader, tx) + if err != nil { + return nil, err + } + + blockRoot := lastBlock.Root() + + batchEndEntries, err = addBatchEndEntriesProto(batchNumber, &blockRoot, gers, &localExitRoot) + if err != nil { + return nil, err + } + + allEntries = NewDataStreamEntries(batchStartEntries.Size() + len(blocksEntries) + len(batchEndEntries)) + allEntries.AddMany(batchStartEntries.Entries()) + allEntries.AddMany(blocksEntries) + allEntries.AddMany(batchEndEntries) + + return allEntries, nil +} + +func (srv *DataStreamServer) IsLastEntryBatchEnd() (isBatchEnd bool, err error) { + header := srv.stream.GetHeader() + + if header.TotalEntries == 0 { + return false, nil + } + + //find end block entry to delete from it onward + entryNum := header.TotalEntries - 1 + var entry datastreamer.FileEntry + entry, err = srv.stream.GetEntry(entryNum) + if err != nil { + return false, err + } + + return uint32(entry.Type) == uint32(types.EntryTypeBatchEnd), nil } func (srv *DataStreamServer) GetHighestBlockNumber() (uint64, error) { diff --git a/zk/datastream/server/data_stream_server_utils.go b/zk/datastream/server/data_stream_server_utils.go index 71dcc6b7a03..be1814ecb0d 100644 --- a/zk/datastream/server/data_stream_server_utils.go +++ b/zk/datastream/server/data_stream_server_utils.go @@ -134,11 +134,11 @@ func createBatchStartEntriesProto( batchNumber, lastBatchNumber, batchGap, chainId uint64, root libcommon.Hash, gers []types.GerUpdateProto, -) ([]DataStreamEntryProto, error) { +) (*DataStreamEntries, error) { var err error var batchStartEntries []DataStreamEntryProto - entries := make([]DataStreamEntryProto, 0, 2+int(3*(batchGap-1))+len(gers)) + entries := NewDataStreamEntries(2 + int(3*(batchGap-1)) + len(gers)) // if we have a gap of more than 1 batch then we need to write in the batch start and ends for these empty batches if batchGap > 1 { @@ -150,14 +150,14 @@ func createBatchStartEntriesProto( if batchStartEntries, err = addBatchStartEntries(reader, workingBatch, chainId); err != nil { return nil, err } - entries = append(entries, batchStartEntries...) + // entries = append(entries, batchStartEntries...) + entries.AddMany(batchStartEntries) // see if we have any gers to handle for _, ger := range gers { upd := ger.UpdateGER if upd.BatchNumber == workingBatch { - entries = append( - entries, + entries.Add( newGerUpdateProto(upd.BatchNumber, upd.Timestamp, libcommon.BytesToHash(upd.GlobalExitRoot), libcommon.BytesToAddress(upd.Coinbase), upd.ForkId, upd.ChainId, libcommon.BytesToHash(upd.StateRoot)), ) } @@ -167,7 +167,7 @@ func createBatchStartEntriesProto( if localExitRoot, err = utils.GetBatchLocalExitRootFromSCStorage(workingBatch, reader, tx); err != nil { return nil, err } - entries = append(entries, newBatchEndProto(localExitRoot, root, workingBatch)) + entries.Add(newBatchEndProto(localExitRoot, root, workingBatch)) } } @@ -175,13 +175,12 @@ func createBatchStartEntriesProto( if batchStartEntries, err = addBatchStartEntries(reader, batchNumber, chainId); err != nil { return nil, err } - entries = append(entries, batchStartEntries...) + entries.AddMany(batchStartEntries) return entries, nil } func addBatchEndEntriesProto( - tx kv.Tx, - batchNumber, lastBatchNumber uint64, + batchNumber uint64, root *libcommon.Hash, gers []types.GerUpdateProto, localExitRoot *libcommon.Hash, @@ -231,22 +230,6 @@ func addBatchStartEntries(reader DbReader, batchNum, chainId uint64) ([]DataStre return entries, nil } -func filterTransactionByIndexes( - filteredTransactions eritypes.Transactions, - transactionsToIncludeByIndex []int, -) eritypes.Transactions { - if transactionsToIncludeByIndex != nil { - filteredTransactionsBuilder := make(eritypes.Transactions, len(transactionsToIncludeByIndex)) - for i, txIndexInBlock := range transactionsToIncludeByIndex { - filteredTransactionsBuilder[i] = filteredTransactions[txIndexInBlock] - } - - filteredTransactions = filteredTransactionsBuilder - } - - return filteredTransactions -} - const ( PACKET_TYPE_DATA = 2 // NOOP_ENTRY_NUMBER is used because we don't care about the entry number when feeding an atrificial diff --git a/zk/datastream/server/datastream_populate.go b/zk/datastream/server/datastream_populate.go index fd6ed247691..17777cc85be 100644 --- a/zk/datastream/server/datastream_populate.go +++ b/zk/datastream/server/datastream_populate.go @@ -1,6 +1,7 @@ package server import ( + "context" "fmt" "time" @@ -14,71 +15,185 @@ import ( "github.com/ledgerwatch/log/v3" ) -const GenesisForkId = 0 // genesis fork is always 0 in the datastream +const ( + GenesisForkId = 0 // genesis fork is always 0 in the datastream + insertEntryCount = 100_000 + commitEntryCountLimit = 80_000 +) -func (srv *DataStreamServer) WriteBlocksToStream( +// gets the blocks for the said batch from the reader +// writes a bookmarks, batch start, blocks and batch end +// basically writes a whole standalone batch +// plus the GER updates if the batch gap is > 1 +// starts atomicOp and commits it internally +func (srv *DataStreamServer) WriteWholeBatchToStream( + logPrefix string, tx kv.Tx, reader DbReader, - from, to uint64, + prevBatchNum, + batchNum uint64, +) error { + var err error + if err = srv.stream.StartAtomicOp(); err != nil { + return err + } + + blocksForBatch, err := reader.GetL2BlockNosByBatch(batchNum) + if err != nil { + return err + } + + var fromBlockNum, toBlockNum uint64 + for _, blockNum := range blocksForBatch { + if fromBlockNum == 0 || blockNum < fromBlockNum { + fromBlockNum = blockNum + } + if blockNum > toBlockNum { + toBlockNum = blockNum + } + } + + if err = srv.UnwindIfNecessary(logPrefix, reader, fromBlockNum, prevBatchNum, batchNum); err != nil { + return err + } + + blocks := make([]eritypes.Block, 0) + txsPerBlock := make(map[uint64][]eritypes.Transaction) + for blockNumber := fromBlockNum; blockNumber <= toBlockNum; blockNumber++ { + block, err := rawdb.ReadBlockByNumber(tx, blockNumber) + if err != nil { + return err + } + + blocks = append(blocks, *block) + txsPerBlock[blockNumber] = block.Transactions() + } + + entries, err := BuildWholeBatchStreamEntriesProto(tx, reader, srv.GetChainId(), batchNum, batchNum, blocks, txsPerBlock, make(map[uint64]uint64)) + if err != nil { + return err + } + + if err = srv.CommitEntriesToStreamProto(entries.Entries(), &toBlockNum, &batchNum); err != nil { + return err + } + + if err = srv.stream.CommitAtomicOp(); err != nil { + return err + } + + return nil +} + +// writes consecutively blocks from-to +// checks for all batch related stuff in the meantime - batch start, batche end, etc +// starts atomicOp and commits it internally +func (srv *DataStreamServer) WriteBlocksToStreamConsecutively( + ctx context.Context, logPrefix string, + tx kv.Tx, + reader DbReader, + from, to uint64, ) error { + var err error + + // logger stuff t := utils.StartTimer("write-stream", "writeblockstostream") defer t.LogTimer() + logTicker := time.NewTicker(10 * time.Second) + totalToWrite := to - (from - 1) + copyFrom := from + ////////// - var err error + latestbatchNum, err := reader.GetBatchNoByL2Block(from - 1) + if err != nil { + return err + } + + batchNum, err := reader.GetBatchNoByL2Block(from) + if err != nil { + return err + } - logTicker := time.NewTicker(10 * time.Second) - var lastBlock *eritypes.Block if err = srv.stream.StartAtomicOp(); err != nil { return err } - totalToWrite := to - (from - 1) - insertEntryCount := 100_000 - entries := make([]DataStreamEntryProto, insertEntryCount) - index := 0 - copyFrom := from - var latestbatchNum uint64 + + if err = srv.UnwindIfNecessary(logPrefix, reader, from, latestbatchNum, batchNum); err != nil { + return err + } + + // check if a new batch starts and the old needs closing before that + // if it is already closed with a batch end, do not add a new batch end + // this is needed because we have to write a batch end when writing a new block from the next batch + // because at the current block we might not know if it is the last one in the batch + // but we know for certain if it is a 1st block from a new batch + islastEntrybatchEnd, err := srv.IsLastEntryBatchEnd() + if err != nil { + return err + } + + lastBlock, err := rawdb.ReadBlockByNumber(tx, from-1) + if err != nil { + return err + } + + entries := make([]DataStreamEntryProto, 0, insertEntryCount) + var forkId uint64 +LOOP: for currentBlockNumber := from; currentBlockNumber <= to; currentBlockNumber++ { select { case <-logTicker.C: log.Info(fmt.Sprintf("[%s]: progress", logPrefix), "block", currentBlockNumber, "target", to, "%", float64(currentBlockNumber-copyFrom)/float64(totalToWrite)*100) + case <-ctx.Done(): + break LOOP default: } - if lastBlock == nil { - lastBlock, err = rawdb.ReadBlockByNumber(tx, currentBlockNumber-1) + block, err := rawdb.ReadBlockByNumber(tx, currentBlockNumber) + if err != nil { + return err + } + + batchNum, err := reader.GetBatchNoByL2Block(currentBlockNumber) + if err != nil { + return err + } + + // fork id changes only per batch so query it only once per batch + if batchNum != latestbatchNum { + forkId, err = reader.GetForkId(batchNum) if err != nil { return err } } - block, blockEntries, batchNum, err := srv.createBlockStreamEntriesWithBatchCheck(logPrefix, tx, reader, lastBlock, currentBlockNumber) + blockEntries, err := createBlockWithBatchCheckStreamEntriesProto(reader, tx, block, lastBlock, batchNum, latestbatchNum, srv.chainId, forkId, islastEntrybatchEnd) if err != nil { return err } + entries = append(entries, blockEntries.Entries()...) + latestbatchNum = batchNum + lastBlock = block - for _, entry := range blockEntries { - entries[index] = entry - index++ - } + // the check is needed only before the first block + // after that - write batch end before each batch start + islastEntrybatchEnd = false // basically commit once 80% of the entries array is filled - if index+1 >= insertEntryCount*4/5 { + if len(entries) >= commitEntryCountLimit { log.Info(fmt.Sprintf("[%s] Commit count reached, committing entries", logPrefix), "block", currentBlockNumber) - if err = srv.CommitEntriesToStreamProto(entries[:index], ¤tBlockNumber, &batchNum); err != nil { + if err = srv.CommitEntriesToStreamProto(entries, ¤tBlockNumber, &batchNum); err != nil { return err } - entries = make([]DataStreamEntryProto, insertEntryCount) - index = 0 + entries = make([]DataStreamEntryProto, 0, insertEntryCount) } - - lastBlock = block } - if err = srv.CommitEntriesToStreamProto(entries[:index], &to, &latestbatchNum); err != nil { + if err = srv.CommitEntriesToStreamProto(entries, &to, &latestbatchNum); err != nil { return err } @@ -89,41 +204,62 @@ func (srv *DataStreamServer) WriteBlocksToStream( return nil } -func (srv *DataStreamServer) WriteBlockToStream( +// gets other needed data from the reader +// writes a batchBookmark and batch start (if needed), block bookmark, block and txs in it +// basically a full standalone block +func (srv *DataStreamServer) WriteBlockWithBatchStartToStream( logPrefix string, tx kv.Tx, reader DbReader, - batchNum, prevBatchNum, - blockNum uint64, -) error { + forkId, + batchNum, prevBlockBatchNum uint64, + prevBlock, block eritypes.Block, +) (err error) { t := utils.StartTimer("write-stream", "writeblockstostream") defer t.LogTimer() - var err error - - if err = srv.UnwindIfNecessary(logPrefix, reader, blockNum, prevBatchNum, batchNum); err != nil { + if err = srv.stream.StartAtomicOp(); err != nil { return err } - if err = srv.stream.StartAtomicOp(); err != nil { + blockNum := block.NumberU64() + + if err = srv.UnwindIfNecessary(logPrefix, reader, blockNum, prevBlockBatchNum, batchNum); err != nil { return err } - lastBlock, err := rawdb.ReadBlockByNumber(tx, blockNum-1) - if err != nil { - return err + // if start of new batch add batch start entries + var batchStartEntries *DataStreamEntries + if prevBlockBatchNum != batchNum { + gers, err := reader.GetBatchGlobalExitRootsProto(prevBlockBatchNum, batchNum) + if err != nil { + return err + } + + if batchStartEntries, err = createBatchStartEntriesProto(reader, tx, batchNum, prevBlockBatchNum, batchNum-prevBlockBatchNum, srv.GetChainId(), block.Root(), gers); err != nil { + return err + } } - block, err := rawdb.ReadBlockByNumber(tx, blockNum) - if err != nil { - return err + + l1InfoTreeMinTimestamps := make(map[uint64]uint64) + deltaTimestamp := block.Time() - prevBlock.Time() + if blockNum == 1 { + deltaTimestamp = block.Time() + l1InfoTreeMinTimestamps[0] = 0 } - entries, err := createBlockWithBatchCheckStreamEntriesProto(srv.chainId, reader, tx, block, lastBlock, batchNum, prevBatchNum, make(map[uint64]uint64), false, nil) + blockEntries, err := createFullBlockStreamEntriesProto(reader, tx, &block, block.Transactions(), forkId, deltaTimestamp, batchNum, make(map[uint64]uint64)) if err != nil { return err } - if err = srv.CommitEntriesToStreamProto(entries, &blockNum, &batchNum); err != nil { + if batchStartEntries != nil { + if err = srv.CommitEntriesToStreamProto(batchStartEntries.Entries(), &blockNum, &batchNum); err != nil { + return err + } + } + + if err = srv.CommitEntriesToStreamProto(blockEntries.Entries(), &blockNum, &batchNum); err != nil { return err } @@ -134,21 +270,25 @@ func (srv *DataStreamServer) WriteBlockToStream( return nil } -func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader, blockNum, prevBatchNum, batchNum uint64) error { +// checks if the stream has blocks above the current one +// if there is something, try to unwind it +// in the unwind chek if the block is at batch start +// if it is - unwind to previous batch's end, so it deletes batch stat of current batch as well +func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader, blockNum, prevBlockBatchNum, batchNum uint64) error { // if from is higher than the last datastream block number - unwind the stream highestDatastreamBlock, err := srv.GetHighestBlockNumber() if err != nil { return err } - //if this is a new batch case, we must unwind to previous batch's batch end + // if this is a new batch case, we must unwind to previous batch's batch end // otherwise it would corrupt the datastream with batch bookmark after a batch start or something similar if highestDatastreamBlock >= blockNum { - if prevBatchNum != batchNum { - log.Warn(fmt.Sprintf("[%s] Datastream must unwind to batch", logPrefix), "prevBatchNum", prevBatchNum, "batchNum", batchNum) + if prevBlockBatchNum != batchNum { + log.Warn(fmt.Sprintf("[%s] Datastream must unwind to batch", logPrefix), "prevBlockBatchNum", prevBlockBatchNum, "batchNum", batchNum) //get latest block in prev batch - lastBlockInPrevbatch, err := reader.GetHighestBlockInBatch(prevBatchNum) + lastBlockInPrevbatch, err := reader.GetHighestBlockInBatch(prevBlockBatchNum) if err != nil { return err } @@ -156,7 +296,7 @@ func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader // this represents a case where the block we must unwind to is part of a previous batch // this should never happen since previous batch in this use must be already completed if lastBlockInPrevbatch != blockNum-1 { - return fmt.Errorf("datastream must unwind to prev batch, but it would corrupt the datastream: prevBatchNum: %d, abtchNum: %d, blockNum: %d", prevBatchNum, batchNum, blockNum) + return fmt.Errorf("datastream must unwind to prev batch, but it would corrupt the datastream: prevBlockBatchNum: %d, batchNum: %d, blockNum: %d", prevBlockBatchNum, batchNum, blockNum) } if err := srv.UnwindToBatchStart(batchNum); err != nil { @@ -173,8 +313,6 @@ func (srv *DataStreamServer) UnwindIfNecessary(logPrefix string, reader DbReader } func (srv *DataStreamServer) WriteBatchEnd( - logPrefix string, - tx kv.Tx, reader DbReader, batchNumber, lastBatchNumber uint64, @@ -190,7 +328,7 @@ func (srv *DataStreamServer) WriteBatchEnd( return err } - batchEndEntries, err := addBatchEndEntriesProto(tx, batchNumber, lastBatchNumber, stateRoot, gers, localExitRoot) + batchEndEntries, err := addBatchEndEntriesProto(batchNumber, stateRoot, gers, localExitRoot) if err != nil { return err } @@ -206,48 +344,6 @@ func (srv *DataStreamServer) WriteBatchEnd( return nil } -func (srv *DataStreamServer) createBlockStreamEntriesWithBatchCheck( - logPrefix string, - tx kv.Tx, - reader DbReader, - lastBlock *eritypes.Block, - blockNumber uint64, -) (*eritypes.Block, []DataStreamEntryProto, uint64, error) { - block, err := rawdb.ReadBlockByNumber(tx, blockNumber) - if err != nil { - return nil, nil, 0, err - } - - batchNum, err := reader.GetBatchNoByL2Block(blockNumber) - if err != nil { - return nil, nil, 0, err - } - - prevBatchNum, err := reader.GetBatchNoByL2Block(blockNumber - 1) - if err != nil { - return nil, nil, 0, err - } - - if err = srv.UnwindIfNecessary(logPrefix, reader, blockNumber, prevBatchNum, batchNum); err != nil { - return nil, nil, 0, err - } - - nextBatchNum, nextBatchExists, err := reader.CheckBatchNoByL2Block(blockNumber + 1) - if err != nil { - return nil, nil, 0, err - } - - // a 0 next batch num here would mean we don't know about the next batch so must be at the end of the batch - isBatchEnd := !nextBatchExists || nextBatchNum > batchNum - - entries, err := createBlockWithBatchCheckStreamEntriesProto(srv.chainId, reader, tx, block, lastBlock, batchNum, prevBatchNum, make(map[uint64]uint64), isBatchEnd, nil) - if err != nil { - return nil, nil, 0, err - } - - return block, entries, batchNum, nil -} - func (srv *DataStreamServer) WriteGenesisToStream( genesis *eritypes.Block, reader *hermez_db.HermezDbReader, diff --git a/zk/debug_tools/datastream-bytes/main.go b/zk/debug_tools/datastream-bytes/main.go index aecec678716..83fba978f24 100644 --- a/zk/debug_tools/datastream-bytes/main.go +++ b/zk/debug_tools/datastream-bytes/main.go @@ -8,6 +8,7 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/gateway-fm/cdk-erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" ) @@ -30,41 +31,35 @@ func main() { err := db.View(context.Background(), func(tx kv.Tx) error { hermezDb := hermez_db.NewHermezDbReader(tx) - blocks, err := hermezDb.GetL2BlockNosByBatch(uint64(batchNum)) + blockNumbers, err := hermezDb.GetL2BlockNosByBatch(uint64(batchNum)) if err != nil { return err } - if len(blocks) == 0 { + if len(blockNumbers) == 0 { return fmt.Errorf("no blocks found for batch %d", batchNum) } - lastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0]-1) - if err != nil { - return err - } - previousBatch := batchNum - 1 + blocks := make([]types.Block, 0, len(blockNumbers)) + txsPerBlock := make(map[uint64][]types.Transaction) - for idx, blockNumber := range blocks { + for _, blockNumber := range blockNumbers { block, err := rawdb.ReadBlockByNumber(tx, blockNumber) if err != nil { return err } - - //gerUpdates := []dstypes.GerUpdate{} - var l1InfoTreeMinTimestamps map[uint64]uint64 - - isBatchEnd := idx == len(blocks)-1 - - sBytes, err := server.CreateAndBuildStreamEntryBytesProto(uint64(chainId), block, hermezDb, tx, lastBlock, uint64(batchNum), uint64(previousBatch), l1InfoTreeMinTimestamps, isBatchEnd, nil) - if err != nil { - return err - } - streamBytes = append(streamBytes, sBytes...) - lastBlock = block - // we only put in the batch bookmark at the start of the stream data once - previousBatch = batchNum + blocks = append(blocks, *block) + txsPerBlock[blockNumber] = block.Transactions() + } + var l1InfoTreeMinTimestamps map[uint64]uint64 + entries, err := server.BuildWholeBatchStreamEntriesProto(tx, hermezDb, uint64(chainId), uint64(previousBatch), uint64(batchNum), blocks, txsPerBlock, l1InfoTreeMinTimestamps) + if err != nil { + return err + } + streamBytes, err = entries.Marshal() + if err != nil { + return err } return nil diff --git a/zk/debug_tools/datastream-correctness-check/main.go b/zk/debug_tools/datastream-correctness-check/main.go new file mode 100644 index 00000000000..39674ea41cd --- /dev/null +++ b/zk/debug_tools/datastream-correctness-check/main.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "fmt" + + "github.com/ledgerwatch/erigon/zk/datastream/client" + "github.com/ledgerwatch/erigon/zk/datastream/proto/github.com/0xPolygonHermez/zkevm-node/state/datastream" + "github.com/ledgerwatch/erigon/zk/datastream/types" + "github.com/ledgerwatch/erigon/zk/debug_tools" +) + +func main() { + ctx := context.Background() + cfg, err := debug_tools.GetConf() + if err != nil { + panic(fmt.Sprintf("RPGCOnfig: %s", err)) + } + + // Create client + client := client.NewClient(ctx, cfg.Datastream, 3, 500, 0) + + // Start client (connect to the server) + defer client.Stop() + if err := client.Start(); err != nil { + panic(err) + } + + // create bookmark + bookmark := types.NewBookmarkProto(1, datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK) + + var previousFile *types.FileEntry + progressBatch := uint64(0) + progressBlock := uint64(0) + function := func(file *types.FileEntry) error { + switch file.EntryType { + case types.BookmarkEntryType: + bookmark, err := types.UnmarshalBookmark(file.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + progressBatch = bookmark.Value + if previousFile != nil && previousFile.EntryType != types.EntryTypeBatchEnd { + return fmt.Errorf("unexpected entry type before batch bookmark type: %v, bookmark batch number: %d", previousFile.EntryType, bookmark.Value) + } + } + if bookmark.BookmarkType() == datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + progressBlock = bookmark.Value + if previousFile != nil && + previousFile.EntryType != types.EntryTypeBatchStart && + previousFile.EntryType != types.EntryTypeL2Tx && + previousFile.EntryType != types.EntryTypeL2Block { + return fmt.Errorf("unexpected entry type before block bookmark type: %v, bookmark block number: %d", previousFile.EntryType, bookmark.Value) + } + } + case types.EntryTypeBatchStart: + batchStart, err := types.UnmarshalBatchStart(file.Data) + if err != nil { + return err + } + progressBatch = batchStart.Number + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType { + return fmt.Errorf("unexpected entry type before batch start: %v, batchStart Batch number: %d", previousFile.EntryType, batchStart.Number) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_BATCH { + return fmt.Errorf("unexpected bookmark type before batch start: %v, batchStart Batch number: %d", bookmark.BookmarkType(), batchStart.Number) + } + } + } + case types.EntryTypeBatchEnd: + if previousFile != nil && + previousFile.EntryType != types.EntryTypeL2Tx && + previousFile.EntryType != types.EntryTypeL2Block && + previousFile.EntryType != types.EntryTypeBatchStart { + return fmt.Errorf("unexpected entry type before batch end: %v", previousFile.EntryType) + } + case types.EntryTypeL2Tx: + if previousFile != nil && previousFile.EntryType != types.EntryTypeL2Tx && previousFile.EntryType != types.EntryTypeL2Block { + return fmt.Errorf("unexpected entry type before l2 tx: %v", previousFile.EntryType) + } + case types.EntryTypeL2Block: + l2Block, err := types.UnmarshalL2Block(file.Data) + if err != nil { + return err + } + progressBlock = l2Block.L2BlockNumber + if previousFile != nil { + if previousFile.EntryType != types.BookmarkEntryType { + return fmt.Errorf("unexpected entry type before l2 block: %v, block number: %d", previousFile.EntryType, l2Block.L2BlockNumber) + } else { + bookmark, err := types.UnmarshalBookmark(previousFile.Data) + if err != nil { + return err + } + if bookmark.BookmarkType() != datastream.BookmarkType_BOOKMARK_TYPE_L2_BLOCK { + return fmt.Errorf("unexpected bookmark type before l2 block: %v, block number: %d", bookmark.BookmarkType(), l2Block.L2BlockNumber) + } + + } + } + case types.EntryTypeGerUpdate: + return nil + default: + return fmt.Errorf("unexpected entry type: %v", file.EntryType) + } + + previousFile = file + return nil + } + // send start command + err = client.ExecutePerFile(bookmark, function) + fmt.Println("progress block: ", progressBlock) + fmt.Println("progress batch: ", progressBatch) + if err != nil { + panic(fmt.Sprintf("found an error: %s", err)) + } +} diff --git a/zk/legacy_executor_verifier/legacy_executor_verifier.go b/zk/legacy_executor_verifier/legacy_executor_verifier.go index fe9eba22e73..a0631c2bc25 100644 --- a/zk/legacy_executor_verifier/legacy_executor_verifier.go +++ b/zk/legacy_executor_verifier/legacy_executor_verifier.go @@ -17,6 +17,7 @@ import ( "github.com/gateway-fm/cdk-erigon-lib/kv" "github.com/ledgerwatch/erigon/chain" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/erigon/zk/datastream/server" "github.com/ledgerwatch/erigon/zk/hermez_db" @@ -227,7 +228,7 @@ func (v *LegacyExecutorVerifier) AddRequestUnsafe(request *VerifierRequest, sequ hermezDb := hermez_db.NewHermezDbReader(tx) l1InfoTreeMinTimestamps := make(map[uint64]uint64) - streamBytes, err := v.GetStreamBytes(request.BatchNumber, tx, blocks, hermezDb, l1InfoTreeMinTimestamps, nil) + streamBytes, err := v.GetWholeBatchStreamBytes(request.BatchNumber, tx, blocks, hermezDb, l1InfoTreeMinTimestamps, nil) if err != nil { return verifierBundle, err } @@ -329,10 +330,10 @@ func (v *LegacyExecutorVerifier) checkAndWriteToStream(tx kv.Tx, hdb *hermez_db. // check if we have the next batch we're waiting for if latestBatch == newBatch-1 { - v.lowestWrittenBatch = newBatch if err := v.WriteBatchToStream(newBatch, hdb, tx); err != nil { return err } + v.lowestWrittenBatch = newBatch delete(v.responsesToWrite, newBatch) } } @@ -440,12 +441,8 @@ func (v *LegacyExecutorVerifier) IsRequestAddedUnsafe(batch uint64) bool { func (v *LegacyExecutorVerifier) WriteBatchToStream(batchNumber uint64, hdb *hermez_db.HermezDbReader, roTx kv.Tx) error { log.Info("[Verifier] Writing batch to stream", "batch", batchNumber) - blks, err := hdb.GetL2BlockNosByBatch(batchNumber) - if err != nil { - return err - } - if err := v.streamServer.WriteBlocksToStream(roTx, hdb, blks[0], blks[len(blks)-1], "verifier"); err != nil { + if err := v.streamServer.WriteWholeBatchToStream("verifier", roTx, hdb, v.lowestWrittenBatch, batchNumber); err != nil { return err } return nil @@ -498,49 +495,58 @@ func (v *LegacyExecutorVerifier) availableBlocksToProcess(innerCtx context.Conte return blocks, nil } -func (v *LegacyExecutorVerifier) GetStreamBytes( +func (v *LegacyExecutorVerifier) GetWholeBatchStreamBytes( batchNumber uint64, tx kv.Tx, - blocks []uint64, + blockNumbers []uint64, hermezDb *hermez_db.HermezDbReader, l1InfoTreeMinTimestamps map[uint64]uint64, transactionsToIncludeByIndex [][]int, // passing nil here will include all transactions in the blocks -) ([]byte, error) { - lastBlock, err := rawdb.ReadBlockByNumber(tx, blocks[0]-1) - if err != nil { - return nil, err - } - var streamBytes []byte +) (streamBytes []byte, err error) { + blocks := make([]types.Block, 0, len(blockNumbers)) + txsPerBlock := make(map[uint64][]types.Transaction) // as we only ever use the executor verifier for whole batches we can safely assume that the previous batch // will always be the request batch - 1 and that the first block in the batch will be at the batch // boundary so we will always add in the batch bookmark to the stream previousBatch := batchNumber - 1 - for idx, blockNumber := range blocks { + for idx, blockNumber := range blockNumbers { block, err := rawdb.ReadBlockByNumber(tx, blockNumber) if err != nil { return nil, err } + blocks = append(blocks, *block) - var sBytes []byte - - isBatchEnd := idx == len(blocks)-1 - - var transactionsToIncludeByIndexInBlock []int = nil + filteredTransactions := block.Transactions() + // filter transactions by indexes that should be included if transactionsToIncludeByIndex != nil { - transactionsToIncludeByIndexInBlock = transactionsToIncludeByIndex[idx] + filteredTransactions = filterTransactionByIndexes(block.Transactions(), transactionsToIncludeByIndex[idx]) } - sBytes, err = server.CreateAndBuildStreamEntryBytesProto(v.streamServer.GetChainId(), block, hermezDb, tx, lastBlock, batchNumber, previousBatch, l1InfoTreeMinTimestamps, isBatchEnd, transactionsToIncludeByIndexInBlock) - if err != nil { - return nil, err + + txsPerBlock[blockNumber] = filteredTransactions + } + + entries, err := server.BuildWholeBatchStreamEntriesProto(tx, hermezDb, v.streamServer.GetChainId(), batchNumber, previousBatch, blocks, txsPerBlock, l1InfoTreeMinTimestamps) + if err != nil { + return nil, err + } + + return entries.Marshal() +} + +func filterTransactionByIndexes( + filteredTransactions types.Transactions, + transactionsToIncludeByIndex []int, +) types.Transactions { + if transactionsToIncludeByIndex != nil { + filteredTransactionsBuilder := make(types.Transactions, len(transactionsToIncludeByIndex)) + for i, txIndexInBlock := range transactionsToIncludeByIndex { + filteredTransactionsBuilder[i] = filteredTransactions[txIndexInBlock] } - streamBytes = append(streamBytes, sBytes...) - lastBlock = block - // we only put in the batch bookmark at the start of the stream data once - previousBatch = batchNumber + filteredTransactions = filteredTransactionsBuilder } - return streamBytes, nil + return filteredTransactions } diff --git a/zk/stages/stage_dataStreamCatchup.go b/zk/stages/stage_dataStreamCatchup.go index c515534cc42..20100319db3 100644 --- a/zk/stages/stage_dataStreamCatchup.go +++ b/zk/stages/stage_dataStreamCatchup.go @@ -61,7 +61,7 @@ func SpawnStageDataStreamCatchup( createdTx = true } - finalBlockNumber, err := CatchupDatastream(logPrefix, tx, stream, cfg.chainId, cfg.streamVersion, cfg.hasExecutors) + finalBlockNumber, err := CatchupDatastream(ctx, logPrefix, tx, stream, cfg.chainId, cfg.streamVersion, cfg.hasExecutors) if err != nil { return err } @@ -77,7 +77,7 @@ func SpawnStageDataStreamCatchup( return err } -func CatchupDatastream(logPrefix string, tx kv.RwTx, stream *datastreamer.StreamServer, chainId uint64, streamVersion int, hasExecutors bool) (uint64, error) { +func CatchupDatastream(ctx context.Context, logPrefix string, tx kv.RwTx, stream *datastreamer.StreamServer, chainId uint64, streamVersion int, hasExecutors bool) (uint64, error) { srv := server.NewDataStreamServer(stream, chainId) reader := hermez_db.NewHermezDbReader(tx) @@ -143,7 +143,7 @@ func CatchupDatastream(logPrefix string, tx kv.RwTx, stream *datastreamer.Stream } } - if err = srv.WriteBlocksToStream(tx, reader, previousProgress+1, finalBlockNumber, logPrefix); err != nil { + if err = srv.WriteBlocksToStreamConsecutively(ctx, logPrefix, tx, reader, previousProgress+1, finalBlockNumber); err != nil { return 0, err } diff --git a/zk/stages/stage_sequence_execute.go b/zk/stages/stage_sequence_execute.go index 6919f3d9784..288fdd43a57 100644 --- a/zk/stages/stage_sequence_execute.go +++ b/zk/stages/stage_sequence_execute.go @@ -74,6 +74,15 @@ func SpawnSequencingStage( getHeader := func(hash common.Hash, number uint64) *types.Header { return rawdb.ReadHeader(sdb.tx, hash, number) } hasExecutorForThisBatch := !isLastBatchPariallyProcessed && cfg.zk.HasExecutors() + // handle case where batch wasn't closed properly + // close it before starting a new one + // this occurs when sequencer was switched from syncer or sequencer datastream files were deleted + // and datastream was regenerated + isLastEntryBatchEnd, err := cfg.datastreamServer.IsLastEntryBatchEnd() + if err != nil { + return err + } + // injected batch if executionAt == 0 { // set the block height for the fork we're running at to ensure contract interactions are correct @@ -93,8 +102,7 @@ func SpawnSequencingStage( return err } - // write the batch directly to the stream - if err = cfg.datastreamServer.WriteBlocksToStream(tx, sdb.hermezDb.HermezDbReader, injectedBatchBlockNumber, injectedBatchBlockNumber, logPrefix); err != nil { + if err = cfg.datastreamServer.WriteWholeBatchToStream(logPrefix, tx, sdb.hermezDb.HermezDbReader, lastBatch, injectedBatchNumber); err != nil { return err } @@ -107,6 +115,23 @@ func SpawnSequencingStage( return nil } + if !isLastBatchPariallyProcessed && !isLastEntryBatchEnd { + log.Warn(fmt.Sprintf("[%s] Last batch %d was not closed properly, closing it now...", logPrefix, lastBatch)) + ler, err := utils.GetBatchLocalExitRootFromSCStorage(lastBatch, sdb.hermezDb.HermezDbReader, tx) + if err != nil { + return err + } + + lastBlock, err := rawdb.ReadBlockByNumber(sdb.tx, executionAt) + if err != nil { + return err + } + root := lastBlock.Root() + if err = cfg.datastreamServer.WriteBatchEnd(sdb.hermezDb, lastBatch, lastBatch-1, &root, &ler); err != nil { + return err + } + } + if err := utils.UpdateZkEVMBlockCfg(cfg.chainConfig, sdb.hermezDb, logPrefix); err != nil { return err } @@ -501,25 +526,22 @@ func SpawnSequencingStage( // because it would be later added twice counters := batchCounters.CombineCollectorsNoChanges(l1InfoIndex != 0) - err = sdb.hermezDb.WriteBatchCounters(thisBatch, counters.UsedAsMap()) - if err != nil { + if err = sdb.hermezDb.WriteBatchCounters(thisBatch, counters.UsedAsMap()); err != nil { return err } - err = sdb.hermezDb.WriteIsBatchPartiallyProcessed(thisBatch) - if err != nil { + if err = sdb.hermezDb.WriteIsBatchPartiallyProcessed(thisBatch); err != nil { return err } - if err = cfg.datastreamServer.WriteBlockToStream(logPrefix, tx, sdb.hermezDb, thisBatch, lastBatch, blockNumber); err != nil { + if err = cfg.datastreamServer.WriteBlockWithBatchStartToStream(logPrefix, tx, sdb.hermezDb, forkId, thisBatch, lastBatch, *parentBlock, *block); err != nil { return err } if err = tx.Commit(); err != nil { return err } - tx, err = cfg.db.BeginRw(ctx) - if err != nil { + if tx, err = cfg.db.BeginRw(ctx); err != nil { return err } // TODO: This creates stacked up deferrals @@ -563,7 +585,7 @@ func SpawnSequencingStage( if !hasExecutorForThisBatch { blockRoot := block.Root() - if err = cfg.datastreamServer.WriteBatchEnd(logPrefix, tx, sdb.hermezDb, thisBatch, lastBatch, &blockRoot, &ler); err != nil { + if err = cfg.datastreamServer.WriteBatchEnd(sdb.hermezDb, thisBatch, lastBatch, &blockRoot, &ler); err != nil { return err } } diff --git a/zk/stages/stage_sequencer_executor_verify.go b/zk/stages/stage_sequencer_executor_verify.go index 564bb6aa5cc..68299035c7d 100644 --- a/zk/stages/stage_sequencer_executor_verify.go +++ b/zk/stages/stage_sequencer_executor_verify.go @@ -175,8 +175,7 @@ func SpawnSequencerExecutorVerifyStage( } l1InfoTreeMinTimestamps := make(map[uint64]uint64) - _, err = cfg.verifier.GetStreamBytes(response.BatchNumber, tx, blockNumbers, hermezDbReader, l1InfoTreeMinTimestamps, nil) - if err != nil { + if _, err = cfg.verifier.GetWholeBatchStreamBytes(response.BatchNumber, tx, blockNumbers, hermezDbReader, l1InfoTreeMinTimestamps, nil); err != nil { return err } @@ -217,7 +216,7 @@ func SpawnSequencerExecutorVerifyStage( senderMapKey := sender.Hex() blocksForStreamBytes, transactionsToIncludeByIndex := limboStreamBytesBuilderHelper.append(senderMapKey, blockNumber, i) - streamBytes, err := cfg.verifier.GetStreamBytes(response.BatchNumber, tx, blocksForStreamBytes, hermezDbReader, l1InfoTreeMinTimestamps, transactionsToIncludeByIndex) + streamBytes, err := cfg.verifier.GetWholeBatchStreamBytes(response.BatchNumber, tx, blocksForStreamBytes, hermezDbReader, l1InfoTreeMinTimestamps, transactionsToIncludeByIndex) if err != nil { return err }