Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 86 additions & 120 deletions client/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type ChainClient interface {
SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*txtypes.SimulateResponse, error)
AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error)

// Build signed tx with given accNum and accSeq, useful for offline siging
// If simulate is set to false, initialGas will be used
Expand Down Expand Up @@ -681,35 +682,6 @@ func (c *chainClient) GetAccount(ctx context.Context, address string) (*authtype
return res, err
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, msgs...)

if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed synchronously broadcast messages:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) GetFeeDiscountInfo(ctx context.Context, account string) (*exchangetypes.QueryFeeDiscountAccountInfoResponse, error) {
req := &exchangetypes.QueryFeeDiscountAccountInfoRequest{
Account: account,
Expand Down Expand Up @@ -746,36 +718,6 @@ func (c *chainClient) SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*t
return simRes, nil
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, err
}
}

return res, nil
}

func (c *chainClient) BuildSignedTx(clientCtx client.Context, accNum, accSeq, initialGas uint64, msgs ...sdk.Msg) ([]byte, error) {
txf := NewTxFactory(clientCtx).WithSequence(accSeq).WithAccountNumber(accNum).WithGas(initialGas)
return c.buildSignedTx(clientCtx, txf, msgs...)
Expand Down Expand Up @@ -890,57 +832,23 @@ func (c *chainClient) AsyncBroadcastSignedTx(txBytes []byte) (*txtypes.Broadcast
func (c *chainClient) broadcastTx(
clientCtx client.Context,
txf tx.Factory,
await bool,
broadcastMode txtypes.BroadcastMode,
msgs ...sdk.Msg,
) (*txtypes.BroadcastTxResponse, error) {
) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
txBytes, err := c.buildSignedTx(clientCtx, txf, msgs...)
if err != nil {
err = errors.Wrap(err, "failed to build signed Tx")
return nil, err
return nil, nil, err
}

req := txtypes.BroadcastTxRequest{
TxBytes: txBytes,
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
Mode: broadcastMode,
}

res, err := common.ExecuteCall(context.Background(), c.network.ChainCookieAssistant, c.txClient.BroadcastTx, &req)
if err != nil || res.TxResponse.Code != 0 || !await {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, txBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}
return &req, res, err

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
Expand Down Expand Up @@ -970,28 +878,7 @@ func (c *chainClient) runBatchBroadcast() {
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)

submitBatch := func(toSubmit []sdk.Msg) {
c.syncMux.Lock()
defer c.syncMux.Unlock()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("broadcastTx with nonce", sequence)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to broadcast messages batch:", string(resJSON))
return
}
}
res, err := c.SyncBroadcastMsg(toSubmit...)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle the error returned by SyncBroadcastMsg

At line 881, the error returned by c.SyncBroadcastMsg(toSubmit...) is assigned to err but not used. Ignoring errors can lead to silent failures and unexpected behaviors. It's important to check and handle errors appropriately.

Apply this diff to handle the error:

 func (c *chainClient) runBatchBroadcast() {
     // ... existing code ...

     submitBatch := func(toSubmit []sdk.Msg) {
-        res, err := c.SyncBroadcastMsg(toSubmit...)
+        res, err := c.SyncBroadcastMsg(toSubmit...)

+        if err != nil {
+            c.logger.WithError(err).Errorln("failed to broadcast messages batch")
+            return
+        }

         if res.TxResponse.Code != 0 {
             err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
             log.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: pre-commit

[error] 881-881: Ineffectual assignment to err variable. The error value is assigned but never used.


if res.TxResponse.Code != 0 {
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
Expand Down Expand Up @@ -2651,3 +2538,82 @@ func (c *chainClient) FetchVouchersForAddress(ctx context.Context, address strin
func (c *chainClient) GetNetwork() common.Network {
return c.network
}

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
req, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_SYNC, msgs...)

if err != nil || res.TxResponse.Code != 0 {
return res, err
}

awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := c.ctx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckCometError(err, req.TxBytes); errRes != nil {
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
}

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
t.Stop()
return res, err
}

t.Reset(defaultBroadcastStatusPoll)
}
}
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
_, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_ASYNC, msgs...)
return res, err
}

// BroadcastMsg submits a group of messages in one transaction to the chain
// The function uses the broadcast mode specified with the broadcastMode parameter
func (c *chainClient) BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
req, res, err := c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
if err != nil {
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, nil, err
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid potential panic due to nil res in error handling

In the BroadcastMsg method, when logging the error, there's a possibility that res might be nil if the broadcastTx function returns an error before obtaining a response. Accessing res fields when res is nil can cause a panic.

Apply this diff to prevent nil pointer dereference:

 if err != nil {
     if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
         // retry logic
     }
     if err != nil {
-        resJSON, _ := json.MarshalIndent(res, "", "\t")
-        c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
+        var resJSON []byte
+        if res != nil {
+            resJSON, _ = json.MarshalIndent(res, "", "\t")
+        }
+        c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to broadcast messages:", string(resJSON))
         return nil, nil, err
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
return nil, nil, err
}
}
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
sequence := c.getAccSeq()
c.txFactory = c.txFactory.WithSequence(sequence)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", sequence)
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
}
if err != nil {
var resJSON []byte
if res != nil {
resJSON, _ = json.MarshalIndent(res, "", "\t")
}
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to broadcast messages:", string(resJSON))
return nil, nil, err
}
}

return req, res, nil
}
Loading
Loading