From 872e51925efbc4bc1d34197fdc388c1ab78da380 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 5 Dec 2024 11:31:38 +0800 Subject: [PATCH 1/4] Add queue for block proposal, remove consensus ready signal --- api/service/blockproposal/service.go | 2 +- {p2p/stream => common}/types/safe_map.go | 2 +- consensus/consensus.go | 72 ++++--- consensus/consensus_service.go | 2 +- consensus/consensus_v2.go | 8 +- consensus/proposal.go | 134 ++++++++++++++ consensus/proposal_manager.go | 175 ++++++++++++++++++ consensus/proposer.go | 169 +++++++++++------ consensus/view_change.go | 2 +- .../common/requestmanager/interface_test.go | 5 +- .../common/requestmanager/requestmanager.go | 23 +-- .../requestmanager/requestmanager_test.go | 5 +- 12 files changed, 477 insertions(+), 122 deletions(-) rename {p2p/stream => common}/types/safe_map.go (99%) create mode 100644 consensus/proposal.go create mode 100644 consensus/proposal_manager.go diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 8330d6367e..6f123cf192 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -30,7 +30,7 @@ func (s *Service) Start() error { } func (s *Service) run() { - s.c.WaitForConsensusReadyV2(s.stopChan, s.stoppedChan) + s.c.StartCheckingForNewProposals(s.stopChan, s.stoppedChan) } // Stop stops block proposal service. diff --git a/p2p/stream/types/safe_map.go b/common/types/safe_map.go similarity index 99% rename from p2p/stream/types/safe_map.go rename to common/types/safe_map.go index e4a5e559c5..001438da76 100644 --- a/p2p/stream/types/safe_map.go +++ b/common/types/safe_map.go @@ -1,4 +1,4 @@ -package sttypes +package types import ( "sync" diff --git a/consensus/consensus.go b/consensus/consensus.go index f341319941..cd0b2597c1 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -32,25 +32,6 @@ const ( var errLeaderPriKeyNotFound = errors.New("leader private key not found locally") -type Proposal struct { - Type ProposalType - Caller string -} - -// NewProposal creates a new proposal -func NewProposal(t ProposalType) Proposal { - return Proposal{Type: t, Caller: utils.GetCallStackInfo(2)} -} - -// ProposalType is to indicate the type of signal for new block proposal -type ProposalType byte - -// Constant of the type of new block proposal -const ( - SyncProposal ProposalType = iota - AsyncProposal -) - type DownloadAsync interface { DownloadAsync() } @@ -74,6 +55,8 @@ type Consensus struct { prepareBitmap *bls_cosi.Mask commitBitmap *bls_cosi.Mask + proposalManager *ProposalManager + multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators pendingCXReceipts map[utils.CXKey]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus @@ -101,8 +84,6 @@ type Consensus struct { mutex *sync.RWMutex // ViewChange struct vc *viewChange - // Signal channel for proposing a new block and start new consensus - readySignal chan Proposal // Channel to send full commit signatures to finish new block proposal commitSigChannel chan []byte // verified block to state sync broadcast @@ -162,16 +143,27 @@ func (consensus *Consensus) ChainReader() engine.ChainReader { return consensus.Blockchain() } -func (consensus *Consensus) ReadySignal(p Proposal, signalSource string, signalReason string) { +func (consensus *Consensus) AddProposal(t ProposalType, source string, reason string) error { + bn := consensus.Blockchain().CurrentBlock().NumberU64() + v := consensus.GetViewChangingID() + p := NewProposal(t, v, bn, source, reason) + consensus.proposalManager.AddProposal(p) + return nil +} + +func (consensus *Consensus) ReadySignal(t ProposalType, signalSource string, signalReason string) { + if err := consensus.AddProposal(t, signalSource, signalReason); err != nil { + utils.Logger().Debug().Err(err). + Str("signalSource", signalSource). + Str("signalReason", signalReason). + Msg("ReadySignal is failed to add a new proposal") + return + } utils.Logger().Info(). + Str("ProposalType", t.String()). Str("signalSource", signalSource). Str("signalReason", signalReason). - Msg("ReadySignal is called to propose new block") - consensus.readySignal <- p -} - -func (consensus *Consensus) GetReadySignal() chan Proposal { - return consensus.readySignal + Msg("ReadySignal is called to propose a new block") } func (consensus *Consensus) GetCommitSigChannel() chan []byte { @@ -287,17 +279,18 @@ func New( Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { consensus := Consensus{ - mutex: &sync.RWMutex{}, - ShardID: shard, - fBFTLog: NewFBFTLog(), - phase: FBFTAnnounce, - current: NewState(Normal), - decider: Decider, - registry: registry, - MinPeers: minPeers, - AggregateSig: aggregateSig, - host: host, - msgSender: NewMessageSender(host), + mutex: &sync.RWMutex{}, + ShardID: shard, + fBFTLog: NewFBFTLog(), + phase: FBFTAnnounce, + current: NewState(Normal), + decider: Decider, + registry: registry, + proposalManager: NewProposalManager(), + MinPeers: minPeers, + AggregateSig: aggregateSig, + host: host, + msgSender: NewMessageSender(host), // FBFT timeout consensusTimeout: createTimeout(), dHelper: downloadAsync{}, @@ -318,7 +311,6 @@ func New( // displayed on explorer as Height right now consensus.setCurBlockViewID(0) consensus.SlashChan = make(chan slash.Record) - consensus.readySignal = make(chan Proposal) consensus.commitSigChannel = make(chan []byte) // channel for receiving newly generated VDF consensus.RndChannel = make(chan [vdfAndSeedSize]byte) diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 42e295573e..b25bf0f378 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -498,7 +498,7 @@ func (consensus *Consensus) updateConsensusInformation(reason string) Mode { consensus.GetLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). Msg("[UpdateConsensusInformation] I am the New Leader") - consensus.ReadySignal(NewProposal(SyncProposal), "updateConsensusInformation", "leader changed and I am the new leader") + consensus.ReadySignal(SyncProposal, "updateConsensusInformation", "leader changed and I am the new leader") }() } return Normal diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 3567a324e0..47470738d9 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -278,7 +278,7 @@ func (consensus *Consensus) finalCommit(isLeader bool) { // No pipelining go func() { consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal") - consensus.ReadySignal(NewProposal(SyncProposal), "finalCommit", "I am leader and it's the last block in epoch") + consensus.ReadySignal(SyncProposal, "finalCommit", "I am leader and it's the last block in epoch") }() } else { // pipelining @@ -354,7 +354,7 @@ func (consensus *Consensus) StartChannel() { consensus.start = true consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") consensus.mutex.Unlock() - consensus.ReadySignal(NewProposal(SyncProposal), "StartChannel", "consensus channel is started") + consensus.ReadySignal(SyncProposal, "StartChannel", "consensus channel is started") return } consensus.mutex.Unlock() @@ -606,7 +606,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { // Send signal to Node to propose the new block for consensus consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal") consensus.mutex.Unlock() - consensus.ReadySignal(NewProposal(AsyncProposal), "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish") + consensus.ReadySignal(AsyncProposal, "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish") }() return nil @@ -848,7 +848,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg blockPeriod := consensus.BlockPeriod go func() { <-time.After(blockPeriod) - consensus.ReadySignal(NewProposal(SyncProposal), "setupForNewConsensus", "I am the new leader") + consensus.ReadySignal(SyncProposal, "setupForNewConsensus", "I am the new leader") }() } } diff --git a/consensus/proposal.go b/consensus/proposal.go new file mode 100644 index 0000000000..1a3e6b31f9 --- /dev/null +++ b/consensus/proposal.go @@ -0,0 +1,134 @@ +package consensus + +import ( + "sync" + "time" + + "github.com/harmony-one/harmony/internal/utils" +) + +// ProposalType is to indicate the type of signal for new block proposal +type ProposalType byte + +// Constant of the type of new block proposal +const ( + SyncProposal ProposalType = iota + AsyncProposal +) + +func (pt ProposalType) String() string { + if pt == SyncProposal { + return "SyncProposal" + } + return "AsyncProposal" +} + +// Proposal represents a new block proposal with associated metadata +type Proposal struct { + Type ProposalType + Caller string + Height uint64 + ViewID uint64 + Source string + Reason string + CreatedAt time.Time + lock *sync.RWMutex +} + +// NewProposal creates a new proposal +func NewProposal(t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal { + return &Proposal{ + Type: t, + Caller: utils.GetCallStackInfo(2), + ViewID: 0, + Height: 0, + Source: source, + Reason: reason, + CreatedAt: time.Now(), + lock: &sync.RWMutex{}, + } +} + +// Clone returns a copy of proposal +func (p *Proposal) Clone() *Proposal { + p.lock.RLock() + defer p.lock.RUnlock() + return &Proposal{ + Type: p.Type, + Caller: p.Caller, + ViewID: p.ViewID, + Height: p.Height, + CreatedAt: p.CreatedAt, + lock: &sync.RWMutex{}, + } +} + +// GetType retrieves the Proposal type +func (p *Proposal) GetType() ProposalType { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Type +} + +// SetType updates the Proposal type +func (p *Proposal) SetType(t ProposalType) { + p.lock.Lock() + defer p.lock.Unlock() + p.Type = t +} + +// GetCaller retrieves the Proposal caller +func (p *Proposal) GetCaller() string { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Caller +} + +// SetCaller updates the Proposal caller +func (p *Proposal) SetCaller(caller string) { + p.lock.Lock() + defer p.lock.Unlock() + p.Caller = caller +} + +// GetHeight retrieves the Proposal height +func (p *Proposal) GetHeight() uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.Height +} + +// SetHeight updates the Proposal height +func (p *Proposal) SetHeight(height uint64) { + p.lock.Lock() + defer p.lock.Unlock() + p.Height = height +} + +// GetViewID retrieves the Proposal view ID +func (p *Proposal) GetViewID() uint64 { + p.lock.RLock() + defer p.lock.RUnlock() + return p.ViewID +} + +// SetViewID updates the Proposal view ID +func (p *Proposal) SetViewID(viewID uint64) { + p.lock.Lock() + defer p.lock.Unlock() + p.ViewID = viewID +} + +// GetCreatedAt retrieves the Proposal creation time +func (p *Proposal) GetCreatedAt() time.Time { + p.lock.RLock() + defer p.lock.RUnlock() + return p.CreatedAt +} + +// SetCreatedAt updates the Proposal creation time +func (p *Proposal) SetCreatedAt(createdAt time.Time) { + p.lock.Lock() + defer p.lock.Unlock() + p.CreatedAt = createdAt +} diff --git a/consensus/proposal_manager.go b/consensus/proposal_manager.go new file mode 100644 index 0000000000..b9f954d91a --- /dev/null +++ b/consensus/proposal_manager.go @@ -0,0 +1,175 @@ +package consensus + +import ( + "sync" + + types "github.com/harmony-one/harmony/common/types" +) + +type ProposalCreationStatus int + +const ( + // Ready indicates the consensus is prepared to create a new proposal. + // No ongoing proposal or dependencies are blocking the proposal process. + Ready ProposalCreationStatus = iota + + // WaitingForCommitSigs signifies the consensus is currently waiting for commit signatures + // from the previous block or process. This state can persist for an extended duration, + // typically up to 8 seconds, depending on proposal type and processing time. + WaitingForCommitSigs + + // CreatingNewProposal indicates the consensus is already engaged in creating a new proposal. + // During this state, no additional proposals can be initiated until the current one completes. + CreatingNewPropsal +) + +func (pt ProposalCreationStatus) String() string { + switch pt { + case Ready: + return "Ready" + case WaitingForCommitSigs: + return "WaitingForCommitSigs" + case CreatingNewPropsal: + return "CreatingNewProposal" + default: + return "Unknown" + } +} + +type ProposalManager struct { + history *types.SafeMap[ProposalType, *Proposal] + lastHeight uint64 + status ProposalCreationStatus + lock *sync.RWMutex +} + +// NewProposalManager initializes a new ProposalManager. +func NewProposalManager() *ProposalManager { + return &ProposalManager{ + history: types.NewSafeMap[ProposalType, *Proposal](), + lastHeight: 0, + status: Ready, + lock: &sync.RWMutex{}, + } +} + +// SetLastHeight updates the last processed proposal height. +func (pm *ProposalManager) SetLastHeight(h uint64) { + pm.lock.Lock() + defer pm.lock.Unlock() + if h > pm.lastHeight { + pm.lastHeight = h + } +} + +// GetLastHeight retrieves the last processed proposal height. +func (pm *ProposalManager) GetLastHeight() uint64 { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.lastHeight +} + +// SetStatus sets new proposal creation status. +func (pm *ProposalManager) SetStatus(newStatus ProposalCreationStatus) { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = newStatus +} + +// StartWaitingForCommitSigs sets isWaitingForCommitSigs. +func (pm *ProposalManager) SetToWaitingForCommitSigsMode() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = WaitingForCommitSigs +} + +// IsWaitingForCommitSigs returns true if current proposal is waiting for commit sigs. +func (pm *ProposalManager) IsWaitingForCommitSigs() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == WaitingForCommitSigs +} + +// StartWaitingForCommitSigs sets isWaitingForCommitSigs. +func (pm *ProposalManager) SetToCreatingNewProposalMode() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = CreatingNewPropsal +} + +// IsCreatingNewProposal returns true if consensus is busy with proposal creation. +func (pm *ProposalManager) IsCreatingNewProposal() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == CreatingNewPropsal +} + +// Done sets status to ready. +func (pm *ProposalManager) Done() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = Ready +} + +// IsWaitingForCommitSigs returns true if current proposal is waiting for commit sigs. +func (pm *ProposalManager) IsReady() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == Ready +} + +// AddProposal adds a new proposal if valid or updates an existing one if better. +func (pm *ProposalManager) AddProposal(p *Proposal) bool { + pm.lock.Lock() + defer pm.lock.Unlock() + existingProposal, exists := pm.history.Get(p.Type) + if exists { + if p.Height > existingProposal.Height || (p.Height == existingProposal.Height && p.ViewID > existingProposal.ViewID) { + pm.history.Set(p.Type, p) + return true + } + return false + } + pm.history.Set(p.Type, p) + return true +} + +// GetNextProposal retrieves and removes the next proposal based on priority. +func (pm *ProposalManager) GetNextProposal() (*Proposal, error) { + pm.lock.Lock() + defer pm.lock.Unlock() + + syncProposal, syncExist := pm.history.Get(SyncProposal) + asyncProposal, asyncExist := pm.history.Get(AsyncProposal) + + var nextProposal *Proposal + if syncExist { + nextProposal = syncProposal.Clone() + pm.history.Delete(SyncProposal) + } else if asyncExist { + nextProposal = asyncProposal.Clone() + pm.history.Delete(AsyncProposal) + } + + if nextProposal == nil { + // no proposals available + return nil, nil + } + + pm.lastHeight = nextProposal.Height + return nextProposal, nil +} + +// ClearHistory clears all proposals from the history. +func (pm *ProposalManager) ClearHistory() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.history.Clear() +} + +// Length returns the number of proposals in the history. +func (pm *ProposalManager) Length() int { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.history.Length() +} diff --git a/consensus/proposer.go b/consensus/proposer.go index 6f60887ecf..7026abff2e 100644 --- a/consensus/proposer.go +++ b/consensus/proposer.go @@ -16,9 +16,9 @@ func NewProposer(consensus *Consensus) *Proposer { return &Proposer{consensus} } -// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. +// StartCheckingForNewProposals checks the proposal queue and generate new block for consensus. // only leader will receive the ready signal -func (p *Proposer) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan chan struct{}) { +func (p *Proposer) StartCheckingForNewProposals(stopChan chan struct{}, stoppedChan chan struct{}) { consensus := p.consensus go func() { // Setup stoppedChan @@ -39,65 +39,116 @@ func (p *Proposer) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan c utils.Logger().Warn(). Msg("Consensus new block proposal: STOPPED!") return - case proposal := <-consensus.GetReadySignal(): - for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { - time.Sleep(SleepPeriod) - utils.Logger().Info(). - Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). - Bool("asyncProposal", proposal.Type == AsyncProposal). - Str("called", proposal.Caller). - Msg("PROPOSING NEW BLOCK ------------------------------------------------") - - // Prepare last commit signatures - newCommitSigsChan := make(chan []byte) - - go func() { - waitTime := 0 * time.Second - if proposal.Type == AsyncProposal { - waitTime = worker.CommitSigReceiverTimeout - } - select { - case <-time.After(waitTime): - if waitTime == 0 { - utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") - } else { - utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") - } - sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) - - if err != nil { - utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") - } else { - newCommitSigsChan <- sigs - } - case commitSigs := <-consensus.GetCommitSigChannel(): - utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") - if len(commitSigs) > bls.BLSSignatureSizeInBytes { - newCommitSigsChan <- commitSigs - } - } - }() - newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) - if err == nil { - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Uint64("epoch", newBlock.Epoch().Uint64()). - Uint64("viewID", newBlock.Header().ViewID().Uint64()). - Int("numTxs", newBlock.Transactions().Len()). - Int("numStakingTxs", newBlock.StakingTransactions().Len()). - Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). - Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) - - // Send the new block to Consensus so it can be confirmed. - consensus.BlockChannel(newBlock) - break - } else { - utils.Logger().Err(err).Int("retryCount", retryCount). - Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") - continue - } + //case proposal := <-consensus.GetReadySignal(): + + case <-time.NewTicker(100 * time.Millisecond).C: + if !consensus.proposalManager.IsReady() { + continue + } + numProposalsInQueue := consensus.proposalManager.Length() + if numProposalsInQueue == 0 { + continue + } + // Send signal every 100ms + proposal, errNewProposal := consensus.proposalManager.GetNextProposal() + if errNewProposal != nil { + utils.Logger().Debug().Err(errNewProposal).Msg("[ProposeNewBlock] Cannot get next proposal") + } + if proposal == nil { + continue + } + if err := p.CreateProposal(proposal); err != nil { + utils.Logger().Warn().Err(err). + Str("Caller", proposal.Caller). + Uint64("Height", proposal.Height). + Uint64("ViewID", proposal.ViewID). + Msg("[ProposeNewBlock] proposal creation failed") + } + if !consensus.proposalManager.IsWaitingForCommitSigs() { + consensus.proposalManager.Done() } + } } }() } + +func (p *Proposer) CreateProposal(proposal *Proposal) error { + + consensus := p.consensus + if consensus == nil { + utils.Logger().Warn().Msg("[CreateProposal] trying to create a new block proposal while consensus is not initialized yet") + return nil + } + + // set proposal manager status status + consensus.proposalManager.SetToCreatingNewProposalMode() + + for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { + time.Sleep(SleepPeriod) + utils.Logger().Info(). + Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). + Bool("asyncProposal", proposal.Type == AsyncProposal). + Str("called", proposal.Caller). + Msg("PROPOSING NEW BLOCK ------------------------------------------------") + + // Prepare last commit signatures + newCommitSigsChan := make(chan []byte) + + go func() { + waitTime := 0 * time.Second + if proposal.Type == AsyncProposal { + waitTime = worker.CommitSigReceiverTimeout + } + if waitTime > 0 { + consensus.proposalManager.SetToWaitingForCommitSigsMode() + } + select { + case <-time.After(waitTime): + if waitTime == 0 { + utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") + } else { + utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") + } + sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) + + if err != nil { + utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") + } else { + newCommitSigsChan <- sigs + } + if waitTime > 0 { + consensus.proposalManager.Done() + } + case commitSigs := <-consensus.GetCommitSigChannel(): + utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") + if len(commitSigs) > bls.BLSSignatureSizeInBytes { + newCommitSigsChan <- commitSigs + if waitTime > 0 { + consensus.proposalManager.Done() + } + } + } + }() + newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) + if err == nil { + utils.Logger().Info(). + Uint64("blockNum", newBlock.NumberU64()). + Uint64("epoch", newBlock.Epoch().Uint64()). + Uint64("viewID", newBlock.Header().ViewID().Uint64()). + Int("numTxs", newBlock.Transactions().Len()). + Int("numStakingTxs", newBlock.StakingTransactions().Len()). + Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). + Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) + + // Send the new block to Consensus so it can be confirmed. + consensus.BlockChannel(newBlock) + break + } else { + utils.Logger().Err(err).Int("retryCount", retryCount). + Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") + continue + } + } + return nil +} diff --git a/consensus/view_change.go b/consensus/view_change.go index 6fb6def705..51e7912bd8 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -440,7 +440,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { consensus.getLogger().Error().Err(err).Msg("[onViewChange] startNewView failed") return } - go consensus.ReadySignal(NewProposal(SyncProposal), "onViewChange", "quorum is achieved by mask and is view change mode and M1 payload is empty") + go consensus.ReadySignal(SyncProposal, "onViewChange", "quorum is achieved by mask and is view change mode and M1 payload is empty") return } diff --git a/p2p/stream/common/requestmanager/interface_test.go b/p2p/stream/common/requestmanager/interface_test.go index 6fc087103e..1449198c0a 100644 --- a/p2p/stream/common/requestmanager/interface_test.go +++ b/p2p/stream/common/requestmanager/interface_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rlp" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" ) @@ -138,8 +139,8 @@ func makeDummyTestStreams(indexes []int) []sttypes.Stream { return sts } -func makeDummyStreamSets(indexes []int) *sttypes.SafeMap[sttypes.StreamID, *stream] { - m := sttypes.NewSafeMap[sttypes.StreamID, *stream]() +func makeDummyStreamSets(indexes []int) *types.SafeMap[sttypes.StreamID, *stream] { + m := types.NewSafeMap[sttypes.StreamID, *stream]() for _, index := range indexes { st := &testStream{ diff --git a/p2p/stream/common/requestmanager/requestmanager.go b/p2p/stream/common/requestmanager/requestmanager.go index 923668a152..e128244fcb 100644 --- a/p2p/stream/common/requestmanager/requestmanager.go +++ b/p2p/stream/common/requestmanager/requestmanager.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "github.com/ethereum/go-ethereum/event" + types "github.com/harmony-one/harmony/common/types" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p/stream/common/streammanager" sttypes "github.com/harmony-one/harmony/p2p/stream/types" @@ -20,10 +21,10 @@ import ( // TODO: each peer is able to have a queue of requests instead of one request at a time. // TODO: add QoS evaluation for each stream type requestManager struct { - streams *sttypes.SafeMap[sttypes.StreamID, *stream] // All streams - available *sttypes.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request - pendings *sttypes.SafeMap[uint64, *request] // requests that are sent but not received response - waitings requestQueues // double linked list of requests that are on the waiting list + streams *types.SafeMap[sttypes.StreamID, *stream] // All streams + available *types.SafeMap[sttypes.StreamID, struct{}] // Streams that are available for request + pendings *types.SafeMap[uint64, *request] // requests that are sent but not received response + waitings requestQueues // double linked list of requests that are on the waiting list // Stream events sm streammanager.Reader @@ -56,9 +57,9 @@ func newRequestManager(sm streammanager.ReaderSubscriber) *requestManager { logger := utils.Logger().With().Str("module", "request manager").Logger() return &requestManager{ - streams: sttypes.NewSafeMap[sttypes.StreamID, *stream](), - available: sttypes.NewSafeMap[sttypes.StreamID, struct{}](), - pendings: sttypes.NewSafeMap[uint64, *request](), + streams: types.NewSafeMap[sttypes.StreamID, *stream](), + available: types.NewSafeMap[sttypes.StreamID, struct{}](), + pendings: types.NewSafeMap[uint64, *request](), waitings: newRequestQueues(), sm: sm, @@ -355,7 +356,7 @@ func (rm *requestManager) refreshStreams() { } } -func checkStreamUpdates(exists *sttypes.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { +func checkStreamUpdates(exists *types.SafeMap[sttypes.StreamID, *stream], targets []sttypes.Stream) (added []sttypes.Stream, removed []*stream) { targetM := make(map[sttypes.StreamID]sttypes.Stream) for _, target := range targets { @@ -401,9 +402,9 @@ func (rm *requestManager) close() { rm.pendings.Iterate(func(key uint64, req *request) { req.doneWithResponse(responseData{err: ErrClosed}) }) - rm.streams = sttypes.NewSafeMap[sttypes.StreamID, *stream]() - rm.available = sttypes.NewSafeMap[sttypes.StreamID, struct{}]() - rm.pendings = sttypes.NewSafeMap[uint64, *request]() + rm.streams = types.NewSafeMap[sttypes.StreamID, *stream]() + rm.available = types.NewSafeMap[sttypes.StreamID, struct{}]() + rm.pendings = types.NewSafeMap[uint64, *request]() rm.waitings = newRequestQueues() close(rm.stopC) } diff --git a/p2p/stream/common/requestmanager/requestmanager_test.go b/p2p/stream/common/requestmanager/requestmanager_test.go index 64ad4458d7..ac6c2333dd 100644 --- a/p2p/stream/common/requestmanager/requestmanager_test.go +++ b/p2p/stream/common/requestmanager/requestmanager_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + types "github.com/harmony-one/harmony/common/types" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/pkg/errors" ) @@ -368,7 +369,7 @@ func TestRequestManager_Concurrency(t *testing.T) { func TestGenReqID(t *testing.T) { retry := 100000 rm := &requestManager{ - pendings: sttypes.NewSafeMap[uint64, *request](), + pendings: types.NewSafeMap[uint64, *request](), } for i := 0; i != retry; i++ { @@ -382,7 +383,7 @@ func TestGenReqID(t *testing.T) { func TestCheckStreamUpdates(t *testing.T) { tests := []struct { - exists *sttypes.SafeMap[sttypes.StreamID, *stream] + exists *types.SafeMap[sttypes.StreamID, *stream] targets []sttypes.Stream expAddedIndexes []int expRemovedIndexes []int From 675b0e9f9906b97df400448a40d88e8b7658e284 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Thu, 5 Dec 2024 22:21:56 +0800 Subject: [PATCH 2/4] improve block proposer, fix spell error --- consensus/proposal_manager.go | 43 +++++++++++++++++++---------------- consensus/proposer.go | 10 ++++---- internal/params/config.go | 4 ++-- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/consensus/proposal_manager.go b/consensus/proposal_manager.go index b9f954d91a..71a352d1dc 100644 --- a/consensus/proposal_manager.go +++ b/consensus/proposal_manager.go @@ -20,7 +20,7 @@ const ( // CreatingNewProposal indicates the consensus is already engaged in creating a new proposal. // During this state, no additional proposals can be initiated until the current one completes. - CreatingNewPropsal + CreatingNewProposal ) func (pt ProposalCreationStatus) String() string { @@ -29,7 +29,7 @@ func (pt ProposalCreationStatus) String() string { return "Ready" case WaitingForCommitSigs: return "WaitingForCommitSigs" - case CreatingNewPropsal: + case CreatingNewProposal: return "CreatingNewProposal" default: return "Unknown" @@ -37,36 +37,39 @@ func (pt ProposalCreationStatus) String() string { } type ProposalManager struct { - history *types.SafeMap[ProposalType, *Proposal] - lastHeight uint64 - status ProposalCreationStatus - lock *sync.RWMutex + history *types.SafeMap[ProposalType, *Proposal] + lasProposal *Proposal + status ProposalCreationStatus + lock *sync.RWMutex } // NewProposalManager initializes a new ProposalManager. func NewProposalManager() *ProposalManager { return &ProposalManager{ - history: types.NewSafeMap[ProposalType, *Proposal](), - lastHeight: 0, - status: Ready, - lock: &sync.RWMutex{}, + history: types.NewSafeMap[ProposalType, *Proposal](), + lasProposal: nil, + status: Ready, + lock: &sync.RWMutex{}, } } -// SetLastHeight updates the last processed proposal height. -func (pm *ProposalManager) SetLastHeight(h uint64) { +// SetlasProposal updates the last processed proposal height. +func (pm *ProposalManager) SetlasProposal(p *Proposal) { + if p == nil { + return + } pm.lock.Lock() defer pm.lock.Unlock() - if h > pm.lastHeight { - pm.lastHeight = h + if pm.lasProposal == nil || p.Height > pm.lasProposal.Height { + pm.lasProposal = p } } -// GetLastHeight retrieves the last processed proposal height. -func (pm *ProposalManager) GetLastHeight() uint64 { +// GetlasProposal retrieves the last processed proposal height. +func (pm *ProposalManager) GetlasProposal() *Proposal { pm.lock.RLock() defer pm.lock.RUnlock() - return pm.lastHeight + return pm.lasProposal } // SetStatus sets new proposal creation status. @@ -94,14 +97,14 @@ func (pm *ProposalManager) IsWaitingForCommitSigs() bool { func (pm *ProposalManager) SetToCreatingNewProposalMode() { pm.lock.Lock() defer pm.lock.Unlock() - pm.status = CreatingNewPropsal + pm.status = CreatingNewProposal } // IsCreatingNewProposal returns true if consensus is busy with proposal creation. func (pm *ProposalManager) IsCreatingNewProposal() bool { pm.lock.RLock() defer pm.lock.RUnlock() - return pm.status == CreatingNewPropsal + return pm.status == CreatingNewProposal } // Done sets status to ready. @@ -156,7 +159,7 @@ func (pm *ProposalManager) GetNextProposal() (*Proposal, error) { return nil, nil } - pm.lastHeight = nextProposal.Height + pm.lasProposal = nextProposal return nextProposal, nil } diff --git a/consensus/proposer.go b/consensus/proposer.go index 7026abff2e..6183c80928 100644 --- a/consensus/proposer.go +++ b/consensus/proposer.go @@ -49,7 +49,6 @@ func (p *Proposer) StartCheckingForNewProposals(stopChan chan struct{}, stoppedC if numProposalsInQueue == 0 { continue } - // Send signal every 100ms proposal, errNewProposal := consensus.proposalManager.GetNextProposal() if errNewProposal != nil { utils.Logger().Debug().Err(errNewProposal).Msg("[ProposeNewBlock] Cannot get next proposal") @@ -64,10 +63,6 @@ func (p *Proposer) StartCheckingForNewProposals(stopChan chan struct{}, stoppedC Uint64("ViewID", proposal.ViewID). Msg("[ProposeNewBlock] proposal creation failed") } - if !consensus.proposalManager.IsWaitingForCommitSigs() { - consensus.proposalManager.Done() - } - } } }() @@ -83,6 +78,11 @@ func (p *Proposer) CreateProposal(proposal *Proposal) error { // set proposal manager status status consensus.proposalManager.SetToCreatingNewProposalMode() + defer func() { + if !consensus.proposalManager.IsWaitingForCommitSigs() { + consensus.proposalManager.Done() + } + }() for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { time.Sleep(SleepPeriod) diff --git a/internal/params/config.go b/internal/params/config.go index 3e774c113f..3cb8b71382 100644 --- a/internal/params/config.go +++ b/internal/params/config.go @@ -312,8 +312,8 @@ var ( SlotsLimitedEpoch: EpochTBD, // epoch to enable HIP-16 CrossShardXferPrecompileEpoch: big.NewInt(1), AllowlistEpoch: EpochTBD, - LeaderRotationInternalValidatorsEpoch: big.NewInt(5), - LeaderRotationExternalValidatorsEpoch: big.NewInt(6), + LeaderRotationInternalValidatorsEpoch: big.NewInt(3), + LeaderRotationExternalValidatorsEpoch: big.NewInt(3), LeaderRotationV2Epoch: EpochTBD, FeeCollectEpoch: big.NewInt(2), ValidatorCodeFixEpoch: big.NewInt(2), From 36c149dd604ec2fdeadcb2b312238264023a95bc Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Fri, 6 Dec 2024 14:01:18 +0800 Subject: [PATCH 3/4] add check for consensus proposal manager before precommit --- consensus/consensus_v2.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 47470738d9..3f8bb57f6a 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -547,6 +547,10 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { return errors.New("block to pre-commit is nil") } + if !consensus.proposalManager.IsReady() { + return nil + } + leaderPriKey, err := consensus.getConsensusLeaderPrivateKey() if err != nil { consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found") From 8a8b23602860e2d810b61ece3c174158630223c5 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Mon, 9 Dec 2024 11:13:23 +0800 Subject: [PATCH 4/4] improve consensus proposal mamanger --- consensus/consensus.go | 6 +-- consensus/proposal.go | 39 +++++++------- consensus/proposal_manager.go | 95 +++++++++++++++++++++++++++-------- 3 files changed, 100 insertions(+), 40 deletions(-) diff --git a/consensus/consensus.go b/consensus/consensus.go index cd0b2597c1..8b82b91d73 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -143,16 +143,16 @@ func (consensus *Consensus) ChainReader() engine.ChainReader { return consensus.Blockchain() } -func (consensus *Consensus) AddProposal(t ProposalType, source string, reason string) error { +func (consensus *Consensus) AddProposal(leaderPubKey *bls_cosi.PublicKeyWrapper, t ProposalType, source string, reason string) error { bn := consensus.Blockchain().CurrentBlock().NumberU64() v := consensus.GetViewChangingID() - p := NewProposal(t, v, bn, source, reason) + p := NewProposal(leaderPubKey ,t, v, bn, source, reason) consensus.proposalManager.AddProposal(p) return nil } func (consensus *Consensus) ReadySignal(t ProposalType, signalSource string, signalReason string) { - if err := consensus.AddProposal(t, signalSource, signalReason); err != nil { + if err := consensus.AddProposal(consensus.getLeaderPubKey(), t, signalSource, signalReason); err != nil { utils.Logger().Debug().Err(err). Str("signalSource", signalSource). Str("signalReason", signalReason). diff --git a/consensus/proposal.go b/consensus/proposal.go index 1a3e6b31f9..ea5507df99 100644 --- a/consensus/proposal.go +++ b/consensus/proposal.go @@ -4,6 +4,7 @@ import ( "sync" "time" + bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/utils" ) @@ -25,27 +26,29 @@ func (pt ProposalType) String() string { // Proposal represents a new block proposal with associated metadata type Proposal struct { - Type ProposalType - Caller string - Height uint64 - ViewID uint64 - Source string - Reason string - CreatedAt time.Time - lock *sync.RWMutex + leaderPubKey *bls_cosi.PublicKeyWrapper + Type ProposalType + Caller string + Height uint64 + ViewID uint64 + Source string + Reason string + CreatedAt time.Time + lock *sync.RWMutex } // NewProposal creates a new proposal -func NewProposal(t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal { +func NewProposal(leaderPubKey *bls_cosi.PublicKeyWrapper, t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal { return &Proposal{ - Type: t, - Caller: utils.GetCallStackInfo(2), - ViewID: 0, - Height: 0, - Source: source, - Reason: reason, - CreatedAt: time.Now(), - lock: &sync.RWMutex{}, + leaderPubKey: leaderPubKey, + Type: t, + Caller: utils.GetCallStackInfo(2), + ViewID: 0, + Height: 0, + Source: source, + Reason: reason, + CreatedAt: time.Now(), + lock: &sync.RWMutex{}, } } @@ -58,6 +61,8 @@ func (p *Proposal) Clone() *Proposal { Caller: p.Caller, ViewID: p.ViewID, Height: p.Height, + Source: p.Source, + Reason: p.Reason, CreatedAt: p.CreatedAt, lock: &sync.RWMutex{}, } diff --git a/consensus/proposal_manager.go b/consensus/proposal_manager.go index 71a352d1dc..35e231f96b 100644 --- a/consensus/proposal_manager.go +++ b/consensus/proposal_manager.go @@ -1,7 +1,9 @@ package consensus import ( + "context" "sync" + "time" types "github.com/harmony-one/harmony/common/types" ) @@ -21,6 +23,9 @@ const ( // CreatingNewProposal indicates the consensus is already engaged in creating a new proposal. // During this state, no additional proposals can be initiated until the current one completes. CreatingNewProposal + + // Consensus is busy with doing other process like Updating Information and so on + ConsensusBusy ) func (pt ProposalCreationStatus) String() string { @@ -37,39 +42,39 @@ func (pt ProposalCreationStatus) String() string { } type ProposalManager struct { - history *types.SafeMap[ProposalType, *Proposal] - lasProposal *Proposal - status ProposalCreationStatus - lock *sync.RWMutex + history *types.SafeMap[ProposalType, *Proposal] + currentProcessingProposal *Proposal + status ProposalCreationStatus + lock *sync.RWMutex } // NewProposalManager initializes a new ProposalManager. func NewProposalManager() *ProposalManager { return &ProposalManager{ - history: types.NewSafeMap[ProposalType, *Proposal](), - lasProposal: nil, - status: Ready, - lock: &sync.RWMutex{}, + history: types.NewSafeMap[ProposalType, *Proposal](), + currentProcessingProposal: nil, + status: Ready, + lock: &sync.RWMutex{}, } } -// SetlasProposal updates the last processed proposal height. -func (pm *ProposalManager) SetlasProposal(p *Proposal) { +// SetCurrentProcessingProposal updates the last processed proposal height. +func (pm *ProposalManager) SetCurrentProcessingProposal(p *Proposal) { if p == nil { return } pm.lock.Lock() defer pm.lock.Unlock() - if pm.lasProposal == nil || p.Height > pm.lasProposal.Height { - pm.lasProposal = p + if pm.currentProcessingProposal == nil || p.Height > pm.currentProcessingProposal.Height { + pm.currentProcessingProposal = p } } -// GetlasProposal retrieves the last processed proposal height. -func (pm *ProposalManager) GetlasProposal() *Proposal { +// GetCurrentProcessingProposal retrieves the last processed proposal height. +func (pm *ProposalManager) GetCurrentProcessingProposal() *Proposal { pm.lock.RLock() defer pm.lock.RUnlock() - return pm.lasProposal + return pm.currentProcessingProposal } // SetStatus sets new proposal creation status. @@ -79,6 +84,13 @@ func (pm *ProposalManager) SetStatus(newStatus ProposalCreationStatus) { pm.status = newStatus } +// GetStatus returns proposal manager status. +func (pm *ProposalManager) GetStatus() ProposalCreationStatus { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status +} + // StartWaitingForCommitSigs sets isWaitingForCommitSigs. func (pm *ProposalManager) SetToWaitingForCommitSigsMode() { pm.lock.Lock() @@ -107,10 +119,25 @@ func (pm *ProposalManager) IsCreatingNewProposal() bool { return pm.status == CreatingNewProposal } +// SetToConsensusBusyMode sets the status to ConsensusBusy. +func (pm *ProposalManager) SetToConsensusBusyMode() { + pm.lock.Lock() + defer pm.lock.Unlock() + pm.status = ConsensusBusy +} + +// IsConsensusBusy returns true if consensus is busy with other processes than proposal creation +func (pm *ProposalManager) IsConsensusBusy() bool { + pm.lock.RLock() + defer pm.lock.RUnlock() + return pm.status == ConsensusBusy +} + // Done sets status to ready. func (pm *ProposalManager) Done() { pm.lock.Lock() defer pm.lock.Unlock() + pm.currentProcessingProposal = nil pm.status = Ready } @@ -127,6 +154,12 @@ func (pm *ProposalManager) AddProposal(p *Proposal) bool { defer pm.lock.Unlock() existingProposal, exists := pm.history.Get(p.Type) if exists { + if p.leaderPubKey.Object.IsEqual(existingProposal.leaderPubKey.Object) { + return false + } + if pm.currentProcessingProposal != nil && p.leaderPubKey.Object.IsEqual(pm.currentProcessingProposal.leaderPubKey.Object) { + return false + } if p.Height > existingProposal.Height || (p.Height == existingProposal.Height && p.ViewID > existingProposal.ViewID) { pm.history.Set(p.Type, p) return true @@ -146,12 +179,12 @@ func (pm *ProposalManager) GetNextProposal() (*Proposal, error) { asyncProposal, asyncExist := pm.history.Get(AsyncProposal) var nextProposal *Proposal - if syncExist { - nextProposal = syncProposal.Clone() - pm.history.Delete(SyncProposal) - } else if asyncExist { + if asyncExist { nextProposal = asyncProposal.Clone() pm.history.Delete(AsyncProposal) + } else if syncExist { + nextProposal = syncProposal.Clone() + pm.history.Delete(SyncProposal) } if nextProposal == nil { @@ -159,7 +192,7 @@ func (pm *ProposalManager) GetNextProposal() (*Proposal, error) { return nil, nil } - pm.lasProposal = nextProposal + pm.currentProcessingProposal = nextProposal return nextProposal, nil } @@ -176,3 +209,25 @@ func (pm *ProposalManager) Length() int { defer pm.lock.RUnlock() return pm.history.Length() } + +// WaitToBeReady waits until the ProposalManager is ready for a new proposal. +// It blocks until the current status is Ready or the provided context is canceled. +func (pm *ProposalManager) WaitToBeReady(ctx context.Context) error { + for { + pm.lock.RLock() + currentStatus := pm.status + pm.lock.RUnlock() + + if currentStatus == Ready { + return nil + } + + select { + case <-ctx.Done(): + return ctx.Err() // Return error if the context is canceled or times out + default: + // Allow other goroutines to execute + time.Sleep(10 * time.Millisecond) + } + } +}