Skip to content

Commit

Permalink
cancel peer reader and writer while waiting for bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Sep 12, 2019
1 parent ab7894e commit 752a901
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
21 changes: 15 additions & 6 deletions internal/peerconn/peerreader/peerreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (p *PeerReader) Run() {
return
} else if err == io.ErrUnexpectedEOF {
return
} else if err == errStoppedWhileWaitingBucket {
return
} else if _, ok := err.(*net.OpError); ok {
return
}
Expand Down Expand Up @@ -262,18 +264,23 @@ func (p *PeerReader) readPiece(length uint32) (buf bufferpool.Buffer, err error)
}
}()

r := p.r
if p.bucket != nil {
r = ratelimit.Reader(r, p.bucket)
}

var n, m int
for {
if p.bucket != nil {
d := p.bucket.Take(int64(length))
select {
case <-time.After(d):
case <-p.stopC:
err = errStoppedWhileWaitingBucket
return
}
}

err = p.conn.SetReadDeadline(time.Now().Add(p.pieceTimeout))
if err != nil {
return
}
n, err = io.ReadFull(r, buf.Data[m:])
n, err = io.ReadFull(p.r, buf.Data[m:])
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
// Peer didn't send the full block in allowed time.
Expand All @@ -292,3 +299,5 @@ func (p *PeerReader) readPiece(length uint32) (buf bufferpool.Buffer, err error)
return
}
}

var errStoppedWhileWaitingBucket = errors.New("peer reader stopped while waiting for bucket")
11 changes: 7 additions & 4 deletions internal/peerconn/peerwriter/peerwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,16 @@ func (p *PeerWriter) messageWriter() {
// Put message ID
buf.Bytes()[4] = uint8(msg.ID())

var w io.Writer = p.conn
if _, ok := msg.(Piece); ok && p.bucket != nil {
w = ratelimit.Writer(w, p.bucket)
d := p.bucket.Take(int64(buf.Len()))
select {
case <-time.After(d):
case <-p.stopC:
return
}
}

var n int
n, err = w.Write(buf.Bytes())
n, err := p.conn.Write(buf.Bytes())
if _, ok := msg.(Piece); ok {
p.countUploadBytes(n)
}
Expand Down

0 comments on commit 752a901

Please sign in to comment.