From 5c96ac0d93c76b7da73968fef2b885bc153e3149 Mon Sep 17 00:00:00 2001 From: David Viejo Date: Thu, 6 Feb 2025 22:17:02 +0100 Subject: [PATCH] Enhance orderer channel configuration in mainchannel_controller (#250) * Enhance orderer channel configuration in mainchannel_controller - Added methods to set various EtcdRaft options including ElectionInterval, HeartbeatTick, TickInterval, SnapshotIntervalSize, and MaxInflightBlocks in the updateOrdererChannelConfigTx function. - Improved error handling for each configuration setting to ensure robust updates. These changes facilitate better control over the orderer channel's Raft consensus parameters, enhancing overall system reliability. Signed-off-by: David VIEJO * Refactor orderer block fetching with retry logic and update inspect command to use ledger client - Replaced manual retry logic in `fetchOrdererChannelBlock` and `fetchConfigBlock` methods with a retry option from the Fabric SDK, improving code readability and maintainability. - Updated the `inspectChannelCmd` to utilize the new ledger client for querying the configuration block, enhancing the command's functionality and performance. - Removed deprecated code related to resource management client in the inspect command. These changes streamline the process of fetching orderer blocks and improve the overall robustness of the channel inspection functionality. Signed-off-by: David VIEJO * Added orderer parameter for inspect Signed-off-by: David VIEJO * don't put all load in the last orderer Signed-off-by: David VIEJO * Cleanup code Signed-off-by: David VIEJO * Fix consenter replace certificate issue Signed-off-by: dviejokfs * Try to fix the error with test kubectl Signed-off-by: dviejokfs * Fix the test issue Signed-off-by: dviejokfs --------- Signed-off-by: David VIEJO Signed-off-by: dviejokfs --- .github/workflows/test-kubectl-plugin.yml | 9 +- .../mainchannel/mainchannel_controller.go | 181 +++++++++++++----- kubectl-hlf/cmd/channel/inspect.go | 15 +- 3 files changed, 151 insertions(+), 54 deletions(-) diff --git a/.github/workflows/test-kubectl-plugin.yml b/.github/workflows/test-kubectl-plugin.yml index fe7c8f45..57841de9 100644 --- a/.github/workflows/test-kubectl-plugin.yml +++ b/.github/workflows/test-kubectl-plugin.yml @@ -313,13 +313,14 @@ jobs: ordererOrganizations: - caName: "ord-ca" caNamespace: "default" - externalOrderersToJoin: - - host: ord-node1.default - port: 7053 + externalOrderersToJoin: [] mspID: OrdererMSP ordererEndpoints: - orderer0-ord.localho.st:443 - orderersToJoin: [] + orderersToJoin: + - name: ord-node1 + namespace: default + orderers: - host: orderer0-ord.localho.st port: 443 diff --git a/controllers/mainchannel/mainchannel_controller.go b/controllers/mainchannel/mainchannel_controller.go index de72420a..b2f244a8 100644 --- a/controllers/mainchannel/mainchannel_controller.go +++ b/controllers/mainchannel/mainchannel_controller.go @@ -23,6 +23,7 @@ import ( cb "github.com/hyperledger/fabric-protos-go/common" sb "github.com/hyperledger/fabric-protos-go/orderer/smartbft" "github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" fab2 "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp" "github.com/hyperledger/fabric-sdk-go/pkg/core/config" @@ -115,22 +116,43 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re return r.handleReconcileError(ctx, fabricMainChannel, err) } - resmgmtOptions := r.setupResmgmtOptions(fabricMainChannel) + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) - blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, resmgmtOptions) + // Try to get existing channel config + channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { - return r.handleReconcileError(ctx, fabricMainChannel, err) - } + // Channel doesn't exist, create it and join orderers + log.Infof("Channel %s does not exist, creating it", fabricMainChannel.Spec.Name) + blockBytes, err := r.createNewChannel(fabricMainChannel) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to create new channel")) + } - if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil { - return r.handleReconcileError(ctx, fabricMainChannel, err) + // Join orderers to the channel + if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to join orderers to channel")) + } + + // Wait for orderers to stabilize + time.Sleep(5 * time.Second) + + // Get the config block after channel creation + channelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to get config block after creating channel")) + } } + _ = channelBlock - if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions, blockBytes, sdk, clientSet); err != nil { + // Update channel config if needed + if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, options, sdk, clientSet); err != nil { return r.handleReconcileError(ctx, fabricMainChannel, err) } + time.Sleep(3 * time.Second) - if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions); err != nil { + + // Save channel config + if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient); err != nil { return r.handleReconcileError(ctx, fabricMainChannel, err) } @@ -394,21 +416,47 @@ func (r *FabricMainChannelReconciler) joinInternalOrderers(ctx context.Context, return nil } -func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) { - var ordererChannelBlock *common.Block - var err error - attemptsLeft := 5 - for { - ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err == nil || attemptsLeft == 0 { - break +func (r *FabricMainChannelReconciler) queryConfigBlockFromOrdererWithRoundRobin(resClient *resmgmt.Client, channelID string, ordererEndpoints []string, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) { + if len(ordererEndpoints) == 0 { + return nil, fmt.Errorf("no orderer endpoints available") + } + + // Try each orderer in sequence until one succeeds + var lastErr error + for _, endpoint := range ordererEndpoints { + // Create options for this specific orderer + ordererOpts := []resmgmt.RequestOption{ + resmgmt.WithOrdererEndpoint(endpoint), + resmgmt.WithRetry(retry.Opts{ + Attempts: 3, + InitialBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + }), } + + // Add any other options that were passed in (except orderer endpoints) + ordererOpts = append(ordererOpts, resmgmtOptions...) + + log.Infof("Attempting to query config block from orderer %s", endpoint) + block, err := resClient.QueryConfigBlockFromOrderer(channelID, ordererOpts...) if err != nil { - attemptsLeft-- + log.Warnf("Failed to query config block from orderer %s: %v", endpoint, err) + lastErr = err + continue } - log.Infof("Failed to get block %v, attempts left %d", err, attemptsLeft) - time.Sleep(1500 * time.Millisecond) + log.Infof("Successfully queried config block from orderer %s", endpoint) + return block, nil } + + return nil, fmt.Errorf("failed to query config block from all orderers, last error: %v", lastErr) +} + +func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) (*common.Block, error) { + var ordererChannelBlock *common.Block + var err error + + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) + ordererChannelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name) } @@ -483,39 +531,30 @@ func (r *FabricMainChannelReconciler) handleReconcileError(ctx context.Context, return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) } -func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) []resmgmt.RequestOption { +func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]resmgmt.RequestOption, []string) { resmgmtOptions := []resmgmt.RequestOption{ resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second), + resmgmt.WithRetry(retry.Opts{ + Attempts: 3, + InitialBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + }), } + var ordererEndpoints []string for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations { - for _, endpoint := range ordOrg.OrdererEndpoints { - resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint)) - } + ordererEndpoints = append(ordererEndpoints, ordOrg.OrdererEndpoints...) } - return resmgmtOptions + return resmgmtOptions, ordererEndpoints } -func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) ([]byte, error) { - var channelBlock *cb.Block - var err error - - for i := 0; i < 5; i++ { - channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err == nil { - break - } - log.Warnf("Attempt %d failed to query config block from orderer: %v retrying in 1 second", i+1, err) - time.Sleep(1 * time.Second) - } - +func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]byte, error) { + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) + channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { - log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err) - return r.createNewChannel(fabricMainChannel) + return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name) } - - log.Infof("Channel %s already exists", fabricMainChannel.Spec.Name) return proto.Marshal(channelBlock) } @@ -557,8 +596,8 @@ func (r *FabricMainChannelReconciler) joinOrderers(ctx context.Context, fabricMa return nil } -func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, blockBytes []byte, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error { - ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) +func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel) if err != nil { return err } @@ -641,8 +680,8 @@ func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, f return nil } -func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption) error { - ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) +func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel) if err != nil { return err } @@ -1397,20 +1436,41 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx } for _, consenter := range ord.EtcdRaft.Consenters { deleted := true + needsUpdate := false + var matchingNewConsenter orderer.Consenter + for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters { if newConsenter.Address.Host == consenter.Address.Host && newConsenter.Address.Port == consenter.Address.Port { deleted = false + matchingNewConsenter = newConsenter + // Check if TLS certs are different + if !bytes.Equal(newConsenter.ClientTLSCert.Raw, consenter.ClientTLSCert.Raw) || + !bytes.Equal(newConsenter.ServerTLSCert.Raw, consenter.ServerTLSCert.Raw) { + needsUpdate = true + } break } } + if deleted { log.Infof("Removing consenter %s:%d", consenter.Address.Host, consenter.Address.Port) err = currentConfigTX.Orderer().RemoveConsenter(consenter) if err != nil { return errors.Wrapf(err, "failed to remove consenter %s:%d", consenter.Address.Host, consenter.Address.Port) } + } else if needsUpdate { + log.Infof("Updating certificates for consenter %s:%d", consenter.Address.Host, consenter.Address.Port) + err = currentConfigTX.Orderer().RemoveConsenter(consenter) + if err != nil { + return errors.Wrapf(err, "failed to remove consenter %s:%d for cert update", consenter.Address.Host, consenter.Address.Port) + } + err = currentConfigTX.Orderer().AddConsenter(matchingNewConsenter) + if err != nil { + return errors.Wrapf(err, "failed to add updated consenter %s:%d", consenter.Address.Host, consenter.Address.Port) + } } } + for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters { found := false for _, consenter := range ord.EtcdRaft.Consenters { @@ -1427,6 +1487,36 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx } } } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetElectionInterval( + newConfigTx.Orderer.EtcdRaft.Options.ElectionTick, + ) + if err != nil { + return errors.Wrapf(err, "failed to set election interval") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetHeartbeatTick( + newConfigTx.Orderer.EtcdRaft.Options.HeartbeatTick, + ) + if err != nil { + return errors.Wrapf(err, "failed to set heartbeat tick") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetTickInterval( + newConfigTx.Orderer.EtcdRaft.Options.TickInterval, + ) + if err != nil { + return errors.Wrapf(err, "failed to set tick interval") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetSnapshotIntervalSize( + newConfigTx.Orderer.EtcdRaft.Options.SnapshotIntervalSize, + ) + if err != nil { + return errors.Wrapf(err, "failed to set snapshot interval size") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetMaxInflightBlocks( + newConfigTx.Orderer.EtcdRaft.Options.MaxInflightBlocks, + ) + if err != nil { + return errors.Wrapf(err, "failed to set max inflight blocks") + } } else if newConfigTx.Orderer.OrdererType == orderer.ConsensusTypeBFT { err = currentConfigTX.Orderer().SetConfiguration(newConfigTx.Orderer) if err != nil { @@ -1583,6 +1673,7 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx if err != nil { return errors.Wrapf(err, "failed to set preferred max bytes") } + err = currentConfigTX.Orderer().SetBatchTimeout(newConfigTx.Orderer.BatchTimeout) if err != nil { return errors.Wrapf(err, "failed to set batch timeout") diff --git a/kubectl-hlf/cmd/channel/inspect.go b/kubectl-hlf/cmd/channel/inspect.go index ab3ecc33..361c5e6d 100644 --- a/kubectl-hlf/cmd/channel/inspect.go +++ b/kubectl-hlf/cmd/channel/inspect.go @@ -3,14 +3,15 @@ package channel import ( "bytes" "fmt" + "io" + + "github.com/hyperledger/fabric-config/protolator" "github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt" "github.com/hyperledger/fabric-sdk-go/pkg/core/config" "github.com/hyperledger/fabric-sdk-go/pkg/fab/resource" "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" - "github.com/hyperledger/fabric/common/tools/protolator" "github.com/kfsoftware/hlf-operator/kubectl-hlf/cmd/helpers" "github.com/spf13/cobra" - "io" ) type inspectChannelCmd struct { @@ -18,6 +19,7 @@ type inspectChannelCmd struct { peer string channelName string userName string + ordererName string } func (c *inspectChannelCmd) validate() error { @@ -50,9 +52,11 @@ func (c *inspectChannelCmd) run(out io.Writer) error { if err != nil { return err } - block, err := resClient.QueryConfigBlockFromOrderer( - c.channelName, - ) + resmgmtOptions := []resmgmt.RequestOption{} + if c.ordererName != "" { + resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(c.ordererName)) + } + block, err := resClient.QueryConfigBlockFromOrderer(c.channelName, resmgmtOptions...) if err != nil { return err } @@ -87,6 +91,7 @@ func newInspectChannelCMD(out io.Writer, errOut io.Writer) *cobra.Command { persistentFlags.StringVarP(&c.userName, "user", "u", "", "User name for the transaction") persistentFlags.StringVarP(&c.channelName, "channel", "c", "", "Channel name") persistentFlags.StringVarP(&c.configPath, "config", "", "", "Configuration file for the SDK") + persistentFlags.StringVarP(&c.ordererName, "orderer", "o", "", "Orderer endpoint to fetch config from (optional)") cmd.MarkPersistentFlagRequired("channel") cmd.MarkPersistentFlagRequired("user") cmd.MarkPersistentFlagRequired("peer")