Skip to content

Commit

Permalink
Merge pull request #719 from libp2p/fix/fullrt-log-progress
Browse files Browse the repository at this point in the history
fix: fullrt dht bug fixes
  • Loading branch information
aschmahmann authored May 27, 2021
2 parents eac1b5e + daab800 commit 301a6e4
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 59 deletions.
185 changes: 126 additions & 59 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
})
err := dht.protoMessenger.PutValue(ctx, p, rec)
return err
}, peers)
}, peers, true)

if successes == 0 {
return fmt.Errorf("failed to complete put")
Expand Down Expand Up @@ -751,7 +751,7 @@ func (dht *FullRT) getValues(ctx context.Context, key string, stopQuery chan str
return nil
}

dht.execOnMany(ctx, queryFn, peers)
dht.execOnMany(ctx, queryFn, peers, false)
lookupResCh <- &lookupWithFollowupResult{peers: peers}
}()
return valCh, lookupResCh
Expand Down Expand Up @@ -817,7 +817,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
err := dht.protoMessenger.PutProvider(ctx, p, keyMH, dht.h)
return err
}, peers)
}, peers, true)

if exceededDeadline {
return context.DeadlineExceeded
Expand All @@ -830,41 +830,71 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
return ctx.Err()
}

func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID) int {
putctx, cancel := context.WithCancel(ctx)
defer cancel()
// execOnMany executes the given function on each of the peers, although it may only wait for a certain chunk of peers
// to respond before considering the results "good enough" and returning.
//
// If sloppyExit is true then this function will return without waiting for all of its internal goroutines to close.
// If sloppyExit is true then the passed in function MUST be able to safely complete an arbitrary amount of time after
// execOnMany has returned (e.g. do not write to resources that might get closed or set to nil and therefore result in
// a panic instead of just returning an error).
func (dht *FullRT) execOnMany(ctx context.Context, fn func(context.Context, peer.ID) error, peers []peer.ID, sloppyExit bool) int {
if len(peers) == 0 {
return 0
}

waitAllCh := make(chan struct{}, len(peers))
// having a buffer that can take all of the elements is basically a hack to allow for sloppy exits that clean up
// the goroutines after the function is done rather than before
errCh := make(chan error, len(peers))
numSuccessfulToWaitFor := int(float64(len(peers)) * dht.waitFrac)
waitSuccessCh := make(chan struct{}, numSuccessfulToWaitFor)

putctx, cancel := context.WithTimeout(ctx, dht.timeoutPerOp)
defer cancel()

for _, p := range peers {
go func(p peer.ID) {
fnCtx, fnCancel := context.WithTimeout(putctx, dht.timeoutPerOp)
defer fnCancel()
err := fn(fnCtx, p)
if err != nil {
logger.Debug(err)
} else {
waitSuccessCh <- struct{}{}
}
waitAllCh <- struct{}{}
errCh <- fn(putctx, p)
}(p)
}

numSuccess, numDone := 0, 0
t := time.NewTimer(time.Hour)
for numDone != len(peers) {
var numDone, numSuccess, successSinceLastTick int
var ticker *time.Ticker
var tickChan <-chan time.Time

for numDone < len(peers) {
select {
case <-waitAllCh:
case err := <-errCh:
numDone++
case <-waitSuccessCh:
if numSuccess >= numSuccessfulToWaitFor {
t.Reset(time.Millisecond * 500)
if err == nil {
numSuccess++
if numSuccess >= numSuccessfulToWaitFor && ticker == nil {
// Once there are enough successes, wait a little longer
ticker = time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
tickChan = ticker.C
successSinceLastTick = numSuccess
}
// This is equivalent to numSuccess * 2 + numFailures >= len(peers) and is a heuristic that seems to be
// performing reasonably.
// TODO: Make this metric more configurable
// TODO: Have better heuristics in this function whether determined from observing static network
// properties or dynamically calculating them
if numSuccess+numDone >= len(peers) {
cancel()
if sloppyExit {
return numSuccess
}
}
}
case <-tickChan:
if numSuccess > successSinceLastTick {
// If there were additional successes, then wait another tick
successSinceLastTick = numSuccess
} else {
cancel()
if sloppyExit {
return numSuccess
}
}
numSuccess++
numDone++
case <-t.C:
cancel()
}
}
return numSuccess
Expand Down Expand Up @@ -898,7 +928,7 @@ func (dht *FullRT) ProvideMany(ctx context.Context, keys []multihash.Multihash)
pmes.ProviderPeers = pbPeers

return dht.messageSender.SendMessage(ctx, p, pmes)
}, peers)
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful provides")
}
Expand Down Expand Up @@ -941,7 +971,7 @@ func (dht *FullRT) PutMany(ctx context.Context, keys []string, values [][]byte)
successes := dht.execOnMany(ctx, func(ctx context.Context, p peer.ID) error {
keyStr := string(k)
return dht.protoMessenger.PutValue(ctx, p, record.MakePutRecord(keyStr, keyRecMap[keyStr]))
}, peers)
}, peers, true)
if successes == 0 {
return fmt.Errorf("no successful puts")
}
Expand All @@ -962,39 +992,43 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
var numSendsSuccessful uint64 = 0

