Skip to content

Commit

Permalink
feat: allow changing L1 synced height via admin RPC/CLI (#1044)
Browse files Browse the repository at this point in the history
* feat: allow changing L1 synced height via RPC/CLI

* chore: auto version bump [bot]

* fix typos

* add height validity check

* change implementation

* moved hotfix apis from ScrollAPI to PrivateAdminAPI

* change namespace from scroll to admin

* fix bugs

* add locks

* add a lock to protect latestProcessedBlock and db state

* revert some changes

* remove lock and use atomic in latestProcessedBlock

* fix CI

* tweak

* use cas in updating latestProcessedBlock

* renaming

* also update sync height in error paths

* fix a bug

* use mutex lock

---------

Co-authored-by: colinlyguo <[email protected]>
  • Loading branch information
colinlyguo and colinlyguo authored Sep 24, 2024
1 parent 7ff93ca commit cf1ca84
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 1 deletion.
26 changes: 26 additions & 0 deletions eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,32 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) {
return true, nil
}

// SetRollupEventSyncedL1Height sets the synced L1 height for rollup event synchronization
func (api *PrivateAdminAPI) SetRollupEventSyncedL1Height(height uint64) error {
rollupSyncService := api.eth.GetRollupSyncService()
if rollupSyncService == nil {
return errors.New("RollupSyncService is not available")
}

log.Info("Setting rollup event synced L1 height", "height", height)
rollupSyncService.ResetStartSyncHeight(height)

return nil
}

// SetL1MessageSyncedL1Height sets the synced L1 height for L1 message synchronization
func (api *PrivateAdminAPI) SetL1MessageSyncedL1Height(height uint64) error {
syncService := api.eth.GetSyncService()
if syncService == nil {
return errors.New("SyncService is not available")
}

log.Info("Setting L1 message synced L1 height", "height", height)
syncService.ResetStartSyncHeight(height)

return nil
}

// PublicDebugAPI is the collection of Ethereum full node APIs exposed
// over the public debugging endpoint.
type PublicDebugAPI struct {
Expand Down
12 changes: 12 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,3 +609,15 @@ func (s *Ethereum) Stop() error {

return nil
}

// GetRollupSyncService returns the RollupSyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetRollupSyncService() *rollup_sync_service.RollupSyncService {
return e.rollupSyncService
}

// GetSyncService returns the SyncService of the Ethereum instance.
// It returns nil if the service is not initialized.
func (e *Ethereum) GetSyncService() *sync_service.SyncService {
return e.syncService
}
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ web3._extend({
name: 'stopWS',
call: 'admin_stopWS'
}),
new web3._extend.Method({
name: 'setRollupEventSyncedL1Height',
call: 'admin_setRollupEventSyncedL1Height',
params: 1
}),
new web3._extend.Method({
name: 'setL1MessageSyncedL1Height',
call: 'admin_setL1MessageSyncedL1Height',
params: 1
}),
],
properties: [
new web3._extend.Property({
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 7 // Minor version component of the current release
VersionPatch = 21 // Patch version component of the current release
VersionPatch = 22 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
18 changes: 18 additions & 0 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"os"
"reflect"
"sync"
"time"

"github.com/scroll-tech/da-codec/encoding"
Expand Down Expand Up @@ -63,6 +64,7 @@ type RollupSyncService struct {
l1FinalizeBatchEventSignature common.Hash
bc *core.BlockChain
stack *node.Node
stateMu sync.Mutex
}

func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) {
Expand Down Expand Up @@ -157,7 +159,23 @@ func (s *RollupSyncService) Stop() {
}
}

// ResetStartSyncHeight resets the RollupSyncService to a specific L1 block height
func (s *RollupSyncService) ResetStartSyncHeight(height uint64) {
if s == nil {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

s.latestProcessedBlock = height
log.Info("Reset sync service", "height", height)
}

func (s *RollupSyncService) fetchRollupEvents() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestFinalizedBlockNumber()
if err != nil {
log.Warn("failed to get latest confirmed block number", "err", err)
Expand Down
18 changes: 18 additions & 0 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"time"

"github.com/scroll-tech/go-ethereum/core"
Expand Down Expand Up @@ -50,6 +51,7 @@ type SyncService struct {
pollInterval time.Duration
latestProcessedBlock uint64
scope event.SubscriptionScope
stateMu sync.Mutex
}

func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) {
Expand Down Expand Up @@ -139,13 +141,29 @@ func (s *SyncService) Stop() {
}
}

// ResetStartSyncHeight resets the SyncService to a specific L1 block height
func (s *SyncService) ResetStartSyncHeight(height uint64) {
if s == nil {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

s.latestProcessedBlock = height
log.Info("Reset sync service", "height", height)
}

// SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription {
return s.scope.Track(s.msgCountFeed.Subscribe(ch))
}

func (s *SyncService) fetchMessages() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err != nil {
log.Warn("Failed to get latest confirmed block number", "err", err)
Expand Down

0 comments on commit cf1ca84

Please sign in to comment.