From ea1d2fded734ae7d6f7d3bbae063b0e84eb4e36c Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Wed, 18 Sep 2024 03:53:13 +0800 Subject: [PATCH 01/19] feat: allow changing L1 synced height via RPC/CLI --- eth/api.go | 14 ++++++++++++++ internal/web3ext/web3ext.go | 10 ++++++++++ 2 files changed, 24 insertions(+) diff --git a/eth/api.go b/eth/api.go index 1ba4d6a3f1b9..a10870a7f424 100644 --- a/eth/api.go +++ b/eth/api.go @@ -833,3 +833,17 @@ func (api *ScrollAPI) CalculateRowConsumptionByBlockNumber(ctx context.Context, asyncChecker.Wait() return rawdb.ReadBlockRowConsumption(api.eth.ChainDb(), block.Hash()), checkErr } + +// SetRollupEventSyncL1Height sets the synced L1 height for rollup event synchronization +func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { + log.Info("Setting rollup event synced L1 height", "height", height) + rawdb.WriteRollupEventSyncedL1BlockNumber(api.eth.ChainDb(), height) + return nil +} + +// SetL1MessageSyncL1Height sets the synced L1 height for L1 message synchronization +func (api *ScrollAPI) SetL1MessageSyncedL1Height(height uint64) error { + rawdb.WriteSyncedL1BlockNumber(api.eth.ChainDb(), height) + log.Info("Setting L1 message synced L1 height", "height", height) + return nil +} diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index c13fbb8be6d7..5c6f77d8d833 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -932,6 +932,16 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.formatters.inputBlockNumberFormatter] }), + new web3._extend.Method({ + name: 'setRollupEventSyncedL1Height', + call: 'scroll_setRollupEventSyncedL1Height', + params: 1 + }), + new web3._extend.Method({ + name: 'setL1MessageSyncedL1Height', + call: 'scroll_setL1MessageSyncedL1Height', + params: 1 + }), ], properties: [ From 47fb5c5d0577dc9485f11236ee6a813a91aa5dc0 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 17 Sep 2024 20:05:24 +0000 Subject: [PATCH 02/19] =?UTF-8?q?chore:=20auto=20version=20bump=E2=80=89[b?= =?UTF-8?q?ot]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index 9da09c308064..f527ef360186 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 7 // Minor version component of the current release - VersionPatch = 17 // Patch version component of the current release + VersionPatch = 18 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) From 0751c861fe3ada6de9aab5ebb2592ff19169ec60 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Wed, 18 Sep 2024 04:06:04 +0800 Subject: [PATCH 03/19] fix typos --- eth/api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/api.go b/eth/api.go index a10870a7f424..1a14c555ae79 100644 --- a/eth/api.go +++ b/eth/api.go @@ -834,14 +834,14 @@ func (api *ScrollAPI) CalculateRowConsumptionByBlockNumber(ctx context.Context, return rawdb.ReadBlockRowConsumption(api.eth.ChainDb(), block.Hash()), checkErr } -// SetRollupEventSyncL1Height sets the synced L1 height for rollup event synchronization +// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { log.Info("Setting rollup event synced L1 height", "height", height) rawdb.WriteRollupEventSyncedL1BlockNumber(api.eth.ChainDb(), height) return nil } -// SetL1MessageSyncL1Height sets the synced L1 height for L1 message synchronization +// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization func (api *ScrollAPI) SetL1MessageSyncedL1Height(height uint64) error { rawdb.WriteSyncedL1BlockNumber(api.eth.ChainDb(), height) log.Info("Setting L1 message synced L1 height", "height", height) From 41ab50d399f0b6599e52b2d9ea0cd61c7a6edd4e Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Wed, 18 Sep 2024 12:40:19 +0800 Subject: [PATCH 04/19] add height validity check --- eth/api.go | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/eth/api.go b/eth/api.go index 1a14c555ae79..37a9730a138d 100644 --- a/eth/api.go +++ b/eth/api.go @@ -836,6 +836,19 @@ func (api *ScrollAPI) CalculateRowConsumptionByBlockNumber(ctx context.Context, // SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { + prevHeightPtr := rawdb.ReadRollupEventSyncedL1BlockNumber(api.eth.ChainDb()) + + if prevHeightPtr == nil { + log.Warn("No previous rollup event synced L1 height found in database") + return fmt.Errorf("no previous rollup event synced L1 height found in database") + } + + prevHeight := *prevHeightPtr + if height >= prevHeight { + log.Warn("New rollup event synced L1 height is not lower than previous height", "newHeight", height, "prevHeight", prevHeight) + return fmt.Errorf("new rollup event synced L1 height (%d) is not lower than previous height (%d)", height, prevHeight) + } + log.Info("Setting rollup event synced L1 height", "height", height) rawdb.WriteRollupEventSyncedL1BlockNumber(api.eth.ChainDb(), height) return nil @@ -843,7 +856,20 @@ func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { // SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization func (api *ScrollAPI) SetL1MessageSyncedL1Height(height uint64) error { - rawdb.WriteSyncedL1BlockNumber(api.eth.ChainDb(), height) + prevHeightPtr := rawdb.ReadSyncedL1BlockNumber(api.eth.ChainDb()) + + if prevHeightPtr == nil { + log.Warn("No previous L1 message synced L1 height found in database") + return fmt.Errorf("no previous L1 message synced L1 height found in database") + } + + prevHeight := *prevHeightPtr + if height >= prevHeight { + log.Warn("New L1 message synced L1 height is not lower than previous height", "newHeight", height, "prevHeight", prevHeight) + return fmt.Errorf("new L1 message synced L1 height (%d) is not lower than previous height (%d)", height, prevHeight) + } + log.Info("Setting L1 message synced L1 height", "height", height) + rawdb.WriteSyncedL1BlockNumber(api.eth.ChainDb(), height) return nil } From d38f65619b441b073564ede3ae039e2b07b2010e Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Thu, 19 Sep 2024 03:43:17 +0800 Subject: [PATCH 05/19] change implementation --- eth/api.go | 36 +++++++------------ eth/backend.go | 12 +++++++ .../rollup_sync_service.go | 13 +++++++ rollup/sync_service/sync_service.go | 13 +++++++ 4 files changed, 50 insertions(+), 24 deletions(-) diff --git a/eth/api.go b/eth/api.go index 37a9730a138d..7950de701e4d 100644 --- a/eth/api.go +++ b/eth/api.go @@ -836,40 +836,28 @@ func (api *ScrollAPI) CalculateRowConsumptionByBlockNumber(ctx context.Context, // SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { - prevHeightPtr := rawdb.ReadRollupEventSyncedL1BlockNumber(api.eth.ChainDb()) - - if prevHeightPtr == nil { - log.Warn("No previous rollup event synced L1 height found in database") - return fmt.Errorf("no previous rollup event synced L1 height found in database") - } - - prevHeight := *prevHeightPtr - if height >= prevHeight { - log.Warn("New rollup event synced L1 height is not lower than previous height", "newHeight", height, "prevHeight", prevHeight) - return fmt.Errorf("new rollup event synced L1 height (%d) is not lower than previous height (%d)", height, prevHeight) + rollupSyncService := api.eth.GetRollupSyncService() + if rollupSyncService == nil { + return errors.New("RollupSyncService is not available") } log.Info("Setting rollup event synced L1 height", "height", height) - rawdb.WriteRollupEventSyncedL1BlockNumber(api.eth.ChainDb(), height) + + rollupSyncService.ResetToHeight(height) + return nil } // SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization func (api *ScrollAPI) SetL1MessageSyncedL1Height(height uint64) error { - prevHeightPtr := rawdb.ReadSyncedL1BlockNumber(api.eth.ChainDb()) - - if prevHeightPtr == nil { - log.Warn("No previous L1 message synced L1 height found in database") - return fmt.Errorf("no previous L1 message synced L1 height found in database") - } - - prevHeight := *prevHeightPtr - if height >= prevHeight { - log.Warn("New L1 message synced L1 height is not lower than previous height", "newHeight", height, "prevHeight", prevHeight) - return fmt.Errorf("new L1 message synced L1 height (%d) is not lower than previous height (%d)", height, prevHeight) + syncService := api.eth.GetSyncService() + if syncService == nil { + return errors.New("SyncService is not available") } log.Info("Setting L1 message synced L1 height", "height", height) - rawdb.WriteSyncedL1BlockNumber(api.eth.ChainDb(), height) + + syncService.ResetToHeight(height) + return nil } diff --git a/eth/backend.go b/eth/backend.go index 7651c72f29c6..ae198bceefe8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -609,3 +609,15 @@ func (s *Ethereum) Stop() error { return nil } + +// GetRollupSyncService returns the RollupSyncService of the Ethereum instance. +// It returns nil if the service is not initialized. +func (e *Ethereum) GetRollupSyncService() *rollup_sync_service.RollupSyncService { + return e.rollupSyncService +} + +// GetSyncService returns the SyncService of the Ethereum instance. +// It returns nil if the service is not initialized. +func (e *Ethereum) GetSyncService() *sync_service.SyncService { + return e.syncService +} diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 3991debcb1c6..c3662c72c465 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -157,6 +157,19 @@ func (s *RollupSyncService) Stop() { } } +// ResetToHeight resets the RollupSyncService to a specific L1 block height +func (s *RollupSyncService) ResetToHeight(height uint64) { + s.Stop() + + s.latestProcessedBlock = height + + rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height) + + log.Info("Reset rollup sync service", "height", height) + + go s.Start() +} + func (s *RollupSyncService) fetchRollupEvents() { latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 091f2d19691f..92d325894f69 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -139,6 +139,19 @@ func (s *SyncService) Stop() { } } +// ResetToHeight resets the SyncService to a specific L1 block height +func (s *SyncService) ResetToHeight(height uint64) { + s.Stop() + + s.latestProcessedBlock = height + + rawdb.WriteSyncedL1BlockNumber(s.db, height) + + log.Info("Reset sync service", "height", height) + + go s.Start() +} + // SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and // starts sending event to the given channel. func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription { From e39cdb16f2c8910ce9d84e1240552c9cb51e1e0b Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Thu, 19 Sep 2024 14:35:10 +0800 Subject: [PATCH 06/19] moved hotfix apis from ScrollAPI to PrivateAdminAPI --- eth/api.go | 56 +++++++++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/eth/api.go b/eth/api.go index 7950de701e4d..a00b09b62b16 100644 --- a/eth/api.go +++ b/eth/api.go @@ -254,6 +254,34 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization +func (api *PrivateAdminAPI) SetRollupEventSyncedL1Height(height uint64) error { + rollupSyncService := api.eth.GetRollupSyncService() + if rollupSyncService == nil { + return errors.New("RollupSyncService is not available") + } + + log.Info("Setting rollup event synced L1 height", "height", height) + + rollupSyncService.ResetToHeight(height) + + return nil +} + +// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization +func (api *PrivateAdminAPI) SetL1MessageSyncedL1Height(height uint64) error { + syncService := api.eth.GetSyncService() + if syncService == nil { + return errors.New("SyncService is not available") + } + + log.Info("Setting L1 message synced L1 height", "height", height) + + syncService.ResetToHeight(height) + + return nil +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { @@ -833,31 +861,3 @@ func (api *ScrollAPI) CalculateRowConsumptionByBlockNumber(ctx context.Context, asyncChecker.Wait() return rawdb.ReadBlockRowConsumption(api.eth.ChainDb(), block.Hash()), checkErr } - -// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization -func (api *ScrollAPI) SetRollupEventSyncedL1Height(height uint64) error { - rollupSyncService := api.eth.GetRollupSyncService() - if rollupSyncService == nil { - return errors.New("RollupSyncService is not available") - } - - log.Info("Setting rollup event synced L1 height", "height", height) - - rollupSyncService.ResetToHeight(height) - - return nil -} - -// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization -func (api *ScrollAPI) SetL1MessageSyncedL1Height(height uint64) error { - syncService := api.eth.GetSyncService() - if syncService == nil { - return errors.New("SyncService is not available") - } - - log.Info("Setting L1 message synced L1 height", "height", height) - - syncService.ResetToHeight(height) - - return nil -} From e124dce5f94559e1376725bd07eeb8db9f989264 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Thu, 19 Sep 2024 18:38:37 +0800 Subject: [PATCH 07/19] change namespace from scroll to admin --- internal/web3ext/web3ext.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 5c6f77d8d833..d43e01467ee3 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -190,6 +190,16 @@ web3._extend({ name: 'stopWS', call: 'admin_stopWS' }), + new web3._extend.Method({ + name: 'setRollupEventSyncedL1Height', + call: 'admin_setRollupEventSyncedL1Height', + params: 1 + }), + new web3._extend.Method({ + name: 'setL1MessageSyncedL1Height', + call: 'admin_setL1MessageSyncedL1Height', + params: 1 + }), ], properties: [ new web3._extend.Property({ @@ -932,16 +942,6 @@ web3._extend({ params: 1, inputFormatter: [web3._extend.formatters.inputBlockNumberFormatter] }), - new web3._extend.Method({ - name: 'setRollupEventSyncedL1Height', - call: 'scroll_setRollupEventSyncedL1Height', - params: 1 - }), - new web3._extend.Method({ - name: 'setL1MessageSyncedL1Height', - call: 'scroll_setL1MessageSyncedL1Height', - params: 1 - }), ], properties: [ From dc03c126b4a5e979798afe55d9db00195a3eacea Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Thu, 19 Sep 2024 21:03:32 +0800 Subject: [PATCH 08/19] fix bugs --- rollup/rollup_sync_service/rollup_sync_service.go | 9 +++++++-- rollup/sync_service/sync_service.go | 9 +++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index c3662c72c465..51cf31c4bad5 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -52,6 +52,7 @@ const ( // RollupSyncService collects ScrollChain batch commit/revert/finalize events and stores metadata into db. type RollupSyncService struct { + originalCtx context.Context ctx context.Context cancel context.CancelFunc client *L1Client @@ -99,10 +100,11 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig latestProcessedBlock = *block } - ctx, cancel := context.WithCancel(ctx) + serviceCtx, cancel := context.WithCancel(ctx) service := RollupSyncService{ - ctx: ctx, + originalCtx: ctx, + ctx: serviceCtx, cancel: cancel, client: client, db: db, @@ -161,6 +163,9 @@ func (s *RollupSyncService) Stop() { func (s *RollupSyncService) ResetToHeight(height uint64) { s.Stop() + newCtx, newCancel := context.WithCancel(s.originalCtx) + s.ctx = newCtx + s.cancel = newCancel s.latestProcessedBlock = height rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height) diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 92d325894f69..1de228453a3a 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -42,6 +42,7 @@ var ( // SyncService collects all L1 messages and stores them in a local database. type SyncService struct { + originalCtx context.Context ctx context.Context cancel context.CancelFunc client *BridgeClient @@ -76,10 +77,11 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node latestProcessedBlock = *block } - ctx, cancel := context.WithCancel(ctx) + serviceCtx, cancel := context.WithCancel(ctx) service := SyncService{ - ctx: ctx, + originalCtx: ctx, + ctx: serviceCtx, cancel: cancel, client: client, db: db, @@ -143,6 +145,9 @@ func (s *SyncService) Stop() { func (s *SyncService) ResetToHeight(height uint64) { s.Stop() + newCtx, newCancel := context.WithCancel(s.originalCtx) + s.ctx = newCtx + s.cancel = newCancel s.latestProcessedBlock = height rawdb.WriteSyncedL1BlockNumber(s.db, height) From b6670b5d7877bc0949d4e40cee1fbebfb23e6d5c Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Thu, 19 Sep 2024 23:15:26 +0800 Subject: [PATCH 09/19] add locks --- .../rollup_sync_service/rollup_sync_service.go | 17 +++++++++++++++++ rollup/sync_service/sync_service.go | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 51cf31c4bad5..282d3529757f 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,6 +7,7 @@ import ( "math/big" "os" "reflect" + "sync" "time" "github.com/scroll-tech/da-codec/encoding" @@ -64,6 +65,9 @@ type RollupSyncService struct { l1FinalizeBatchEventSignature common.Hash bc *core.BlockChain stack *node.Node + + stateMu sync.Mutex // protects the service state + resetMu sync.Mutex // protects critical sections during reset operation } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -125,6 +129,9 @@ func (s *RollupSyncService) Start() { return } + s.stateMu.Lock() + defer s.stateMu.Unlock() + log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock) go func() { @@ -152,6 +159,9 @@ func (s *RollupSyncService) Stop() { return } + s.stateMu.Lock() + defer s.stateMu.Unlock() + log.Info("Stopping rollup event sync background service") if s.cancel != nil { @@ -161,6 +171,13 @@ func (s *RollupSyncService) Stop() { // ResetToHeight resets the RollupSyncService to a specific L1 block height func (s *RollupSyncService) ResetToHeight(height uint64) { + if s == nil { + return + } + + s.resetMu.Lock() + defer s.resetMu.Unlock() + s.Stop() newCtx, newCancel := context.WithCancel(s.originalCtx) diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 1de228453a3a..a040b9ac566c 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "time" "github.com/scroll-tech/go-ethereum/core" @@ -51,6 +52,9 @@ type SyncService struct { pollInterval time.Duration latestProcessedBlock uint64 scope event.SubscriptionScope + + stateMu sync.Mutex // protects the service state + resetMu sync.Mutex // protects critical sections during reset operation } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -97,6 +101,9 @@ func (s *SyncService) Start() { return } + s.stateMu.Lock() + defer s.stateMu.Unlock() + // wait for initial sync before starting node log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock) @@ -131,6 +138,9 @@ func (s *SyncService) Stop() { return } + s.stateMu.Lock() + defer s.stateMu.Unlock() + log.Info("Stopping sync service") // Unsubscribe all subscriptions registered @@ -143,6 +153,13 @@ func (s *SyncService) Stop() { // ResetToHeight resets the SyncService to a specific L1 block height func (s *SyncService) ResetToHeight(height uint64) { + if s == nil { + return + } + + s.resetMu.Lock() + defer s.resetMu.Unlock() + s.Stop() newCtx, newCancel := context.WithCancel(s.originalCtx) From c588232a753e61f3c8201b922c34d9db14ccb318 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 23 Sep 2024 19:35:08 +0800 Subject: [PATCH 10/19] add a lock to protect latestProcessedBlock and db state --- .../rollup_sync_service.go | 24 +++++++------------ rollup/sync_service/sync_service.go | 22 ++++++----------- 2 files changed, 15 insertions(+), 31 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 282d3529757f..f40debb49696 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -53,7 +53,6 @@ const ( // RollupSyncService collects ScrollChain batch commit/revert/finalize events and stores metadata into db. type RollupSyncService struct { - originalCtx context.Context ctx context.Context cancel context.CancelFunc client *L1Client @@ -66,8 +65,7 @@ type RollupSyncService struct { bc *core.BlockChain stack *node.Node - stateMu sync.Mutex // protects the service state - resetMu sync.Mutex // protects critical sections during reset operation + stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -107,7 +105,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig serviceCtx, cancel := context.WithCancel(ctx) service := RollupSyncService{ - originalCtx: ctx, ctx: serviceCtx, cancel: cancel, client: client, @@ -175,24 +172,19 @@ func (s *RollupSyncService) ResetToHeight(height uint64) { return } - s.resetMu.Lock() - defer s.resetMu.Unlock() - - s.Stop() - - newCtx, newCancel := context.WithCancel(s.originalCtx) - s.ctx = newCtx - s.cancel = newCancel - s.latestProcessedBlock = height + s.stateMu.Lock() + defer s.stateMu.Unlock() rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height) + s.latestProcessedBlock = height - log.Info("Reset rollup sync service", "height", height) - - go s.Start() + log.Info("Reset sync service", "height", height) } func (s *RollupSyncService) fetchRollupEvents() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index a040b9ac566c..66d6f098d5f8 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -43,7 +43,6 @@ var ( // SyncService collects all L1 messages and stores them in a local database. type SyncService struct { - originalCtx context.Context ctx context.Context cancel context.CancelFunc client *BridgeClient @@ -53,8 +52,7 @@ type SyncService struct { latestProcessedBlock uint64 scope event.SubscriptionScope - stateMu sync.Mutex // protects the service state - resetMu sync.Mutex // protects critical sections during reset operation + stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -84,7 +82,6 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node serviceCtx, cancel := context.WithCancel(ctx) service := SyncService{ - originalCtx: ctx, ctx: serviceCtx, cancel: cancel, client: client, @@ -157,21 +154,13 @@ func (s *SyncService) ResetToHeight(height uint64) { return } - s.resetMu.Lock() - defer s.resetMu.Unlock() - - s.Stop() - - newCtx, newCancel := context.WithCancel(s.originalCtx) - s.ctx = newCtx - s.cancel = newCancel - s.latestProcessedBlock = height + s.stateMu.Lock() + defer s.stateMu.Unlock() rawdb.WriteSyncedL1BlockNumber(s.db, height) + s.latestProcessedBlock = height log.Info("Reset sync service", "height", height) - - go s.Start() } // SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and @@ -181,6 +170,9 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve } func (s *SyncService) fetchMessages() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("Failed to get latest confirmed block number", "err", err) From eaa6eebecf28cfbf890313ee9772840bc2c1b14b Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 23 Sep 2024 20:03:56 +0800 Subject: [PATCH 11/19] revert some changes --- rollup/rollup_sync_service/rollup_sync_service.go | 10 ++-------- rollup/sync_service/sync_service.go | 10 ++-------- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index f40debb49696..49e6fb1b74eb 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -102,10 +102,10 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig latestProcessedBlock = *block } - serviceCtx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) service := RollupSyncService{ - ctx: serviceCtx, + ctx: ctx, cancel: cancel, client: client, db: db, @@ -126,9 +126,6 @@ func (s *RollupSyncService) Start() { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock) go func() { @@ -156,9 +153,6 @@ func (s *RollupSyncService) Stop() { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - log.Info("Stopping rollup event sync background service") if s.cancel != nil { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 66d6f098d5f8..89f6b8a36865 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -79,10 +79,10 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node latestProcessedBlock = *block } - serviceCtx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) service := SyncService{ - ctx: serviceCtx, + ctx: ctx, cancel: cancel, client: client, db: db, @@ -98,9 +98,6 @@ func (s *SyncService) Start() { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - // wait for initial sync before starting node log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock) @@ -135,9 +132,6 @@ func (s *SyncService) Stop() { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - log.Info("Stopping sync service") // Unsubscribe all subscriptions registered From 75413e676b59f6f528436ab3282dbefbf2d47e76 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 23 Sep 2024 20:49:42 +0800 Subject: [PATCH 12/19] remove lock and use atomic in latestProcessedBlock --- .../rollup_sync_service.go | 28 +++++------- rollup/sync_service/sync_service.go | 44 ++++++++----------- 2 files changed, 28 insertions(+), 44 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 49e6fb1b74eb..a379c23ee4aa 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,7 +7,7 @@ import ( "math/big" "os" "reflect" - "sync" + "runtime/internal/atomic" "time" "github.com/scroll-tech/da-codec/encoding" @@ -57,15 +57,13 @@ type RollupSyncService struct { cancel context.CancelFunc client *L1Client db ethdb.Database - latestProcessedBlock uint64 + latestProcessedBlock atomic.Uint64 scrollChainABI *abi.ABI l1CommitBatchEventSignature common.Hash l1RevertBatchEventSignature common.Hash l1FinalizeBatchEventSignature common.Hash bc *core.BlockChain stack *node.Node - - stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -109,7 +107,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig cancel: cancel, client: client, db: db, - latestProcessedBlock: latestProcessedBlock, scrollChainABI: scrollChainABI, l1CommitBatchEventSignature: scrollChainABI.Events["CommitBatch"].ID, l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID, @@ -118,6 +115,8 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig stack: stack, } + service.latestProcessedBlock.Store(latestProcessedBlock) + return &service, nil } @@ -126,7 +125,7 @@ func (s *RollupSyncService) Start() { return } - log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock) + log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock.Load()) go func() { syncTicker := time.NewTicker(defaultSyncInterval) @@ -142,7 +141,7 @@ func (s *RollupSyncService) Start() { case <-syncTicker.C: s.fetchRollupEvents() case <-logTicker.C: - log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock.Load()) } } }() @@ -166,29 +165,22 @@ func (s *RollupSyncService) ResetToHeight(height uint64) { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - - rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height) - s.latestProcessedBlock = height + s.latestProcessedBlock.Store(height) log.Info("Reset sync service", "height", height) } func (s *RollupSyncService) fetchRollupEvents() { - s.stateMu.Lock() - defer s.stateMu.Unlock() - latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) return } - log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock, "latest confirmed", latestConfirmed) + log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) // query in batches - for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -210,7 +202,7 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - s.latestProcessedBlock = to + s.latestProcessedBlock.Store(to) } } diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 89f6b8a36865..05efe802e05a 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "reflect" - "sync" + "sync/atomic" "time" "github.com/scroll-tech/go-ethereum/core" @@ -49,10 +49,8 @@ type SyncService struct { db ethdb.Database msgCountFeed event.Feed pollInterval time.Duration - latestProcessedBlock uint64 + latestProcessedBlock atomic.Uint64 scope event.SubscriptionScope - - stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -82,14 +80,15 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node ctx, cancel := context.WithCancel(ctx) service := SyncService{ - ctx: ctx, - cancel: cancel, - client: client, - db: db, - pollInterval: DefaultPollInterval, - latestProcessedBlock: latestProcessedBlock, + ctx: ctx, + cancel: cancel, + client: client, + db: db, + pollInterval: DefaultPollInterval, } + service.latestProcessedBlock.Store(latestProcessedBlock) + return &service, nil } @@ -99,14 +98,14 @@ func (s *SyncService) Start() { } // wait for initial sync before starting node - log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock.Load()) // block node startup during initial sync and print some helpful logs latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) - if err == nil && latestConfirmed > s.latestProcessedBlock+1000 { + if err == nil && latestConfirmed > s.latestProcessedBlock.Load()+1000 { log.Warn("Running initial sync of L1 messages before starting l2geth, this might take a while...") s.fetchMessages() - log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock) + log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock.Load()) } go func() { @@ -148,11 +147,7 @@ func (s *SyncService) ResetToHeight(height uint64) { return } - s.stateMu.Lock() - defer s.stateMu.Unlock() - - rawdb.WriteSyncedL1BlockNumber(s.db, height) - s.latestProcessedBlock = height + s.latestProcessedBlock.Store(height) log.Info("Reset sync service", "height", height) } @@ -164,16 +159,13 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve } func (s *SyncService) fetchMessages() { - s.stateMu.Lock() - defer s.stateMu.Unlock() - latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("Failed to get latest confirmed block number", "err", err) return } - log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed) + log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -203,7 +195,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - s.latestProcessedBlock = lastBlock + s.latestProcessedBlock.Store(lastBlock) } // ticker for logging progress @@ -211,7 +203,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -220,8 +212,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(s.latestProcessedBlock) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: } From bc40b7a9e55ec06b4554f0cca83cd1b9cb028aa1 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 23 Sep 2024 20:55:13 +0800 Subject: [PATCH 13/19] fix CI --- rollup/rollup_sync_service/rollup_sync_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index a379c23ee4aa..3c9f58bb0eb5 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,7 +7,7 @@ import ( "math/big" "os" "reflect" - "runtime/internal/atomic" + "sync/atomic" "time" "github.com/scroll-tech/da-codec/encoding" From 2662c6de2fbf839c39c60fefc75472d7d20b8746 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 23 Sep 2024 21:04:22 +0800 Subject: [PATCH 14/19] tweak --- eth/api.go | 6 ++---- rollup/rollup_sync_service/rollup_sync_service.go | 5 ++--- rollup/sync_service/sync_service.go | 5 ++--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/eth/api.go b/eth/api.go index a00b09b62b16..74672347823c 100644 --- a/eth/api.go +++ b/eth/api.go @@ -262,8 +262,7 @@ func (api *PrivateAdminAPI) SetRollupEventSyncedL1Height(height uint64) error { } log.Info("Setting rollup event synced L1 height", "height", height) - - rollupSyncService.ResetToHeight(height) + rollupSyncService.ResetStartSyncHeight(height) return nil } @@ -276,8 +275,7 @@ func (api *PrivateAdminAPI) SetL1MessageSyncedL1Height(height uint64) error { } log.Info("Setting L1 message synced L1 height", "height", height) - - syncService.ResetToHeight(height) + syncService.ResetStartSyncHeight(height) return nil } diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 3c9f58bb0eb5..bfc8bc08ef58 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -159,14 +159,13 @@ func (s *RollupSyncService) Stop() { } } -// ResetToHeight resets the RollupSyncService to a specific L1 block height -func (s *RollupSyncService) ResetToHeight(height uint64) { +// ResetStartSyncHeight resets the RollupSyncService to a specific L1 block height +func (s *RollupSyncService) ResetStartSyncHeight(height uint64) { if s == nil { return } s.latestProcessedBlock.Store(height) - log.Info("Reset sync service", "height", height) } diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 05efe802e05a..f32979671211 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -141,14 +141,13 @@ func (s *SyncService) Stop() { } } -// ResetToHeight resets the SyncService to a specific L1 block height -func (s *SyncService) ResetToHeight(height uint64) { +// ResetStartSyncHeight resets the SyncService to a specific L1 block height +func (s *SyncService) ResetStartSyncHeight(height uint64) { if s == nil { return } s.latestProcessedBlock.Store(height) - log.Info("Reset sync service", "height", height) } From 014207e66155265e841fc18b59714ff790919dfa Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 24 Sep 2024 17:35:27 +0800 Subject: [PATCH 15/19] use cas in updating latestProcessedBlock --- rollup/rollup_sync_service/rollup_sync_service.go | 9 +++++++-- rollup/sync_service/sync_service.go | 15 ++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index bfc8bc08ef58..f90aa2695e63 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -178,8 +178,11 @@ func (s *RollupSyncService) fetchRollupEvents() { log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) + latestProcessedBlock := s.latestProcessedBlock.Load() + updatedLatestProcessedBlock := latestProcessedBlock + // query in batches - for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -201,8 +204,10 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - s.latestProcessedBlock.Store(to) + updatedLatestProcessedBlock = to } + + s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) } func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endBlockNumber uint64) error { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index f32979671211..9fc50d68a9c7 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -164,7 +164,10 @@ func (s *SyncService) fetchMessages() { return } - log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed) + latestProcessedBlock := s.latestProcessedBlock.Load() + updatedLatestProcessedBlock := latestProcessedBlock + + log.Trace("Sync service fetchMessages", "latestProcessedBlock", latestProcessedBlock, "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -194,7 +197,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - s.latestProcessedBlock.Store(lastBlock) + updatedLatestProcessedBlock = lastBlock } // ticker for logging progress @@ -202,7 +205,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := updatedLatestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -211,8 +214,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(updatedLatestProcessedBlock) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", updatedLatestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: } @@ -256,4 +259,6 @@ func (s *SyncService) fetchMessages() { flush(to) } } + + s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) } From 20805d0ad8700dfe785ea3ffac6d43ae31b8529a Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 24 Sep 2024 17:52:36 +0800 Subject: [PATCH 16/19] renaming --- .../rollup_sync_service/rollup_sync_service.go | 12 ++++++------ rollup/sync_service/sync_service.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index f90aa2695e63..d651379debe7 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -176,13 +176,13 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed) + initialProcessedBlock := s.latestProcessedBlock.Load() + currentProcessedBlock := initialProcessedBlock - latestProcessedBlock := s.latestProcessedBlock.Load() - updatedLatestProcessedBlock := latestProcessedBlock + log.Trace("Sync service fetch rollup events", "latest processed block", currentProcessedBlock, "latest confirmed", latestConfirmed) // query in batches - for from := latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := currentProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -204,10 +204,10 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - updatedLatestProcessedBlock = to + currentProcessedBlock = to } - s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) } func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endBlockNumber uint64) error { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 9fc50d68a9c7..45854c8e9d3c 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -164,10 +164,10 @@ func (s *SyncService) fetchMessages() { return } - latestProcessedBlock := s.latestProcessedBlock.Load() - updatedLatestProcessedBlock := latestProcessedBlock + initialProcessedBlock := s.latestProcessedBlock.Load() + currentProcessedBlock := initialProcessedBlock - log.Trace("Sync service fetchMessages", "latestProcessedBlock", latestProcessedBlock, "latestConfirmed", latestConfirmed) + log.Trace("Sync service fetchMessages", "latestProcessedBlock", currentProcessedBlock, "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -197,7 +197,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - updatedLatestProcessedBlock = lastBlock + currentProcessedBlock = lastBlock } // ticker for logging progress @@ -205,7 +205,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := updatedLatestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := currentProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -214,8 +214,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(updatedLatestProcessedBlock) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", updatedLatestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(currentProcessedBlock) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", currentProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: } @@ -260,5 +260,5 @@ func (s *SyncService) fetchMessages() { } } - s.latestProcessedBlock.CompareAndSwap(latestProcessedBlock, updatedLatestProcessedBlock) + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) } From e42e4c44e8aeae668e86496c672fcdcd852b1c5f Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 24 Sep 2024 18:12:31 +0800 Subject: [PATCH 17/19] also update sync height in error paths --- rollup/rollup_sync_service/rollup_sync_service.go | 3 +-- rollup/sync_service/sync_service.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index d651379debe7..186643530aca 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -178,6 +178,7 @@ func (s *RollupSyncService) fetchRollupEvents() { initialProcessedBlock := s.latestProcessedBlock.Load() currentProcessedBlock := initialProcessedBlock + defer s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) log.Trace("Sync service fetch rollup events", "latest processed block", currentProcessedBlock, "latest confirmed", latestConfirmed) @@ -206,8 +207,6 @@ func (s *RollupSyncService) fetchRollupEvents() { currentProcessedBlock = to } - - s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) } func (s *RollupSyncService) parseAndUpdateRollupEventLogs(logs []types.Log, endBlockNumber uint64) error { diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 45854c8e9d3c..2c5210d37a59 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -166,6 +166,7 @@ func (s *SyncService) fetchMessages() { initialProcessedBlock := s.latestProcessedBlock.Load() currentProcessedBlock := initialProcessedBlock + defer s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) log.Trace("Sync service fetchMessages", "latestProcessedBlock", currentProcessedBlock, "latestConfirmed", latestConfirmed) @@ -259,6 +260,4 @@ func (s *SyncService) fetchMessages() { flush(to) } } - - s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) } From 05567250b852b20225f5a622375bb7d992602ecf Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 24 Sep 2024 18:16:54 +0800 Subject: [PATCH 18/19] fix a bug --- rollup/rollup_sync_service/rollup_sync_service.go | 5 ++++- rollup/sync_service/sync_service.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index 186643530aca..a413498bc310 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -178,7 +178,10 @@ func (s *RollupSyncService) fetchRollupEvents() { initialProcessedBlock := s.latestProcessedBlock.Load() currentProcessedBlock := initialProcessedBlock - defer s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) + // func() ensures using final currentProcessedBlock value when function returns + defer func() { + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) + }() log.Trace("Sync service fetch rollup events", "latest processed block", currentProcessedBlock, "latest confirmed", latestConfirmed) diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 2c5210d37a59..9a682566a0ab 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -166,7 +166,10 @@ func (s *SyncService) fetchMessages() { initialProcessedBlock := s.latestProcessedBlock.Load() currentProcessedBlock := initialProcessedBlock - defer s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) + // func() ensures using final currentProcessedBlock value when function returns + defer func() { + s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) + }() log.Trace("Sync service fetchMessages", "latestProcessedBlock", currentProcessedBlock, "latestConfirmed", latestConfirmed) From a178e9e8a693021dc9b72dbc450fa77e0e951137 Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Tue, 24 Sep 2024 22:13:11 +0800 Subject: [PATCH 19/19] use mutex lock --- .../rollup_sync_service.go | 33 ++++++------- rollup/sync_service/sync_service.go | 49 +++++++++---------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/rollup/rollup_sync_service/rollup_sync_service.go b/rollup/rollup_sync_service/rollup_sync_service.go index a413498bc310..c03d63e05c47 100644 --- a/rollup/rollup_sync_service/rollup_sync_service.go +++ b/rollup/rollup_sync_service/rollup_sync_service.go @@ -7,7 +7,7 @@ import ( "math/big" "os" "reflect" - "sync/atomic" + "sync" "time" "github.com/scroll-tech/da-codec/encoding" @@ -57,13 +57,14 @@ type RollupSyncService struct { cancel context.CancelFunc client *L1Client db ethdb.Database - latestProcessedBlock atomic.Uint64 + latestProcessedBlock uint64 scrollChainABI *abi.ABI l1CommitBatchEventSignature common.Hash l1RevertBatchEventSignature common.Hash l1FinalizeBatchEventSignature common.Hash bc *core.BlockChain stack *node.Node + stateMu sync.Mutex } func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) { @@ -107,6 +108,7 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig cancel: cancel, client: client, db: db, + latestProcessedBlock: latestProcessedBlock, scrollChainABI: scrollChainABI, l1CommitBatchEventSignature: scrollChainABI.Events["CommitBatch"].ID, l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID, @@ -115,8 +117,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig stack: stack, } - service.latestProcessedBlock.Store(latestProcessedBlock) - return &service, nil } @@ -125,7 +125,7 @@ func (s *RollupSyncService) Start() { return } - log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock.Load()) + log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock) go func() { syncTicker := time.NewTicker(defaultSyncInterval) @@ -141,7 +141,7 @@ func (s *RollupSyncService) Start() { case <-syncTicker.C: s.fetchRollupEvents() case <-logTicker.C: - log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock.Load()) + log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock) } } }() @@ -165,28 +165,27 @@ func (s *RollupSyncService) ResetStartSyncHeight(height uint64) { return } - s.latestProcessedBlock.Store(height) + s.stateMu.Lock() + defer s.stateMu.Unlock() + + s.latestProcessedBlock = height log.Info("Reset sync service", "height", height) } func (s *RollupSyncService) fetchRollupEvents() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestFinalizedBlockNumber() if err != nil { log.Warn("failed to get latest confirmed block number", "err", err) return } - initialProcessedBlock := s.latestProcessedBlock.Load() - currentProcessedBlock := initialProcessedBlock - // func() ensures using final currentProcessedBlock value when function returns - defer func() { - s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) - }() - - log.Trace("Sync service fetch rollup events", "latest processed block", currentProcessedBlock, "latest confirmed", latestConfirmed) + log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock, "latest confirmed", latestConfirmed) // query in batches - for from := currentProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { + for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange { if s.ctx.Err() != nil { log.Info("Context canceled", "reason", s.ctx.Err()) return @@ -208,7 +207,7 @@ func (s *RollupSyncService) fetchRollupEvents() { return } - currentProcessedBlock = to + s.latestProcessedBlock = to } } diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 9a682566a0ab..9239e210b8ed 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "reflect" - "sync/atomic" + "sync" "time" "github.com/scroll-tech/go-ethereum/core" @@ -49,8 +49,9 @@ type SyncService struct { db ethdb.Database msgCountFeed event.Feed pollInterval time.Duration - latestProcessedBlock atomic.Uint64 + latestProcessedBlock uint64 scope event.SubscriptionScope + stateMu sync.Mutex } func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) { @@ -80,15 +81,14 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node ctx, cancel := context.WithCancel(ctx) service := SyncService{ - ctx: ctx, - cancel: cancel, - client: client, - db: db, - pollInterval: DefaultPollInterval, + ctx: ctx, + cancel: cancel, + client: client, + db: db, + pollInterval: DefaultPollInterval, + latestProcessedBlock: latestProcessedBlock, } - service.latestProcessedBlock.Store(latestProcessedBlock) - return &service, nil } @@ -98,14 +98,14 @@ func (s *SyncService) Start() { } // wait for initial sync before starting node - log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock.Load()) + log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock) // block node startup during initial sync and print some helpful logs latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) - if err == nil && latestConfirmed > s.latestProcessedBlock.Load()+1000 { + if err == nil && latestConfirmed > s.latestProcessedBlock+1000 { log.Warn("Running initial sync of L1 messages before starting l2geth, this might take a while...") s.fetchMessages() - log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock.Load()) + log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock) } go func() { @@ -147,7 +147,10 @@ func (s *SyncService) ResetStartSyncHeight(height uint64) { return } - s.latestProcessedBlock.Store(height) + s.stateMu.Lock() + defer s.stateMu.Unlock() + + s.latestProcessedBlock = height log.Info("Reset sync service", "height", height) } @@ -158,20 +161,16 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve } func (s *SyncService) fetchMessages() { + s.stateMu.Lock() + defer s.stateMu.Unlock() + latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx) if err != nil { log.Warn("Failed to get latest confirmed block number", "err", err) return } - initialProcessedBlock := s.latestProcessedBlock.Load() - currentProcessedBlock := initialProcessedBlock - // func() ensures using final currentProcessedBlock value when function returns - defer func() { - s.latestProcessedBlock.CompareAndSwap(initialProcessedBlock, currentProcessedBlock) - }() - - log.Trace("Sync service fetchMessages", "latestProcessedBlock", currentProcessedBlock, "latestConfirmed", latestConfirmed) + log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed) // keep track of next queue index we're expecting to see queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) @@ -201,7 +200,7 @@ func (s *SyncService) fetchMessages() { numMessagesPendingDbWrite = 0 } - currentProcessedBlock = lastBlock + s.latestProcessedBlock = lastBlock } // ticker for logging progress @@ -209,7 +208,7 @@ func (s *SyncService) fetchMessages() { numMsgsCollected := 0 // query in batches - for from := currentProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { + for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange { select { case <-s.ctx.Done(): // flush pending writes to database @@ -218,8 +217,8 @@ func (s *SyncService) fetchMessages() { } return case <-t.C: - progress := 100 * float64(currentProcessedBlock) / float64(latestConfirmed) - log.Info("Syncing L1 messages", "processed", currentProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) + progress := 100 * float64(s.latestProcessedBlock) / float64(latestConfirmed) + log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress) default: }