Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc api rate limit #3272

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetAPIs(clientCtx context.CLIContext, log log.Logger, keys ...ethsecp256k1.
rateLimiters := getRateLimiter()
disableAPI := getDisableAPI()
ethBackend = backend.New(clientCtx, log, rateLimiters, disableAPI)
ethAPI := eth.NewAPI(clientCtx, log, ethBackend, nonceLock, keys...)
ethAPI := eth.NewAPI(rateLimiters, clientCtx, log, ethBackend, nonceLock, keys...)
if evmtypes.GetEnableBloomFilter() {
ethBackend.StartBloomHandlers(evmtypes.BloomBitsBlocks, evmtypes.GetIndexer().GetDB())
}
Expand Down
57 changes: 52 additions & 5 deletions app/rpc/namespaces/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -90,10 +92,18 @@ type PublicEthereumAPI struct {
callCache *lru.Cache
cdc *codec.Codec
fastQueryThreshold uint64
rateLimiters map[string]*rate.Limiter
}

func (api *PublicEthereumAPI) GetRateLimiter(apiName string) *rate.Limiter {
if api.rateLimiters == nil {
return nil
}
return api.rateLimiters[apiName]
}

// NewAPI creates an instance of the public ETH Web3 API.
func NewAPI(
func NewAPI(rateLimiters map[string]*rate.Limiter,
clientCtx clientcontext.CLIContext, log log.Logger, backend backend.Backend, nonceLock *rpctypes.AddrLocker,
keys ...ethsecp256k1.PrivKey,
) *PublicEthereumAPI {
Expand All @@ -115,6 +125,7 @@ func NewAPI(
wrappedBackend: watcher.NewQuerier(),
watcherBackend: watcher.NewWatcher(log),
fastQueryThreshold: viper.GetUint64(FlagFastQueryThreshold),
rateLimiters: rateLimiters,
}
api.evmFactory = simulation.NewEvmFactory(clientCtx.ChainID, api.wrappedBackend)
module := evm.AppModuleBasic{}
Expand Down Expand Up @@ -506,6 +517,10 @@ func (api *PublicEthereumAPI) getStorageAt(address common.Address, key []byte, b
func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getStorageAt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "key", key, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getStorageAt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand All @@ -522,7 +537,10 @@ func (api *PublicEthereumAPI) GetStorageAtInternal(address common.Address, key [
func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (*hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)

rateLimiter := api.GetRateLimiter("eth_getTransactionCount")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -553,6 +571,10 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN
func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *hexutil.Uint {
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getBlockTransactionCountByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil
}
res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
return nil
Expand Down Expand Up @@ -636,6 +658,10 @@ func (api *PublicEthereumAPI) GetUncleCountByBlockNumber(_ rpctypes.BlockNumber)
func (api *PublicEthereumAPI) GetCode(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getCode", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getCode")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNumber, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -683,6 +709,10 @@ func (api *PublicEthereumAPI) GetCodeByHash(hash common.Hash) (hexutil.Bytes, er
// GetTransactionLogs returns the logs given a transaction hash.
func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) {
api.logger.Debug("eth_getTransactionLogs", "hash", txHash)
rateLimiter := api.GetRateLimiter("eth_getTransactionLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
return api.backend.GetTransactionLogs(txHash)
}

Expand Down Expand Up @@ -864,7 +894,10 @@ func (api *PublicEthereumAPI) addCallCache(key common.Hash, data []byte) {
func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNrOrHash rpctypes.BlockNumberOrHash, overrides *evmtypes.StateOverrides) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_call", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args, "block number", blockNrOrHash)

rateLimiter := api.GetRateLimiter("eth_call")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
if overrides != nil {
if err := overrides.Check(); err != nil {
return nil, err
Expand Down Expand Up @@ -1092,7 +1125,10 @@ func (api *PublicEthereumAPI) simDoCall(args rpctypes.CallArgs, cap uint64) (uin
func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_estimateGas", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args)

rateLimiter := api.GetRateLimiter("eth_estimateGas")
if rateLimiter != nil && !rateLimiter.Allow() {
return 0, rpctypes.ErrServerBusy
}
params, err := api.getEvmParams()
if err != nil {
return 0, TransformDataError(err, "eth_estimateGas")
Expand Down Expand Up @@ -1136,6 +1172,10 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6
func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash, "full", fullTx)
rateLimiter := api.GetRateLimiter("eth_getBlockByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.backend.GetBlockByHash(hash, fullTx)
if err != nil {
return nil, TransformDataError(err, RPCEthGetBlockByHash)
Expand Down Expand Up @@ -1195,7 +1235,10 @@ func (api *PublicEthereumAPI) getBlockByNumber(blockNum rpctypes.BlockNumber, fu
func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("number", blockNum, "full", fullTx)

rateLimiter := api.GetRateLimiter("eth_getBlockByNumber")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.getBlockByNumber(blockNum, fullTx)
return blockRes, err
}
Expand Down Expand Up @@ -1306,6 +1349,10 @@ func (api *PublicEthereumAPI) getTransactionByBlockAndIndex(block *tmtypes.Block
func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (*watcher.TransactionReceipt, error) {
monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getTransactionReceipt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
res, e := api.wrappedBackend.GetTransactionReceipt(hash)
if e == nil {
return res, nil
Expand Down
11 changes: 5 additions & 6 deletions app/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy")
ErrMethodNotAllowed = errors.New("the method is not allowed")
NameSpace = "filters"
)
Expand Down Expand Up @@ -138,7 +137,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newPendingTransactionFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", rpctypes.ErrServerBusy.Error()))
}
pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs()
if err != nil {
Expand Down Expand Up @@ -235,7 +234,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newBlockFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating block filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating block filter: %s", rpctypes.ErrServerBusy.Error()))
}
headerSub, cancelSubs, err := api.events.SubscribeNewHeads()
if err != nil {
Expand Down Expand Up @@ -402,7 +401,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID,
}
rateLimiter := api.backend.GetRateLimiter("eth_newFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(""), ErrServerBusy
return rpc.ID(""), rpctypes.ErrServerBusy
}
var (
filterID = rpc.ID("")
Expand Down Expand Up @@ -468,7 +467,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.Filter
}
rateLimiter := api.backend.GetRateLimiter("eth_getLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
var filter *Filter
if criteria.BlockHash != nil {
Expand Down Expand Up @@ -574,7 +573,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
}
rateLimiter := api.backend.GetRateLimiter("eth_getFilterChanges")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions app/rpc/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy, please try again later")
// static gas limit for all blocks
defaultGasLimit = hexutil.Uint64(int64(^uint32(0)))
defaultGasUsed = hexutil.Uint64(0)
Expand Down