From 9140a158440cf58102417f4c10f1a97a73f352f2 Mon Sep 17 00:00:00 2001 From: "evan.han" Date: Wed, 13 Dec 2023 19:55:31 +0800 Subject: [PATCH 1/2] limit more rpc api --- app/rpc/apis.go | 2 +- app/rpc/namespaces/eth/api.go | 59 ++++++++++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/app/rpc/apis.go b/app/rpc/apis.go index d399293da8..ef8db4ee50 100644 --- a/app/rpc/apis.go +++ b/app/rpc/apis.go @@ -50,7 +50,7 @@ func GetAPIs(clientCtx context.CLIContext, log log.Logger, keys ...ethsecp256k1. rateLimiters := getRateLimiter() disableAPI := getDisableAPI() ethBackend = backend.New(clientCtx, log, rateLimiters, disableAPI) - ethAPI := eth.NewAPI(clientCtx, log, ethBackend, nonceLock, keys...) + ethAPI := eth.NewAPI(rateLimiters, clientCtx, log, ethBackend, nonceLock, keys...) if evmtypes.GetEnableBloomFilter() { ethBackend.StartBloomHandlers(evmtypes.BloomBitsBlocks, evmtypes.GetIndexer().GetDB()) } diff --git a/app/rpc/namespaces/eth/api.go b/app/rpc/namespaces/eth/api.go index aa0482c90b..d7faae49f4 100644 --- a/app/rpc/namespaces/eth/api.go +++ b/app/rpc/namespaces/eth/api.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -58,6 +60,8 @@ import ( "github.com/okex/exchain/x/vmbridge" ) +var ErrServerBusy = errors.New("server is too busy") + const ( CacheOfEthCallLru = 40960 @@ -90,10 +94,18 @@ type PublicEthereumAPI struct { callCache *lru.Cache cdc *codec.Codec fastQueryThreshold uint64 + rateLimiters map[string]*rate.Limiter +} + +func (api *PublicEthereumAPI) GetRateLimiter(apiName string) *rate.Limiter { + if api.rateLimiters == nil { + return nil + } + return api.rateLimiters[apiName] } // NewAPI creates an instance of the public ETH Web3 API. -func NewAPI( +func NewAPI(rateLimiters map[string]*rate.Limiter, clientCtx clientcontext.CLIContext, log log.Logger, backend backend.Backend, nonceLock *rpctypes.AddrLocker, keys ...ethsecp256k1.PrivKey, ) *PublicEthereumAPI { @@ -115,6 +127,7 @@ func NewAPI( wrappedBackend: watcher.NewQuerier(), watcherBackend: watcher.NewWatcher(log), fastQueryThreshold: viper.GetUint64(FlagFastQueryThreshold), + rateLimiters: rateLimiters, } api.evmFactory = simulation.NewEvmFactory(clientCtx.ChainID, api.wrappedBackend) module := evm.AppModuleBasic{} @@ -506,6 +519,10 @@ func (api *PublicEthereumAPI) getStorageAt(address common.Address, key []byte, b func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) { monitor := monitor.GetMonitor("eth_getStorageAt", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("address", address, "key", key, "block number", blockNrOrHash) + rateLimiter := api.GetRateLimiter("eth_getStorageAt") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { return nil, err @@ -522,7 +539,10 @@ func (api *PublicEthereumAPI) GetStorageAtInternal(address common.Address, key [ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (*hexutil.Uint64, error) { monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("address", address, "block number", blockNrOrHash) - + rateLimiter := api.GetRateLimiter("eth_getTransactionCount") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { return nil, err @@ -553,6 +573,10 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *hexutil.Uint { monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("hash", hash) + rateLimiter := api.GetRateLimiter("eth_getBlockTransactionCountByHash") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil + } res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex())) if err != nil { return nil @@ -636,6 +660,10 @@ func (api *PublicEthereumAPI) GetUncleCountByBlockNumber(_ rpctypes.BlockNumber) func (api *PublicEthereumAPI) GetCode(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) { monitor := monitor.GetMonitor("eth_getCode", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("address", address, "block number", blockNrOrHash) + rateLimiter := api.GetRateLimiter("eth_getCode") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } blockNumber, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { return nil, err @@ -683,6 +711,10 @@ func (api *PublicEthereumAPI) GetCodeByHash(hash common.Hash) (hexutil.Bytes, er // GetTransactionLogs returns the logs given a transaction hash. func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) { api.logger.Debug("eth_getTransactionLogs", "hash", txHash) + rateLimiter := api.GetRateLimiter("eth_getTransactionLogs") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } return api.backend.GetTransactionLogs(txHash) } @@ -864,7 +896,10 @@ func (api *PublicEthereumAPI) addCallCache(key common.Hash, data []byte) { func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNrOrHash rpctypes.BlockNumberOrHash, overrides *evmtypes.StateOverrides) (hexutil.Bytes, error) { monitor := monitor.GetMonitor("eth_call", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("args", args, "block number", blockNrOrHash) - + rateLimiter := api.GetRateLimiter("eth_call") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } if overrides != nil { if err := overrides.Check(); err != nil { return nil, err @@ -1092,7 +1127,10 @@ func (api *PublicEthereumAPI) simDoCall(args rpctypes.CallArgs, cap uint64) (uin func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint64, error) { monitor := monitor.GetMonitor("eth_estimateGas", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("args", args) - + rateLimiter := api.GetRateLimiter("eth_estimateGas") + if rateLimiter != nil && !rateLimiter.Allow() { + return 0, ErrServerBusy + } params, err := api.getEvmParams() if err != nil { return 0, TransformDataError(err, "eth_estimateGas") @@ -1136,6 +1174,10 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6 func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (*watcher.Block, error) { monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("hash", hash, "full", fullTx) + rateLimiter := api.GetRateLimiter("eth_getBlockByHash") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } blockRes, err := api.backend.GetBlockByHash(hash, fullTx) if err != nil { return nil, TransformDataError(err, RPCEthGetBlockByHash) @@ -1195,7 +1237,10 @@ func (api *PublicEthereumAPI) getBlockByNumber(blockNum rpctypes.BlockNumber, fu func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fullTx bool) (*watcher.Block, error) { monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("number", blockNum, "full", fullTx) - + rateLimiter := api.GetRateLimiter("eth_getBlockByNumber") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } blockRes, err := api.getBlockByNumber(blockNum, fullTx) return blockRes, err } @@ -1306,6 +1351,10 @@ func (api *PublicEthereumAPI) getTransactionByBlockAndIndex(block *tmtypes.Block func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (*watcher.TransactionReceipt, error) { monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger, api.Metrics).OnBegin() defer monitor.OnEnd("hash", hash) + rateLimiter := api.GetRateLimiter("eth_getTransactionReceipt") + if rateLimiter != nil && !rateLimiter.Allow() { + return nil, ErrServerBusy + } res, e := api.wrappedBackend.GetTransactionReceipt(hash) if e == nil { return res, nil From 66d81bb17899fcacefc815c9d353a28e47e70813 Mon Sep 17 00:00:00 2001 From: "evan.han" Date: Wed, 13 Dec 2023 20:45:36 +0800 Subject: [PATCH 2/2] optimize code --- app/rpc/namespaces/eth/api.go | 20 +++++++++----------- app/rpc/namespaces/eth/filters/api.go | 11 +++++------ app/rpc/types/utils.go | 1 + 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/app/rpc/namespaces/eth/api.go b/app/rpc/namespaces/eth/api.go index d7faae49f4..5604fbb79f 100644 --- a/app/rpc/namespaces/eth/api.go +++ b/app/rpc/namespaces/eth/api.go @@ -60,8 +60,6 @@ import ( "github.com/okex/exchain/x/vmbridge" ) -var ErrServerBusy = errors.New("server is too busy") - const ( CacheOfEthCallLru = 40960 @@ -521,7 +519,7 @@ func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, b defer monitor.OnEnd("address", address, "key", key, "block number", blockNrOrHash) rateLimiter := api.GetRateLimiter("eth_getStorageAt") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { @@ -541,7 +539,7 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN defer monitor.OnEnd("address", address, "block number", blockNrOrHash) rateLimiter := api.GetRateLimiter("eth_getTransactionCount") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { @@ -662,7 +660,7 @@ func (api *PublicEthereumAPI) GetCode(address common.Address, blockNrOrHash rpct defer monitor.OnEnd("address", address, "block number", blockNrOrHash) rateLimiter := api.GetRateLimiter("eth_getCode") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } blockNumber, err := api.backend.ConvertToBlockNumber(blockNrOrHash) if err != nil { @@ -713,7 +711,7 @@ func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtype api.logger.Debug("eth_getTransactionLogs", "hash", txHash) rateLimiter := api.GetRateLimiter("eth_getTransactionLogs") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } return api.backend.GetTransactionLogs(txHash) } @@ -898,7 +896,7 @@ func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNrOrHash rpctype defer monitor.OnEnd("args", args, "block number", blockNrOrHash) rateLimiter := api.GetRateLimiter("eth_call") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } if overrides != nil { if err := overrides.Check(); err != nil { @@ -1129,7 +1127,7 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6 defer monitor.OnEnd("args", args) rateLimiter := api.GetRateLimiter("eth_estimateGas") if rateLimiter != nil && !rateLimiter.Allow() { - return 0, ErrServerBusy + return 0, rpctypes.ErrServerBusy } params, err := api.getEvmParams() if err != nil { @@ -1176,7 +1174,7 @@ func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (*wa defer monitor.OnEnd("hash", hash, "full", fullTx) rateLimiter := api.GetRateLimiter("eth_getBlockByHash") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } blockRes, err := api.backend.GetBlockByHash(hash, fullTx) if err != nil { @@ -1239,7 +1237,7 @@ func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fu defer monitor.OnEnd("number", blockNum, "full", fullTx) rateLimiter := api.GetRateLimiter("eth_getBlockByNumber") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } blockRes, err := api.getBlockByNumber(blockNum, fullTx) return blockRes, err @@ -1353,7 +1351,7 @@ func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (*watcher. defer monitor.OnEnd("hash", hash) rateLimiter := api.GetRateLimiter("eth_getTransactionReceipt") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } res, e := api.wrappedBackend.GetTransactionReceipt(hash) if e == nil { diff --git a/app/rpc/namespaces/eth/filters/api.go b/app/rpc/namespaces/eth/filters/api.go index 095b546437..45c01f3478 100644 --- a/app/rpc/namespaces/eth/filters/api.go +++ b/app/rpc/namespaces/eth/filters/api.go @@ -27,7 +27,6 @@ import ( ) var ( - ErrServerBusy = errors.New("server is too busy") ErrMethodNotAllowed = errors.New("the method is not allowed") NameSpace = "filters" ) @@ -138,7 +137,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { } rateLimiter := api.backend.GetRateLimiter("eth_newPendingTransactionFilter") if rateLimiter != nil && !rateLimiter.Allow() { - return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", ErrServerBusy.Error())) + return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", rpctypes.ErrServerBusy.Error())) } pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs() if err != nil { @@ -235,7 +234,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { } rateLimiter := api.backend.GetRateLimiter("eth_newBlockFilter") if rateLimiter != nil && !rateLimiter.Allow() { - return rpc.ID(fmt.Sprintf("error creating block filter: %s", ErrServerBusy.Error())) + return rpc.ID(fmt.Sprintf("error creating block filter: %s", rpctypes.ErrServerBusy.Error())) } headerSub, cancelSubs, err := api.events.SubscribeNewHeads() if err != nil { @@ -402,7 +401,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID, } rateLimiter := api.backend.GetRateLimiter("eth_newFilter") if rateLimiter != nil && !rateLimiter.Allow() { - return rpc.ID(""), ErrServerBusy + return rpc.ID(""), rpctypes.ErrServerBusy } var ( filterID = rpc.ID("") @@ -468,7 +467,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.Filter } rateLimiter := api.backend.GetRateLimiter("eth_getLogs") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } var filter *Filter if criteria.BlockHash != nil { @@ -574,7 +573,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { } rateLimiter := api.backend.GetRateLimiter("eth_getFilterChanges") if rateLimiter != nil && !rateLimiter.Allow() { - return nil, ErrServerBusy + return nil, rpctypes.ErrServerBusy } api.filtersMu.Lock() defer api.filtersMu.Unlock() diff --git a/app/rpc/types/utils.go b/app/rpc/types/utils.go index dc7b3c491a..1a6b7317fb 100644 --- a/app/rpc/types/utils.go +++ b/app/rpc/types/utils.go @@ -25,6 +25,7 @@ import ( ) var ( + ErrServerBusy = errors.New("server is too busy, please try again later") // static gas limit for all blocks defaultGasLimit = hexutil.Uint64(int64(^uint32(0))) defaultGasUsed = hexutil.Uint64(0)