Skip to content

Commit

Permalink
Check in claimtxmanager to block until l2network is synced (#23)
Browse files Browse the repository at this point in the history
* Check in claimtxmanager to block until l2network is synced

* add judge err
  • Loading branch information
chengzhinei authored Nov 8, 2023
1 parent 30297a5 commit 0682df0
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 26 deletions.
52 changes: 38 additions & 14 deletions claimtxman/claimtxman.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,25 @@ type ClaimTxManager struct {
bridgeService bridgeServiceInterface
cfg Config
chExitRootEvent chan *etherman.GlobalExitRoot
chSynced chan uint
storage storageInterface
auth *bind.TransactOpts
nonceCache *lru.Cache[string, uint64]
synced bool
}

// NewClaimTxManager creates a new claim transaction manager.
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}) (*ClaimTxManager, error) {
client, err := utils.NewClient(context.Background(), l2NodeURL, l2BridgeAddr)
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}) (*ClaimTxManager, error) {
ctx := context.Background()
client, err := utils.NewClient(ctx, l2NodeURL, l2BridgeAddr)
if err != nil {
return nil, err
}
cache, err := lru.New[string, uint64](int(cacheSize))
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
auth, err := client.GetSignerFromKeystore(ctx, cfg.PrivateKey)
return &ClaimTxManager{
ctx: ctx,
Expand All @@ -65,6 +68,7 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot
bridgeService: bridgeService,
cfg: cfg,
chExitRootEvent: chExitRootEvent,
chSynced: chSynced,
storage: storage.(storageInterface),
auth: auth,
nonceCache: cache,
Expand All @@ -75,19 +79,30 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot
// send then to the blockchain and keep monitoring them until they
// get mined
func (tm *ClaimTxManager) Start() {
ticker := time.NewTicker(tm.cfg.FrequencyToMonitorTxs.Duration)
for {
select {
case <-tm.ctx.Done():
return
case netID := <-tm.chSynced:
if netID == tm.l2NetworkID && !tm.synced {
log.Info("NetworkID synced: ", netID)
tm.synced = true
}
case ger := <-tm.chExitRootEvent:
go func() {
err := tm.updateDepositsStatus(ger)
if err != nil {
log.Errorf("failed to update deposits status: %v", err)
}
}()
case <-time.After(tm.cfg.FrequencyToMonitorTxs.Duration):
err := tm.monitorTxs(context.Background())
if tm.synced {
log.Debug("UpdateDepositsStatus for ger: ", ger.GlobalExitRoot)
go func() {
err := tm.updateDepositsStatus(ger)
if err != nil {
log.Errorf("failed to update deposits status: %v", err)
}
}()
} else {
log.Infof("Waiting for networkID %d to be synced before processing deposits", tm.l2NetworkID)
}
case <-ticker.C:
err := tm.monitorTxs(tm.ctx)
if err != nil {
log.Errorf("failed to monitor txs: %v", err)
}
Expand Down Expand Up @@ -244,6 +259,12 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
statusesFilter := []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatusCreated}
mTxs, err := tm.storage.GetClaimTxsByStatus(ctx, statusesFilter, dbTx)
if err != nil {
log.Errorf("failed to get created monitored txs: %v", err)
rollbackErr := tm.storage.Rollback(tm.ctx, dbTx)
if rollbackErr != nil {
log.Errorf("claimtxman error rolling back state. RollbackErr: %s, err: %v", rollbackErr.Error(), err)
return rollbackErr
}
return fmt.Errorf("failed to get created monitored txs: %v", err)
}

Expand Down Expand Up @@ -281,6 +302,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
mTxLog.Errorf("failed to get tx %s: %v", txHash.String(), err)
continue
}
log.Infof("tx: %s not mined yet", txHash.String())

allHistoryTxMined = false
continue
Expand Down Expand Up @@ -425,17 +447,19 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
mTxLog.Errorf("failed to update monitored tx: %v", err)
continue
}
mTxLog.Debugf("signed tx added to the monitored tx history")
mTxLog.Infof("signed tx %s added to the monitored tx history", signedTx.Hash().String())
}
}

