Skip to content

Commit

Permalink
Last week's state sync debugging changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Olshansk committed Aug 7, 2023
1 parent 819f990 commit 6ffe41d
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 22 deletions.
3 changes: 2 additions & 1 deletion build/config/config.validator1.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@
},
"servicer": {
"enabled": true,
"private_key": "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e",
"chains": ["0001"]
},
"ibc": {
"enabled": true,
"stores_dir": "/var/ibc",
"host": {
"private_key": "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e"
"private_key": "0ca1a40ddecdab4f5b04fa0bfed1d235beaa2b8082e7554425607516f0862075dfe357de55649e6d2ce889acf15eb77e94ab3c5756fe46d3c7538d37f27f115e"
}
}
}
7 changes: 6 additions & 1 deletion build/localnet/Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,19 @@ deps = [
"build/debug.go",
"consensus",
"p2p",
"persistance",
"persistence",
"rpc",
"runtime",
"shared",
"telemetry",
"utility",
"vendor",
"logger",
"e2e",
"ibc",
"internal",
"state_machine",
"tools",
]

deps_full_path = [root_dir + "/" + depdir for depdir in deps]
Expand Down
3 changes: 2 additions & 1 deletion build/localnet/cluster-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func init() {
clusterManagerCmd.PersistentFlags().StringVar(
&flags.RemoteCLIURL,
"remote_cli_url",
defaults.Validator1EndpointK8SHostname,
// defaults.Validator1EndpointK8SHostname,
defaults.FullNode1EndpointK8SHostname,
"takes a remote endpoint in the form of <protocol>://<host>:<port> (uses RPC Port)",
)

Expand Down
3 changes: 3 additions & 0 deletions consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func (m *consensusModule) isBlockInMessageValidBasic(msg *typesCons.HotstuffMess

// refreshUtilityUnitOfWork is a helper that creates a new Utility Unit Of Work and clears/nullifies a previous one if it exists
func (m *consensusModule) refreshUtilityUnitOfWork() error {
// m.m.Lock()

Check failure on line 81 in consensus/block.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// defer m.m.Unlock()

// Catch-all structure to release the previous utility UOW if it wasn't properly cleaned up.
utilityUnitOfWork := m.utilityUnitOfWork

Expand Down
1 change: 1 addition & 0 deletions consensus/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (m *consensusModule) HandleUnsynced(msg *messaging.StateMachineTransitionEv
func (m *consensusModule) HandleSyncMode(msg *messaging.StateMachineTransitionEvent) error {
m.logger.Info().Str("source", consensusFSMHandlerSource).Msg("Node is in Sync Mode. About to start synchronous sync loop...")
go m.stateSync.StartSynchronousStateSync()
m.logger.Info().Str("source", consensusFSMHandlerSource).Msg("Node is in Sync Mode. Finished synchronous sync loop!!!")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (handler *HotstuffReplicaMessageHandler) HandleDecideMessage(m *consensusMo
if err := m.commitBlock(m.block); err != nil {
m.logger.Error().Err(err).Msg("Could not commit block")
m.paceMaker.InterruptRound("failed to commit block")
return
return

Check failure on line 166 in consensus/hotstuff_replica.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
}

m.paceMaker.NewHeight()
Expand Down
2 changes: 1 addition & 1 deletion consensus/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (m *consensusModule) loadPersistedState() error {
defer readCtx.Release()

latestHeight, err := readCtx.GetMaximumBlockHeight()
if err != nil || latestHeight == 0 {
if err != nil {
// TODO: Proper state sync not implemented yet
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion consensus/module_consensus_pacemaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (m *consensusModule) BroadcastMessageToValidators(msg *anypb.Any) error {
}

func (m *consensusModule) IsLeader() bool {
return m.leaderId != nil && *m.leaderId == m.nodeId
valMod, err := m.GetBus().GetUtilityModule().GetValidatorModule()
return err == nil && valMod != nil && m.leaderId != nil && *m.leaderId == m.nodeId
}

func (m *consensusModule) IsLeaderSet() bool {
Expand Down
8 changes: 7 additions & 1 deletion consensus/state_sync/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func (m *stateSync) sendStateSyncMessage(msg *typesCons.StateSyncMessage, dst cr
// For now, aggregating the messages when requests is good enough.
func (m *stateSync) getAggregatedStateSyncMetadata() (minHeight, maxHeight uint64) {
chanLen := len(m.metadataReceived)
m.logger.Info().Msgf("Looping over %d state sync metadata responses", chanLen)
m.logger.Info().
Int16("num_state_sync_metadata_messages", int16(chanLen)).
Msgf("About to loop overstate sync metadata responses")

for i := 0; i < chanLen; i++ {
metadata := <-m.metadataReceived
Expand All @@ -32,5 +34,9 @@ func (m *stateSync) getAggregatedStateSyncMetadata() (minHeight, maxHeight uint6
minHeight = metadata.MinHeight
}
}
m.logger.Info().Fields(map[string]any{
"min_height": minHeight,
"max_height": maxHeight,
}).Msg("Finished aggregating state sync metadata")
return
}
27 changes: 24 additions & 3 deletions consensus/state_sync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
committedBlocsChannelSize = 100
metadataChannelSize = 1000
blocksChannelSize = 1000
metadataSyncPeriod = 45 * time.Second
metadataSyncPeriod = 10 * time.Second
)

type StateSyncModule interface {
Expand Down Expand Up @@ -99,10 +99,21 @@ func (m *stateSync) StartSynchronousStateSync() {
// Get a view into the state of the network
_, maxHeight := m.getAggregatedStateSyncMetadata()

m.logger.Info().
Uint64("current_height", currentHeight).
Uint64("max_height", maxHeight).
Msg("Synchronous state sync is requesting blocks...")

// Synchronously request block requests from the current height to the aggregated metadata height
// Note that we are using `<=` because:
// - maxHeight is the max * committed * height of the network
// - currentHeight is the latest * committing * height of the node

// We do not need to request the genesis block from anyone
if currentHeight == 0 {
currentHeight += 1
}

for currentHeight <= maxHeight {
m.logger.Info().Msgf("Synchronous state sync is requesting block: %d, ending height: %d", currentHeight, maxHeight)

Expand Down Expand Up @@ -132,7 +143,7 @@ func (m *stateSync) StartSynchronousStateSync() {
case blockHeight := <-m.committedBlocksChannel:
m.logger.Info().Msgf("State sync received event that block %d is committed!", blockHeight)
case <-time.After(blockWaitingPeriod):
m.logger.Warn().Msgf("Timed out waiting for block %d to be committed...", currentHeight)
m.logger.Error().Msgf("Timed out waiting for block %d to be committed...", currentHeight)
}

// Update the height and continue catching up to the latest known state
Expand All @@ -159,8 +170,18 @@ func (m *stateSync) StartSynchronousStateSync() {
}

func (m *stateSync) HandleStateSyncMetadataResponse(res *typesCons.StateSyncMetadataResponse) {
m.logger.Info().Msg("Handling state sync metadata response")
m.logger.Info().Fields(map[string]any{
"peer_address": res.PeerAddress,
"min_height": res.MinHeight,
"max_height": res.MaxHeight,
}).Msg("Handling state sync metadata response")
m.metadataReceived <- res

if res.MaxHeight > 0 && m.GetBus().GetConsensusModule().CurrentHeight() <= res.MaxHeight {
if err := m.GetBus().GetStateMachineModule().SendEvent(coreTypes.StateMachineEvent_Consensus_IsUnsynced); err != nil {
m.logger.Error().Err(err).Msg("Failed to send state machine event")
}
}
}

func (m *stateSync) HandleBlockCommittedEvent(msg *messaging.ConsensusNewHeightEvent) {
Expand Down
10 changes: 10 additions & 0 deletions consensus/state_sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func (m *stateSync) HandleStateSyncMetadataRequest(metadataReq *typesCons.StateS
serverNodePeerAddress := consensusMod.GetNodeAddress()
clientPeerAddress := metadataReq.PeerAddress

// No blocks or metadata to share at genesis
if consensusMod.CurrentHeight() == 0 {
return
}

// current height is the height of the block that is being processed, so we need to subtract 1 for the last finalized block
prevPersistedBlockHeight := consensusMod.CurrentHeight() - 1

Expand Down Expand Up @@ -81,6 +86,11 @@ func (m *stateSync) HandleGetBlockRequest(blockReq *typesCons.GetBlockRequest) {
serverNodePeerAddress := consensusMod.GetNodeAddress()
clientPeerAddress := blockReq.PeerAddress

// No blocks or metadata to share at genesis
if consensusMod.CurrentHeight() == 0 {
return
}

// Check if the block should be retrievable based on the node's consensus height
prevPersistedBlockHeight := consensusMod.CurrentHeight() - 1
if prevPersistedBlockHeight < blockReq.Height {
Expand Down
4 changes: 2 additions & 2 deletions consensus/state_sync_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m *consensusModule) handleStateSyncMessage(stateSyncMessage *typesCons.Sta
switch stateSyncMessage.Message.(type) {

case *typesCons.StateSyncMessage_MetadataReq:
m.logger.Info().Str("proto_type", "MetadataRequest").Msg("Handling StateSyncMessage MetadataReq")
// m.logger.Info().Str("proto_type", "MetadataRequest").Msg("Handling StateSyncMessage MetadataReq")

Check failure on line 36 in consensus/state_sync_handler.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
if !m.consCfg.ServerModeEnabled {
m.logger.Warn().Msg("Node's server module is not enabled")
return nil
Expand All @@ -42,7 +42,7 @@ func (m *consensusModule) handleStateSyncMessage(stateSyncMessage *typesCons.Sta
return nil

case *typesCons.StateSyncMessage_GetBlockReq:
m.logger.Info().Str("proto_type", "GetBlockRequest").Msg("Handling StateSyncMessage GetBlockRequest")
// m.logger.Info().Str("proto_type", "GetBlockRequest").Msg("Handling StateSyncMessage GetBlockRequest")

Check failure on line 45 in consensus/state_sync_handler.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
if !m.consCfg.ServerModeEnabled {
m.logger.Warn().Msg("Node's server module is not enabled")
return nil
Expand Down
3 changes: 2 additions & 1 deletion e2e/tests/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ var (
)

func init() {
defaultRPCHost := runtime.GetEnv("RPC_HOST", defaults.RandomValidatorEndpointK8SHostname)
// defaultRPCHost := runtime.GetEnv("RPC_HOST", defaults.RandomValidatorEndpointK8SHostname)
defaultRPCHost := runtime.GetEnv("RPC_HOST", defaults.FullNode1EndpointK8SHostname)
defaultRPCURL = fmt.Sprintf("http://%s:%s", defaultRPCHost, defaults.DefaultRPCPort)
}

Expand Down
1 change: 1 addition & 0 deletions runtime/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
DefaultRPCHost = "localhost"
Validator1EndpointDockerComposeHostname = "validator1"
Validator1EndpointK8SHostname = "validator-001-pocket"
FullNode1EndpointK8SHostname = "full-node-001-pocket"
RandomValidatorEndpointK8SHostname = "pocket-validators"
)

Expand Down
8 changes: 4 additions & 4 deletions shared/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ func (m *Node) GetBus() modules.Bus {
// TODO: Move all message types this is dependant on to the `messaging` package
func (node *Node) handleEvent(message *messaging.PocketEnvelope) error {
contentType := message.GetContentType()
logger.Global.Debug().Fields(map[string]any{
"message": message,
"contentType": contentType,
}).Msg("node handling event")
// logger.Global.Debug().Fields(map[string]any{
// "message": message,
// "contentType": contentType,
// }).Msg("node handling event")

switch contentType {

Expand Down
3 changes: 3 additions & 0 deletions state_machine/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewNodeFSM(callbacks *fsm.Callbacks, options ...func(*fsm.FSM)) *fsm.FSM {
Name: string(coreTypes.StateMachineEvent_Consensus_IsSyncedNonValidator),
Src: []string{
string(coreTypes.StateMachineState_Consensus_SyncMode),
// string(coreTypes.StateMachineState_Consensus_Synced),
},
Dst: string(coreTypes.StateMachineState_Consensus_Synced),
},
Expand All @@ -60,6 +61,8 @@ func NewNodeFSM(callbacks *fsm.Callbacks, options ...func(*fsm.FSM)) *fsm.FSM {
Src: []string{
string(coreTypes.StateMachineState_Consensus_Pacemaker),
string(coreTypes.StateMachineState_Consensus_Synced),
// string(coreTypes.StateMachineState_Consensus_Unsynced),
string(coreTypes.StateMachineState_Consensus_SyncMode),
string(coreTypes.StateMachineState_P2P_Bootstrapped),
},
Dst: string(coreTypes.StateMachineState_Consensus_Unsynced),
Expand Down
10 changes: 5 additions & 5 deletions utility/unit_of_work/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,19 @@ func (u *baseUtilityUnitOfWork) getActorExists(actorType coreTypes.ActorType, ad

// IMPROVE: Need to re-evaluate the design of `Output Address` to support things like "rev-share"
// and multiple output addresses.
func (u *baseUtilityUnitOfWork) getActorOutputAddress(actorType coreTypes.ActorType, operator []byte) ([]byte, coreTypes.Error) {
func (uow *baseUtilityUnitOfWork) getActorOutputAddress(actorType coreTypes.ActorType, operator []byte) ([]byte, coreTypes.Error) {
var outputAddr []byte
var err error

switch actorType {
case coreTypes.ActorType_ACTOR_TYPE_APP:
outputAddr, err = u.persistenceReadContext.GetAppOutputAddress(operator, u.height)
outputAddr, err = uow.persistenceReadContext.GetAppOutputAddress(operator, uow.height)
case coreTypes.ActorType_ACTOR_TYPE_FISH:
outputAddr, err = u.persistenceReadContext.GetFishermanOutputAddress(operator, u.height)
outputAddr, err = uow.persistenceReadContext.GetFishermanOutputAddress(operator, uow.height)
case coreTypes.ActorType_ACTOR_TYPE_SERVICER:
outputAddr, err = u.persistenceReadContext.GetServicerOutputAddress(operator, u.height)
outputAddr, err = uow.persistenceReadContext.GetServicerOutputAddress(operator, uow.height)
case coreTypes.ActorType_ACTOR_TYPE_VAL:
outputAddr, err = u.persistenceReadContext.GetValidatorOutputAddress(operator, u.height)
outputAddr, err = uow.persistenceReadContext.GetValidatorOutputAddress(operator, uow.height)
default:
err = coreTypes.ErrUnknownActorType(actorType.String())
}
Expand Down
3 changes: 3 additions & 0 deletions utility/unit_of_work/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (

var _ modules.UtilityUnitOfWork = &baseUtilityUnitOfWork{}

// TODO: Rename all `u * baseUtilityUnitOfWork` to `uow * baseUtilityUnitOfWork` for consistency
type baseUtilityUnitOfWork struct {
base_modules.IntegrableModule

Expand Down Expand Up @@ -117,6 +118,8 @@ func (uow *baseUtilityUnitOfWork) Commit(quorumCert []byte) error {
}

func (uow *baseUtilityUnitOfWork) Release() error {
uow.logger.Info().Msg("releasing the unit of work...")

rwCtx := uow.persistenceRWContext
if rwCtx != nil {
uow.persistenceRWContext = nil
Expand Down

0 comments on commit 6ffe41d

Please sign in to comment.