wg := sync.WaitGroup{}
wg.Add(dht.bulkSendParallelism)
chunkSize := len(sortedKeys) / dht.bulkSendParallelism
onePctKeys := uint64(len(sortedKeys)) / 100
for i := 0; i < dht.bulkSendParallelism; i++ {
var chunk []peer.ID
end := (i + 1) * chunkSize
if end > len(sortedKeys) {
chunk = sortedKeys[i*chunkSize:]
} else {
chunk = sortedKeys[i*chunkSize : end]
}

go func() {
defer wg.Done()
for _, key := range chunk {
sendsSoFar := atomic.AddUint64(&numSends, 1)
if sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil {
var l interface{}
if isProvRec {
l = internal.LoggableProviderRecordBytes(key)
} else {
l = internal.LoggableRecordKeyString(key)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
bulkSendFn := func(chunk []peer.ID) {
defer wg.Done()
for _, key := range chunk {
if ctx.Err() != nil {
break
}

sendsSoFar := atomic.AddUint64(&numSends, 1)
if onePctKeys > 0 && sendsSoFar%onePctKeys == 0 {
logger.Infof("bulk sending goroutine: %.1f%% done - %d/%d done", 100*float64(sendsSoFar)/float64(len(sortedKeys)), sendsSoFar, len(sortedKeys))
}
if err := fn(ctx, key); err != nil {
var l interface{}
if isProvRec {
l = internal.LoggableProviderRecordBytes(key)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
l = internal.LoggableRecordKeyString(key)
}
logger.Infof("failed to complete bulk sending of key :%v. %v", l, err)
} else {
atomic.AddUint64(&numSendsSuccessful, 1)
}
}()
}
}

// divide the keys into groups so that we can talk to more peers at a time, because the keys are sorted in
// XOR/Kadmelia space consecutive puts will be too the same, or nearly the same, set of peers. Working in parallel
// means less waiting on individual dials to complete and also continuing to make progress even if one segment of
// the network is being slow, or we are maxing out the connection, stream, etc. to those peers.
keyGroups := divideIntoGroups(sortedKeys, dht.bulkSendParallelism)
wg.Add(len(keyGroups))
for _, chunk := range keyGroups {
go bulkSendFn(chunk)
}

wg.Wait()

if numSendsSuccessful == 0 {
Expand All @@ -1006,6 +1040,39 @@ func (dht *FullRT) bulkMessageSend(ctx context.Context, keys []peer.ID, fn func(
return nil
}

// divideIntoGroups divides the set of keys into (at most) the number of groups
func divideIntoGroups(keys []peer.ID, groups int) [][]peer.ID {
var keyGroups [][]peer.ID
if len(keys) < groups {
for i := 0; i < len(keys); i++ {
keyGroups = append(keyGroups, keys[i:i+1])
}
return keyGroups
}

chunkSize := len(keys) / groups
remainder := len(keys) % groups

start := 0
end := chunkSize
for i := 0; i < groups; i++ {
var chunk []peer.ID
// distribute the remainder as one extra entry per parallel thread
if remainder > 0 {
chunk = keys[start : end+1]
remainder--
start = end + 1
end = end + 1 + chunkSize
} else {
chunk = keys[start:end]
start = end
end = end + chunkSize
}
keyGroups = append(keyGroups, chunk)
}
return keyGroups
}

// FindProviders searches until the context expires.
func (dht *FullRT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
if !dht.enableProviders {
Expand Down Expand Up @@ -1129,7 +1196,7 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
return nil
}

dht.execOnMany(queryctx, fn, peers)
dht.execOnMany(queryctx, fn, peers, false)
}

// FindPeer searches for a peer with given ID.
Expand Down Expand Up @@ -1214,7 +1281,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, e
return nil
}

dht.execOnMany(queryctx, fn, peers)
dht.execOnMany(queryctx, fn, peers, false)

close(addrsCh)
wg.Wait()
Expand Down
85 changes: 85 additions & 0 deletions fullrt/dht_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fullrt

import (
"strconv"
"testing"

"github.com/libp2p/go-libp2p-core/peer"
)

func TestDivideIntoGroups(t *testing.T) {
var keys []peer.ID
for i := 0; i < 10; i++ {
keys = append(keys, peer.ID(strconv.Itoa(i)))
}

convertToStrings := func(peers []peer.ID) []string {
var out []string
for _, p := range peers {
out = append(out, string(p))
}
return out
}

pidsEquals := func(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
}
}
return true
}

t.Run("Divides", func(t *testing.T) {
gr := divideIntoGroups(keys, 2)
if len(gr) != 2 {
t.Fatal("incorrect number of groups")
}
if g1, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3", "4"}; !pidsEquals(g1, expected) {
t.Fatalf("expected %v, got %v", expected, g1)
}
if g2, expected := convertToStrings(gr[1]), []string{"5", "6", "7", "8", "9"}; !pidsEquals(g2, expected) {
t.Fatalf("expected %v, got %v", expected, g2)
}
})
t.Run("Remainder", func(t *testing.T) {
gr := divideIntoGroups(keys, 3)
if len(gr) != 3 {
t.Fatal("incorrect number of groups")
}
if g, expected := convertToStrings(gr[0]), []string{"0", "1", "2", "3"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
if g, expected := convertToStrings(gr[1]), []string{"4", "5", "6"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
if g, expected := convertToStrings(gr[2]), []string{"7", "8", "9"}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
})
t.Run("OneEach", func(t *testing.T) {
gr := divideIntoGroups(keys, 10)
if len(gr) != 10 {
t.Fatal("incorrect number of groups")
}
for i := 0; i < 10; i++ {
if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
}
})
t.Run("TooManyGroups", func(t *testing.T) {
gr := divideIntoGroups(keys, 11)
if len(gr) != 10 {
t.Fatal("incorrect number of groups")
}
for i := 0; i < 10; i++ {
if g, expected := convertToStrings(gr[i]), []string{strconv.Itoa(i)}; !pidsEquals(g, expected) {
t.Fatalf("expected %v, got %v", expected, g)
}
}
})
}

0 comments on commit 301a6e4

Please sign in to comment.