Skip to content

Commit

Permalink
remove lock and use atomic in latestProcessedBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Sep 23, 2024
1 parent eaa6eeb commit 75413e6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 44 deletions.
28 changes: 10 additions & 18 deletions rollup/rollup_sync_service/rollup_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"math/big"
"os"
"reflect"
"sync"
"runtime/internal/atomic"

Check failure on line 10 in rollup/rollup_sync_service/rollup_sync_service.go

View workflow job for this annotation

GitHub Actions / build-mock-ccc-geth

use of internal package runtime/internal/atomic not allowed

Check failure on line 10 in rollup/rollup_sync_service/rollup_sync_service.go

View workflow job for this annotation

GitHub Actions / test

use of internal package runtime/internal/atomic not allowed
"time"

"github.com/scroll-tech/da-codec/encoding"
Expand Down Expand Up @@ -57,15 +57,13 @@ type RollupSyncService struct {
cancel context.CancelFunc
client *L1Client
db ethdb.Database
latestProcessedBlock uint64
latestProcessedBlock atomic.Uint64
scrollChainABI *abi.ABI
l1CommitBatchEventSignature common.Hash
l1RevertBatchEventSignature common.Hash
l1FinalizeBatchEventSignature common.Hash
bc *core.BlockChain
stack *node.Node

stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates
}

func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig, db ethdb.Database, l1Client sync_service.EthClient, bc *core.BlockChain, stack *node.Node) (*RollupSyncService, error) {
Expand Down Expand Up @@ -109,7 +107,6 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
cancel: cancel,
client: client,
db: db,
latestProcessedBlock: latestProcessedBlock,
scrollChainABI: scrollChainABI,
l1CommitBatchEventSignature: scrollChainABI.Events["CommitBatch"].ID,
l1RevertBatchEventSignature: scrollChainABI.Events["RevertBatch"].ID,
Expand All @@ -118,6 +115,8 @@ func NewRollupSyncService(ctx context.Context, genesisConfig *params.ChainConfig
stack: stack,
}

service.latestProcessedBlock.Store(latestProcessedBlock)

return &service, nil
}

Expand All @@ -126,7 +125,7 @@ func (s *RollupSyncService) Start() {
return
}

log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock)
log.Info("Starting rollup event sync background service", "latest processed block", s.latestProcessedBlock.Load())

go func() {
syncTicker := time.NewTicker(defaultSyncInterval)
Expand All @@ -142,7 +141,7 @@ func (s *RollupSyncService) Start() {
case <-syncTicker.C:
s.fetchRollupEvents()
case <-logTicker.C:
log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("Sync rollup events progress update", "latestProcessedBlock", s.latestProcessedBlock.Load())
}
}
}()
Expand All @@ -166,29 +165,22 @@ func (s *RollupSyncService) ResetToHeight(height uint64) {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

rawdb.WriteRollupEventSyncedL1BlockNumber(s.db, height)
s.latestProcessedBlock = height
s.latestProcessedBlock.Store(height)

log.Info("Reset sync service", "height", height)
}

func (s *RollupSyncService) fetchRollupEvents() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestFinalizedBlockNumber()
if err != nil {
log.Warn("failed to get latest confirmed block number", "err", err)
return
}

log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock, "latest confirmed", latestConfirmed)
log.Trace("Sync service fetch rollup events", "latest processed block", s.latestProcessedBlock.Load(), "latest confirmed", latestConfirmed)

// query in batches
for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += defaultFetchBlockRange {
for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += defaultFetchBlockRange {
if s.ctx.Err() != nil {
log.Info("Context canceled", "reason", s.ctx.Err())
return
Expand All @@ -210,7 +202,7 @@ func (s *RollupSyncService) fetchRollupEvents() {
return
}

s.latestProcessedBlock = to
s.latestProcessedBlock.Store(to)
}
}

Expand Down
44 changes: 18 additions & 26 deletions rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"

"github.com/scroll-tech/go-ethereum/core"
Expand Down Expand Up @@ -49,10 +49,8 @@ type SyncService struct {
db ethdb.Database
msgCountFeed event.Feed
pollInterval time.Duration
latestProcessedBlock uint64
latestProcessedBlock atomic.Uint64
scope event.SubscriptionScope

stateMu sync.Mutex // protects the service state, e.g. db and latestProcessedBlock updates
}

func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) {
Expand Down Expand Up @@ -82,14 +80,15 @@ func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, node
ctx, cancel := context.WithCancel(ctx)

service := SyncService{
ctx: ctx,
cancel: cancel,
client: client,
db: db,
pollInterval: DefaultPollInterval,
latestProcessedBlock: latestProcessedBlock,
ctx: ctx,
cancel: cancel,
client: client,
db: db,
pollInterval: DefaultPollInterval,
}

service.latestProcessedBlock.Store(latestProcessedBlock)

return &service, nil
}

Expand All @@ -99,14 +98,14 @@ func (s *SyncService) Start() {
}

// wait for initial sync before starting node
log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("Starting L1 message sync service", "latestProcessedBlock", s.latestProcessedBlock.Load())

// block node startup during initial sync and print some helpful logs
latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err == nil && latestConfirmed > s.latestProcessedBlock+1000 {
if err == nil && latestConfirmed > s.latestProcessedBlock.Load()+1000 {
log.Warn("Running initial sync of L1 messages before starting l2geth, this might take a while...")
s.fetchMessages()
log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock)
log.Info("L1 message initial sync completed", "latestProcessedBlock", s.latestProcessedBlock.Load())
}

go func() {
Expand Down Expand Up @@ -148,11 +147,7 @@ func (s *SyncService) ResetToHeight(height uint64) {
return
}

s.stateMu.Lock()
defer s.stateMu.Unlock()

rawdb.WriteSyncedL1BlockNumber(s.db, height)
s.latestProcessedBlock = height
s.latestProcessedBlock.Store(height)

log.Info("Reset sync service", "height", height)
}
Expand All @@ -164,16 +159,13 @@ func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) eve
}

func (s *SyncService) fetchMessages() {
s.stateMu.Lock()
defer s.stateMu.Unlock()

latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err != nil {
log.Warn("Failed to get latest confirmed block number", "err", err)
return
}

log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed)
log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock.Load(), "latestConfirmed", latestConfirmed)

// keep track of next queue index we're expecting to see
queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db)
Expand Down Expand Up @@ -203,15 +195,15 @@ func (s *SyncService) fetchMessages() {
numMessagesPendingDbWrite = 0
}

s.latestProcessedBlock = lastBlock
s.latestProcessedBlock.Store(lastBlock)
}

// ticker for logging progress
t := time.NewTicker(LogProgressInterval)
numMsgsCollected := 0

// query in batches
for from := s.latestProcessedBlock + 1; from <= latestConfirmed; from += DefaultFetchBlockRange {
for from := s.latestProcessedBlock.Load() + 1; from <= latestConfirmed; from += DefaultFetchBlockRange {
select {
case <-s.ctx.Done():
// flush pending writes to database
Expand All @@ -220,8 +212,8 @@ func (s *SyncService) fetchMessages() {
}
return
case <-t.C:
progress := 100 * float64(s.latestProcessedBlock) / float64(latestConfirmed)
log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock, "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress)
progress := 100 * float64(s.latestProcessedBlock.Load()) / float64(latestConfirmed)
log.Info("Syncing L1 messages", "processed", s.latestProcessedBlock.Load(), "confirmed", latestConfirmed, "collected", numMsgsCollected, "progress(%)", progress)
default:
}

Expand Down

0 comments on commit 75413e6

Please sign in to comment.