diff --git a/app/config/config.go b/app/config/config.go index 25a366c942..67f069b012 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -135,6 +135,8 @@ type OecConfig struct { enableMempoolSimGuFactor bool maxSubscriptionClients int + + maxTxLimitPerPeer uint64 } const ( @@ -178,6 +180,7 @@ const ( FlagCommitGapOffset = "commit-gap-offset" FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor" FlagMaxSubscriptionClients = "max-subscription-clients" + FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer" ) var ( @@ -297,6 +300,7 @@ func (c *OecConfig) loadFromConfig() { c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush)) c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost)) c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock)) + c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer))) c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx)) c.SetPendingPoolBlacklist(viper.GetString(FlagPendingPoolBlacklist)) c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock)) @@ -491,6 +495,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetMaxTxNumPerBlock(r) + case FlagMaxTxLimitPerPeer: + r, err := strconv.ParseInt(v, 10, 64) + if err != nil { + return + } + c.SetMaxTxLimitPerPeer(r) case FlagMempoolEnableDeleteMinGPTx: r, err := strconv.ParseBool(v) if err != nil { @@ -1196,3 +1206,14 @@ func (c *OecConfig) SetPendingPoolBlacklist(v string) { func (c *OecConfig) GetPendingPoolBlacklist() string { return c.pendingPoolBlacklist } + +func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) { + if maxTxLimitPerPeer < 0 { + return + } + c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer) +} + +func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 { + return c.maxTxLimitPerPeer +} diff --git a/dev/testnet/testnet.sh b/dev/testnet/testnet.sh index d049d9494d..2c28b69c43 100755 --- a/dev/testnet/testnet.sh +++ b/dev/testnet/testnet.sh @@ -150,6 +150,7 @@ run() { --enable-wtx=${WRAPPEDTX} \ --mempool.node_key_whitelist ${WHITE_LIST} \ --p2p.pex=false \ + --mempool.max_tx_limit_per_peer=1 \ --p2p.addr_book_strict=false \ $p2p_seed_opt $p2p_seed_arg \ --p2p.laddr tcp://${IP}:${p2pport} \ @@ -158,7 +159,7 @@ run() { --chain-id ${CHAIN_ID} \ --upload-delta=false \ --enable-gid \ - --consensus.timeout_commit 3800ms \ + --consensus.timeout_commit 10000ms \ --enable-blockpart-ack=false \ --append-pid=true \ ${LOG_SERVER} \ diff --git a/libs/tendermint/cmd/tendermint/commands/run_node.go b/libs/tendermint/cmd/tendermint/commands/run_node.go index a576e84763..e3194d1b99 100644 --- a/libs/tendermint/cmd/tendermint/commands/run_node.go +++ b/libs/tendermint/cmd/tendermint/commands/run_node.go @@ -195,6 +195,11 @@ func AddNodeFlags(cmd *cobra.Command) { config.Mempool.PendingRemoveEvent, "Push event when remove a pending tx", ) + cmd.Flags().Uint64( + "mempool.max_tx_limit_per_peer", + config.Mempool.MaxTxLimitPerPeer, + "Max tx limit per peer. If set 0 ,this flag disable", + ) cmd.Flags().String( "mempool.node_key_whitelist", diff --git a/libs/tendermint/config/config.go b/libs/tendermint/config/config.go index ae3f587780..b5559335b3 100644 --- a/libs/tendermint/config/config.go +++ b/libs/tendermint/config/config.go @@ -689,6 +689,7 @@ type MempoolConfig struct { PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"` NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"` PendingRemoveEvent bool `mapstructure:"pending_remove_event"` + MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig { PendingPoolMaxTxPerAddress: 100, NodeKeyWhitelist: []string{}, PendingRemoveEvent: false, + MaxTxLimitPerPeer: 100, } } @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error { return nil } -//----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- // TxIndexConfig // Remember that Event has the following structure: // type: [ -// key: value, -// ... +// +// key: value, +// ... +// // ] // // CompositeKeys are constructed by `type.key` diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index a7ae0d9da7..d990f8b012 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -40,6 +40,7 @@ type IDynamicConfig interface { GetEnableMempoolSimGuFactor() bool GetMaxSubscriptionClients() int GetPendingPoolBlacklist() string + GetMaxTxLimitPerPeer() uint64 } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -228,3 +229,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) { func (d MockDynamicConfig) GetPendingPoolBlacklist() string { return "" } + +func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 { + return DefaultMempoolConfig().MaxTxLimitPerPeer +} diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index 291928ef75..bcd7274b13 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -104,6 +104,10 @@ type CListMempool struct { gpo *Oracle + peersTxCountMtx sync.RWMutex + peersTxCount map[string]uint64 +} + info pguInfo } @@ -152,6 +156,7 @@ func NewCListMempool( txs: txQueue, simQueue: make(chan *mempoolTx, 200000), gpo: gpo, + peersTxCount: make(map[string]uint64, 0), } if config.PendingRemoveEvent { @@ -286,6 +291,38 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { return mem.txs.TxsWaitChan() } +func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error { + if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 { + return nil + } + mem.peersTxCountMtx.Lock() + defer mem.peersTxCountMtx.Unlock() + if len(txInfo.SenderP2PID) != 0 { + peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)] + if !ok { + peerTxCount = 0 + } + if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() { + mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())) + return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()) + } + peerTxCount++ + mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount + } + return nil +} + +func (mem *CListMempool) resetPeerCount() { + if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 { + return + } + mem.peersTxCountMtx.Lock() + defer mem.peersTxCountMtx.Unlock() + for key := range mem.peersTxCount { + delete(mem.peersTxCount, key) + } +} + // It blocks if we're waiting on Update() or Reap(). // cb: A callback from the CheckTx command. // @@ -295,6 +332,9 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} { // // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error { + if err := mem.validatePeerCount(txInfo); err != nil { + return err + } timeStart := int64(0) if cfg.DynamicConfig.GetMempoolCheckTxCost() { timeStart = time.Now().UnixMicro() @@ -993,6 +1033,7 @@ func (mem *CListMempool) Update( preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { + mem.resetPeerCount() // no need to update when mempool is unavailable if mem.config.Sealed { return mem.updateSealed(height, txs, deliverTxResponses)