diff --git a/bcs/consensus/tdpos/schedule.go b/bcs/consensus/tdpos/schedule.go index c99a2827..7794f848 100644 --- a/bcs/consensus/tdpos/schedule.go +++ b/bcs/consensus/tdpos/schedule.go @@ -210,10 +210,11 @@ func (s *tdposSchedule) UpdateProposers(height int64) bool { return false } if !common.AddressEqual(nextProposers, s.validators) { - s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers) + s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "proposers", nextProposers, "height", height) s.validators = nextProposers return true } + s.log.Debug("tdpos::UpdateProposers", "origin", s.validators, "height", height) return false } @@ -352,7 +353,7 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) { term, pos, blockPos := s.minerScheduling(block.GetTimestamp()) // 往前回溯的最远距离为internal,即该轮term之前最多生产过多少个区块 internal := pos*s.blockNum + blockPos - begin := block.GetHeight() - internal + begin := block.GetHeight() - internal - 1 if begin <= s.startHeight { begin = s.startHeight } @@ -361,7 +362,9 @@ func (s *tdposSchedule) calHisValidators(height int64) ([]string, error) { if err != nil { return nil, err } - s.log.Debug("tdpos::CalculateProposers::target height.", "height", height, "targetHeight", targetHeight, "term", term) + s.log.Debug("tdpos::CalculateProposers::target height.", "inputHeight", height, "targetHeight", targetHeight, + "begin", begin, "end", block.GetHeight(), "term", term, "pos", pos, "blockPos", blockPos, "internal", internal, + "blockNum", s.blockNum, "block.Timestamp", block.GetTimestamp()) return s.calTopKNominator(targetHeight) } @@ -378,7 +381,7 @@ func (s *tdposSchedule) binarySearch(begin int64, end int64, term int64) (int64, return -1, err } if midTerm < term && nextMidTerm == term { - return mid + 1, nil + return mid, nil } if midTerm < term { begin = mid + 1 diff --git a/bcs/consensus/tdpos/schedule_test.go b/bcs/consensus/tdpos/schedule_test.go index 4f84f6d0..c7d8abb0 100644 --- a/bcs/consensus/tdpos/schedule_test.go +++ b/bcs/consensus/tdpos/schedule_test.go @@ -178,12 +178,12 @@ func TestCalHisValidators(t *testing.T) { return } target, _ = s.binarySearch(int64(1), int64(5), int64(2)) - if target != 4 { + if target != 3 { t.Error("binarySearch cal err2.", "target", target) return } target, _ = s.binarySearch(int64(5), int64(6), int64(3)) - if target != 6 { + if target != 5 { t.Error("binarySearch cal err.", "target", target) return } @@ -198,7 +198,7 @@ func TestCalHisValidators(t *testing.T) { return } target, _ = s.binarySearch(int64(5), int64(11), int64(5)) - if target != 8 { + if target != 7 { t.Error("binarySearch cal err.", "target", target) return } diff --git a/bcs/consensus/tdpos/tdpos.go b/bcs/consensus/tdpos/tdpos.go index 4dbed043..003161dc 100644 --- a/bcs/consensus/tdpos/tdpos.go +++ b/bcs/consensus/tdpos/tdpos.go @@ -177,7 +177,9 @@ func (tp *tdposConsensus) CheckMinerMatch(ctx xcontext.XContext, block cctx.Bloc return false, err } if wantProposers[pos] != string(block.GetProposer()) { - tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer", "want", wantProposers[pos], "have", string(block.GetProposer())) + tp.log.Error("consensus:tdpos:CheckMinerMatch: invalid proposer", + "want", wantProposers[pos], "have", string(block.GetProposer()), + "wantProposers", wantProposers, "pos", pos) return false, ErrInvalidProposer } diff --git a/kernel/consensus/base/driver/chained-bft/saftyrules.go b/kernel/consensus/base/driver/chained-bft/saftyrules.go index 80d2ac0a..ec86b9f4 100644 --- a/kernel/consensus/base/driver/chained-bft/saftyrules.go +++ b/kernel/consensus/base/driver/chained-bft/saftyrules.go @@ -145,6 +145,7 @@ func (s *DefaultSaftyRules) CheckProposal(proposal, parent storage.QuorumCertInt // 检查justify的所有vote签名 justifySigns := parent.GetSignsInfo() + s.Log.Debug("DefaultSaftyRules::CheckProposal", "parent", parent, "justifyValidators", justifyValidators) validCnt := 0 for _, v := range justifySigns { if !isInSlice(v.GetAddress(), justifyValidators) { diff --git a/kernel/consensus/base/driver/chained-bft/smr.go b/kernel/consensus/base/driver/chained-bft/smr.go index 9ce170e8..29538915 100644 --- a/kernel/consensus/base/driver/chained-bft/smr.go +++ b/kernel/consensus/base/driver/chained-bft/smr.go @@ -379,7 +379,12 @@ func (s *Smr) ProcessProposal(viewNumber int64, proposalID []byte, parentID []by s.log.Error("smr::ProcessProposal::NewMessage error") return ErrP2PInternalErr } + go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts(s.removeLocalValidator(validatesIpInfo))) + s.log.Debug("smr::ProcessProposal::proposal", "localAddress", s.address, "validatesIpInfo", validatesIpInfo, + "ProposalView", proposal.ProposalView, "ProposalId", utils.F(proposal.ProposalId), + "Timestamp", proposal.Timestamp, "JustifyQC", proposal.JustifyQC) + s.localProposal.Store(utils.F(proposalID), proposal.Timestamp) // 若为单候选人情况,则此处需要特殊处理,矿工需要给自己提前签名 if len(validatesIpInfo) == 1 { @@ -518,6 +523,9 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) { // 此处如果失败,仍会执行下层逻辑,因为是多个节点通知该轮Leader,因此若发不出去仍可继续运行 if leader != "" && netMsg != nil && leader != s.address { go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts([]string{leader})) + s.log.Debug("smr::handleReceivedProposal::proposal", "localAddress", s.address, "leader", leader, + "ProposalView", newProposalMsg.ProposalView, "ProposalId", utils.F(newProposalMsg.ProposalId), + "Timestamp", newProposalMsg.Timestamp, "JustifyQC", newProposalMsg.JustifyQC) } } @@ -556,7 +564,7 @@ func (s *Smr) handleReceivedProposal(msg *xuperp2p.XuperMessage) { // 6.发送一个vote消息给下一个Leader nextLeader := s.election.GetLeader(s.pacemaker.GetCurrentView() + 1) if nextLeader == "" { - s.log.Debug("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1) + s.log.Warn("smr::handleReceivedProposal::empty next leader", "next round", s.pacemaker.GetCurrentView()+1) return } s.voteProposal(newProposalMsg.GetProposalId(), newVote, newLedgerInfo, nextLeader) diff --git a/kernel/consensus/base/driver/chained-bft/smr_test.go b/kernel/consensus/base/driver/chained-bft/smr_test.go index 94296f38..64c6eca2 100644 --- a/kernel/consensus/base/driver/chained-bft/smr_test.go +++ b/kernel/consensus/base/driver/chained-bft/smr_test.go @@ -108,6 +108,7 @@ func NewSMR(node string, log logs.Logger, p2p network.Network, t *testing.T) *Sm saftyrules := &DefaultSaftyRules{ Crypto: cryptoClient, QcTree: q, + Log: log, } election := &ElectionA{ addrs: []string{NodeA, NodeB, NodeC}, diff --git a/kernel/engines/xuperos/miner/miner.go b/kernel/engines/xuperos/miner/miner.go index 4fbcfbbf..991f3f11 100644 --- a/kernel/engines/xuperos/miner/miner.go +++ b/kernel/engines/xuperos/miner/miner.go @@ -24,7 +24,7 @@ import ( const ( tickOnCalcBlock = time.Second - syncOnstatusChangeTimeout = 1 * time.Minute + syncOnStatusChangeTimeout = 1 * time.Minute statusFollowing = 0 statusMining = 1 @@ -65,21 +65,26 @@ func (t *Miner) ProcBlock(ctx xctx.XContext, block *lpb.InternalBlock) error { return nil } +// Start // 启动矿工,周期检查矿工身份 -// 同一时间,矿工状态是唯一的。0:休眠中 1:同步区块中 2:打包区块中 +// 同一时间,矿工状态是唯一的 +// 0:休眠中 1:同步区块中 2:打包区块中 func (t *Miner) Start() { + var err error + // 用于监测退出 t.exitWG.Add(1) defer t.exitWG.Done() - var err error + // 节点初始状态为同步节点 t.status = statusFollowing + // 开启挖矿前先同步区块 ctx := &xctx.BaseCtx{ XLog: t.log, Timer: timer.NewXTimer(), } - t.syncWithNeighbors(ctx) + _ = t.syncWithNeighbors(ctx) // 启动矿工循环 for !t.IsExit() { @@ -124,6 +129,7 @@ func (t *Miner) step() error { Timer: timer.NewXTimer(), } + // 账本和状态机最新区块id不一致,需要进行一次同步 if !bytes.Equal(ledgerTipId, stateTipId) { err := t.ctx.State.Walk(ledgerTipId, false) if err != nil { @@ -136,6 +142,15 @@ func (t *Miner) step() error { ctx.GetLog().Trace("miner step", "ledgerTipHeight", ledgerTipHeight, "ledgerTipId", utils.F(ledgerTipId), "stateTipId", utils.F(stateTipId)) + // 如果上次角色是非矿工,则尝试同步网络最新区块 + // 注意:这里出现错误也要继续执行,防止恶意节点错误出块导致流程无法继续执行 + if t.status == statusFollowing { + err := t.syncWithValidators(ctx, syncOnStatusChangeTimeout) + ctx.GetLog().Trace("miner syncWithValidators before CompeteMaster", "originTipHeight", ledgerTipHeight, + "currentLedgerHeight", t.ctx.Ledger.GetMeta().TrunkHeight, "err", err) + trace("syncUpValidators") + } + // 通过共识检查矿工身份 isMiner, isSync, err := t.ctx.Consensus.CompeteMaster(ledgerTipHeight + 1) trace("competeMaster") @@ -143,30 +158,35 @@ func (t *Miner) step() error { if err != nil { return err } - // 如需要同步,尝试同步网络最新区块 - if isMiner && isSync { - err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout) - if err != nil { - return err - } - } - trace("syncUpValidators") // 如果是矿工,出块 if isMiner { - if t.status == statusFollowing { + if t.status == statusFollowing || isSync { ctx.GetLog().Info("miner change follow=>miner", "miner", t.ctx.Address.Address, "netAddr", t.ctx.EngCtx.Net.PeerInfo().Id, "height", t.ctx.Ledger.GetMeta().GetTrunkHeight(), ) + // 在由非矿工向矿工切换的这次"边沿触发",主动向所有的验证集合的最长链进行一次区块同步 - err = t.syncWithValidators(ctx, syncOnstatusChangeTimeout) + err = t.syncWithValidators(ctx, syncOnStatusChangeTimeout) if err != nil { + ctx.GetLog().Error("miner change follow=>miner syncWithValidators failed", "err", err) return err } + + // 由于同步了最长链,所以这里需要检查链是否增长 + // 由于pos和poa类共识依赖账本高度来判断状态,如果链发生变化则表明CompeteMaster的结果需要重新根据当前最新高度计算 + if ledgerTipHeight != t.ctx.Ledger.GetMeta().TrunkHeight { + ctx.GetLog().Trace("miner change follow=>miner", "originTipHeight", ledgerTipHeight, "currentLedgerHeight", + t.ctx.Ledger.GetMeta().TrunkHeight, "isMiner", isMiner, "isSync", isSync) + return nil + } + trace("syncUpValidators") } t.status = statusMining + + // 开始挖矿 err = t.mining(ctx) if err != nil { return err @@ -192,7 +212,7 @@ func (t *Miner) step() error { return nil } -// 挖矿生产区块 +// mining 挖矿生产区块 func (t *Miner) mining(ctx xctx.XContext) error { ctx.GetLog().Debug("mining start.") @@ -213,6 +233,7 @@ func (t *Miner) mining(ctx xctx.XContext) error { } // 重置高度 height = t.ctx.Ledger.GetMeta().TrunkHeight + 1 + ctx.GetLog().Debug("truncateTarget result", "newHeight", height) } // 2.打包区块 @@ -249,6 +270,8 @@ func (t *Miner) mining(ctx xctx.XContext) error { "blockId", utils.F(block.GetBlockid())) return err } + + // 5.可插拔共识,根据区块高度确认是否需要切换升级共识实例 err = t.ctx.Consensus.SwitchConsensus(block.Height) if err != nil { ctx.GetLog().Warn("SwitchConsensus failed", "bcname", t.ctx.BCName,