From 0633640840aae9b87d1c25c719f3e696dcc68232 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Mon, 11 Nov 2019 11:17:20 -0500 Subject: [PATCH] ipns(pubsub): utilize persistent pubsub value store --- core/node/groups.go | 5 ++++- core/node/libp2p/pubsub.go | 11 ++++++----- core/node/libp2p/routing.go | 13 +++++++++---- core/node/libp2p/topicdiscovery.go | 31 ++++++++++++++++++++++++++++++ go.mod | 3 ++- go.sum | 4 +++- 6 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 core/node/libp2p/topicdiscovery.go diff --git a/core/node/groups.go b/core/node/groups.go index 5eb222ba5ac..b567f34fac8 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -67,8 +67,10 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { // parse PubSub config - ps := fx.Options() + ps, disc := fx.Options(), fx.Options() if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") { + disc = fx.Provide(libp2p.TopicDiscovery()) + var pubsubOptions []pubsub.Option pubsubOptions = append( pubsubOptions, @@ -113,6 +115,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { maybeInvoke(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService), connmgr, ps, + disc, ) return opts diff --git a/core/node/libp2p/pubsub.go b/core/node/libp2p/pubsub.go index c065cd7ee40..001a787da71 100644 --- a/core/node/libp2p/pubsub.go +++ b/core/node/libp2p/pubsub.go @@ -1,7 +1,8 @@ package libp2p import ( - host "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" @@ -9,13 +10,13 @@ import ( ) func FloodSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { - return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) { + return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...) } } func GossipSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) { - return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...) + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) { + return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...) } } diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index e5ba1de30bb..cc32ce4b3ca 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -3,6 +3,7 @@ package libp2p import ( "context" "sort" + "time" host "github.com/libp2p/go-libp2p-core/host" routing "github.com/libp2p/go-libp2p-core/routing" @@ -83,15 +84,19 @@ type p2pPSRoutingIn struct { PubSub *pubsub.PubSub `optional:"true"` } -func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) { - psRouter := namesys.NewPubsubValueStore( +func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) { + psRouter, err := namesys.NewPubsubValueStore( helpers.LifecycleCtx(mctx, lc), in.Host, - in.BaseIpfsRouting, in.PubSub, in.Validator, + namesys.WithRebroadcastInterval(time.Minute), ) + if err != nil { + return p2pRouterOut{}, nil, err + } + return p2pRouterOut{ Router: Router{ Routing: &routinghelpers.Compose{ @@ -102,5 +107,5 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) ( }, Priority: 100, }, - }, psRouter + }, psRouter, nil } diff --git a/core/node/libp2p/topicdiscovery.go b/core/node/libp2p/topicdiscovery.go new file mode 100644 index 00000000000..fd2cbe00637 --- /dev/null +++ b/core/node/libp2p/topicdiscovery.go @@ -0,0 +1,31 @@ +package libp2p + +import ( + "math/rand" + "time" + + "github.com/libp2p/go-libp2p-core/discovery" + "github.com/libp2p/go-libp2p-core/host" + disc "github.com/libp2p/go-libp2p-discovery" + + "github.com/ipfs/go-ipfs/core/node/helpers" + "go.uber.org/fx" +) + +func TopicDiscovery() interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) { + baseDisc := disc.NewRoutingDiscovery(cr) + minBackoff, maxBackoff := time.Second*60, time.Hour + rng := rand.New(rand.NewSource(rand.Int63())) + d, err := disc.NewBackoffDiscovery( + baseDisc, + disc.NewExponentialBackoff(minBackoff, maxBackoff, disc.FullJitter, time.Second, 5.0, 0, rng), + ) + + if err != nil { + return nil, err + } + + return d, nil + } +} diff --git a/go.mod b/go.mod index b3c7aa7341e..49ec9fdd1e9 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( github.com/libp2p/go-libp2p-circuit v0.1.4 github.com/libp2p/go-libp2p-connmgr v0.1.1 github.com/libp2p/go-libp2p-core v0.2.5 + github.com/libp2p/go-libp2p-discovery v0.2.0 github.com/libp2p/go-libp2p-http v0.1.4 github.com/libp2p/go-libp2p-kad-dht v0.3.1 github.com/libp2p/go-libp2p-kbucket v0.2.1 @@ -70,7 +71,7 @@ require ( github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/libp2p/go-libp2p-pnet v0.1.0 github.com/libp2p/go-libp2p-pubsub v0.2.4 - github.com/libp2p/go-libp2p-pubsub-router v0.1.0 + github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab github.com/libp2p/go-libp2p-quic-transport v0.2.2 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-routing-helpers v0.1.0 diff --git a/go.sum b/go.sum index 81c68259b10..65b7e694191 100644 --- a/go.sum +++ b/go.sum @@ -435,10 +435,12 @@ github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3 github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s= github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk= github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q= +github.com/libp2p/go-libp2p-pubsub v0.2.1/go.mod h1:Jscj3fk23R5mCrOwb625xjVs5ZEyTZcx/OlTwMDqU+g= github.com/libp2p/go-libp2p-pubsub v0.2.4 h1:O4BcaKpPQ9p82yTBtzIzgDFoOXkqhrQpfcVac3FAywU= github.com/libp2p/go-libp2p-pubsub v0.2.4/go.mod h1:1tJwAfySvZQ49R9uTVlkwtSTMVLeQQdrnLTJrr91gVc= github.com/libp2p/go-libp2p-pubsub-router v0.1.0 h1:xA5B8Sdx64tNlSRIcay2QUngtlu8LpUJClaUk/dYYrg= -github.com/libp2p/go-libp2p-pubsub-router v0.1.0/go.mod h1:PnHOshBr/2I2ZxVfEsqfgCQPsVg09zo+DhSlWkOhPFM= +github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab h1:upGMP9YYJ/+IZSVoEQ14E8WOA56h86KDXCMav/g8DjM= +github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191111061427-2fcdcceaeaab/go.mod h1:CeModTwYOlqcWtbc+7N1F3RhG7nbY3h9s3g5iHHe/AQ= github.com/libp2p/go-libp2p-quic-transport v0.2.2 h1:XyGRqFHD1oHdI2k98P1tWWRb9s27fl1SfmCcaX8plso= github.com/libp2p/go-libp2p-quic-transport v0.2.2/go.mod h1:rVzcsiuOFBomAqvNOxeBUcP4vM4wE+NqqRZWvxjkbe0= github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=