err = tm.storage.Commit(tm.ctx, dbTx)
if err != nil {
log.Errorf("UpdateClaimTx committing dbTx, err: %v", err)
rollbackErr := tm.storage.Rollback(tm.ctx, dbTx)
if rollbackErr != nil {
log.Fatalf("claimtxman error rolling back state. RollbackErr: %s, err: %s", rollbackErr.Error(), err.Error())
log.Errorf("claimtxman error rolling back state. RollbackErr: %s, err: %v", rollbackErr.Error(), err)
return rollbackErr
}
log.Fatalf("UpdateClaimTx committing dbTx, err: %s", err.Error())
return err
}
return nil
}
Expand Down
13 changes: 8 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ func startServer(ctx *cli.Context) error {
log.Debug("trusted sequencer URL ", c.Etherman.L2URLs[0])
zkEVMClient := client.NewClient(c.Etherman.L2URLs[0])
chExitRootEvent := make(chan *etherman.GlobalExitRoot)
go runSynchronizer(c.NetworkConfig.GenBlockNumber, bridgeController, l1Etherman, c.Synchronizer, storage, zkEVMClient, chExitRootEvent)
chSynced := make(chan uint)
go runSynchronizer(c.NetworkConfig.GenBlockNumber, bridgeController, l1Etherman, c.Synchronizer, storage, zkEVMClient, chExitRootEvent, chSynced)
for _, client := range l2Ethermans {
go runSynchronizer(0, bridgeController, client, c.Synchronizer, storage, zkEVMClient, chExitRootEvent)
go runSynchronizer(0, bridgeController, client, c.Synchronizer, storage, zkEVMClient, chExitRootEvent, chSynced)
}

if c.ClaimTxManager.Enabled {
for i := 0; i < len(c.Etherman.L2URLs); i++ {
// we should match the orders of L2URLs between etherman and claimtxman
// since we are using the networkIDs in the same order
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage)
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage)
if err != nil {
log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err)
}
Expand All @@ -138,6 +139,8 @@ func startServer(ctx *cli.Context) error {
select {
case <-chExitRootEvent:
log.Debug("New GER received")
case netID := <-chSynced:
log.Debug("NetworkID synced: ", netID)
case <-ctx.Context.Done():
log.Debug("Stopping goroutine that listen new GER updates")
return
Expand Down Expand Up @@ -205,8 +208,8 @@ func newEthermans(c *config.Config) (*etherman.Client, []*etherman.Client, error
return l1Etherman, l2Ethermans, nil
}

func runSynchronizer(genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEvent chan *etherman.GlobalExitRoot) {
sy, err := synchronizer.NewSynchronizer(storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEvent, cfg)
func runSynchronizer(genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint) {
sy, err := synchronizer.NewSynchronizer(storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEvent, chSynced, cfg)
if err != nil {
log.Fatal(err)
}
Expand Down
21 changes: 16 additions & 5 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ClientSynchronizer struct {
cfg Config
networkID uint
chExitRootEvent chan *etherman.GlobalExitRoot
chSynced chan uint
zkEVMClient zkEVMClientInterface
synced bool
l1RollupExitRoot common.Hash
Expand All @@ -44,6 +45,7 @@ func NewSynchronizer(
zkEVMClient zkEVMClientInterface,
genBlockNumber uint64,
chExitRootEvent chan *etherman.GlobalExitRoot,
chSynced chan uint,
cfg Config) (Synchronizer, error) {
ctx, cancel := context.WithCancel(context.Background())
networkID, err := ethMan.GetNetworkID(ctx)
Expand All @@ -70,6 +72,7 @@ func NewSynchronizer(
cfg: cfg,
networkID: networkID,
chExitRootEvent: chExitRootEvent,
chSynced: chSynced,
zkEVMClient: zkEVMClient,
l1RollupExitRoot: ger.ExitRoots[1],
}, nil
Expand All @@ -82,6 +85,7 @@ func NewSynchronizer(
cancelCtx: cancel,
genBlockNumber: genBlockNumber,
cfg: cfg,
chSynced: chSynced,
networkID: networkID,
}, nil
}
Expand Down Expand Up @@ -115,12 +119,13 @@ func (s *ClientSynchronizer) Sync() error {
log.Debug("synchronizer ctx done. NetworkID: ", s.networkID)
return nil
case <-time.After(waitDuration):
log.Debugf("NetworkID: %d, syncing...", s.networkID)
//Sync L1Blocks
if lastBlockSynced, err = s.syncBlocks(lastBlockSynced); err != nil {
log.Warn("error syncing blocks: ", err)
log.Warnf("networkID: %d, error syncing blocks: ", s.networkID, err)
lastBlockSynced, err = s.storage.GetLastBlock(s.ctx, s.networkID, nil)
if err != nil {
log.Fatal("error getting lastBlockSynced to resume the synchronization... Error: ", err)
log.Fatalf("networkID: %d, error getting lastBlockSynced to resume the synchronization... Error: ", s.networkID, err)
}
if s.ctx.Err() != nil {
continue
Expand All @@ -134,9 +139,11 @@ func (s *ClientSynchronizer) Sync() error {
continue
}
lastKnownBlock := header.Number.Uint64()
if lastBlockSynced.BlockNumber == lastKnownBlock {
if lastBlockSynced.BlockNumber == lastKnownBlock && !s.synced {
log.Infof("NetworkID %d Synced!", s.networkID)
waitDuration = s.cfg.SyncInterval.Duration
s.synced = true
s.chSynced <- s.networkID
}
if lastBlockSynced.BlockNumber > lastKnownBlock {
if s.networkID == 0 {
Expand Down Expand Up @@ -253,8 +260,12 @@ func (s *ClientSynchronizer) syncBlocks(lastBlockSynced *etherman.Block) (*ether
fromBlock = toBlock + 1

if lastKnownBlock.Cmp(new(big.Int).SetUint64(toBlock)) < 1 {
waitDuration = s.cfg.SyncInterval.Duration
s.synced = true
if !s.synced {
log.Infof("NetworkID %d Synced!", s.networkID)
waitDuration = s.cfg.SyncInterval.Duration
s.synced = true
s.chSynced <- s.networkID
}
break
}
if len(blocks) == 0 { // If there is no events in the checked blocks range and lastKnownBlock > fromBlock.
Expand Down
6 changes: 4 additions & 2 deletions synchronizer/synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ func TestSyncGer(t *testing.T) {
m.Etherman.On("GetNetworkID", ctx).Return(uint(0), nil)
m.Storage.On("GetLatestL1SyncedExitRoot", context.Background(), nil).Return(&etherman.GlobalExitRoot{}, gerror.ErrStorageNotFound)
chEvent := make(chan *etherman.GlobalExitRoot)
sync, err := NewSynchronizer(m.Storage, m.BridgeCtrl, m.Etherman, m.ZkEVMClient, genBlockNumber, chEvent, cfg)
chSynced := make(chan uint)
sync, err := NewSynchronizer(m.Storage, m.BridgeCtrl, m.Etherman, m.ZkEVMClient, genBlockNumber, chEvent, chSynced, cfg)
require.NoError(t, err)

go func() {
for {
select {
case <-chEvent:
t.Log("New GER received")
return
case netID := <-chSynced:
t.Log("Synced networkID: ", netID)
case <-context.Background().Done():
return
}
Expand Down

0 comments on commit 0682df0

Please sign in to comment.