Skip to content

Commit

Permalink
Fixed the poa and pos consensus stopped producing blocks after operat…
Browse files Browse the repository at this point in the history
…ions such as proposal changes (#324)
  • Loading branch information
wangchao0222 authored Jan 28, 2022
1 parent 52b1682 commit dcc1e24
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 24 deletions.
11 changes: 7 additions & 4 deletions bcs/consensus/tdpos/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,11 @@ func (s *tdposSchedule) UpdateProposers(height int64) bool {
return false
}
if !common.AddressEqual(nextProposers, s.validators) {
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers)
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers, "height", height)
s.validators = nextProposers
return true
}
s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "height", height)
return false
}

Expand Down Expand Up @@ -352,7 +353,7 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) {
term, pos, blockPos := s.minerScheduling(block.GetTimestamp())
// 往前回溯的最远距离为internal,即该轮term之前最多生产过多少个区块
internal := pos*s.blockNum + blockPos
begin := block.GetHeight() - internal
begin := block.GetHeight() - internal - 1
if begin <= s.startHeight {
begin = s.startHeight
}
Expand All @@ -361,7 +362,9 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) {
if err != nil {
return nil, err
}
s.log.Debug("tdpos::CalculateProposers::target height.", "height", height, "targetHeight", targetHeight, "term", term)
s.log.Debug("tdpos::CalculateProposers::target height.", "inputHeight", height, "targetHeight", targetHeight,
"begin", begin, "end", block.GetHeight(), "term", term, "pos", pos, "blockPos", blockPos, "internal", internal,
"blockNum", s.blockNum, "block.Timestamp", block.GetTimestamp())
return s.calTopKNominator(targetHeight)
}

