Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[P2P] Refactor P2P submodules #895

Merged
merged 9 commits into from
Jul 13, 2023
11 changes: 3 additions & 8 deletions app/client/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/messaging"
Expand Down Expand Up @@ -227,19 +226,15 @@ func fetchPeerstore(cmd *cobra.Command) (typesP2P.Peerstore, error) {
if !ok || bus == nil {
return nil, errors.New("retrieving bus from CLI context")
}
modulesRegistry := bus.GetModulesRegistry()
// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule
pstoreProvider, err := modulesRegistry.GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
pstoreProvider, err := bus.GetModulesRegistry().GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return nil, errors.New("retrieving peerstore provider")
}
currentHeightProvider, err := modulesRegistry.GetModule(current_height_provider.ModuleName)
if err != nil {
return nil, errors.New("retrieving currentHeightProvider")
}
currentHeightProvider := bus.GetCurrentHeightProvider()

height := currentHeightProvider.(current_height_provider.CurrentHeightProvider).CurrentHeight()
height := currentHeightProvider.CurrentHeight()
pstore, err := pstoreProvider.(peerstore_provider.PeerstoreProvider).GetStakedPeerstoreAtHeight(height)
if err != nil {
return nil, fmt.Errorf("retrieving peerstore at height %d", height)
Expand Down
23 changes: 15 additions & 8 deletions app/client/cli/helpers/setup.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helpers

import (
"fmt"

"github.com/spf13/cobra"

"github.com/pokt-network/pocket/app/client/cli/flags"
Expand Down Expand Up @@ -30,7 +32,11 @@ func P2PDependenciesPreRunE(cmd *cobra.Command, _ []string) error {
if err := setupPeerstoreProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil {
return err
}
setupCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL)

if err := setupRPCCurrentHeightProvider(*runtimeMgr, flags.RemoteCLIURL); err != nil {
return err
}

setupAndStartP2PModule(*runtimeMgr)

return nil
Expand All @@ -44,15 +50,16 @@ func setupPeerstoreProvider(rm runtime.Manager, rpcURL string) error {
return nil
}

func setupCurrentHeightProvider(rm runtime.Manager, rpcURL string) {
// TECHDEBT(#810): simplify after current height provider is refactored as
// a submodule.
bus := rm.GetBus()
modulesRegistry := bus.GetModulesRegistry()
currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(
func setupRPCCurrentHeightProvider(rm runtime.Manager, rpcURL string) error {
// Ensure `CurrentHeightProvider` exists in the modules registry.
_, err := rpcCHP.Create(
rm.GetBus(),
rpcCHP.WithCustomRPCURL(rpcURL),
)
modulesRegistry.RegisterModule(currentHeightProvider)
if err != nil {
return fmt.Errorf("setting up current height provider: %w", err)
}
return nil
}

func setupAndStartP2PModule(rm runtime.Manager) {
Expand Down
6 changes: 6 additions & 0 deletions consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
consensusTelemetry "github.com/pokt-network/pocket/consensus/telemetry"
typesCons "github.com/pokt-network/pocket/consensus/types"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/providers/current_height_provider/consensus"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/genesis"
"github.com/pokt-network/pocket/shared/codec"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (*consensusModule) Create(bus modules.Bus, options ...modules.ModuleOption)

bus.RegisterModule(m)

// Ensure `CurrentHeightProvider` submodule is registered.
if _, err = consensus.Create(bus); err != nil {
return nil, fmt.Errorf("failed to create current height provider: %w", err)
}

runtimeMgr := bus.GetRuntimeMgr()

consensusCfg := runtimeMgr.GetConfig().Consensus
Expand Down
45 changes: 30 additions & 15 deletions p2p/background/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/unicast"
"github.com/pokt-network/pocket/p2p/utils"
Expand All @@ -27,9 +28,8 @@ import (
)

var (
_ typesP2P.Router = &backgroundRouter{}
_ modules.IntegrableModule = &backgroundRouter{}
_ backgroundRouterFactory = &backgroundRouter{}
_ typesP2P.Router = &backgroundRouter{}
_ backgroundRouterFactory = &backgroundRouter{}
)

type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig]
Expand Down Expand Up @@ -93,7 +93,7 @@ func (*backgroundRouter) Create(bus modules.Bus, cfg *config.BackgroundConfig) (
host: cfg.Host,
cancelReadSubscription: cancel,
}
rtr.SetBus(bus)
bus.RegisterModule(rtr)

bgRouterLogger.Info().Fields(map[string]any{
"host_id": cfg.Host.ID(),
Expand Down Expand Up @@ -127,6 +127,11 @@ func (rtr *backgroundRouter) Close() error {
)
}

// GetModuleName implements the respective `modules.Integrable` interface method.
func (rtr *backgroundRouter) GetModuleName() string {
return typesP2P.UnstakedActorRouterSubmoduleName
}

// Broadcast implements the respective `typesP2P.Router` interface method.
func (rtr *backgroundRouter) Broadcast(pocketEnvelopeBz []byte) error {
backgroundMsg := &typesP2P.BackgroundMessage{
Expand Down Expand Up @@ -215,7 +220,8 @@ func (rtr *backgroundRouter) setupUnicastRouter() error {
return nil
}

func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.BackgroundConfig) error {
// TECHBEDT(#810,#811): remove unused `BackgroundConfig`
func (rtr *backgroundRouter) setupDependencies(ctx context.Context, _ *config.BackgroundConfig) error {
// NB: The order in which the internal components are setup below is important
if err := rtr.setupUnicastRouter(); err != nil {
return err
Expand All @@ -237,21 +243,30 @@ func (rtr *backgroundRouter) setupDependencies(ctx context.Context, cfg *config.
return fmt.Errorf("setting up subscription: %w", err)
}

if err := rtr.setupPeerstore(
ctx,
cfg.PeerstoreProvider,
cfg.CurrentHeightProvider,
); err != nil {
if err := rtr.setupPeerstore(ctx); err != nil {
return fmt.Errorf("setting up peerstore: %w", err)
}
return nil
}

func (rtr *backgroundRouter) setupPeerstore(
ctx context.Context,
pstoreProvider providers.PeerstoreProvider,
currentHeightProvider providers.CurrentHeightProvider,
) (err error) {
func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) {
rtr.logger.Warn().Msg("setting up peerstore...")

// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule
pstoreProviderModule, err := rtr.GetBus().GetModulesRegistry().
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return fmt.Errorf("retrieving peerstore provider: %w", err)
}
pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider)
if !ok {
return fmt.Errorf("unexpected peerstore provider type: %T", pstoreProviderModule)
}

rtr.logger.Debug().Msg("setupCurrentHeightProvider")
currentHeightProvider := rtr.GetBus().GetCurrentHeightProvider()

// seed initial peerstore with current on-chain peer info (i.e. staked actors)
rtr.pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(
currentHeightProvider.CurrentHeight(),
Expand Down
31 changes: 22 additions & 9 deletions p2p/background/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (
"github.com/pokt-network/pocket/internal/testutil"
"github.com/pokt-network/pocket/p2p/config"
"github.com/pokt-network/pocket/p2p/protocol"
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
mock_types "github.com/pokt-network/pocket/p2p/types/mocks"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/runtime"
"github.com/pokt-network/pocket/runtime/configs"
"github.com/pokt-network/pocket/runtime/defaults"
cryptoPocket "github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
mockModules "github.com/pokt-network/pocket/shared/modules/mocks"
)

Expand Down Expand Up @@ -399,6 +402,7 @@ func newTestRouter(
return newRouterWithSelfPeerAndHost(t, selfPeer, host, handler)
}

// TECHDEBT(#796): de-dup & refactor
func newRouterWithSelfPeerAndHost(
t *testing.T,
selfPeer typesP2P.Peer,
Expand All @@ -415,16 +419,27 @@ func newRouterWithSelfPeerAndHost(
},
}).AnyTimes()

modulesRegistry := runtime.NewModulesRegistry()
busMock := mockModules.NewMockBus(ctrl)
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
busMock.EXPECT().GetModulesRegistry().Return(modulesRegistry).AnyTimes()
busMock.EXPECT().RegisterModule(gomock.Any()).Do(func(m modules.Submodule) {
modulesRegistry.RegisterModule(m)
m.SetBus(busMock)
}).AnyTimes()

consensusMock := mockModules.NewMockConsensusModule(ctrl)
consensusMock.EXPECT().GetModuleName().Return(modules.ConsensusModuleName).AnyTimes()
consensusMock.EXPECT().GetBus().Return(busMock).AnyTimes()
consensusMock.EXPECT().SetBus(gomock.Any()).AnyTimes()
consensusMock.EXPECT().CurrentHeight().Return(uint64(1)).AnyTimes()
busMock.EXPECT().GetCurrentHeightProvider().Return(consensusMock).AnyTimes()

pstore := make(typesP2P.PeerAddrMap)
pstoreProviderMock := mock_types.NewMockPeerstoreProvider(ctrl)
pstoreProviderMock.EXPECT().GetModuleName().Return(peerstore_provider.PeerstoreProviderSubmoduleName)
pstoreProviderMock.EXPECT().GetStakedPeerstoreAtHeight(gomock.Any()).Return(pstore, nil).AnyTimes()

busMock := mockModules.NewMockBus(ctrl)
busMock.EXPECT().GetConsensusModule().Return(consensusMock).AnyTimes()
busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()
modulesRegistry.RegisterModule(pstoreProviderMock)

err := pstore.AddPeer(selfPeer)
require.NoError(t, err)
Expand All @@ -434,11 +449,9 @@ func newRouterWithSelfPeerAndHost(
}

router, err := Create(busMock, &config.BackgroundConfig{
Addr: selfPeer.GetAddress(),
PeerstoreProvider: pstoreProviderMock,
CurrentHeightProvider: consensusMock,
Host: host,
Handler: handler,
Addr: selfPeer.GetAddress(),
Host: host,
Handler: handler,
})
require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion p2p/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,14 @@ func (m *p2pModule) bootstrap() error {
return fmt.Errorf("creating RPC peerstore provider: %w", err)
}

currentHeightProvider := rpcCHP.NewRPCCurrentHeightProvider(rpcCHP.WithCustomRPCURL(bootstrapNode))
currentHeightProvider, err := rpcCHP.Create(
m.GetBus(),
rpcCHP.WithCustomRPCURL(bootstrapNode),
)
if err != nil {
m.logger.Warn().Err(err).Str("endpoint", bootstrapNode).Msg("Error getting current height from bootstrap node")
continue
}

pstore, err = pstoreProvider.GetStakedPeerstoreAtHeight(currentHeightProvider.CurrentHeight())
if err != nil {
Expand Down
53 changes: 17 additions & 36 deletions p2p/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/multierr"

"github.com/pokt-network/pocket/p2p/providers"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/crypto"
"github.com/pokt-network/pocket/shared/modules"
Expand All @@ -20,18 +19,16 @@ var (
_ typesP2P.RouterConfig = &RainTreeConfig{}
)

// baseConfig implements `RouterConfig` using the given libp2p host and current
// height and peerstore providers. Intended for internal use by other `RouterConfig`
// baseConfig implements `RouterConfig` using the given libp2p host, pokt address
// and handler function. Intended for internal use by other `RouterConfig`
// implementations with common config parameters.
//
// NB: intentionally *not* embedding `baseConfig` to improve readability of usages
// of would-be embedders (e.g. `BackgroundConfig`).
type baseConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

type UnicastRouterConfig struct {
Expand All @@ -44,20 +41,16 @@ type UnicastRouterConfig struct {

// BackgroundConfig implements `RouterConfig` for use with `BackgroundRouter`.
type BackgroundConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

// RainTreeConfig implements `RouterConfig` for use with `RainTreeRouter`.
type RainTreeConfig struct {
Host host.Host
Addr crypto.Address
CurrentHeightProvider providers.CurrentHeightProvider
PeerstoreProvider providers.PeerstoreProvider
Handler func(data []byte) error
Host host.Host
Addr crypto.Address
Handler func(data []byte) error
}

// IsValid implements the respective member of the `RouterConfig` interface.
Expand All @@ -66,18 +59,10 @@ func (cfg *baseConfig) IsValid() (err error) {
err = multierr.Append(err, fmt.Errorf("pokt address not configured"))
}

if cfg.CurrentHeightProvider == nil {
err = multierr.Append(err, fmt.Errorf("current height provider not configured"))
}

if cfg.Host == nil {
err = multierr.Append(err, fmt.Errorf("host not configured"))
}

if cfg.PeerstoreProvider == nil {
err = multierr.Append(err, fmt.Errorf("peerstore provider not configured"))
}

if cfg.Handler == nil {
err = multierr.Append(err, fmt.Errorf("handler not configured"))
}
Expand Down Expand Up @@ -111,23 +96,19 @@ func (cfg *UnicastRouterConfig) IsValid() (err error) {
// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *BackgroundConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
Handler: cfg.Handler,
Host: cfg.Host,
Addr: cfg.Addr,
Handler: cfg.Handler,
}
return baseCfg.IsValid()
}

// IsValid implements the respective member of the `RouterConfig` interface.
func (cfg *RainTreeConfig) IsValid() error {
baseCfg := baseConfig{
Host: cfg.Host,
Addr: cfg.Addr,
CurrentHeightProvider: cfg.CurrentHeightProvider,
PeerstoreProvider: cfg.PeerstoreProvider,
Handler: cfg.Handler,
Host: cfg.Host,
Addr: cfg.Addr,
Handler: cfg.Handler,
}
return baseCfg.IsValid()
}
7 changes: 6 additions & 1 deletion p2p/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,12 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {
}

oldPeerList := m.stakedActorRouter.GetPeerstore().GetPeerList()
updatedPeerstore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height)
pstoreProvider, err := m.getPeerstoreProvider()
if err != nil {
return err
}

updatedPeerstore, err := pstoreProvider.GetStakedPeerstoreAtHeight(consensusNewHeightEvent.Height)
if err != nil {
return err
}
Expand Down
Loading