-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[P2P] [Tooling] Peer discovery peer list
subcommand
#892
base: main
Are you sure you want to change the base?
Changes from 36 commits
4badf3a
2b83d32
eac7695
bca000b
5e963be
a883f08
83c3604
6ecca53
d570b35
e952365
e80843c
8f90e22
6e691cd
430db08
440b59a
fcfa837
04dc0aa
3925c71
1bbad38
64abbc0
d8b6296
9ecc9e5
1cbc249
ffbc539
0cff1d2
39af37c
c22011c
1fc2bb4
764e171
9eb5a7e
7380260
c03aa27
da62de1
12e22e1
f8f5da5
66a1347
870805f
64ec990
ccec195
90385f0
6d0d300
c6488a5
2317d42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package flags | ||
|
||
import ( | ||
"github.com/spf13/cobra" | ||
"github.com/spf13/viper" | ||
|
||
"github.com/pokt-network/pocket/runtime/configs" | ||
) | ||
|
||
var Cfg *configs.Config | ||
|
||
func ParseConfigAndFlags(_ *cobra.Command, _ []string) error { | ||
// by this time, the config path should be set | ||
Cfg = configs.ParseConfig(ConfigPath) | ||
|
||
// set final `remote_cli_url` value; order of precedence: flag > env var > config > default | ||
RemoteCLIURL = viper.GetString("remote_cli_url") | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
package peer | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/spf13/cobra" | ||
"google.golang.org/protobuf/types/known/anypb" | ||
|
||
"github.com/pokt-network/pocket/app/client/cli/helpers" | ||
"github.com/pokt-network/pocket/logger" | ||
"github.com/pokt-network/pocket/p2p/debug" | ||
"github.com/pokt-network/pocket/shared/messaging" | ||
) | ||
|
||
var ( | ||
listCmd = &cobra.Command{ | ||
Use: "List", | ||
Short: "List the known peers", | ||
Long: "Prints a table of the Peer ID, Pokt Address and Service URL of the known peers", | ||
Aliases: []string{"list", "ls"}, | ||
RunE: listRunE, | ||
} | ||
|
||
ErrRouterType = fmt.Errorf("must specify one of --staked, --unstaked, or --all") | ||
) | ||
|
||
func init() { | ||
PeerCmd.AddCommand(listCmd) | ||
} | ||
|
||
func listRunE(cmd *cobra.Command, _ []string) error { | ||
var routerType debug.RouterType | ||
|
||
bus, err := helpers.GetBusFromCmd(cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
switch { | ||
case stakedFlag && !unstakedFlag && !allFlag: | ||
routerType = debug.StakedRouterType | ||
case unstakedFlag && !stakedFlag && !allFlag: | ||
routerType = debug.UnstakedRouterType | ||
case stakedFlag || unstakedFlag: | ||
return ErrRouterType | ||
// even if `allFlag` is false, we still want to print all connections | ||
default: | ||
routerType = debug.AllRouterTypes | ||
} | ||
|
||
debugMsg := &messaging.DebugMessage{ | ||
Action: messaging.DebugMessageAction_DEBUG_P2P_PRINT_PEER_LIST, | ||
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST, | ||
Message: &anypb.Any{ | ||
Value: []byte(routerType), | ||
}, | ||
} | ||
debugMsgAny, err := anypb.New(debugMsg) | ||
if err != nil { | ||
return fmt.Errorf("error creating anypb from debug message: %w", err) | ||
} | ||
|
||
if localFlag { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comments in #801. They're similar to this as well There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Olshansk can you clarify which comment you are referring to? And what you think should change here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. E.g. #801 (comment) @h5law Can you take a stab at #801 first? I (accidently) reviewed it before this one so I avoid repeating some of the stylistic recommendations here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you. This one is on me There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have gone through #801 and I think I have already addressed most of the relavent comments in this PR as is |
||
if err := debug.PrintPeerList(bus, routerType); err != nil { | ||
return fmt.Errorf("error printing peer list: %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
// TECHDEBT(#811): will need to wait for DHT bootstrapping to complete before | ||
// p2p broadcast can be used with to reach unstaked actors. | ||
// CONSIDERATION: add the peer commands to the interactive CLI as the P2P module | ||
// instance could persist between commands. Other interactive CLI commands which | ||
// rely on unstaked actor router broadcast are working as expected. | ||
|
||
// TECHDEBT(#811): use broadcast instead to reach all peers. | ||
return sendToStakedPeers(cmd, debugMsgAny) | ||
} | ||
|
||
func sendToStakedPeers(cmd *cobra.Command, debugMsgAny *anypb.Any) error { | ||
bus, err := helpers.GetBusFromCmd(cmd) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
pstore, err := helpers.FetchPeerstore(cmd) | ||
if err != nil { | ||
logger.Global.Fatal().Err(err).Msg("unable to retrieve the pstore") | ||
} | ||
|
||
if pstore.Size() == 0 { | ||
logger.Global.Fatal().Msg("no validators found") | ||
} | ||
|
||
for _, peer := range pstore.GetPeerList() { | ||
if err := bus.GetP2PModule().Send(peer.GetAddress(), debugMsgAny); err != nil { | ||
logger.Global.Error().Err(err).Msg("failed to send debug message") | ||
} | ||
} | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package peer | ||
|
||
import ( | ||
"github.com/spf13/cobra" | ||
|
||
"github.com/pokt-network/pocket/app/client/cli/helpers" | ||
) | ||
|
||
var ( | ||
allFlag, | ||
stakedFlag, | ||
unstakedFlag, | ||
localFlag bool | ||
|
||
PeerCmd = &cobra.Command{ | ||
Use: "peer", | ||
Short: "Manage peers", | ||
PersistentPreRunE: helpers.P2PDependenciesPreRunE, | ||
} | ||
) | ||
|
||
func init() { | ||
PeerCmd.PersistentFlags().BoolVarP(&allFlag, "all", "a", false, "operations apply to both staked & unstaked router peerstores (default)") | ||
PeerCmd.PersistentFlags().BoolVarP(&stakedFlag, "staked", "s", false, "operations only apply to staked router peerstore (i.e. raintree)") | ||
PeerCmd.PersistentFlags().BoolVarP(&unstakedFlag, "unstaked", "u", false, "operations only apply to unstaked (including staked as a subset) router peerstore (i.e. gossipsub)") | ||
PeerCmd.PersistentFlags().BoolVarP(&localFlag, "local", "l", false, "operations apply to the local (CLI binary's) P2P module instead of being broadcast") | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,6 @@ import ( | |
"github.com/pokt-network/pocket/logger" | ||
"github.com/pokt-network/pocket/p2p/config" | ||
"github.com/pokt-network/pocket/p2p/protocol" | ||
"github.com/pokt-network/pocket/p2p/providers" | ||
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider" | ||
typesP2P "github.com/pokt-network/pocket/p2p/types" | ||
"github.com/pokt-network/pocket/p2p/unicast" | ||
|
@@ -36,8 +35,8 @@ var ( | |
// TECHDEBT: Make these values configurable | ||
// TECHDEBT: Consider using an exponential backoff instead | ||
const ( | ||
connectMaxRetries = 5 | ||
connectRetryTimeout = time.Second * 2 | ||
connectMaxRetries = 7 | ||
connectRetryTimeout = time.Second * 3 | ||
) | ||
|
||
type backgroundRouterFactory = modules.FactoryWithConfig[typesP2P.Router, *config.BackgroundConfig] | ||
|
@@ -73,7 +72,7 @@ type backgroundRouter struct { | |
subscription *pubsub.Subscription | ||
// kadDHT is a kademlia distributed hash table used for routing and peer discovery. | ||
kadDHT *dht.IpfsDHT | ||
// TECHDEBT: `pstore` will likely be removed in future refactoring / simplification | ||
// TECHDEBT(#747, #749): `pstore` will likely be removed in future refactoring / simplification | ||
// of the `Router` interface. | ||
// pstore is the background router's peerstore. Assigned in `backgroundRouter#setupPeerstore()`. | ||
pstore typesP2P.Peerstore | ||
|
@@ -258,18 +257,11 @@ func (rtr *backgroundRouter) setupDependencies(ctx context.Context, _ *config.Ba | |
} | ||
|
||
func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) { | ||
rtr.logger.Warn().Msg("setting up peerstore...") | ||
|
||
// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider | ||
// TECHDEBT(#811): use `bus.GetPeerstoreProvider()` after peerstore provider | ||
// is retrievable as a proper submodule | ||
pstoreProviderModule, err := rtr.GetBus().GetModulesRegistry(). | ||
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) | ||
pstoreProvider, err := peerstore_provider.GetPeerstoreProvider(rtr.GetBus()) | ||
if err != nil { | ||
return fmt.Errorf("retrieving peerstore provider: %w", err) | ||
} | ||
pstoreProvider, ok := pstoreProviderModule.(providers.PeerstoreProvider) | ||
if !ok { | ||
return fmt.Errorf("unexpected peerstore provider type: %T", pstoreProviderModule) | ||
return err | ||
} | ||
|
||
rtr.logger.Debug().Msg("setupCurrentHeightProvider") | ||
|
@@ -284,10 +276,7 @@ func (rtr *backgroundRouter) setupPeerstore(ctx context.Context) (err error) { | |
} | ||
|
||
// TECHDEBT(#859): integrate with `p2pModule#bootstrap()`. | ||
if err := rtr.bootstrap(ctx); err != nil { | ||
return fmt.Errorf("bootstrapping peerstore: %w", err) | ||
} | ||
|
||
rtr.bootstrap(ctx) | ||
return nil | ||
} | ||
|
||
|
@@ -343,33 +332,38 @@ func (rtr *backgroundRouter) setupSubscription() (err error) { | |
} | ||
|
||
// TECHDEBT(#859): integrate with `p2pModule#bootstrap()`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func (rtr *backgroundRouter) bootstrap(ctx context.Context) error { | ||
func (rtr *backgroundRouter) bootstrap(ctx context.Context) { | ||
// CONSIDERATION: add `GetPeers` method, which returns a map, | ||
// to the `PeerstoreProvider` interface to simplify this loop. | ||
for _, peer := range rtr.pstore.GetPeerList() { | ||
peerList := rtr.pstore.GetPeerList() | ||
for _, peer := range peerList { | ||
if err := utils.AddPeerToLibp2pHost(rtr.host, peer); err != nil { | ||
return err | ||
rtr.logger.Error().Err(err).Msg("error adding peer to libp2p host") | ||
continue | ||
} | ||
|
||
libp2pAddrInfo, err := utils.Libp2pAddrInfoFromPeer(peer) | ||
if err != nil { | ||
return fmt.Errorf( | ||
"converting peer info, pokt address: %s: %w", | ||
peer.GetAddress(), | ||
err, | ||
) | ||
rtr.logger.Error().Err(err).Msg("error converting peer info") | ||
continue | ||
} | ||
|
||
// don't attempt to connect to self | ||
if rtr.host.ID() == libp2pAddrInfo.ID { | ||
return nil | ||
rtr.logger.Debug().Msg("not bootstrapping against self") | ||
continue | ||
} | ||
|
||
rtr.logger.Debug().Fields(map[string]any{ | ||
"peer_id": libp2pAddrInfo.ID.String(), | ||
"peer_addr": libp2pAddrInfo.Addrs[0].String(), | ||
h5law marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"num_peers": len(peerList) - 1, // -1 as includes self | ||
}).Msg("connecting to peer") | ||
if err := rtr.connectWithRetry(ctx, libp2pAddrInfo); err != nil { | ||
return fmt.Errorf("connecting to peer: %w", err) | ||
rtr.logger.Error().Err(err).Msg("error connecting to bootstrap peer") | ||
continue | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
// connectWithRetry attempts to connect to the given peer, retrying up to connectMaxRetries times | ||
|
@@ -382,11 +376,11 @@ func (rtr *backgroundRouter) connectWithRetry(ctx context.Context, libp2pAddrInf | |
return nil | ||
} | ||
|
||
fmt.Printf("Failed to connect (attempt %d), retrying in %v...\n", i+1, connectRetryTimeout) | ||
rtr.logger.Error().Msgf("failed to connect (attempt %d), retrying in %v...", i+1, connectRetryTimeout) | ||
time.Sleep(connectRetryTimeout) | ||
} | ||
|
||
return fmt.Errorf("failed to connect after %d attempts, last error: %w", 5, err) | ||
return fmt.Errorf("failed to connect after %d attempts, last error: %w", connectMaxRetries, err) | ||
} | ||
|
||
// topicValidator is used in conjunction with libp2p-pubsub's notion of "topic | ||
|
@@ -430,7 +424,6 @@ func (rtr *backgroundRouter) readSubscription(ctx context.Context) { | |
return | ||
} | ||
msg, err := rtr.subscription.Next(ctx) | ||
|
||
if err != nil { | ||
rtr.logger.Error().Err(err). | ||
Msg("error reading from background topic subscription") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we do this for the other
flags.go
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intended to revert this change. This is the direction I started when investigating #891 (comment), I thought I had reverted it but apparently failed.
To answer your question, the reason for this is because viper's only integration with flags is to support setting a viper key based on the flag, but not the other way around. I.e: Viper won't update the flag value. This only applies to bound flags
This means that we have to do one of the following consistently for bound flags:
viper.GetString("<flag key>")
(or a helper containing it) anywhere we need the valueI opted for the latter option as I felt it was more conventional and easier to read and maintain.