Skip to content

Commit

Permalink
limit max tx count perpeer
Browse files Browse the repository at this point in the history
* limit tx count of per peer

* remove useles code

* change code location

* can not limit tx count when limit == 0
  • Loading branch information
BananaLF committed Dec 11, 2023
1 parent 6b97af6 commit 6afd1c1
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 4 deletions.
21 changes: 21 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ type OecConfig struct {
enableMempoolSimGuFactor bool

maxSubscriptionClients int

maxTxLimitPerPeer uint64
}

const (
Expand Down Expand Up @@ -178,6 +180,7 @@ const (
FlagCommitGapOffset = "commit-gap-offset"
FlagEnableMempoolSimGuFactor = "enable-mem-sim-gu-factor"
FlagMaxSubscriptionClients = "max-subscription-clients"
FlagMaxTxLimitPerPeer = "mempool.max_tx_limit_per_peer"
)

var (
Expand Down Expand Up @@ -297,6 +300,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetMempoolFlush(viper.GetBool(FlagMempoolFlush))
c.SetMempoolCheckTxCost(viper.GetBool(FlagMempoolCheckTxCost))
c.SetMaxTxNumPerBlock(viper.GetInt64(FlagMaxTxNumPerBlock))
c.SetMaxTxLimitPerPeer(int64(viper.GetUint64(FlagMaxTxLimitPerPeer)))
c.SetEnableDeleteMinGPTx(viper.GetBool(FlagMempoolEnableDeleteMinGPTx))
c.SetPendingPoolBlacklist(viper.GetString(FlagPendingPoolBlacklist))
c.SetMaxGasUsedPerBlock(viper.GetInt64(FlagMaxGasUsedPerBlock))
Expand Down Expand Up @@ -491,6 +495,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetMaxTxNumPerBlock(r)
case FlagMaxTxLimitPerPeer:
r, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return
}
c.SetMaxTxLimitPerPeer(r)
case FlagMempoolEnableDeleteMinGPTx:
r, err := strconv.ParseBool(v)
if err != nil {
Expand Down Expand Up @@ -1196,3 +1206,14 @@ func (c *OecConfig) SetPendingPoolBlacklist(v string) {
func (c *OecConfig) GetPendingPoolBlacklist() string {
return c.pendingPoolBlacklist
}

func (c *OecConfig) SetMaxTxLimitPerPeer(maxTxLimitPerPeer int64) {
if maxTxLimitPerPeer < 0 {
return
}
c.maxTxLimitPerPeer = uint64(maxTxLimitPerPeer)
}

func (c *OecConfig) GetMaxTxLimitPerPeer() uint64 {
return c.maxTxLimitPerPeer
}
3 changes: 2 additions & 1 deletion dev/testnet/testnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ run() {
--enable-wtx=${WRAPPEDTX} \
--mempool.node_key_whitelist ${WHITE_LIST} \
--p2p.pex=false \
--mempool.max_tx_limit_per_peer=1 \
--p2p.addr_book_strict=false \
$p2p_seed_opt $p2p_seed_arg \
--p2p.laddr tcp://${IP}:${p2pport} \
Expand All @@ -158,7 +159,7 @@ run() {
--chain-id ${CHAIN_ID} \
--upload-delta=false \
--enable-gid \
--consensus.timeout_commit 3800ms \
--consensus.timeout_commit 10000ms \
--enable-blockpart-ack=false \
--append-pid=true \
${LOG_SERVER} \
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/cmd/tendermint/commands/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func AddNodeFlags(cmd *cobra.Command) {
config.Mempool.PendingRemoveEvent,
"Push event when remove a pending tx",
)
cmd.Flags().Uint64(
"mempool.max_tx_limit_per_peer",
config.Mempool.MaxTxLimitPerPeer,
"Max tx limit per peer. If set 0 ,this flag disable",
)

cmd.Flags().String(
"mempool.node_key_whitelist",
Expand Down
10 changes: 7 additions & 3 deletions libs/tendermint/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ type MempoolConfig struct {
PendingPoolMaxTxPerAddress int `mapstructure:"pending_pool_max_tx_per_address"`
NodeKeyWhitelist []string `mapstructure:"node_key_whitelist"`
PendingRemoveEvent bool `mapstructure:"pending_remove_event"`
MaxTxLimitPerPeer uint64 `mapstructure:"max_tx_limit_per_peer"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
Expand All @@ -715,6 +716,7 @@ func DefaultMempoolConfig() *MempoolConfig {
PendingPoolMaxTxPerAddress: 100,
NodeKeyWhitelist: []string{},
PendingRemoveEvent: false,
MaxTxLimitPerPeer: 100,
}
}

Expand Down Expand Up @@ -953,12 +955,14 @@ func (cfg *ConsensusConfig) ValidateBasic() error {
return nil
}

//-----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
// TxIndexConfig
// Remember that Event has the following structure:
// type: [
// key: value,
// ...
//
// key: value,
// ...
//
// ]
//
// CompositeKeys are constructed by `type.key`
Expand Down
5 changes: 5 additions & 0 deletions libs/tendermint/config/dynamic_config_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type IDynamicConfig interface {
GetEnableMempoolSimGuFactor() bool
GetMaxSubscriptionClients() int
GetPendingPoolBlacklist() string
GetMaxTxLimitPerPeer() uint64
}

var DynamicConfig IDynamicConfig = MockDynamicConfig{}
Expand Down Expand Up @@ -228,3 +229,7 @@ func (d *MockDynamicConfig) SetMaxSubscriptionClients(value int) {
func (d MockDynamicConfig) GetPendingPoolBlacklist() string {
return ""
}

func (c MockDynamicConfig) GetMaxTxLimitPerPeer() uint64 {
return DefaultMempoolConfig().MaxTxLimitPerPeer
}
41 changes: 41 additions & 0 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type CListMempool struct {

gpo *Oracle

peersTxCountMtx sync.RWMutex
peersTxCount map[string]uint64
}

info pguInfo
}

Expand Down Expand Up @@ -152,6 +156,7 @@ func NewCListMempool(
txs: txQueue,
simQueue: make(chan *mempoolTx, 200000),
gpo: gpo,
peersTxCount: make(map[string]uint64, 0),
}

if config.PendingRemoveEvent {
Expand Down Expand Up @@ -286,6 +291,38 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
return mem.txs.TxsWaitChan()
}

func (mem *CListMempool) validatePeerCount(txInfo TxInfo) error {
if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 {
return nil
}
mem.peersTxCountMtx.Lock()
defer mem.peersTxCountMtx.Unlock()
if len(txInfo.SenderP2PID) != 0 {
peerTxCount, ok := mem.peersTxCount[string(txInfo.SenderP2PID)]
if !ok {
peerTxCount = 0
}
if peerTxCount >= cfg.DynamicConfig.GetMaxTxLimitPerPeer() {
mem.logger.Debug(fmt.Sprintf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer()))
return fmt.Errorf("%s has been over %d transaction, please wait a few second", txInfo.SenderP2PID, cfg.DynamicConfig.GetMaxTxLimitPerPeer())
}
peerTxCount++
mem.peersTxCount[string(txInfo.SenderP2PID)] = peerTxCount
}
return nil
}

func (mem *CListMempool) resetPeerCount() {
if cfg.DynamicConfig.GetMaxTxLimitPerPeer() == 0 {
return
}
mem.peersTxCountMtx.Lock()
defer mem.peersTxCountMtx.Unlock()
for key := range mem.peersTxCount {
delete(mem.peersTxCount, key)
}
}

// It blocks if we're waiting on Update() or Reap().
// cb: A callback from the CheckTx command.
//
Expand All @@ -295,6 +332,9 @@ func (mem *CListMempool) TxsWaitChan() <-chan struct{} {
//
// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {
if err := mem.validatePeerCount(txInfo); err != nil {
return err
}
timeStart := int64(0)
if cfg.DynamicConfig.GetMempoolCheckTxCost() {
timeStart = time.Now().UnixMicro()
Expand Down Expand Up @@ -993,6 +1033,7 @@ func (mem *CListMempool) Update(
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
mem.resetPeerCount()
// no need to update when mempool is unavailable
if mem.config.Sealed {
return mem.updateSealed(height, txs, deliverTxResponses)
Expand Down

0 comments on commit 6afd1c1

Please sign in to comment.