From 1d3898f8808f4d9654c8a5e100a3c91e8bb67af0 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 11 Sep 2022 15:06:06 +0200 Subject: [PATCH] perf: use msgio pooled buffers for received msgs --- comm.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/comm.go b/comm.go index 14e8c773..3e88deff 100644 --- a/comm.go +++ b/comm.go @@ -6,14 +6,14 @@ import ( "io" "time" + "github.com/gogo/protobuf/proto" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" - - pb "github.com/libp2p/go-libp2p-pubsub/pb" - + "github.com/libp2p/go-msgio" "github.com/libp2p/go-msgio/protoio" - "github.com/gogo/protobuf/proto" + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) // get the initial RPC containing all of our subscriptions to send to new peers @@ -60,11 +60,11 @@ func (p *PubSub) handleNewStream(s network.Stream) { p.inboundStreamsMx.Unlock() }() - r := protoio.NewDelimitedReader(s, p.maxMessageSize) + r := msgio.NewVarintReaderSize(s, p.maxMessageSize) for { - rpc := new(RPC) - err := r.ReadMsg(&rpc.RPC) + msgbytes, err := r.ReadMsg() if err != nil { + r.ReleaseMsg(msgbytes) if err != io.EOF { s.Reset() log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err) @@ -77,6 +77,15 @@ func (p *PubSub) handleNewStream(s network.Stream) { return } + rpc := new(RPC) + err = rpc.Unmarshal(msgbytes) + r.ReleaseMsg(msgbytes) + if err != nil { + s.Reset() + log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err) + return + } + rpc.from = peer select { case p.incoming <- rpc: