Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Consensus Block Proposal, Fix Multi-BLS Key Leader Triggering Multiple Block Proposals by Introducing Proposal Queue #4810

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/service/blockproposal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *Service) Start() error {
}

func (s *Service) run() {
s.c.WaitForConsensusReadyV2(s.stopChan, s.stoppedChan)
s.c.StartCheckingForNewProposals(s.stopChan, s.stoppedChan)
}

// Stop stops block proposal service.
Expand Down
2 changes: 1 addition & 1 deletion p2p/stream/types/safe_map.go → common/types/safe_map.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttypes
package types

import (
"sync"
Expand Down
72 changes: 32 additions & 40 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,6 @@ const (

var errLeaderPriKeyNotFound = errors.New("leader private key not found locally")

type Proposal struct {
Type ProposalType
Caller string
}

// NewProposal creates a new proposal
func NewProposal(t ProposalType) Proposal {
return Proposal{Type: t, Caller: utils.GetCallStackInfo(2)}
}

// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte

// Constant of the type of new block proposal
const (
SyncProposal ProposalType = iota
AsyncProposal
)

type DownloadAsync interface {
DownloadAsync()
}
Expand All @@ -74,6 +55,8 @@ type Consensus struct {
prepareBitmap *bls_cosi.Mask
commitBitmap *bls_cosi.Mask

proposalManager *ProposalManager

multiSigBitmap *bls_cosi.Mask // Bitmap for parsing multisig bitmap from validators

pendingCXReceipts map[utils.CXKey]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
Expand Down Expand Up @@ -101,8 +84,6 @@ type Consensus struct {
mutex *sync.RWMutex
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
readySignal chan Proposal
// Channel to send full commit signatures to finish new block proposal
commitSigChannel chan []byte
// verified block to state sync broadcast
Expand Down Expand Up @@ -162,16 +143,27 @@ func (consensus *Consensus) ChainReader() engine.ChainReader {
return consensus.Blockchain()
}

func (consensus *Consensus) ReadySignal(p Proposal, signalSource string, signalReason string) {
func (consensus *Consensus) AddProposal(leaderPubKey *bls_cosi.PublicKeyWrapper, t ProposalType, source string, reason string) error {
bn := consensus.Blockchain().CurrentBlock().NumberU64()
v := consensus.GetViewChangingID()
p := NewProposal(leaderPubKey ,t, v, bn, source, reason)
consensus.proposalManager.AddProposal(p)
return nil
}

func (consensus *Consensus) ReadySignal(t ProposalType, signalSource string, signalReason string) {
if err := consensus.AddProposal(consensus.getLeaderPubKey(), t, signalSource, signalReason); err != nil {
utils.Logger().Debug().Err(err).
Str("signalSource", signalSource).
Str("signalReason", signalReason).
Msg("ReadySignal is failed to add a new proposal")
return
}
utils.Logger().Info().
Str("ProposalType", t.String()).
Str("signalSource", signalSource).
Str("signalReason", signalReason).
Msg("ReadySignal is called to propose new block")
consensus.readySignal <- p
}

func (consensus *Consensus) GetReadySignal() chan Proposal {
return consensus.readySignal
Msg("ReadySignal is called to propose a new block")
}

func (consensus *Consensus) GetCommitSigChannel() chan []byte {
Expand Down Expand Up @@ -287,17 +279,18 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: NewState(Normal),
decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: NewState(Normal),
decider: Decider,
registry: registry,
proposalManager: NewProposalManager(),
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
// FBFT timeout
consensusTimeout: createTimeout(),
dHelper: downloadAsync{},
Expand All @@ -318,7 +311,6 @@ func New(
// displayed on explorer as Height right now
consensus.setCurBlockViewID(0)
consensus.SlashChan = make(chan slash.Record)
consensus.readySignal = make(chan Proposal)
consensus.commitSigChannel = make(chan []byte)
// channel for receiving newly generated VDF
consensus.RndChannel = make(chan [vdfAndSeedSize]byte)
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (consensus *Consensus) updateConsensusInformation(reason string) Mode {
consensus.GetLogger().Info().
Str("myKey", myPubKeys.SerializeToHexStr()).
Msg("[UpdateConsensusInformation] I am the New Leader")
consensus.ReadySignal(NewProposal(SyncProposal), "updateConsensusInformation", "leader changed and I am the new leader")
consensus.ReadySignal(SyncProposal, "updateConsensusInformation", "leader changed and I am the new leader")
}()
}
return Normal
Expand Down
12 changes: 8 additions & 4 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (consensus *Consensus) finalCommit(isLeader bool) {
// No pipelining
go func() {
consensus.getLogger().Info().Msg("[finalCommit] sending block proposal signal")
consensus.ReadySignal(NewProposal(SyncProposal), "finalCommit", "I am leader and it's the last block in epoch")
consensus.ReadySignal(SyncProposal, "finalCommit", "I am leader and it's the last block in epoch")
}()
} else {
// pipelining
Expand Down Expand Up @@ -354,7 +354,7 @@ func (consensus *Consensus) StartChannel() {
consensus.start = true
consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal")
consensus.mutex.Unlock()
consensus.ReadySignal(NewProposal(SyncProposal), "StartChannel", "consensus channel is started")
consensus.ReadySignal(SyncProposal, "StartChannel", "consensus channel is started")
return
}
consensus.mutex.Unlock()
Expand Down Expand Up @@ -547,6 +547,10 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
return errors.New("block to pre-commit is nil")
}

if !consensus.proposalManager.IsReady() {
return nil
}

leaderPriKey, err := consensus.getConsensusLeaderPrivateKey()
if err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found")
Expand Down Expand Up @@ -606,7 +610,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
// Send signal to Node to propose the new block for consensus
consensus.getLogger().Info().Msg("[preCommitAndPropose] sending block proposal signal")
consensus.mutex.Unlock()
consensus.ReadySignal(NewProposal(AsyncProposal), "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish")
consensus.ReadySignal(AsyncProposal, "preCommitAndPropose", "proposing new block which will wait on the full commit signatures to finish")
}()

return nil
Expand Down Expand Up @@ -848,7 +852,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
blockPeriod := consensus.BlockPeriod
go func() {
<-time.After(blockPeriod)
consensus.ReadySignal(NewProposal(SyncProposal), "setupForNewConsensus", "I am the new leader")
consensus.ReadySignal(SyncProposal, "setupForNewConsensus", "I am the new leader")
}()
}
}
Expand Down
139 changes: 139 additions & 0 deletions consensus/proposal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package consensus

import (
"sync"
"time"

bls_cosi "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/utils"
)

// ProposalType is to indicate the type of signal for new block proposal
type ProposalType byte

// Constant of the type of new block proposal
const (
SyncProposal ProposalType = iota
AsyncProposal
)

func (pt ProposalType) String() string {
if pt == SyncProposal {
return "SyncProposal"
}
return "AsyncProposal"
}

// Proposal represents a new block proposal with associated metadata
type Proposal struct {
leaderPubKey *bls_cosi.PublicKeyWrapper
Type ProposalType
Caller string
Height uint64
ViewID uint64
Source string
Reason string
CreatedAt time.Time
lock *sync.RWMutex
}

// NewProposal creates a new proposal
func NewProposal(leaderPubKey *bls_cosi.PublicKeyWrapper, t ProposalType, viewID uint64, height uint64, source string, reason string) *Proposal {
return &Proposal{
leaderPubKey: leaderPubKey,
Type: t,
Caller: utils.GetCallStackInfo(2),
ViewID: 0,
Height: 0,
Source: source,
Reason: reason,
CreatedAt: time.Now(),
lock: &sync.RWMutex{},
}
}

// Clone returns a copy of proposal
func (p *Proposal) Clone() *Proposal {
p.lock.RLock()
defer p.lock.RUnlock()
return &Proposal{
Type: p.Type,
Caller: p.Caller,
ViewID: p.ViewID,
Height: p.Height,
Source: p.Source,
Reason: p.Reason,
CreatedAt: p.CreatedAt,
lock: &sync.RWMutex{},
}
}

// GetType retrieves the Proposal type
func (p *Proposal) GetType() ProposalType {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Type
}

// SetType updates the Proposal type
func (p *Proposal) SetType(t ProposalType) {
p.lock.Lock()
defer p.lock.Unlock()
p.Type = t
}

// GetCaller retrieves the Proposal caller
func (p *Proposal) GetCaller() string {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Caller
}

// SetCaller updates the Proposal caller
func (p *Proposal) SetCaller(caller string) {
p.lock.Lock()
defer p.lock.Unlock()
p.Caller = caller
}

// GetHeight retrieves the Proposal height
func (p *Proposal) GetHeight() uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.Height
}

// SetHeight updates the Proposal height
func (p *Proposal) SetHeight(height uint64) {
p.lock.Lock()
defer p.lock.Unlock()
p.Height = height
}

// GetViewID retrieves the Proposal view ID
func (p *Proposal) GetViewID() uint64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.ViewID
}

// SetViewID updates the Proposal view ID
func (p *Proposal) SetViewID(viewID uint64) {
p.lock.Lock()
defer p.lock.Unlock()
p.ViewID = viewID
}

// GetCreatedAt retrieves the Proposal creation time
func (p *Proposal) GetCreatedAt() time.Time {
p.lock.RLock()
defer p.lock.RUnlock()
return p.CreatedAt
}

// SetCreatedAt updates the Proposal creation time
func (p *Proposal) SetCreatedAt(createdAt time.Time) {
p.lock.Lock()
defer p.lock.Unlock()
p.CreatedAt = createdAt
}
Loading