Skip to content

Commit 87a8944

Browse files
authored
Merge pull request #272 from InjectiveLabs/feat/tx_broadcast_refactoring
feat/tx_broadcast_refactoring
2 parents 48e6836 + 064ae5e commit 87a8944

File tree

12 files changed

+211
-170
lines changed

12 files changed

+211
-170
lines changed

client/chain/chain.go

Lines changed: 95 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type ChainClient interface {
8383
SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*txtypes.SimulateResponse, error)
8484
AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
8585
SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error)
86+
BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error)
8687

8788
// Build signed tx with given accNum and accSeq, useful for offline siging
8889
// If simulate is set to false, initialGas will be used
@@ -681,35 +682,6 @@ func (c *chainClient) GetAccount(ctx context.Context, address string) (*authtype
681682
return res, err
682683
}
683684

684-
// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
685-
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
686-
c.syncMux.Lock()
687-
defer c.syncMux.Unlock()
688-
689-
sequence := c.getAccSeq()
690-
c.txFactory = c.txFactory.WithSequence(sequence)
691-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
692-
res, err := c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
693-
694-
if err != nil {
695-
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
696-
c.syncNonce()
697-
sequence := c.getAccSeq()
698-
c.txFactory = c.txFactory.WithSequence(sequence)
699-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
700-
log.Debugln("retrying broadcastTx with nonce", sequence)
701-
res, err = c.broadcastTx(c.ctx, c.txFactory, true, msgs...)
702-
}
703-
if err != nil {
704-
resJSON, _ := json.MarshalIndent(res, "", "\t")
705-
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed synchronously broadcast messages:", string(resJSON))
706-
return nil, err
707-
}
708-
}
709-
710-
return res, nil
711-
}
712-
713685
func (c *chainClient) GetFeeDiscountInfo(ctx context.Context, account string) (*exchangetypes.QueryFeeDiscountAccountInfoResponse, error) {
714686
req := &exchangetypes.QueryFeeDiscountAccountInfoRequest{
715687
Account: account,
@@ -746,36 +718,6 @@ func (c *chainClient) SimulateMsg(clientCtx client.Context, msgs ...sdk.Msg) (*t
746718
return simRes, nil
747719
}
748720

749-
// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
750-
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
751-
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
752-
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
753-
c.syncMux.Lock()
754-
defer c.syncMux.Unlock()
755-
756-
sequence := c.getAccSeq()
757-
c.txFactory = c.txFactory.WithSequence(sequence)
758-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
759-
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
760-
if err != nil {
761-
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
762-
c.syncNonce()
763-
sequence := c.getAccSeq()
764-
c.txFactory = c.txFactory.WithSequence(sequence)
765-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
766-
log.Debugln("retrying broadcastTx with nonce", sequence)
767-
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
768-
}
769-
if err != nil {
770-
resJSON, _ := json.MarshalIndent(res, "", "\t")
771-
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
772-
return nil, err
773-
}
774-
}
775-
776-
return res, nil
777-
}
778-
779721
func (c *chainClient) BuildSignedTx(clientCtx client.Context, accNum, accSeq, initialGas uint64, msgs ...sdk.Msg) ([]byte, error) {
780722
txf := NewTxFactory(clientCtx).WithSequence(accSeq).WithAccountNumber(accNum).WithGas(initialGas)
781723
return c.buildSignedTx(clientCtx, txf, msgs...)
@@ -890,57 +832,23 @@ func (c *chainClient) AsyncBroadcastSignedTx(txBytes []byte) (*txtypes.Broadcast
890832
func (c *chainClient) broadcastTx(
891833
clientCtx client.Context,
892834
txf tx.Factory,
893-
await bool,
835+
broadcastMode txtypes.BroadcastMode,
894836
msgs ...sdk.Msg,
895-
) (*txtypes.BroadcastTxResponse, error) {
837+
) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
896838
txBytes, err := c.buildSignedTx(clientCtx, txf, msgs...)
897839
if err != nil {
898840
err = errors.Wrap(err, "failed to build signed Tx")
899-
return nil, err
841+
return nil, nil, err
900842
}
901843

902844
req := txtypes.BroadcastTxRequest{
903845
TxBytes: txBytes,
904-
Mode: txtypes.BroadcastMode_BROADCAST_MODE_SYNC,
846+
Mode: broadcastMode,
905847
}
906848

907849
res, err := common.ExecuteCall(context.Background(), c.network.ChainCookieAssistant, c.txClient.BroadcastTx, &req)
908-
if err != nil || res.TxResponse.Code != 0 || !await {
909-
return res, err
910-
}
911-
912-
awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
913-
defer cancelFn()
914-
915-
txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
916-
t := time.NewTimer(defaultBroadcastStatusPoll)
917-
918-
for {
919-
select {
920-
case <-awaitCtx.Done():
921-
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
922-
t.Stop()
923-
return nil, err
924-
case <-t.C:
925-
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
926-
if err != nil {
927-
if errRes := client.CheckCometError(err, txBytes); errRes != nil {
928-
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
929-
}
930-
931-
t.Reset(defaultBroadcastStatusPoll)
932-
continue
933-
934-
} else if resultTx.Height > 0 {
935-
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
936-
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
937-
t.Stop()
938-
return res, err
939-
}
850+
return &req, res, err
940851

941-
t.Reset(defaultBroadcastStatusPoll)
942-
}
943-
}
944852
}
945853

946854
// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
@@ -970,37 +878,20 @@ func (c *chainClient) runBatchBroadcast() {
970878
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)
971879

972880
submitBatch := func(toSubmit []sdk.Msg) {
973-
c.syncMux.Lock()
974-
defer c.syncMux.Unlock()
975-
sequence := c.getAccSeq()
976-
c.txFactory = c.txFactory.WithSequence(sequence)
977-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
978-
log.Debugln("broadcastTx with nonce", sequence)
979-
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
980-
if err != nil {
981-
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
982-
c.syncNonce()
983-
sequence := c.getAccSeq()
984-
c.txFactory = c.txFactory.WithSequence(sequence)
985-
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
986-
log.Debugln("retrying broadcastTx with nonce", sequence)
987-
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
988-
}
989-
if err != nil {
990-
resJSON, _ := json.MarshalIndent(res, "", "\t")
991-
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to broadcast messages batch:", string(resJSON))
992-
return
993-
}
994-
}
881+
res, err := c.SyncBroadcastMsg(toSubmit...)
995882

996-
if res.TxResponse.Code != 0 {
997-
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
998-
log.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
883+
if err != nil {
884+
c.logger.WithError(err)
999885
} else {
1000-
log.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
886+
if res.TxResponse.Code != 0 {
887+
err = errors.Errorf("error %d (%s): %s", res.TxResponse.Code, res.TxResponse.Codespace, res.TxResponse.RawLog)
888+
c.logger.WithField("txHash", res.TxResponse.TxHash).WithError(err).Errorln("failed to broadcast messages batch")
889+
} else {
890+
c.logger.WithField("txHash", res.TxResponse.TxHash).Debugln("msg batch broadcasted successfully at height", res.TxResponse.Height)
891+
}
1001892
}
1002893

1003-
log.Debugln("gas wanted: ", c.gasWanted)
894+
c.logger.Debugln("gas wanted: ", c.gasWanted)
1004895
}
1005896

1006897
for {
@@ -2651,3 +2542,82 @@ func (c *chainClient) FetchVouchersForAddress(ctx context.Context, address strin
26512542
func (c *chainClient) GetNetwork() common.Network {
26522543
return c.network
26532544
}
2545+
2546+
// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
2547+
func (c *chainClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
2548+
req, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_SYNC, msgs...)
2549+
2550+
if err != nil || res.TxResponse.Code != 0 {
2551+
return res, err
2552+
}
2553+
2554+
awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
2555+
defer cancelFn()
2556+
2557+
txHash, _ := hex.DecodeString(res.TxResponse.TxHash)
2558+
t := time.NewTimer(defaultBroadcastStatusPoll)
2559+
2560+
for {
2561+
select {
2562+
case <-awaitCtx.Done():
2563+
err := errors.Wrapf(ErrTimedOut, "%s", res.TxResponse.TxHash)
2564+
t.Stop()
2565+
return nil, err
2566+
case <-t.C:
2567+
resultTx, err := c.ctx.Client.Tx(awaitCtx, txHash, false)
2568+
if err != nil {
2569+
if errRes := client.CheckCometError(err, req.TxBytes); errRes != nil {
2570+
return &txtypes.BroadcastTxResponse{TxResponse: errRes}, err
2571+
}
2572+
2573+
t.Reset(defaultBroadcastStatusPoll)
2574+
continue
2575+
2576+
} else if resultTx.Height > 0 {
2577+
resResultTx := sdk.NewResponseResultTx(resultTx, res.TxResponse.Tx, res.TxResponse.Timestamp)
2578+
res = &txtypes.BroadcastTxResponse{TxResponse: resResultTx}
2579+
t.Stop()
2580+
return res, err
2581+
}
2582+
2583+
t.Reset(defaultBroadcastStatusPoll)
2584+
}
2585+
}
2586+
}
2587+
2588+
// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
2589+
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
2590+
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
2591+
func (c *chainClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*txtypes.BroadcastTxResponse, error) {
2592+
_, res, err := c.BroadcastMsg(txtypes.BroadcastMode_BROADCAST_MODE_ASYNC, msgs...)
2593+
return res, err
2594+
}
2595+
2596+
// BroadcastMsg submits a group of messages in one transaction to the chain
2597+
// The function uses the broadcast mode specified with the broadcastMode parameter
2598+
func (c *chainClient) BroadcastMsg(broadcastMode txtypes.BroadcastMode, msgs ...sdk.Msg) (*txtypes.BroadcastTxRequest, *txtypes.BroadcastTxResponse, error) {
2599+
c.syncMux.Lock()
2600+
defer c.syncMux.Unlock()
2601+
2602+
sequence := c.getAccSeq()
2603+
c.txFactory = c.txFactory.WithSequence(sequence)
2604+
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
2605+
req, res, err := c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
2606+
if err != nil {
2607+
if c.opts.ShouldFixSequenceMismatch && strings.Contains(err.Error(), "account sequence mismatch") {
2608+
c.syncNonce()
2609+
sequence := c.getAccSeq()
2610+
c.txFactory = c.txFactory.WithSequence(sequence)
2611+
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
2612+
c.logger.Debugln("retrying broadcastTx with nonce", sequence)
2613+
req, res, err = c.broadcastTx(c.ctx, c.txFactory, broadcastMode, msgs...)
2614+
}
2615+
if err != nil {
2616+
resJSON, _ := json.MarshalIndent(res, "", "\t")
2617+
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to asynchronously broadcast messagess:", string(resJSON))
2618+
return nil, nil, err
2619+
}
2620+
}
2621+
2622+
return req, res, nil
2623+
}

0 commit comments

Comments
 (0)