diff --git a/comm.go b/comm.go index 3e88deff..86380a74 100644 --- a/comm.go +++ b/comm.go @@ -1,12 +1,14 @@ package pubsub import ( - "bufio" "context" + "encoding/binary" "io" "time" "github.com/gogo/protobuf/proto" + pool "github.com/libp2p/go-buffer-pool" + "github.com/multiformats/go-varint" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -156,16 +158,20 @@ func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { } func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) { - bufw := bufio.NewWriter(s) - wc := protoio.NewDelimitedWriter(bufw) + writeRpc := func(rpc *RPC) error { + size := uint64(rpc.Size()) - writeMsg := func(msg proto.Message) error { - err := wc.WriteMsg(msg) + buf := pool.Get(varint.UvarintSize(size) + int(size)) + defer pool.Put(buf) + + n := binary.PutUvarint(buf, size) + _, err := rpc.MarshalTo(buf[n:]) if err != nil { return err } - return bufw.Flush() + _, err = s.Write(buf) + return err } defer s.Close() @@ -176,7 +182,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou return } - err := writeMsg(&rpc.RPC) + err := writeRpc(rpc) if err != nil { s.Reset() log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)