diff --git a/rollup/cmd/permissionless_batches/app/app.go b/rollup/cmd/permissionless_batches/app/app.go new file mode 100644 index 000000000..983248a72 --- /dev/null +++ b/rollup/cmd/permissionless_batches/app/app.go @@ -0,0 +1,335 @@ +package app + +import ( + "context" + "fmt" + "os" + "os/signal" + + "github.com/prometheus/client_golang/prometheus" + "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core" + "github.com/scroll-tech/go-ethereum/ethclient" + "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/l1" + "github.com/urfave/cli/v2" + "gorm.io/gorm" + + "scroll-tech/common/database" + "scroll-tech/common/observability" + "scroll-tech/common/utils" + "scroll-tech/common/version" + "scroll-tech/database/migrate" + "scroll-tech/rollup/internal/config" + "scroll-tech/rollup/internal/controller/watcher" + "scroll-tech/rollup/internal/orm" +) + +var app *cli.App + +func init() { + // Set up rollup-relayer app info. + app = cli.NewApp() + app.Action = action + app.Name = "permissionless-batches" + app.Usage = "The Scroll Rollup Relayer for permissionless batch production" + app.Version = version.Version + app.Flags = append(app.Flags, utils.CommonFlags...) + app.Flags = append(app.Flags, utils.RollupRelayerFlags...) + app.Commands = []*cli.Command{} + app.Before = func(ctx *cli.Context) error { + return utils.LogSetup(ctx) + } +} + +func action(ctx *cli.Context) error { + // Load config file. + cfgFile := ctx.String(utils.ConfigFileFlag.Name) + cfg, err := config.NewConfig(cfgFile) + if err != nil { + log.Crit("failed to load config file", "config file", cfgFile, "error", err) + } + + subCtx, cancel := context.WithCancel(ctx.Context) + defer cancel() + + db, err := initDB(cfg) + if err != nil { + return fmt.Errorf("failed to init db: %w", err) + } + defer func() { + if err = database.CloseDB(db); err != nil { + log.Crit("failed to close db connection", "error", err) + } + }() + + registry := prometheus.DefaultRegisterer + observability.Server(ctx, db) + + genesisPath := ctx.String(utils.Genesis.Name) + genesis, err := utils.ReadGenesis(genesisPath) + if err != nil { + log.Crit("failed to read genesis", "genesis file", genesisPath, "error", err) + } + + chunkProposer := watcher.NewChunkProposer(subCtx, cfg.L2Config.ChunkProposerConfig, genesis.Config, db, registry) + batchProposer := watcher.NewBatchProposer(subCtx, cfg.L2Config.BatchProposerConfig, genesis.Config, db, registry) + //bundleProposer := watcher.NewBundleProposer(subCtx, cfg.L2Config.BundleProposerConfig, genesis.Config, db, registry) + + fmt.Println(cfg.L1Config) + fmt.Println(cfg.L2Config) + fmt.Println(cfg.DBConfig) + fmt.Println(cfg.RecoveryConfig) + + // Restore minimal previous state required to be able to create new chunks, batches and bundles. + latestFinalizedChunk, latestFinalizedBatch, err := restoreMinimalPreviousState(cfg, chunkProposer, batchProposer) + if err != nil { + return fmt.Errorf("failed to restore minimal previous state: %w", err) + } + + // Fetch and insert the missing blocks from the last block in the latestFinalizedBatch to the latest L2 block. + fromBlock := latestFinalizedChunk.EndBlockNumber + 1 + toBlock, err := fetchL2Blocks(subCtx, cfg, genesis, db, registry, fromBlock, cfg.RecoveryConfig.L2BlockHeightLimit) + if err != nil { + return fmt.Errorf("failed to fetch L2 blocks: %w", err) + } + + fmt.Println(latestFinalizedChunk.Index, latestFinalizedBatch.Index, fromBlock, toBlock) + + // Create chunks for L2 blocks. + log.Info("Creating chunks for L2 blocks", "from", fromBlock, "to", toBlock) + + var latestChunk *orm.Chunk + var count int + for { + if err = chunkProposer.ProposeChunk(); err != nil { + return fmt.Errorf("failed to propose chunk: %w", err) + } + count++ + + latestChunk, err = chunkProposer.ChunkORM().GetLatestChunk(subCtx) + if err != nil { + return fmt.Errorf("failed to get latest latestFinalizedChunk: %w", err) + } + + log.Info("Chunk created", "index", latestChunk.Index, "hash", latestChunk.Hash, "StartBlockNumber", latestChunk.StartBlockNumber, "EndBlockNumber", latestChunk.EndBlockNumber, "TotalL1MessagesPoppedBefore", latestChunk.TotalL1MessagesPoppedBefore) + + // We have created chunks for all available L2 blocks. + if latestChunk.EndBlockNumber >= toBlock { + break + } + } + + log.Info("Chunks created", "count", count, "latest latestFinalizedChunk", latestChunk.Index, "hash", latestChunk.Hash, "StartBlockNumber", latestChunk.StartBlockNumber, "EndBlockNumber", latestChunk.EndBlockNumber, "TotalL1MessagesPoppedBefore", latestChunk.TotalL1MessagesPoppedBefore) + + // Create batch for the created chunks. We only allow 1 batch it needs to be submitted (and finalized) with a proof in a single step. + log.Info("Creating batch for chunks", "from", latestFinalizedChunk.Index+1, "to", latestChunk.Index) + + batchProposer.TryProposeBatch() + latestBatch, err := batchProposer.BatchORM().GetLatestBatch(subCtx) + if err != nil { + return fmt.Errorf("failed to get latest latestFinalizedBatch: %w", err) + } + + if latestBatch.EndChunkIndex != latestChunk.Index { + return fmt.Errorf("latest chunk in produced batch %d != %d, too many L2 blocks - specify less L2 blocks and retry again", latestBatch.EndChunkIndex, latestChunk.Index) + } + + log.Info("Batch created", "index", latestBatch.Index, "hash", latestBatch.Hash, "StartChunkIndex", latestBatch.StartChunkIndex, "EndChunkIndex", latestBatch.EndChunkIndex) + + // Catch CTRL-C to ensure a graceful shutdown. + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + // Wait until the interrupt signal is received from an OS signal. + <-interrupt + + return nil +} + +// Run rollup relayer cmd instance. +func Run() { + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func initDB(cfg *config.Config) (*gorm.DB, error) { + // init db connection + db, err := database.InitDB(cfg.DBConfig) + if err != nil { + log.Crit("failed to init db connection", "err", err) + } + + // make sure we are starting from a fresh DB + sqlDB, err := db.DB() + if err != nil { + return nil, fmt.Errorf("failed ") + } + + // reset and init DB + var v int64 + err = migrate.Rollback(sqlDB, &v) + if err != nil { + return nil, fmt.Errorf("failed to rollback db: %w", err) + } + + err = migrate.Migrate(sqlDB) + if err != nil { + return nil, fmt.Errorf("failed to migrate db: %w", err) + } + + return db, nil +} + +func fetchL2Blocks(ctx context.Context, cfg *config.Config, genesis *core.Genesis, db *gorm.DB, registry prometheus.Registerer, fromBlock uint64, l2BlockHeightLimit uint64) (uint64, error) { + if l2BlockHeightLimit > 0 && fromBlock > l2BlockHeightLimit { + return 0, fmt.Errorf("fromBlock (latest finalized L2 block) is higher than specified L2BlockHeightLimit: %d > %d", fromBlock, l2BlockHeightLimit) + } + + log.Info("Fetching L2 blocks with", "fromBlock", fromBlock, "l2BlockHeightLimit", l2BlockHeightLimit) + + // Init l2geth connection + l2client, err := ethclient.Dial(cfg.L2Config.Endpoint) + if err != nil { + return 0, fmt.Errorf("failed to connect to L2geth at RPC=%s: %w", cfg.L2Config.Endpoint, err) + } + + l2Watcher := watcher.NewL2WatcherClient(ctx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry) + + // Fetch and insert the missing blocks from the last block in the batch to the latest L2 block. + latestL2Block, err := l2Watcher.Client.BlockNumber(context.Background()) + if err != nil { + return 0, fmt.Errorf("failed to get latest L2 block number: %w", err) + } + + log.Info("Latest L2 block number", "latest L2 block", latestL2Block) + + if l2BlockHeightLimit > latestL2Block { + return 0, fmt.Errorf("l2BlockHeightLimit is higher than the latest L2 block number, not all blocks are available in L2geth: %d > %d", l2BlockHeightLimit, latestL2Block) + } + + toBlock := latestL2Block + if l2BlockHeightLimit > 0 { + toBlock = l2BlockHeightLimit + } + + err = l2Watcher.GetAndStoreBlocks(context.Background(), fromBlock, toBlock) + if err != nil { + return 0, fmt.Errorf("failed to get and store blocks: %w", err) + } + + log.Info("Fetched L2 blocks from", "fromBlock", fromBlock, "toBlock", toBlock) + + return toBlock, nil +} + +// restoreMinimalPreviousState restores the minimal previous state required to be able to create new chunks, batches and bundles. +func restoreMinimalPreviousState(cfg *config.Config, chunkProposer *watcher.ChunkProposer, batchProposer *watcher.BatchProposer) (*orm.Chunk, *orm.Batch, error) { + log.Info("Restoring previous state with", "L1 block height", cfg.RecoveryConfig.L1BlockHeight, "latest finalized batch", cfg.RecoveryConfig.LatestFinalizedBatch) + + // TODO: make these parameters -> part of genesis config? + scrollChainAddress := common.HexToAddress("0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0") + l1MessageQueueAddress := common.HexToAddress("0xF0B2293F5D834eAe920c6974D50957A1732de763") + + l1Client, err := ethclient.Dial(cfg.L1Config.Endpoint) + if err != nil { + return nil, nil, fmt.Errorf("failed to connect to L1 client: %w", err) + } + reader, err := l1.NewReader(context.Background(), l1.Config{ + ScrollChainAddress: scrollChainAddress, + L1MessageQueueAddress: l1MessageQueueAddress, + }, l1Client) + if err != nil { + return nil, nil, fmt.Errorf("failed to create L1 reader: %w", err) + } + + // 1. Sanity check user input: Make sure that the user's L1 block height is not higher than the latest finalized block number. + latestFinalizedL1Block, err := reader.GetLatestFinalizedBlockNumber() + if err != nil { + return nil, nil, fmt.Errorf("failed to get latest finalized L1 block number: %w", err) + } + if cfg.RecoveryConfig.L1BlockHeight > latestFinalizedL1Block { + return nil, nil, fmt.Errorf("specified L1 block height is higher than the latest finalized block number: %d > %d", cfg.RecoveryConfig.L1BlockHeight, latestFinalizedL1Block) + } + + log.Info("Latest finalized L1 block number", "latest finalized L1 block", latestFinalizedL1Block) + + // 2. Make sure that the specified batch is indeed finalized on the L1 rollup contract and is the latest finalized batch. + // TODO: enable check + //latestFinalizedBatch, err := reader.LatestFinalizedBatch(latestFinalizedL1Block) + //if cfg.RecoveryConfig.LatestFinalizedBatch != latestFinalizedBatch { + // return nil, nil, fmt.Errorf("batch %d is not the latest finalized batch: %d", cfg.RecoveryConfig.LatestFinalizedBatch, latestFinalizedBatch) + //} + + var batchCommitEvent *l1.CommitBatchEvent + err = reader.FetchRollupEventsInRangeWithCallback(cfg.RecoveryConfig.L1BlockHeight, latestFinalizedL1Block, func(event l1.RollupEvent) bool { + if event.Type() == l1.CommitEventType && event.BatchIndex().Uint64() == cfg.RecoveryConfig.LatestFinalizedBatch { + batchCommitEvent = event.(*l1.CommitBatchEvent) + return false + } + + return true + }) + if batchCommitEvent == nil { + return nil, nil, fmt.Errorf("commit event not found for batch %d", cfg.RecoveryConfig.LatestFinalizedBatch) + } + + log.Info("Found commit event for batch", "batch", batchCommitEvent.BatchIndex(), "hash", batchCommitEvent.BatchHash(), "L1 block height", batchCommitEvent.BlockNumber(), "L1 tx hash", batchCommitEvent.TxHash()) + + // 3. Fetch commit tx data for latest finalized batch. + args, err := reader.FetchCommitTxData(batchCommitEvent) + if err != nil { + return nil, nil, fmt.Errorf("failed to fetch commit tx data: %w", err) + } + + codec, err := encoding.CodecFromVersion(encoding.CodecVersion(args.Version)) + if err != nil { + return nil, nil, fmt.Errorf("failed to get codec: %w", err) + } + + daChunksRawTxs, err := codec.DecodeDAChunksRawTx(args.Chunks) + if err != nil { + return nil, nil, fmt.Errorf("failed to decode DA chunks: %w", err) + } + lastChunk := daChunksRawTxs[len(daChunksRawTxs)-1] + lastBlockInBatch := lastChunk.Blocks[len(lastChunk.Blocks)-1].Number() + + log.Info("Last L2 block in batch", "batch", batchCommitEvent.BatchIndex(), "L2 block", lastBlockInBatch) + + // 4. Get the L1 messages count after the latest finalized batch. + l1MessagesCount, err := reader.FinalizedL1MessageQueueIndex(latestFinalizedL1Block) + if err != nil { + return nil, nil, fmt.Errorf("failed to get L1 messages count: %w", err) + } + // TODO: remove this. only for testing + l1MessagesCount = 220853 + + log.Info("L1 messages count after latest finalized batch", "batch", batchCommitEvent.BatchIndex(), "count", l1MessagesCount) + + // 5. Insert minimal state to DB. + chunk, err := chunkProposer.ChunkORM().InsertChunkRaw(context.Background(), codec.Version(), lastChunk, l1MessagesCount) + if err != nil { + return nil, nil, fmt.Errorf("failed to insert chunk raw: %w", err) + } + + log.Info("Inserted last finalized chunk to DB", "chunk", chunk.Index, "hash", chunk.Hash, "StartBlockNumber", chunk.StartBlockNumber, "EndBlockNumber", chunk.EndBlockNumber, "TotalL1MessagesPoppedBefore", chunk.TotalL1MessagesPoppedBefore) + + batch, err := batchProposer.BatchORM().InsertBatchRaw(context.Background(), batchCommitEvent.BatchIndex(), batchCommitEvent.BatchHash(), codec.Version(), chunk) + if err != nil { + return nil, nil, fmt.Errorf("failed to insert batch raw: %w", err) + } + + log.Info("Inserted last finalized batch to DB", "batch", batch.Index, "hash", batch.Hash) + + return chunk, batch, nil +} + +//docker run --rm -it \ +// -e POSTGRES_HOST_AUTH_METHOD=trust \ +// -e POSTGRES_DB=scroll \ +// -v ${PWD}/db_data:/var/lib/postgresql/data \ +// -p 5432:5432 \ +// postgres diff --git a/rollup/cmd/permissionless_batches/main.go b/rollup/cmd/permissionless_batches/main.go new file mode 100644 index 000000000..95e157492 --- /dev/null +++ b/rollup/cmd/permissionless_batches/main.go @@ -0,0 +1,7 @@ +package main + +import "scroll-tech/rollup/cmd/permissionless_batches/app" + +func main() { + app.Run() +} diff --git a/rollup/cmd/rollup_relayer/app/app.go b/rollup/cmd/rollup_relayer/app/app.go index 939d4b779..bd4a81a2f 100644 --- a/rollup/cmd/rollup_relayer/app/app.go +++ b/rollup/cmd/rollup_relayer/app/app.go @@ -8,18 +8,23 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/ethclient" "github.com/scroll-tech/go-ethereum/log" + "github.com/scroll-tech/go-ethereum/rollup/l1" "github.com/urfave/cli/v2" + "gorm.io/gorm" "scroll-tech/common/database" "scroll-tech/common/observability" + "scroll-tech/common/types" "scroll-tech/common/utils" "scroll-tech/common/version" - "scroll-tech/rollup/internal/config" "scroll-tech/rollup/internal/controller/relayer" "scroll-tech/rollup/internal/controller/watcher" + "scroll-tech/rollup/internal/orm" butils "scroll-tech/rollup/internal/utils" ) @@ -90,6 +95,16 @@ func action(ctx *cli.Context) error { l2watcher := watcher.NewL2WatcherClient(subCtx, l2client, cfg.L2Config.Confirmations, cfg.L2Config.L2MessageQueueAddress, cfg.L2Config.WithdrawTrieRootSlot, genesis.Config, db, registry) + if cfg.RecoveryConfig.Enable { + log.Info("Starting rollup-relayer in recovery mode", "version", version.Version) + + if err = restoreFullPreviousState(cfg, db, chunkProposer, batchProposer, bundleProposer, l2watcher); err != nil { + log.Crit("failed to restore full previous state", "error", err) + } + + return nil + } + // Watcher loop to fetch missing blocks go utils.LoopWithContext(subCtx, 2*time.Second, func(ctx context.Context) { number, loopErr := butils.GetLatestConfirmedBlockNumber(ctx, l2client, cfg.L2Config.Confirmations) @@ -97,7 +112,8 @@ func action(ctx *cli.Context) error { log.Error("failed to get block number", "err", loopErr) return } - l2watcher.TryFetchRunningMissingBlocks(number) + // errors are logged in the try method as well + _ = l2watcher.TryFetchRunningMissingBlocks(number) }) go utils.Loop(subCtx, time.Duration(cfg.L2Config.ChunkProposerConfig.ProposeIntervalMilliseconds)*time.Millisecond, chunkProposer.TryProposeChunk) @@ -132,3 +148,319 @@ func Run() { os.Exit(1) } } + +func restoreFullPreviousState(cfg *config.Config, db *gorm.DB, chunkProposer *watcher.ChunkProposer, batchProposer *watcher.BatchProposer, bundleProposer *watcher.BundleProposer, l2Watcher *watcher.L2WatcherClient) error { + log.Info("Restoring full previous state with", "L1 block height", cfg.RecoveryConfig.L1BlockHeight, "latest finalized batch", cfg.RecoveryConfig.LatestFinalizedBatch) + + // DB state should be clean: the latest batch in the DB should be finalized on L1. This function will + // restore all batches between the latest finalized batch in the DB and the latest finalized batch on L1. + + // 1. Get latest finalized batch stored in DB + latestDBBatch, err := batchProposer.BatchORM().GetLatestBatch(context.Background()) + if err != nil { + return fmt.Errorf("failed to get latest batch from DB: %w", err) + } + fmt.Println("latestDBBatch", latestDBBatch) + + // TODO: + // 1. what if it is a fresh start? -> latest batch is nil + //latestDBBatch.CommitTxHash + + log.Info("Latest finalized batch in DB", "batch", latestDBBatch.Index, "hash", latestDBBatch.Hash) + + // TODO: make these parameters -> part of genesis config? + scrollChainAddress := common.HexToAddress("0x2D567EcE699Eabe5afCd141eDB7A4f2D0D6ce8a0") + l1MessageQueueAddress := common.HexToAddress("0xF0B2293F5D834eAe920c6974D50957A1732de763") + + l1Client, err := ethclient.Dial(cfg.L1Config.Endpoint) + if err != nil { + return fmt.Errorf("failed to connect to L1 client: %w", err) + } + reader, err := l1.NewReader(context.Background(), l1.Config{ + ScrollChainAddress: scrollChainAddress, + L1MessageQueueAddress: l1MessageQueueAddress, + }, l1Client) + if err != nil { + return fmt.Errorf("failed to create L1 reader: %w", err) + } + + // 2. Get latest finalized L1 block + latestFinalizedL1Block, err := reader.GetLatestFinalizedBlockNumber() + if err != nil { + return fmt.Errorf("failed to get latest finalized L1 block number: %w", err) + } + + log.Info("Latest finalized L1 block number", "latest finalized L1 block", latestFinalizedL1Block) + + // 3. Get latest finalized batch from contract (at latest finalized L1 block) + latestFinalizedBatch, err := reader.LatestFinalizedBatch(latestFinalizedL1Block) + if err != nil { + return fmt.Errorf("failed to get latest finalized batch: %w", err) + } + + log.Info("Latest finalized batch from L1 contract", "latest finalized batch", latestFinalizedBatch, "at latest finalized L1 block", latestFinalizedL1Block) + + // 4. Get batches one by one from stored in DB to latest finalized batch. + receipt, err := l1Client.TransactionReceipt(context.Background(), common.HexToHash(latestDBBatch.CommitTxHash)) + if err != nil { + return fmt.Errorf("failed to get transaction receipt of latest DB batch finalization transaction: %w", err) + } + fromBlock := receipt.BlockNumber.Uint64() + + log.Info("Fetching rollup events from L1", "from block", fromBlock, "to block", latestFinalizedL1Block, "from batch", latestDBBatch.Index, "to batch", latestFinalizedBatch) + + commitsHeapMap := common.NewHeapMap[uint64, *l1.CommitBatchEvent](func(event *l1.CommitBatchEvent) uint64 { + return event.BatchIndex().Uint64() + }) + batchEventsHeap := common.NewHeap[*batchEvents]() + var bundles [][]*batchEvents + + err = reader.FetchRollupEventsInRangeWithCallback(fromBlock, latestFinalizedL1Block, func(event l1.RollupEvent) bool { + // We're only interested in batches that are newer than the latest finalized batch in the DB. + if event.BatchIndex().Uint64() <= latestDBBatch.Index { + return true + } + + switch event.Type() { + case l1.CommitEventType: + commitEvent := event.(*l1.CommitBatchEvent) + commitsHeapMap.Push(commitEvent) + + case l1.FinalizeEventType: + finalizeEvent := event.(*l1.FinalizeBatchEvent) + + var bundle []*batchEvents + + // with bundles all commited batches until this finalized batch are finalized in the same bundle + for commitsHeapMap.Len() > 0 { + commitEvent := commitsHeapMap.Peek() + if commitEvent.BatchIndex().Uint64() > finalizeEvent.BatchIndex().Uint64() { + break + } + + bEvents := newBatchEvents(commitEvent, finalizeEvent) + commitsHeapMap.Pop() + batchEventsHeap.Push(bEvents) + bundle = append(bundle, bEvents) + } + + bundles = append(bundles, bundle) + + // Stop fetching rollup events if we reached the latest finalized batch. + if finalizeEvent.BatchIndex().Uint64() >= latestFinalizedBatch { + return false + } + + case l1.RevertEventType: + // We ignore reverted batches. + commitsHeapMap.RemoveByKey(event.BatchIndex().Uint64()) + } + + return true + }) + if err != nil { + return fmt.Errorf("failed to fetch rollup events: %w", err) + } + + // 5. Process all finalized batches: fetch L2 blocks and reproduce chunks and batches. + for batchEventsHeap.Len() > 0 { + nextBatch := batchEventsHeap.Pop().Value() + fmt.Println("nextBatch", nextBatch.commit.BatchIndex(), nextBatch.commit.BatchHash(), nextBatch.finalize.BatchIndex(), nextBatch.finalize.BatchHash()) + if err = processFinalizedBatch(db, reader, nextBatch, chunkProposer, batchProposer, l2Watcher); err != nil { + return fmt.Errorf("failed to process finalized batch %d %s: %w", nextBatch.commit.BatchIndex(), nextBatch.commit.BatchHash(), err) + } + + log.Info("Processed finalized batch", "batch", nextBatch.commit.BatchIndex(), "hash", nextBatch.commit.BatchHash()) + } + + // 6. Create bundles if needed. + for _, bundle := range bundles { + var dbBatches []*orm.Batch + var lastBatchInBundle *orm.Batch + + for _, batch := range bundle { + dbBatch, err := batchProposer.BatchORM().GetBatchByIndex(context.Background(), batch.commit.BatchIndex().Uint64()) + if err != nil { + return fmt.Errorf("failed to get batch by index for bundle generation: %w", err) + } + // Bundles are only supported for codec version 3 and above. + if encoding.CodecVersion(dbBatch.CodecVersion) < encoding.CodecV3 { + break + } + + dbBatches = append(dbBatches, dbBatch) + lastBatchInBundle = dbBatch + } + + if len(dbBatches) == 0 { + continue + } + + err = db.Transaction(func(dbTX *gorm.DB) error { + newBundle, err := bundleProposer.BundleORM().InsertBundle(context.Background(), dbBatches, encoding.CodecVersion(lastBatchInBundle.CodecVersion), dbTX) + if err != nil { + return fmt.Errorf("failed to insert bundle to DB: %w", err) + } + if err = batchProposer.BatchORM().UpdateBundleHashInRange(context.Background(), newBundle.StartBatchIndex, newBundle.EndBatchIndex, newBundle.Hash, dbTX); err != nil { + return fmt.Errorf("failed to update bundle_hash %s for batches (%d to %d): %w", newBundle.Hash, newBundle.StartBatchIndex, newBundle.EndBatchIndex, err) + } + + if err = bundleProposer.BundleORM().UpdateFinalizeTxHashAndRollupStatus(context.Background(), newBundle.Hash, lastBatchInBundle.FinalizeTxHash, types.RollupFinalized, dbTX); err != nil { + return fmt.Errorf("failed to update finalize tx hash and rollup status for bundle %s: %w", newBundle.Hash, err) + } + + if err = bundleProposer.BundleORM().UpdateProvingStatus(context.Background(), newBundle.Hash, types.ProvingTaskVerified, dbTX); err != nil { + return fmt.Errorf("failed to update proving status for bundle %s: %w", newBundle.Hash, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to insert bundle in DB transaction: %w", err) + } + + fmt.Println("bundle", len(bundle), bundle[0].commit.BatchIndex()) + } + + return nil +} + +func processFinalizedBatch(db *gorm.DB, reader *l1.Reader, nextBatch *batchEvents, chunkProposer *watcher.ChunkProposer, batchProposer *watcher.BatchProposer, l2Watcher *watcher.L2WatcherClient) error { + log.Info("Processing finalized batch", "batch", nextBatch.commit.BatchIndex(), "hash", nextBatch.commit.BatchHash()) + + // 5.1. Fetch commit tx data for batch (via commit event). + args, err := reader.FetchCommitTxData(nextBatch.commit) + if err != nil { + return fmt.Errorf("failed to fetch commit tx data: %w", err) + } + + codec, err := encoding.CodecFromVersion(encoding.CodecVersion(args.Version)) + if err != nil { + return fmt.Errorf("failed to get codec: %w", err) + } + + daChunksRawTxs, err := codec.DecodeDAChunksRawTx(args.Chunks) + if err != nil { + return fmt.Errorf("failed to decode DA chunks: %w", err) + } + lastChunk := daChunksRawTxs[len(daChunksRawTxs)-1] + lastBlockInBatch := lastChunk.Blocks[len(lastChunk.Blocks)-1].Number() + + log.Info("Fetching L2 blocks from l2geth", "batch", nextBatch.commit.BatchIndex(), "last L2 block in batch", lastBlockInBatch) + + // 5.2. Fetch L2 blocks for the entire batch. + if err = l2Watcher.TryFetchRunningMissingBlocks(lastBlockInBatch); err != nil { + return fmt.Errorf("failed to fetch L2 blocks: %w", err) + } + + // 5.3. Reproduce chunks. + daChunks := make([]*encoding.Chunk, 0, len(daChunksRawTxs)) + dbChunks := make([]*orm.Chunk, 0, len(daChunksRawTxs)) + for _, daChunkRawTxs := range daChunksRawTxs { + start := daChunkRawTxs.Blocks[0].Number() + end := daChunkRawTxs.Blocks[len(daChunkRawTxs.Blocks)-1].Number() + + blocks, err := l2Watcher.BlockORM().GetL2BlocksInRange(context.Background(), start, end) + if err != nil { + return fmt.Errorf("failed to get L2 blocks in range: %w", err) + } + + log.Info("Reproducing chunk", "start block", start, "end block", end) + + var chunk encoding.Chunk + for _, block := range blocks { + chunk.Blocks = append(chunk.Blocks, block) + } + + metrics, err := butils.CalculateChunkMetrics(&chunk, codec.Version()) + if err != nil { + return fmt.Errorf("failed to calculate chunk metrics: %w", err) + } + + err = db.Transaction(func(dbTX *gorm.DB) error { + dbChunk, err := chunkProposer.ChunkORM().InsertChunk(context.Background(), &chunk, codec.Version(), *metrics, dbTX) + if err != nil { + return fmt.Errorf("failed to insert chunk to DB: %w", err) + } + if err := l2Watcher.BlockORM().UpdateChunkHashInRange(context.Background(), dbChunk.StartBlockNumber, dbChunk.EndBlockNumber, dbChunk.Hash, dbTX); err != nil { + return fmt.Errorf("failed to update chunk_hash for l2_blocks (chunk hash: %s, start block: %d, end block: %d): %w", dbChunk.Hash, dbChunk.StartBlockNumber, dbChunk.EndBlockNumber, err) + } + + if err = chunkProposer.ChunkORM().UpdateProvingStatus(context.Background(), dbChunk.Hash, types.ProvingTaskVerified, dbTX); err != nil { + return fmt.Errorf("failed to update proving status for chunk %s: %w", dbChunk.Hash, err) + } + + daChunks = append(daChunks, &chunk) + dbChunks = append(dbChunks, dbChunk) + + return nil + }) + if err != nil { + return fmt.Errorf("failed to insert chunk in DB transaction: %w", err) + } + } + + // 5.4 Reproduce batch. + dbParentBatch, err := batchProposer.BatchORM().GetLatestBatch(context.Background()) + if err != nil { + return fmt.Errorf("failed to get latest batch from DB: %w", err) + } + + var batch encoding.Batch + batch.Index = dbParentBatch.Index + 1 + batch.ParentBatchHash = common.HexToHash(dbParentBatch.Hash) + batch.TotalL1MessagePoppedBefore = dbChunks[0].TotalL1MessagesPoppedBefore + + for _, chunk := range daChunks { + batch.Chunks = append(batch.Chunks, chunk) + } + + metrics, err := butils.CalculateBatchMetrics(&batch, codec.Version()) + if err != nil { + return fmt.Errorf("failed to calculate batch metrics: %w", err) + } + + err = db.Transaction(func(dbTX *gorm.DB) error { + dbBatch, err := batchProposer.BatchORM().InsertBatch(context.Background(), &batch, codec.Version(), *metrics, dbTX) + if err != nil { + return fmt.Errorf("failed to insert batch to DB: %w", err) + } + if err = chunkProposer.ChunkORM().UpdateBatchHashInRange(context.Background(), dbBatch.StartChunkIndex, dbBatch.EndChunkIndex, dbBatch.Hash, dbTX); err != nil { + return fmt.Errorf("failed to update batch_hash for chunks (batch hash: %s, start chunk: %d, end chunk: %d): %w", dbBatch.Hash, dbBatch.StartChunkIndex, dbBatch.EndChunkIndex, err) + } + + if err = batchProposer.BatchORM().UpdateProvingStatus(context.Background(), dbBatch.Hash, types.ProvingTaskVerified, dbTX); err != nil { + return fmt.Errorf("failed to update proving status for batch %s: %w", dbBatch.Hash, err) + } + if err = batchProposer.BatchORM().UpdateRollupStatusCommitAndFinalizeTxHash(context.Background(), dbBatch.Hash, types.RollupFinalized, nextBatch.commit.TxHash().Hex(), nextBatch.finalize.TxHash().Hex(), dbTX); err != nil { + return fmt.Errorf("failed to update rollup status for batch %s: %w", dbBatch.Hash, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to insert batch in DB transaction: %w", err) + } + + return nil +} + +type batchEvents struct { + commit *l1.CommitBatchEvent + finalize *l1.FinalizeBatchEvent +} + +func newBatchEvents(commit *l1.CommitBatchEvent, finalize *l1.FinalizeBatchEvent) *batchEvents { + if commit.BatchIndex().Uint64() != finalize.BatchIndex().Uint64() { + panic("commit and finalize batch indexes do not match") + } + + return &batchEvents{ + commit: commit, + finalize: finalize, + } +} + +func (e *batchEvents) CompareTo(other *batchEvents) int { + return e.commit.BatchIndex().Cmp(other.commit.BatchIndex()) +} diff --git a/rollup/go.mod b/rollup/go.mod index 757c04482..b48023d85 100644 --- a/rollup/go.mod +++ b/rollup/go.mod @@ -12,7 +12,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/prometheus/client_golang v1.16.0 github.com/scroll-tech/da-codec v0.1.2 - github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d + github.com/scroll-tech/go-ethereum v1.10.24-0.20241024005353-591534c3790d github.com/smartystreets/goconvey v1.8.0 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 diff --git a/rollup/go.sum b/rollup/go.sum index 6702af857..937aa61c6 100644 --- a/rollup/go.sum +++ b/rollup/go.sum @@ -269,6 +269,8 @@ github.com/scroll-tech/da-codec v0.1.2 h1:QyJ+dQ4zWVVJwuqxNt4MiKyrymVc6rHe4YPtUR github.com/scroll-tech/da-codec v0.1.2/go.mod h1:odz1ck3umvYccCG03osaQBISAYGinZktZYbpk94fYRE= github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d h1:vuv7fGKEDtoeetI6RkKt8RAByJsYZBWk9Vo6gShv65c= github.com/scroll-tech/go-ethereum v1.10.14-0.20241023093931-91c2f9c27f4d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM= +github.com/scroll-tech/go-ethereum v1.10.24-0.20241024005353-591534c3790d h1:ZxRzFWs1VvAGqkeUyjgkEkUHyWlGUHXibqCpLr81KZI= +github.com/scroll-tech/go-ethereum v1.10.24-0.20241024005353-591534c3790d/go.mod h1:PWEOTg6LeWlJAlFJauO0msSLXWnpHmE+mVh5txtfeRM= github.com/scroll-tech/zktrie v0.8.4 h1:UagmnZ4Z3ITCk+aUq9NQZJNAwnWl4gSxsLb2Nl7IgRE= github.com/scroll-tech/zktrie v0.8.4/go.mod h1:XvNo7vAk8yxNyTjBDj5WIiFzYW4bx/gJ78+NK6Zn6Uk= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= diff --git a/rollup/internal/config/config.go b/rollup/internal/config/config.go index 725d1e492..93fd7e88e 100644 --- a/rollup/internal/config/config.go +++ b/rollup/internal/config/config.go @@ -3,9 +3,10 @@ package config import ( "fmt" "reflect" - "scroll-tech/common/database" "strings" + "scroll-tech/common/database" + "github.com/mitchellh/mapstructure" "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/rpc" @@ -14,9 +15,10 @@ import ( // Config load configuration items. type Config struct { - L1Config *L1Config `json:"l1_config"` - L2Config *L2Config `json:"l2_config"` - DBConfig *database.Config `json:"db_config"` + L1Config *L1Config `json:"l1_config"` + L2Config *L2Config `json:"l2_config"` + DBConfig *database.Config `json:"db_config"` + RecoveryConfig *RecoveryConfig `json:"recovery_config"` } // NewConfig returns a new instance of Config. diff --git a/rollup/internal/config/recovery.go b/rollup/internal/config/recovery.go new file mode 100644 index 000000000..4ed9c0a85 --- /dev/null +++ b/rollup/internal/config/recovery.go @@ -0,0 +1,11 @@ +package config + +// L1Config loads l1eth configuration items. +type RecoveryConfig struct { + Enable bool `json:"enable"` + + LatestFinalizedBatch uint64 `json:"latest_finalized_batch"` + L1BlockHeight uint64 `json:"l1_block_height"` + + L2BlockHeightLimit uint64 `json:"l2_block_height_limit"` +} diff --git a/rollup/internal/controller/watcher/batch_proposer.go b/rollup/internal/controller/watcher/batch_proposer.go index ec25c8f24..e565d88f3 100644 --- a/rollup/internal/controller/watcher/batch_proposer.go +++ b/rollup/internal/controller/watcher/batch_proposer.go @@ -149,6 +149,10 @@ func NewBatchProposer(ctx context.Context, cfg *config.BatchProposerConfig, chai return p } +func (p *BatchProposer) BatchORM() *orm.Batch { + return p.batchOrm +} + // TryProposeBatch tries to propose a new batches. func (p *BatchProposer) TryProposeBatch() { p.batchProposerCircleTotal.Inc() diff --git a/rollup/internal/controller/watcher/bundle_proposer.go b/rollup/internal/controller/watcher/bundle_proposer.go index 686ad580c..ca53da390 100644 --- a/rollup/internal/controller/watcher/bundle_proposer.go +++ b/rollup/internal/controller/watcher/bundle_proposer.go @@ -86,6 +86,10 @@ func NewBundleProposer(ctx context.Context, cfg *config.BundleProposerConfig, ch return p } +func (p *BundleProposer) BundleORM() *orm.Bundle { + return p.bundleOrm +} + // TryProposeBundle tries to propose a new bundle. func (p *BundleProposer) TryProposeBundle() { p.bundleProposerCircleTotal.Inc() diff --git a/rollup/internal/controller/watcher/chunk_proposer.go b/rollup/internal/controller/watcher/chunk_proposer.go index 99eaca9d4..03cd3c25f 100644 --- a/rollup/internal/controller/watcher/chunk_proposer.go +++ b/rollup/internal/controller/watcher/chunk_proposer.go @@ -167,13 +167,17 @@ func NewChunkProposer(ctx context.Context, cfg *config.ChunkProposerConfig, chai // TryProposeChunk tries to propose a new chunk. func (p *ChunkProposer) TryProposeChunk() { p.chunkProposerCircleTotal.Inc() - if err := p.proposeChunk(); err != nil { + if err := p.ProposeChunk(); err != nil { p.proposeChunkFailureTotal.Inc() log.Error("propose new chunk failed", "err", err) return } } +func (p *ChunkProposer) ChunkORM() *orm.Chunk { + return p.chunkOrm +} + func (p *ChunkProposer) updateDBChunkInfo(chunk *encoding.Chunk, codecVersion encoding.CodecVersion, metrics *utils.ChunkMetrics) error { if chunk == nil { return nil @@ -244,7 +248,7 @@ func (p *ChunkProposer) updateDBChunkInfo(chunk *encoding.Chunk, codecVersion en return nil } -func (p *ChunkProposer) proposeChunk() error { +func (p *ChunkProposer) ProposeChunk() error { // unchunkedBlockHeight >= 1, assuming genesis batch with chunk 0, block 0 is committed. unchunkedBlockHeight, err := p.chunkOrm.GetUnchunkedBlockHeight(p.ctx) if err != nil { diff --git a/rollup/internal/controller/watcher/l2_watcher.go b/rollup/internal/controller/watcher/l2_watcher.go index 3a8283acd..2ceba4d04 100644 --- a/rollup/internal/controller/watcher/l2_watcher.go +++ b/rollup/internal/controller/watcher/l2_watcher.go @@ -60,13 +60,17 @@ func NewL2WatcherClient(ctx context.Context, client *ethclient.Client, confirmat const blocksFetchLimit = uint64(10) +func (w *L2WatcherClient) BlockORM() *orm.L2Block { + return w.l2BlockOrm +} + // TryFetchRunningMissingBlocks attempts to fetch and store block traces for any missing blocks. -func (w *L2WatcherClient) TryFetchRunningMissingBlocks(blockHeight uint64) { +func (w *L2WatcherClient) TryFetchRunningMissingBlocks(blockHeight uint64) error { w.metrics.fetchRunningMissingBlocksTotal.Inc() heightInDB, err := w.l2BlockOrm.GetL2BlocksLatestHeight(w.ctx) if err != nil { log.Error("failed to GetL2BlocksLatestHeight", "err", err) - return + return fmt.Errorf("failed to GetL2BlocksLatestHeight: %w", err) } // Fetch and store block traces for missing blocks @@ -77,13 +81,15 @@ func (w *L2WatcherClient) TryFetchRunningMissingBlocks(blockHeight uint64) { to = blockHeight } - if err = w.getAndStoreBlocks(w.ctx, from, to); err != nil { + if err = w.GetAndStoreBlocks(w.ctx, from, to); err != nil { log.Error("fail to getAndStoreBlockTraces", "from", from, "to", to, "err", err) - return + return fmt.Errorf("fail to getAndStoreBlockTraces: %w", err) } w.metrics.fetchRunningMissingBlocksHeight.Set(float64(to)) w.metrics.rollupL2BlocksFetchedGap.Set(float64(blockHeight - to)) } + + return nil } func txsToTxsData(txs gethTypes.Transactions) []*gethTypes.TransactionData { @@ -122,7 +128,7 @@ func txsToTxsData(txs gethTypes.Transactions) []*gethTypes.TransactionData { return txsData } -func (w *L2WatcherClient) getAndStoreBlocks(ctx context.Context, from, to uint64) error { +func (w *L2WatcherClient) GetAndStoreBlocks(ctx context.Context, from, to uint64) error { var blocks []*encoding.Block for number := from; number <= to; number++ { log.Debug("retrieving block", "height", number) diff --git a/rollup/internal/orm/batch.go b/rollup/internal/orm/batch.go index 6f909f296..c7f5b3cf5 100644 --- a/rollup/internal/orm/batch.go +++ b/rollup/internal/orm/batch.go @@ -5,9 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "math/big" "time" "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" @@ -324,6 +326,53 @@ func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVer return &newBatch, nil } +func (o *Batch) InsertBatchRaw(ctx context.Context, batchIndex *big.Int, batchHash common.Hash, codecVersion encoding.CodecVersion, chunk *Chunk) (*Batch, error) { + now := time.Now() + newBatch := &Batch{ + Index: batchIndex.Uint64(), + Hash: batchHash.Hex(), + DataHash: "", + StartChunkIndex: chunk.Index, + StartChunkHash: chunk.Hash, + EndChunkIndex: chunk.Index, + EndChunkHash: chunk.Hash, + StateRoot: "", + WithdrawRoot: "", + ParentBatchHash: "", + BatchHeader: []byte{1, 2, 3}, + CodecVersion: int16(codecVersion), + EnableCompress: false, + BlobBytes: nil, + ChunkProofsStatus: 0, + ProvingStatus: int16(types.ProvingTaskVerified), + Proof: nil, + ProverAssignedAt: nil, + ProvedAt: &now, + ProofTimeSec: 0, + RollupStatus: 0, + CommitTxHash: "", + CommittedAt: nil, + FinalizeTxHash: "", + FinalizedAt: &now, + OracleStatus: 0, + OracleTxHash: "", + BlobDataProof: nil, + BlobSize: 0, + BundleHash: "", + TotalL1CommitGas: 0, + TotalL1CommitCalldataSize: 0, + } + + db := o.db.WithContext(ctx) + db = db.Model(&Batch{}) + + if err := db.Create(newBatch).Error; err != nil { + return nil, fmt.Errorf("Batch.InsertBatchRaw error: %w", err) + } + + return newBatch, nil +} + // UpdateL2GasOracleStatusAndOracleTxHash updates the L2 gas oracle status and transaction hash for a batch. func (o *Batch) UpdateL2GasOracleStatusAndOracleTxHash(ctx context.Context, hash string, status types.GasOracleStatus, txHash string) error { updateFields := make(map[string]interface{}) @@ -368,6 +417,29 @@ func (o *Batch) UpdateProvingStatus(ctx context.Context, hash string, status typ return nil } +func (o *Batch) UpdateRollupStatusCommitAndFinalizeTxHash(ctx context.Context, hash string, status types.RollupStatus, commitTxHash string, finalizeTxHash string, dbTX ...*gorm.DB) error { + updateFields := make(map[string]interface{}) + updateFields["commit_tx_hash"] = commitTxHash + updateFields["committed_at"] = utils.NowUTC() + updateFields["finalize_tx_hash"] = finalizeTxHash + updateFields["finalized_at"] = time.Now() + + updateFields["rollup_status"] = int(status) + + db := o.db + if len(dbTX) > 0 && dbTX[0] != nil { + db = dbTX[0] + } + db = db.WithContext(ctx) + db = db.Model(&Batch{}) + db = db.Where("hash", hash) + + if err := db.Updates(updateFields).Error; err != nil { + return fmt.Errorf("Batch.UpdateRollupStatusCommitAndFinalizeTxHash error: %w, batch hash: %v, status: %v, commitTxHash: %v, finalizeTxHash: %v", err, hash, status.String(), commitTxHash, finalizeTxHash) + } + return nil +} + // UpdateRollupStatus updates the rollup status of a batch. func (o *Batch) UpdateRollupStatus(ctx context.Context, hash string, status types.RollupStatus, dbTX ...*gorm.DB) error { updateFields := make(map[string]interface{}) diff --git a/rollup/internal/orm/bundle.go b/rollup/internal/orm/bundle.go index 6965f6dfa..3d35be29d 100644 --- a/rollup/internal/orm/bundle.go +++ b/rollup/internal/orm/bundle.go @@ -189,7 +189,7 @@ func (o *Bundle) InsertBundle(ctx context.Context, batches []*Batch, codecVersio } // UpdateFinalizeTxHashAndRollupStatus updates the finalize transaction hash and rollup status for a bundle. -func (o *Bundle) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash string, finalizeTxHash string, status types.RollupStatus) error { +func (o *Bundle) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash string, finalizeTxHash string, status types.RollupStatus, dbTX ...*gorm.DB) error { updateFields := make(map[string]interface{}) updateFields["finalize_tx_hash"] = finalizeTxHash updateFields["rollup_status"] = int(status) @@ -197,7 +197,11 @@ func (o *Bundle) UpdateFinalizeTxHashAndRollupStatus(ctx context.Context, hash s updateFields["finalized_at"] = time.Now() } - db := o.db.WithContext(ctx) + db := o.db + if len(dbTX) > 0 && dbTX[0] != nil { + db = dbTX[0] + } + db = db.WithContext(ctx) db = db.Model(&Bundle{}) db = db.Where("hash", hash) diff --git a/rollup/internal/orm/chunk.go b/rollup/internal/orm/chunk.go index 893290f75..e24e7a35b 100644 --- a/rollup/internal/orm/chunk.go +++ b/rollup/internal/orm/chunk.go @@ -7,6 +7,8 @@ import ( "time" "github.com/scroll-tech/da-codec/encoding" + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/crypto" "github.com/scroll-tech/go-ethereum/log" "gorm.io/gorm" @@ -96,8 +98,8 @@ func (o *Chunk) GetChunksInRange(ctx context.Context, startIndex uint64, endInde return chunks, nil } -// getLatestChunk retrieves the latest chunk from the database. -func (o *Chunk) getLatestChunk(ctx context.Context) (*Chunk, error) { +// GetLatestChunk retrieves the latest chunk from the database. +func (o *Chunk) GetLatestChunk(ctx context.Context) (*Chunk, error) { db := o.db.WithContext(ctx) db = db.Model(&Chunk{}) db = db.Order("index desc") @@ -115,7 +117,7 @@ func (o *Chunk) getLatestChunk(ctx context.Context) (*Chunk, error) { // GetUnchunkedBlockHeight retrieves the first unchunked block number. func (o *Chunk) GetUnchunkedBlockHeight(ctx context.Context) (uint64, error) { // Get the latest chunk - latestChunk, err := o.getLatestChunk(ctx) + latestChunk, err := o.GetLatestChunk(ctx) if err != nil { return 0, fmt.Errorf("Chunk.GetUnchunkedBlockHeight error: %w", err) } @@ -186,7 +188,7 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVer var totalL1MessagePoppedBefore uint64 var parentChunkHash string var parentChunkStateRoot string - parentChunk, err := o.getLatestChunk(ctx) + parentChunk, err := o.GetLatestChunk(ctx) if err != nil { log.Error("failed to get latest chunk", "err", err) return nil, fmt.Errorf("Chunk.InsertChunk error: %w", err) @@ -202,6 +204,7 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVer parentChunkStateRoot = parentChunk.StateRoot } + fmt.Println("insertChunk", totalL1MessagePoppedBefore, chunkIndex, parentChunkHash) chunkHash, err := utils.GetChunkHash(chunk, totalL1MessagePoppedBefore, codecVersion) if err != nil { log.Error("failed to get chunk hash", "err", err) @@ -254,6 +257,52 @@ func (o *Chunk) InsertChunk(ctx context.Context, chunk *encoding.Chunk, codecVer return &newChunk, nil } +func (o *Chunk) InsertChunkRaw(ctx context.Context, codecVersion encoding.CodecVersion, chunk *encoding.DAChunkRawTx, totalL1MessagePoppedBefore uint64) (*Chunk, error) { + // Create some unique identifier. It is not really used for anything except in DB. + var chunkBytes []byte + for _, block := range chunk.Blocks { + blockBytes := block.Encode() + chunkBytes = append(chunkBytes, blockBytes...) + } + hash := crypto.Keccak256Hash(chunkBytes) + + numBlocks := len(chunk.Blocks) + emptyHash := common.Hash{}.Hex() + newChunk := &Chunk{ + Index: 1337, + Hash: hash.Hex(), + StartBlockNumber: chunk.Blocks[0].Number(), + StartBlockHash: emptyHash, + EndBlockNumber: chunk.Blocks[numBlocks-1].Number(), + EndBlockHash: emptyHash, + TotalL2TxGas: 0, + TotalL2TxNum: 0, + TotalL1CommitCalldataSize: 0, + TotalL1CommitGas: 0, + StartBlockTime: chunk.Blocks[0].Timestamp(), + TotalL1MessagesPoppedBefore: totalL1MessagePoppedBefore, + TotalL1MessagesPoppedInChunk: 0, + ParentChunkHash: emptyHash, + StateRoot: emptyHash, + ParentChunkStateRoot: emptyHash, + WithdrawRoot: emptyHash, + CodecVersion: int16(codecVersion), + EnableCompress: false, + ProvingStatus: int16(types.ProvingTaskVerified), + CrcMax: 0, + BlobSize: 0, + } + + db := o.db.WithContext(ctx) + db = db.Model(&Chunk{}) + + if err := db.Create(newChunk).Error; err != nil { + return nil, fmt.Errorf("Chunk.InsertChunk error: %w, chunk hash: %v", err, newChunk.Hash) + } + + return newChunk, nil +} + // UpdateProvingStatus updates the proving status of a chunk. func (o *Chunk) UpdateProvingStatus(ctx context.Context, hash string, status types.ProvingStatus, dbTX ...*gorm.DB) error { updateFields := make(map[string]interface{})