Expand All @@ -378,7 +381,7 @@ func (s *tdposSchedule) binarySearch(begin int64, end int64, term int64) (int64,
return -1, err
}
if midTerm < term && nextMidTerm == term {
return mid + 1, nil
return mid, nil
}
if midTerm < term {
begin = mid + 1
Expand Down
6 changes: 3 additions & 3 deletions bcs/consensus/tdpos/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ func TestCalHisValidators(t *testing.T) {
return
}
target, _ = s.binarySearch(int64(1), int64(5), int64(2))
if target != 4 {
if target != 3 {
t.Error("binarySearch cal err2.", "target", target)
return
}
target, _ = s.binarySearch(int64(5), int64(6), int64(3))
if target != 6 {
if target != 5 {
t.Error("binarySearch cal err.", "target", target)
return
}
Expand All @@ -198,7 +198,7 @@ func TestCalHisValidators(t *testing.T) {
return
}
target, _ = s.binarySearch(int64(5), int64(11), int64(5))
if target != 8 {
if target != 7 {
t.Error("binarySearch cal err.", "target", target)
return
}
Expand Down
4 changes: 3 additions & 1 deletion bcs/consensus/tdpos/tdpos.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func (tp *tdposConsensus) CheckMinerMatch(ctx xcontext.XContext, block cctx.Bloc
return false, err
}
if wantProposers[pos] != string(block.GetProposer()) {
tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer", "want", wantProposers[pos], "have", string(block.GetProposer()))
tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer",
"want", wantProposers[pos], "have", string(block.GetProposer()),
"wantProposers", wantProposers, "pos", pos)
return false, ErrInvalidProposer
}

Expand Down
1 change: 1 addition & 0 deletions kernel/consensus/base/driver/chained-bft/saftyrules.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (s *DefaultSaftyRules) CheckProposal(proposal, parent storage.QuorumCertInt

// 检查justify的所有vote签名
justifySigns := parent.GetSignsInfo()
s.Log.Debug("DefaultSaftyRules::CheckProposal", "parent", parent, "justifyValidators", justifyValidators)
validCnt := 0
for _, v := range justifySigns {
if !isInSlice(v.GetAddress(), justifyValidators) {
Expand Down
10 changes: 9 additions & 1 deletion kernel/consensus/base/driver/chained-bft/smr.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,12 @@ func (s *Smr) ProcessProposal(viewNumber int64, proposalID []byte, parentID []by
s.log.Error("smr::ProcessProposal::NewMessage error")
return ErrP2PInternalErr
}

go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts(s.removeLocalValidator(validatesIpInfo)))
s.log.Debug("smr::ProcessProposal::proposal", "localAddress", s.address, "validatesIpInfo", validatesIpInfo,
"ProposalView", proposal.ProposalView, "ProposalId", utils.F(proposal.ProposalId),
"Timestamp", proposal.Timestamp, "JustifyQC", proposal.JustifyQC)

s.localProposal.Store(utils.F(proposalID), proposal.Timestamp)
// 若为单候选人情况,则此处需要特殊处理,矿工需要给自己提前签名
if len(validatesIpInfo) == 1 {
Expand Down Expand Up @@ -518,6 +523,9 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) {
// 此处如果失败,仍会执行下层逻辑,因为是多个节点通知该轮Leader,因此若发不出去仍可继续运行
if leader != "" && netMsg != nil && leader != s.address {
go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts([]string{leader}))
s.log.Debug("smr::handleReceivedProposal::proposal", "localAddress", s.address, "leader", leader,
"ProposalView", newProposalMsg.ProposalView, "ProposalId", utils.F(newProposalMsg.ProposalId),
"Timestamp", newProposalMsg.Timestamp, "JustifyQC", newProposalMsg.JustifyQC)
}
}

Expand Down Expand Up @@ -556,7 +564,7 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) {
// 6.发送一个vote消息给下一个Leader
nextLeader := s.election.GetLeader(s.pacemaker.GetCurrentView() + 1)
if nextLeader == "" {
s.log.Debug("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1)
s.log.Warn("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1)
return
}
s.voteProposal(newProposalMsg.GetProposalId(), newVote, newLedgerInfo, nextLeader)
Expand Down
1 change: 1 addition & 0 deletions kernel/consensus/base/driver/chained-bft/smr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewSMR(node string, log logs.Logger, p2p network.Network, t *testing.T) *Sm
saftyrules := &DefaultSaftyRules{
Crypto: cryptoClient,
QcTree: q,
Log: log,
}
election := &ElectionA{
addrs: []string{NodeA, NodeB, NodeC},
Expand Down
53 changes: 38 additions & 15 deletions kernel/engines/xuperos/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const (
tickOnCalcBlock = time.Second
syncOnstatusChangeTimeout = 1 * time.Minute
syncOnStatusChangeTimeout = 1 * time.Minute

statusFollowing = 0
statusMining = 1
Expand Down Expand Up @@ -65,21 +65,26 @@ func (t *Miner) ProcBlock(ctx xctx.XContext, block *lpb.InternalBlock) error {
return nil
}

// Start
// 启动矿工,周期检查矿工身份
// 同一时间,矿工状态是唯一的。0:休眠中 1:同步区块中 2:打包区块中
// 同一时间,矿工状态是唯一的
// 0:休眠中 1:同步区块中 2:打包区块中
func (t *Miner) Start() {
var err error

// 用于监测退出
t.exitWG.Add(1)
defer t.exitWG.Done()

var err error
// 节点初始状态为同步节点
t.status = statusFollowing

// 开启挖矿前先同步区块
ctx := &xctx.BaseCtx{
XLog: t.log,
Timer: timer.NewXTimer(),
}
t.syncWithNeighbors(ctx)
_ = t.syncWithNeighbors(ctx)

// 启动矿工循环
for !t.IsExit() {
Expand Down Expand Up @@ -124,6 +129,7 @@ func (t *Miner) step() error {
Timer: timer.NewXTimer(),
}

// 账本和状态机最新区块id不一致,需要进行一次同步
if !bytes.Equal(ledgerTipId, stateTipId) {
err := t.ctx.State.Walk(ledgerTipId, false)
if err != nil {
Expand All @@ -136,37 +142,51 @@ func (t *Miner) step() error {
ctx.GetLog().Trace("miner step", "ledgerTipHeight", ledgerTipHeight, "ledgerTipId",
utils.F(ledgerTipId), "stateTipId", utils.F(stateTipId))

// 如果上次角色是非矿工,则尝试同步网络最新区块
// 注意:这里出现错误也要继续执行,防止恶意节点错误出块导致流程无法继续执行
if t.status == statusFollowing {
err := t.syncWithValidators(ctx, syncOnStatusChangeTimeout)
ctx.GetLog().Trace("miner syncWithValidators before CompeteMaster", "originTipHeight", ledgerTipHeight,
"currentLedgerHeight", t.ctx.Ledger.GetMeta().TrunkHeight, "err", err)
trace("syncUpValidators")
}

// 通过共识检查矿工身份
isMiner, isSync, err := t.ctx.Consensus.CompeteMaster(ledgerTipHeight + 1)
trace("competeMaster")
ctx.GetLog().Trace("compete master result", "height", ledgerTipHeight+1, "isMiner", isMiner, "isSync", isSync, "err", err)
if err != nil {
return err
}
// 如需要同步,尝试同步网络最新区块
if isMiner && isSync {
err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout)
if err != nil {
return err
}
}
trace("syncUpValidators")

// 如果是矿工,出块
if isMiner {
if t.status == statusFollowing {
if t.status == statusFollowing || isSync {
ctx.GetLog().Info("miner change follow=>miner",
"miner", t.ctx.Address.Address,
"netAddr", t.ctx.EngCtx.Net.PeerInfo().Id,
"height", t.ctx.Ledger.GetMeta().GetTrunkHeight(),
)

// 在由非矿工向矿工切换的这次"边沿触发",主动向所有的验证集合的最长链进行一次区块同步
err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout)
err = t.syncWithValidators(ctx, syncOnStatusChangeTimeout)
if err != nil {
ctx.GetLog().Error("miner change follow=>miner syncWithValidators failed", "err", err)
return err
}

// 由于同步了最长链,所以这里需要检查链是否增长
// 由于pos和poa类共识依赖账本高度来判断状态,如果链发生变化则表明CompeteMaster的结果需要重新根据当前最新高度计算
if ledgerTipHeight != t.ctx.Ledger.GetMeta().TrunkHeight {
ctx.GetLog().Trace("miner change follow=>miner", "originTipHeight", ledgerTipHeight, "currentLedgerHeight",
t.ctx.Ledger.GetMeta().TrunkHeight, "isMiner", isMiner, "isSync", isSync)
return nil
}
trace("syncUpValidators")
}
t.status = statusMining

// 开始挖矿
err = t.mining(ctx)
if err != nil {
return err
Expand All @@ -192,7 +212,7 @@ func (t *Miner) step() error {
return nil
}

// 挖矿生产区块
// mining 挖矿生产区块
func (t *Miner) mining(ctx xctx.XContext) error {
ctx.GetLog().Debug("mining start.")

Expand All @@ -213,6 +233,7 @@ func (t *Miner) mining(ctx xctx.XContext) error {
}
// 重置高度
height = t.ctx.Ledger.GetMeta().TrunkHeight + 1
ctx.GetLog().Debug("truncateTarget result", "newHeight", height)
}

// 2.打包区块
Expand Down Expand Up @@ -249,6 +270,8 @@ func (t *Miner) mining(ctx xctx.XContext) error {
"blockId", utils.F(block.GetBlockid()))
return err
}

// 5.可插拔共识,根据区块高度确认是否需要切换升级共识实例
err = t.ctx.Consensus.SwitchConsensus(block.Height)
if err != nil {
ctx.GetLog().Warn("SwitchConsensus failed", "bcname", t.ctx.BCName,
Expand Down

0 comments on commit dcc1e24

Please sign in to comment.