From be46cb3e9bbbc32b39d3fce81b63ec539fc95fdc Mon Sep 17 00:00:00 2001 From: Anna Shaleva Date: Fri, 12 Jul 2024 19:49:45 +0300 Subject: [PATCH] *: support extra dBFT stage Ref. #112. Signed-off-by: Anna Shaleva --- check.go | 64 +++++++++++++++ commit_ack.go | 8 ++ config.go | 54 +++++++++++++ consensus_message.go | 2 + consensus_message_type.go | 3 + context.go | 71 ++++++++++++++++- dbft.go | 102 ++++++++++++++++++++++-- dbft_test.go | 3 +- helpers_test.go | 1 + internal/consensus/consensus_message.go | 1 + pre_block.go | 27 +++++++ send.go | 43 +++++++++- 12 files changed, 369 insertions(+), 10 deletions(-) create mode 100644 commit_ack.go create mode 100644 pre_block.go diff --git a/check.go b/check.go index 4b4b9e15..128ddd21 100644 --- a/check.go +++ b/check.go @@ -57,6 +57,70 @@ func (d *DBFT[H]) checkCommit() { return } + // TODO: it should be considered: add PreCommit message instead of CommitAck and + // send this PreCommit *before* Commit. This will allow to create a completely + // custom message for keys exchange whereas the final Commit will keep only final + // signature bytes, as with usual dBFT. + if d.isAntiMEVExtensionEnabled() { + d.preBlock = d.CreatePreBlock() + hash := d.preBlock.Hash() + + d.Logger.Info("processing PreBlock", + zap.Uint32("height", d.BlockIndex), + zap.Stringer("preBlock hash", hash), + zap.Int("tx_count", len(d.preBlock.Transactions()))) + + d.preBlockProcessed = true + d.ProcessPreBlock(d.preBlock) + + if d.CommitSent() { // TODO: Do we really need to require Commit sent by *self* or M other's Commits is enough to sent CommitAck? It depends on the keys sharing logic + d.sendCommitAck() + d.changeTimer(d.SecondsPerBlock) + d.checkCommitAck() + } else { + d.Logger.Debug("can't send commitAck since self commit not yet sent") + } + return + } + + d.lastBlockIndex = d.BlockIndex + d.lastBlockTime = d.Timer.Now() + d.block = d.CreateBlock() + hash := d.block.Hash() + + d.Logger.Info("approving block", + zap.Uint32("height", d.BlockIndex), + zap.Stringer("hash", hash), + zap.Int("tx_count", len(d.block.Transactions())), + zap.Stringer("merkle", d.block.MerkleRoot()), + zap.Stringer("prev", d.block.PrevHash())) + + d.blockProcessed = true + d.ProcessBlock(d.block) + + // Do not initialize consensus process immediately. It's the caller's duty to + // start the new block acceptance process and call Reset at the + // new height. +} + +func (d *DBFT[H]) checkCommitAck() { + if !d.hasAllTransactions() { + d.Logger.Debug("check commit: some transactions are missing", zap.Any("hashes", d.MissingTransactions)) + return + } + + count := 0 + for _, msg := range d.CommitAckPayloads { + if msg != nil && msg.ViewNumber() == d.ViewNumber { + count++ + } + } + + if count < d.M() { + d.Logger.Debug("not enough to commit", zap.Int("count", count)) + return + } + d.lastBlockIndex = d.BlockIndex d.lastBlockTime = d.Timer.Now() d.block = d.CreateBlock() diff --git a/commit_ack.go b/commit_ack.go new file mode 100644 index 00000000..dbc5b1cd --- /dev/null +++ b/commit_ack.go @@ -0,0 +1,8 @@ +package dbft + +// CommitAck is an interface for dBFT CommitAck message. +type CommitAck interface { + // Data returns commitAck's data that should be used for the final + // block construction. + Data() []byte +} diff --git a/config.go b/config.go index c34ac35b..49683897 100644 --- a/config.go +++ b/config.go @@ -20,9 +20,14 @@ type Config[H Hash] struct { // if current time is less than that of previous context. // By default use millisecond precision. TimestampIncrement uint64 + // AntiMEVExtensionEnablingHeight denotes the height starting from which dBFT + // Anti-MEV extensions should be enabled. -1 means no extension is enabled. + AntiMEVExtensionEnablingHeight int64 // GetKeyPair returns an index of the node in the list of validators // together with it's key pair. GetKeyPair func([]PublicKey) (int, PrivateKey, PublicKey) + // NewPreBlockFromContext should allocate, fill from Context and return new block.PreBlock. + NewPreBlockFromContext func(ctx *Context[H]) PreBlock[H] // NewBlockFromContext should allocate, fill from Context and return new block.Block. NewBlockFromContext func(ctx *Context[H]) Block[H] // RequestTx is a callback which is called when transaction contained @@ -40,6 +45,8 @@ type Config[H Hash] struct { VerifyBlock func(b Block[H]) bool // Broadcast should broadcast payload m to the consensus nodes. Broadcast func(m ConsensusPayload[H]) + // ProcessBlock is called every time new preBlock is accepted. + ProcessPreBlock func(b PreBlock[H]) // ProcessBlock is called every time new block is accepted. ProcessBlock func(b Block[H]) // GetBlock should return block with hash. @@ -65,6 +72,8 @@ type Config[H Hash] struct { NewChangeView func(newViewNumber byte, reason ChangeViewReason, timestamp uint64) ChangeView // NewCommit is a constructor for payload.Commit. NewCommit func(signature []byte) Commit + // NewCommitAck is a constructor for payload.CommitAck. + NewCommitAck func(signature []byte) CommitAck // NewRecoveryRequest is a constructor for payload.RecoveryRequest. NewRecoveryRequest func(ts uint64) RecoveryRequest // NewRecoveryMessage is a constructor for payload.RecoveryMessage. @@ -73,6 +82,7 @@ type Config[H Hash] struct { VerifyPrepareRequest func(p ConsensusPayload[H]) error // VerifyPrepareResponse performs external PrepareResponse verification and returns nil if it's successful. VerifyPrepareResponse func(p ConsensusPayload[H]) error + // TODO: may be VerifyCommit callback should be added for case when AntiMEV extensions are enabled. } const defaultSecondsPerBlock = time.Second * 15 @@ -101,6 +111,8 @@ func defaultConfig[H Hash]() *Config[H] { VerifyPrepareRequest: func(ConsensusPayload[H]) error { return nil }, VerifyPrepareResponse: func(ConsensusPayload[H]) error { return nil }, + + AntiMEVExtensionEnablingHeight: -1, } } @@ -131,6 +143,20 @@ func checkConfig[H Hash](cfg *Config[H]) error { return errors.New("NewRecoveryRequest is nil") } else if cfg.NewRecoveryMessage == nil { return errors.New("NewRecoveryMessage is nil") + } else if cfg.AntiMEVExtensionEnablingHeight >= 0 { + if cfg.NewPreBlockFromContext == nil { + return errors.New("NewPreBlockFromContext is nil") + } else if cfg.ProcessPreBlock == nil { + return errors.New("ProcessPreBlock is nil") + } else if cfg.NewCommitAck == nil { + return errors.New("NewCommitAck is nil") + } + } else if cfg.NewPreBlockFromContext != nil { + return errors.New("NewPreBlockFromContext is set, but AntiMEVExtensionEnablingHeight is not specified") + } else if cfg.ProcessPreBlock != nil { + return errors.New("ProcessPreBlock is set, but AntiMEVExtensionEnablingHeight is not specified") + } else if cfg.NewCommitAck != nil { + return errors.New("NewCommitAck is set, but AntiMEVExtensionEnablingHeight is not specified") } return nil @@ -164,6 +190,13 @@ func WithSecondsPerBlock[H Hash](d time.Duration) func(config *Config[H]) { } } +// WithAntiMEVExtensionEnablingHeight sets AntiMEVExtensionEnablingHeight. +func WithAntiMEVExtensionEnablingHeight[H Hash](h int64) func(config *Config[H]) { + return func(cfg *Config[H]) { + cfg.AntiMEVExtensionEnablingHeight = h + } +} + // WithTimestampIncrement sets TimestampIncrement. func WithTimestampIncrement[H Hash](u uint64) func(config *Config[H]) { return func(cfg *Config[H]) { @@ -171,6 +204,13 @@ func WithTimestampIncrement[H Hash](u uint64) func(config *Config[H]) { } } +// WithNewPreBlockFromContext sets NewPreBlockFromContext. +func WithNewPreBlockFromContext[H Hash](f func(ctx *Context[H]) PreBlock[H]) func(config *Config[H]) { + return func(cfg *Config[H]) { + cfg.NewPreBlockFromContext = f + } +} + // WithNewBlockFromContext sets NewBlockFromContext. func WithNewBlockFromContext[H Hash](f func(ctx *Context[H]) Block[H]) func(config *Config[H]) { return func(cfg *Config[H]) { @@ -227,6 +267,13 @@ func WithProcessBlock[H Hash](f func(b Block[H])) func(config *Config[H]) { } } +// WithProcessPreBlock sets ProcessPreBlock. +func WithProcessPreBlock[H Hash](f func(b PreBlock[H])) func(config *Config[H]) { + return func(cfg *Config[H]) { + cfg.ProcessPreBlock = f + } +} + // WithGetBlock sets GetBlock. func WithGetBlock[H Hash](f func(h H) Block[H]) func(config *Config[H]) { return func(cfg *Config[H]) { @@ -297,6 +344,13 @@ func WithNewCommit[H Hash](f func(signature []byte) Commit) func(config *Config[ } } +// WithNewCommitAck sets NewCommitAck. +func WithNewCommitAck[H Hash](f func(signature []byte) CommitAck) func(config *Config[H]) { + return func(cfg *Config[H]) { + cfg.NewCommitAck = f + } +} + // WithNewRecoveryRequest sets NewRecoveryRequest. func WithNewRecoveryRequest[H Hash](f func(ts uint64) RecoveryRequest) func(config *Config[H]) { return func(cfg *Config[H]) { diff --git a/consensus_message.go b/consensus_message.go index d8859dea..37cf5676 100644 --- a/consensus_message.go +++ b/consensus_message.go @@ -17,6 +17,8 @@ type ConsensusMessage[H Hash] interface { GetPrepareResponse() PrepareResponse[H] // GetCommit returns payload as if it was Commit. GetCommit() Commit + // GetCommitAck returns payload as if it was CommitAck. + GetCommitAck() CommitAck // GetRecoveryRequest returns payload as if it was RecoveryRequest. GetRecoveryRequest() RecoveryRequest // GetRecoveryMessage returns payload as if it was RecoveryMessage. diff --git a/consensus_message_type.go b/consensus_message_type.go index faed09b7..20bd1ff7 100644 --- a/consensus_message_type.go +++ b/consensus_message_type.go @@ -11,6 +11,7 @@ const ( PrepareRequestType MessageType = 0x20 PrepareResponseType MessageType = 0x21 CommitType MessageType = 0x30 + CommitAckType MessageType = 0x31 RecoveryRequestType MessageType = 0x40 RecoveryMessageType MessageType = 0x41 ) @@ -26,6 +27,8 @@ func (m MessageType) String() string { return "PrepareResponse" case CommitType: return "Commit" + case CommitAckType: + return "CommitAck" case RecoveryRequestType: return "RecoveryRequest" case RecoveryMessageType: diff --git a/context.go b/context.go index 4a7ce73d..81308eb3 100644 --- a/context.go +++ b/context.go @@ -23,13 +23,17 @@ type Context[H Hash] struct { // Pub is node's public key. Pub PublicKey - block Block[H] - header Block[H] + preBlock PreBlock[H] + preHeader PreBlock[H] + block Block[H] + header Block[H] // blockProcessed denotes whether Config.ProcessBlock callback was called for the current // height. If so, then no second call must happen. After new block is received by the user, // dBFT stops any new transaction or messages processing as far as timeouts handling till // the next call to Reset. blockProcessed bool + // TODO: add a comment, t has another meaning than blockProcessed. + preBlockProcessed bool // BlockIndex is current block index. BlockIndex uint32 @@ -65,6 +69,15 @@ type Context[H Hash] struct { // current round, so it's possible to verify Commit against it) or stored till // the corresponding PrepareRequest receiving. CommitPayloads []ConsensusPayload[H] + // CommitAckPayloads stores consensus CommitAck payloads sent through all epochs. + // It is assumed that valid CommitAck payloads can only be sent once by a single + // node per the whole set of consensus epochs for particular block. Invalid + // CommitAck payloads are kicked off this list immediately (if Commit + // [TODO: and PrepareRequest? How do we verify CommitAck payloads: based on + // Commit only or based on PrepareRequest?] was received for the current round, + // so it's possible to verify CommitAck against it) or stored till the + // corresponding Commit receiving. + CommitAckPayloads []ConsensusPayload[H] // ChangeViewPayloads stores consensus ChangeView payloads for the current epoch. ChangeViewPayloads []ConsensusPayload[H] // LastChangeViewPayloads stores consensus ChangeView payloads for the last epoch. @@ -149,6 +162,12 @@ func (c *Context[H]) CommitSent() bool { return !c.WatchOnly() && c.CommitPayloads[c.MyIndex] != nil } +// CommitAckSent returns true iff CommitAck message was sent for the current epoch +// assuming that the node can't go further than current epoch after commit was sent. +func (c *Context[H]) CommitAckSent() bool { + return !c.WatchOnly() && c.CommitAckPayloads[c.MyIndex] != nil +} + // BlockSent returns true iff block was formed AND sent for the current height. // Once block is sent, the consensus stops new transactions and messages processing // as far as timeouts handling. @@ -227,6 +246,7 @@ func (c *Context[H]) reset(view byte, ts uint64) { c.ChangeViewPayloads = make([]ConsensusPayload[H], n) if view == 0 { c.CommitPayloads = make([]ConsensusPayload[H], n) + c.CommitAckPayloads = make([]ConsensusPayload[H], n) } c.PreparationPayloads = make([]ConsensusPayload[H], n) @@ -285,11 +305,42 @@ func (c *Context[H]) CreateBlock() Block[H] { } c.block.SetTransactions(txx) + + // TODO: do we really need this? CreateBlock will be called when all decryption data are available, thus we may + // add all necessary information in MakeHeader or in SetTransactions. For now, I'd skip it. + //if c.isAntiMEVExtensionEnabled() { + // c.block.Finalize() + //} } return c.block } +// CreatePreBlock returns PreBlock for the current epoch. +func (c *Context[H]) CreatePreBlock() PreBlock[H] { + if c.preBlock == nil { + if c.preBlock = c.MakePreHeader(); c.preBlock == nil { + return nil + } + + txx := make([]Transaction[H], len(c.TransactionHashes)) + + for i, h := range c.TransactionHashes { + txx[i] = c.Transactions[h] + } + + c.preBlock.SetTransactions(txx) + } + + return c.preBlock +} + +// isAntiMEVExtensionEnabled returns whether Anti-MEV dBFT extension is enabled +// at the currently processing block height. +func (c *Context[H]) isAntiMEVExtensionEnabled() bool { + return c.Config.AntiMEVExtensionEnablingHeight >= 0 && uint32(c.Config.AntiMEVExtensionEnablingHeight) < c.BlockIndex +} + // MakeHeader returns half-filled block for the current epoch. // All hashable fields will be filled. func (c *Context[H]) MakeHeader() Block[H] { @@ -297,12 +348,28 @@ func (c *Context[H]) MakeHeader() Block[H] { if !c.RequestSentOrReceived() { return nil } + if c.isAntiMEVExtensionEnabled() && c.CountCommitted() < c.M() { // TODO: replace with count committed for *my* view? + return nil + } c.header = c.Config.NewBlockFromContext(c) } return c.header } +// MakePreHeader returns half-filled block for the current epoch. +// All hashable fields will be filled. +func (c *Context[H]) MakePreHeader() PreBlock[H] { + if c.preHeader == nil { + if !c.RequestSentOrReceived() { + return nil + } + c.preHeader = c.Config.NewPreBlockFromContext(c) + } + + return c.preHeader +} + // hasAllTransactions returns true iff all transactions were received // for the proposed block. func (c *Context[H]) hasAllTransactions() bool { diff --git a/dbft.go b/dbft.go index db6cf851..0f61d71d 100644 --- a/dbft.go +++ b/dbft.go @@ -255,6 +255,14 @@ func (d *DBFT[H]) OnReceive(msg ConsensusPayload[H]) { d.onPrepareResponse(msg) case CommitType: d.onCommit(msg) + case CommitAckType: + if !d.isAntiMEVExtensionEnabled() { + d.Logger.Error(fmt.Sprintf("%s message received but AntiMEVExtension is disabled", CommitAckType), + zap.Uint16("from", msg.ValidatorIndex()), + ) + return + } + d.onCommitAck(msg) case RecoveryRequestType: d.onRecoveryRequest(msg) case RecoveryMessageType: @@ -384,13 +392,37 @@ func (d *DBFT[H]) updateExistingPayloads(msg ConsensusPayload[H]) { } } + antiMEVEnabled := d.isAntiMEVExtensionEnabled() for i, m := range d.CommitPayloads { if m != nil && m.ViewNumber() == d.ViewNumber { - if header := d.MakeHeader(); header != nil { - pub := d.Validators[m.ValidatorIndex()] - if header.Verify(pub, m.GetCommit().Signature()) != nil { - d.CommitPayloads[i] = nil - d.Logger.Warn("can't validate commit signature") + if antiMEVEnabled { + if preHeader := d.MakePreHeader(); preHeader != nil { + pub := d.Validators[m.ValidatorIndex()] + if preHeader.Verify(pub, m.GetCommit().Signature()) != nil { + d.CommitPayloads[i] = nil + d.Logger.Warn("can't validate commit signature") + } + } else { + if header := d.MakeHeader(); header != nil { + pub := d.Validators[m.ValidatorIndex()] + if header.Verify(pub, m.GetCommit().Signature()) != nil { + d.CommitPayloads[i] = nil + d.Logger.Warn("can't validate commit signature") + } + } + } + } + } + } + if antiMEVEnabled { + for i, m := range d.CommitAckPayloads { + if m != nil && m.ViewNumber() == d.ViewNumber { + if header := d.MakeHeader(); header != nil { + pub := d.Validators[m.ValidatorIndex()] + if header.Verify(pub, m.GetCommitAck().Data()) != nil { + d.CommitAckPayloads[i] = nil + d.Logger.Warn("can't validate commitAck signature") + } } } } @@ -497,6 +529,23 @@ func (d *DBFT[H]) onCommit(msg ConsensusPayload[H]) { if d.ViewNumber == msg.ViewNumber() { d.Logger.Info("received Commit", zap.Uint("validator", uint(msg.ValidatorIndex()))) d.extendTimer(4) + if d.isAntiMEVExtensionEnabled() { + preHeader := d.MakePreHeader() + if preHeader == nil { + d.CommitPayloads[msg.ValidatorIndex()] = msg + } else { + pub := d.Validators[msg.ValidatorIndex()] + if preHeader.Verify(pub, msg.GetCommit().Signature()) == nil { + d.CommitPayloads[msg.ValidatorIndex()] = msg + d.checkCommit() + } else { + d.Logger.Warn("invalid commit signature", + zap.Uint("validator", uint(msg.ValidatorIndex())), + ) + } + } + return + } header := d.MakeHeader() if header == nil { d.CommitPayloads[msg.ValidatorIndex()] = msg @@ -522,6 +571,49 @@ func (d *DBFT[H]) onCommit(msg ConsensusPayload[H]) { d.CommitPayloads[msg.ValidatorIndex()] = msg } +func (d *DBFT[H]) onCommitAck(msg ConsensusPayload[H]) { + existing := d.CommitAckPayloads[msg.ValidatorIndex()] + if existing != nil { + if existing.Hash() != msg.Hash() { + d.Logger.Warn("rejecting commitAck due to existing", + zap.Uint("validator", uint(msg.ValidatorIndex())), + zap.Uint("existing view", uint(existing.ViewNumber())), + zap.Uint("view", uint(msg.ViewNumber())), + zap.Stringer("existing hash", existing.Hash()), + zap.Stringer("hash", msg.Hash()), + ) + } + return + } + if d.ViewNumber == msg.ViewNumber() { + d.Logger.Info("received CommitAck", zap.Uint("validator", uint(msg.ValidatorIndex()))) + d.extendTimer(4) + header := d.MakeHeader() + if header == nil { + d.CommitAckPayloads[msg.ValidatorIndex()] = msg + } else { + pub := d.Validators[msg.ValidatorIndex()] + if err := header.Verify(pub, msg.GetCommitAck().Data()); err == nil { + d.CommitAckPayloads[msg.ValidatorIndex()] = msg + d.checkCommitAck() + } else { + d.Logger.Warn("invalid commitAck", + zap.Uint("validator", uint(msg.ValidatorIndex())), + zap.Error(err), + ) + } + } + + return + } + + d.Logger.Info("received commitAck for different view", + zap.Uint("validator", uint(msg.ValidatorIndex())), + zap.Uint("view", uint(msg.ViewNumber())), + ) + d.CommitAckPayloads[msg.ValidatorIndex()] = msg +} + func (d *DBFT[H]) onRecoveryRequest(msg ConsensusPayload[H]) { if !d.CommitSent() { // Limit recoveries to be sent from no more than F nodes diff --git a/dbft_test.go b/dbft_test.go index 86fe3c89..adb5cb14 100644 --- a/dbft_test.go +++ b/dbft_test.go @@ -43,7 +43,8 @@ func TestDBFT_OnStartPrimarySendPrepareRequest(t *testing.T) { t.Run("backup sends nothing on start", func(t *testing.T) { s.currHeight = 0 - service, _ := dbft.New[crypto.Uint256](s.getOptions()...) + service, err := dbft.New[crypto.Uint256](s.getOptions()...) + require.NoError(t, err) service.Start(0) require.Nil(t, s.tryRecv()) diff --git a/helpers_test.go b/helpers_test.go index 33a60920..c57e4389 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -51,6 +51,7 @@ func (p payloadStub) GetPrepareResponse() PrepareResponse[hash] { func (p payloadStub) GetCommit() Commit { panic("TODO") } +func (p payloadStub) GetCommitAck() CommitAck { panic("TODO") } func (p payloadStub) GetRecoveryRequest() RecoveryRequest { panic("TODO") } diff --git a/internal/consensus/consensus_message.go b/internal/consensus/consensus_message.go index cd51b967..2720d746 100644 --- a/internal/consensus/consensus_message.go +++ b/internal/consensus/consensus_message.go @@ -88,6 +88,7 @@ func (m message) GetPrepareResponse() dbft.PrepareResponse[crypto.Uint256] { return m.payload.(dbft.PrepareResponse[crypto.Uint256]) } func (m message) GetCommit() dbft.Commit { return m.payload.(dbft.Commit) } +func (m message) GetCommitAck() dbft.CommitAck { return m.payload.(dbft.CommitAck) } func (m message) GetRecoveryRequest() dbft.RecoveryRequest { return m.payload.(dbft.RecoveryRequest) } func (m message) GetRecoveryMessage() dbft.RecoveryMessage[crypto.Uint256] { return m.payload.(dbft.RecoveryMessage[crypto.Uint256]) diff --git a/pre_block.go b/pre_block.go new file mode 100644 index 00000000..61c924d5 --- /dev/null +++ b/pre_block.go @@ -0,0 +1,27 @@ +package dbft + +// PreBlock is a generic interface for a preBlock used by anti-MEV dBFT extension. +type PreBlock[H Hash] interface { + // TODO: filter out unused methods. + // Hash returns block hash. + Hash() H + // PrevHash returns previous block hash. + PrevHash() H + // MerkleRoot returns a merkle root of the transaction hashes. + MerkleRoot() H + // Index returns block index. + Index() uint32 + + // PreSignature returns PreBlock's data CNs need to exchange during Commit phase. + // It's not a final block signature. + PreSignature() []byte // required + // PreSign generates and sets PreBlock's data CNs need to exchange during Commit phase. + PreSign(key PrivateKey) error // required + // Verify checks if signature is correct. + Verify(key PublicKey, preSign []byte) error // 100% required, but signature may be changed. + + // Transactions returns block's transaction list. + Transactions() []Transaction[H] + // SetTransactions sets block's transaction list. + SetTransactions([]Transaction[H]) // needed +} diff --git a/send.go b/send.go index 9ab7ac76..cf53b99b 100644 --- a/send.go +++ b/send.go @@ -104,15 +104,47 @@ func (c *Context[H]) makeCommit() ConsensusPayload[H] { return msg } + if c.isAntiMEVExtensionEnabled() { + if preB := c.MakePreHeader(); preB != nil { + var preSign []byte + if err := preB.PreSign(c.Priv); err == nil { + preSign = preB.PreSignature() + } + + commit := c.Config.NewCommit(preSign) + + return c.Config.NewConsensusPayload(c, CommitType, commit) + } + } else { + if b := c.MakeHeader(); b != nil { + var sign []byte + if err := b.Sign(c.Priv); err == nil { + sign = b.Signature() + } + + commit := c.Config.NewCommit(sign) + + return c.Config.NewConsensusPayload(c, CommitType, commit) + } + } + + return nil +} + +func (c *Context[H]) makeCommitAck() ConsensusPayload[H] { + if msg := c.CommitAckPayloads[c.MyIndex]; msg != nil { + return msg + } + if b := c.MakeHeader(); b != nil { var sign []byte if err := b.Sign(c.Priv); err == nil { sign = b.Signature() } - commit := c.Config.NewCommit(sign) + commitAck := c.Config.NewCommitAck(sign) - return c.Config.NewConsensusPayload(c, CommitType, commit) + return c.Config.NewConsensusPayload(c, CommitAckType, commitAck) } return nil @@ -125,6 +157,13 @@ func (d *DBFT[H]) sendCommit() { d.broadcast(msg) } +func (d *DBFT[H]) sendCommitAck() { + msg := d.makeCommitAck() + d.CommitAckPayloads[d.MyIndex] = msg + d.Logger.Info("sending CommitAck", zap.Uint32("height", d.BlockIndex), zap.Uint("view", uint(d.ViewNumber))) + d.broadcast(msg) +} + func (d *DBFT[H]) sendRecoveryRequest() { // If we're here, something is wrong, we either missing some messages or // transactions or both, so re-request missing transactions here too.