Skip to content

Commit

Permalink
Merge branch 'release/v0.1.0' into fix-claim-010
Browse files Browse the repository at this point in the history
  • Loading branch information
chengzhinei committed Nov 8, 2023
2 parents f8e32ec + 0682df0 commit 8108fee
Show file tree
Hide file tree
Showing 19 changed files with 1,677 additions and 1,708 deletions.
1,376 changes: 947 additions & 429 deletions bridgectrl/pb/query.pb.go

Large diffs are not rendered by default.

1,423 changes: 271 additions & 1,152 deletions bridgectrl/pb/query.pb.gw.go

Large diffs are not rendered by default.

147 changes: 104 additions & 43 deletions bridgectrl/pb/query_grpc.pb.go

Large diffs are not rendered by default.

52 changes: 38 additions & 14 deletions claimtxman/claimtxman.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,25 @@ type ClaimTxManager struct {
bridgeService bridgeServiceInterface
cfg Config
chExitRootEvent chan *etherman.GlobalExitRoot
chSynced chan uint
storage storageInterface
auth *bind.TransactOpts
nonceCache *lru.Cache[string, uint64]
synced bool
}

// NewClaimTxManager creates a new claim transaction manager.
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}) (*ClaimTxManager, error) {
client, err := utils.NewClient(context.Background(), l2NodeURL, l2BridgeAddr)
func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint, l2NodeURL string, l2NetworkID uint, l2BridgeAddr common.Address, bridgeService bridgeServiceInterface, storage interface{}) (*ClaimTxManager, error) {
ctx := context.Background()
client, err := utils.NewClient(ctx, l2NodeURL, l2BridgeAddr)
if err != nil {
return nil, err
}
cache, err := lru.New[string, uint64](int(cacheSize))
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
auth, err := client.GetSignerFromKeystore(ctx, cfg.PrivateKey)
return &ClaimTxManager{
ctx: ctx,
Expand All @@ -65,6 +68,7 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot
bridgeService: bridgeService,
cfg: cfg,
chExitRootEvent: chExitRootEvent,
chSynced: chSynced,
storage: storage.(storageInterface),
auth: auth,
nonceCache: cache,
Expand All @@ -75,19 +79,30 @@ func NewClaimTxManager(cfg Config, chExitRootEvent chan *etherman.GlobalExitRoot
// send then to the blockchain and keep monitoring them until they
// get mined
func (tm *ClaimTxManager) Start() {
ticker := time.NewTicker(tm.cfg.FrequencyToMonitorTxs.Duration)
for {
select {
case <-tm.ctx.Done():
return
case netID := <-tm.chSynced:
if netID == tm.l2NetworkID && !tm.synced {
log.Info("NetworkID synced: ", netID)
tm.synced = true
}
case ger := <-tm.chExitRootEvent:
go func() {
err := tm.updateDepositsStatus(ger)
if err != nil {
log.Errorf("failed to update deposits status: %v", err)
}
}()
case <-time.After(tm.cfg.FrequencyToMonitorTxs.Duration):
err := tm.monitorTxs(context.Background())
if tm.synced {
log.Debug("UpdateDepositsStatus for ger: ", ger.GlobalExitRoot)
go func() {
err := tm.updateDepositsStatus(ger)
if err != nil {
log.Errorf("failed to update deposits status: %v", err)
}
}()
} else {
log.Infof("Waiting for networkID %d to be synced before processing deposits", tm.l2NetworkID)
}
case <-ticker.C:
err := tm.monitorTxs(tm.ctx)
if err != nil {
log.Errorf("failed to monitor txs: %v", err)
}
Expand Down Expand Up @@ -385,6 +400,12 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
statusesFilter := []ctmtypes.MonitoredTxStatus{ctmtypes.MonitoredTxStatusCreated}
mTxs, err := tm.storage.GetClaimTxsByStatus(ctx, statusesFilter, dbTx)
if err != nil {
log.Errorf("failed to get created monitored txs: %v", err)
rollbackErr := tm.storage.Rollback(tm.ctx, dbTx)
if rollbackErr != nil {
log.Errorf("claimtxman error rolling back state. RollbackErr: %s, err: %v", rollbackErr.Error(), err)
return rollbackErr
}
return fmt.Errorf("failed to get created monitored txs: %v", err)
}

Expand Down Expand Up @@ -422,6 +443,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
mTxLog.Errorf("failed to get tx %s: %v", txHash.String(), err)
continue
}
log.Infof("tx: %s not mined yet", txHash.String())

allHistoryTxMined = false
continue
Expand Down Expand Up @@ -566,17 +588,19 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error {
mTxLog.Errorf("failed to update monitored tx: %v", err)
continue
}
mTxLog.Debugf("signed tx added to the monitored tx history")
mTxLog.Infof("signed tx %s added to the monitored tx history", signedTx.Hash().String())
}
}

err = tm.storage.Commit(tm.ctx, dbTx)
if err != nil {
log.Errorf("UpdateClaimTx committing dbTx, err: %v", err)
rollbackErr := tm.storage.Rollback(tm.ctx, dbTx)
if rollbackErr != nil {
log.Fatalf("claimtxman error rolling back state. RollbackErr: %s, err: %s", rollbackErr.Error(), err.Error())
log.Errorf("claimtxman error rolling back state. RollbackErr: %s, err: %v", rollbackErr.Error(), err)
return rollbackErr
}
log.Fatalf("UpdateClaimTx committing dbTx, err: %s", err.Error())
return err
}
return nil
}
Expand Down
13 changes: 8 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ func startServer(ctx *cli.Context) error {
log.Debug("trusted sequencer URL ", c.Etherman.L2URLs[0])
zkEVMClient := client.NewClient(c.Etherman.L2URLs[0])
chExitRootEvent := make(chan *etherman.GlobalExitRoot)
go runSynchronizer(c.NetworkConfig.GenBlockNumber, bridgeController, l1Etherman, c.Synchronizer, storage, zkEVMClient, chExitRootEvent)
chSynced := make(chan uint)
go runSynchronizer(c.NetworkConfig.GenBlockNumber, bridgeController, l1Etherman, c.Synchronizer, storage, zkEVMClient, chExitRootEvent, chSynced)
for _, client := range l2Ethermans {
go runSynchronizer(0, bridgeController, client, c.Synchronizer, storage, zkEVMClient, chExitRootEvent)
go runSynchronizer(0, bridgeController, client, c.Synchronizer, storage, zkEVMClient, chExitRootEvent, chSynced)
}

if c.ClaimTxManager.Enabled {
for i := 0; i < len(c.Etherman.L2URLs); i++ {
// we should match the orders of L2URLs between etherman and claimtxman
// since we are using the networkIDs in the same order
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage)
claimTxManager, err := claimtxman.NewClaimTxManager(c.ClaimTxManager, chExitRootEvent, chSynced, c.Etherman.L2URLs[i], networkIDs[i+1], c.NetworkConfig.L2PolygonBridgeAddresses[i], bridgeService, storage)
if err != nil {
log.Fatalf("error creating claim tx manager for L2 %s. Error: %v", c.Etherman.L2URLs[i], err)
}
Expand All @@ -138,6 +139,8 @@ func startServer(ctx *cli.Context) error {
select {
case <-chExitRootEvent:
log.Debug("New GER received")
case netID := <-chSynced:
log.Debug("NetworkID synced: ", netID)
case <-ctx.Context.Done():
log.Debug("Stopping goroutine that listen new GER updates")
return
Expand Down Expand Up @@ -205,8 +208,8 @@ func newEthermans(c *config.Config) (*etherman.Client, []*etherman.Client, error
return l1Etherman, l2Ethermans, nil
}

func runSynchronizer(genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEvent chan *etherman.GlobalExitRoot) {
sy, err := synchronizer.NewSynchronizer(storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEvent, cfg)
func runSynchronizer(genBlockNumber uint64, brdigeCtrl *bridgectrl.BridgeController, etherman *etherman.Client, cfg synchronizer.Config, storage db.Storage, zkEVMClient *client.Client, chExitRootEvent chan *etherman.GlobalExitRoot, chSynced chan uint) {
sy, err := synchronizer.NewSynchronizer(storage, brdigeCtrl, etherman, zkEVMClient, genBlockNumber, chExitRootEvent, chSynced, cfg)
if err != nil {
log.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion config/config.debug.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ BridgeVersion = "v1"
MaxConns = 20
TableSuffix = ""
[BridgeServer.Redis]
Addr = "localhost:6379"
IsClusterMode = false
Addrs = ["localhost:6379"]
Username = ""
Password = ""
DB = 0
Expand Down
3 changes: 2 additions & 1 deletion config/config.local.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ SentinelConfigFilePath = "/app/sentinel_config.json"
MaxConns = 20
TableSuffix = ""
[BridgeServer.Redis]
Addr = "xgon-bridge-redis:6379"
IsClusterMode = false
Addrs = ["xgon-bridge-redis:6379"]
Username = ""
Password = ""
DB = 0
Expand Down
2 changes: 1 addition & 1 deletion db/pgstorage/migrations/0008.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ CREATE UNIQUE INDEX IF NOT EXISTS deposit_test_uidx ON sync.deposit_test (networ
CREATE INDEX IF NOT EXISTS deposit_dest_addr_idx ON sync.deposit (dest_addr);
CREATE INDEX IF NOT EXISTS deposit_test_dest_addr_idx ON sync.deposit_test (dest_addr);
CREATE INDEX IF NOT EXISTS claim_dest_addr_idx ON sync.claim (dest_addr);
CREATE INDEX IF NOT EXISTS claim_test_dest_addr_idx ON sync.claim (dest_addr);
CREATE INDEX IF NOT EXISTS claim_test_dest_addr_idx ON sync.claim_test (dest_addr);
62 changes: 62 additions & 0 deletions db/pgstorage/pgstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,37 @@ func (p *PostgresStorage) GetPendingTransactions(ctx context.Context, destAddr s
return deposits, nil
}

// GetNotReadyTransactions returns all the deposit transactions with ready_for_claim = false
func (p *PostgresStorage) GetNotReadyTransactions(ctx context.Context, limit uint, offset uint, dbTx pgx.Tx) ([]*etherman.Deposit, error) {
getDepositsSQL := fmt.Sprintf(`SELECT d.id, leaf_type, orig_net, orig_addr, amount, dest_net, dest_addr, deposit_cnt, block_id, b.block_num, d.network_id, tx_hash, metadata, ready_for_claim, b.received_at
FROM sync.deposit%[1]v as d INNER JOIN sync.block%[1]v as b ON d.network_id = b.network_id AND d.block_id = b.id
WHERE ready_for_claim = false
ORDER BY d.block_id DESC, d.deposit_cnt DESC LIMIT $1 OFFSET $2`, p.tableSuffix)

rows, err := p.getExecQuerier(dbTx).Query(ctx, getDepositsSQL, limit, offset)
if err != nil {
return nil, err
}

deposits := make([]*etherman.Deposit, 0, len(rows.RawValues()))

for rows.Next() {
var (
deposit etherman.Deposit
amount string
)
err = rows.Scan(&deposit.Id, &deposit.LeafType, &deposit.OriginalNetwork, &deposit.OriginalAddress, &amount, &deposit.DestinationNetwork, &deposit.DestinationAddress,
&deposit.DepositCount, &deposit.BlockID, &deposit.BlockNumber, &deposit.NetworkID, &deposit.TxHash, &deposit.Metadata, &deposit.ReadyForClaim, &deposit.Time)
if err != nil {
return nil, err
}
deposit.Amount, _ = new(big.Int).SetString(amount, 10) //nolint:gomnd
deposits = append(deposits, &deposit)
}

return deposits, nil
}

// GetDepositCount gets the deposit count for the destination address.
func (p *PostgresStorage) GetDepositCount(ctx context.Context, destAddr string, dbTx pgx.Tx) (uint64, error) {
getDepositCountSQL := fmt.Sprintf("SELECT COUNT(*) FROM sync.deposit%[1]v WHERE dest_addr = $1", p.tableSuffix)
Expand Down Expand Up @@ -606,6 +637,37 @@ func (p *PostgresStorage) GetClaimTxsByStatus(ctx context.Context, statuses []ct
return mTxs, nil
}

func (p *PostgresStorage) GetClaimTxsByStatusWithLimit(ctx context.Context, statuses []ctmtypes.MonitoredTxStatus, limit uint, offset uint, dbTx pgx.Tx) ([]ctmtypes.MonitoredTx, error) {
getMonitoredTxsSQL := fmt.Sprintf("SELECT * FROM sync.monitored_txs%[1]v WHERE status = ANY($1) ORDER BY created_at DESC LIMIT $2 OFFSET $3", p.tableSuffix)
rows, err := p.getExecQuerier(dbTx).Query(ctx, getMonitoredTxsSQL, pq.Array(statuses), limit, offset)
if errors.Is(err, pgx.ErrNoRows) {
return []ctmtypes.MonitoredTx{}, nil
} else if err != nil {
return nil, err
}

mTxs := make([]ctmtypes.MonitoredTx, 0, len(rows.RawValues()))
for rows.Next() {
var (
value string
history [][]byte
)
mTx := ctmtypes.MonitoredTx{}
err = rows.Scan(&mTx.ID, &mTx.BlockID, &mTx.From, &mTx.To, &mTx.Nonce, &value, &mTx.Data, &mTx.Gas, &mTx.Status, pq.Array(&history), &mTx.CreatedAt, &mTx.UpdatedAt)
if err != nil {
return mTxs, err
}
mTx.Value, _ = new(big.Int).SetString(value, 10) //nolint:gomnd
mTx.History = make(map[common.Hash]bool)
for _, h := range history {
mTx.History[common.BytesToHash(h)] = true
}
mTxs = append(mTxs, mTx)
}

return mTxs, nil
}

// GetClaimTxById gets the monitored transactions by id (depositCount)
func (p *PostgresStorage) GetClaimTxById(ctx context.Context, id uint, dbTx pgx.Tx) (*ctmtypes.MonitoredTx, error) {
getClaimSql := fmt.Sprintf("SELECT * FROM sync.monitored_txs%[1]v WHERE id = $1", p.tableSuffix)
Expand Down
Loading

0 comments on commit 8108fee

Please sign in to comment.