diff --git a/.github/workflows/test-kubectl-plugin.yml b/.github/workflows/test-kubectl-plugin.yml index fe7c8f45..57841de9 100644 --- a/.github/workflows/test-kubectl-plugin.yml +++ b/.github/workflows/test-kubectl-plugin.yml @@ -313,13 +313,14 @@ jobs: ordererOrganizations: - caName: "ord-ca" caNamespace: "default" - externalOrderersToJoin: - - host: ord-node1.default - port: 7053 + externalOrderersToJoin: [] mspID: OrdererMSP ordererEndpoints: - orderer0-ord.localho.st:443 - orderersToJoin: [] + orderersToJoin: + - name: ord-node1 + namespace: default + orderers: - host: orderer0-ord.localho.st port: 443 diff --git a/controllers/mainchannel/mainchannel_controller.go b/controllers/mainchannel/mainchannel_controller.go index de72420a..b2f244a8 100644 --- a/controllers/mainchannel/mainchannel_controller.go +++ b/controllers/mainchannel/mainchannel_controller.go @@ -23,6 +23,7 @@ import ( cb "github.com/hyperledger/fabric-protos-go/common" sb "github.com/hyperledger/fabric-protos-go/orderer/smartbft" "github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" fab2 "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp" "github.com/hyperledger/fabric-sdk-go/pkg/core/config" @@ -115,22 +116,43 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re return r.handleReconcileError(ctx, fabricMainChannel, err) } - resmgmtOptions := r.setupResmgmtOptions(fabricMainChannel) + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) - blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, resmgmtOptions) + // Try to get existing channel config + channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { - return r.handleReconcileError(ctx, fabricMainChannel, err) - } + // Channel doesn't exist, create it and join orderers + log.Infof("Channel %s does not exist, creating it", fabricMainChannel.Spec.Name) + blockBytes, err := r.createNewChannel(fabricMainChannel) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to create new channel")) + } - if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil { - return r.handleReconcileError(ctx, fabricMainChannel, err) + // Join orderers to the channel + if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to join orderers to channel")) + } + + // Wait for orderers to stabilize + time.Sleep(5 * time.Second) + + // Get the config block after channel creation + channelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) + if err != nil { + return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to get config block after creating channel")) + } } + _ = channelBlock - if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions, blockBytes, sdk, clientSet); err != nil { + // Update channel config if needed + if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, options, sdk, clientSet); err != nil { return r.handleReconcileError(ctx, fabricMainChannel, err) } + time.Sleep(3 * time.Second) - if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions); err != nil { + + // Save channel config + if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient); err != nil { return r.handleReconcileError(ctx, fabricMainChannel, err) } @@ -394,21 +416,47 @@ func (r *FabricMainChannelReconciler) joinInternalOrderers(ctx context.Context, return nil } -func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) { - var ordererChannelBlock *common.Block - var err error - attemptsLeft := 5 - for { - ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err == nil || attemptsLeft == 0 { - break +func (r *FabricMainChannelReconciler) queryConfigBlockFromOrdererWithRoundRobin(resClient *resmgmt.Client, channelID string, ordererEndpoints []string, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) { + if len(ordererEndpoints) == 0 { + return nil, fmt.Errorf("no orderer endpoints available") + } + + // Try each orderer in sequence until one succeeds + var lastErr error + for _, endpoint := range ordererEndpoints { + // Create options for this specific orderer + ordererOpts := []resmgmt.RequestOption{ + resmgmt.WithOrdererEndpoint(endpoint), + resmgmt.WithRetry(retry.Opts{ + Attempts: 3, + InitialBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + }), } + + // Add any other options that were passed in (except orderer endpoints) + ordererOpts = append(ordererOpts, resmgmtOptions...) + + log.Infof("Attempting to query config block from orderer %s", endpoint) + block, err := resClient.QueryConfigBlockFromOrderer(channelID, ordererOpts...) if err != nil { - attemptsLeft-- + log.Warnf("Failed to query config block from orderer %s: %v", endpoint, err) + lastErr = err + continue } - log.Infof("Failed to get block %v, attempts left %d", err, attemptsLeft) - time.Sleep(1500 * time.Millisecond) + log.Infof("Successfully queried config block from orderer %s", endpoint) + return block, nil } + + return nil, fmt.Errorf("failed to query config block from all orderers, last error: %v", lastErr) +} + +func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) (*common.Block, error) { + var ordererChannelBlock *common.Block + var err error + + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) + ordererChannelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name) } @@ -483,39 +531,30 @@ func (r *FabricMainChannelReconciler) handleReconcileError(ctx context.Context, return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel) } -func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) []resmgmt.RequestOption { +func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]resmgmt.RequestOption, []string) { resmgmtOptions := []resmgmt.RequestOption{ resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second), + resmgmt.WithRetry(retry.Opts{ + Attempts: 3, + InitialBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + }), } + var ordererEndpoints []string for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations { - for _, endpoint := range ordOrg.OrdererEndpoints { - resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint)) - } + ordererEndpoints = append(ordererEndpoints, ordOrg.OrdererEndpoints...) } - return resmgmtOptions + return resmgmtOptions, ordererEndpoints } -func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) ([]byte, error) { - var channelBlock *cb.Block - var err error - - for i := 0; i < 5; i++ { - channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...) - if err == nil { - break - } - log.Warnf("Attempt %d failed to query config block from orderer: %v retrying in 1 second", i+1, err) - time.Sleep(1 * time.Second) - } - +func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]byte, error) { + options, endpoints := r.setupResmgmtOptions(fabricMainChannel) + channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options) if err != nil { - log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err) - return r.createNewChannel(fabricMainChannel) + return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name) } - - log.Infof("Channel %s already exists", fabricMainChannel.Spec.Name) return proto.Marshal(channelBlock) } @@ -557,8 +596,8 @@ func (r *FabricMainChannelReconciler) joinOrderers(ctx context.Context, fabricMa return nil } -func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, blockBytes []byte, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error { - ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) +func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel) if err != nil { return err } @@ -641,8 +680,8 @@ func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, f return nil } -func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption) error { - ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions) +func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client) error { + ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel) if err != nil { return err } @@ -1397,20 +1436,41 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx } for _, consenter := range ord.EtcdRaft.Consenters { deleted := true + needsUpdate := false + var matchingNewConsenter orderer.Consenter + for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters { if newConsenter.Address.Host == consenter.Address.Host && newConsenter.Address.Port == consenter.Address.Port { deleted = false + matchingNewConsenter = newConsenter + // Check if TLS certs are different + if !bytes.Equal(newConsenter.ClientTLSCert.Raw, consenter.ClientTLSCert.Raw) || + !bytes.Equal(newConsenter.ServerTLSCert.Raw, consenter.ServerTLSCert.Raw) { + needsUpdate = true + } break } } + if deleted { log.Infof("Removing consenter %s:%d", consenter.Address.Host, consenter.Address.Port) err = currentConfigTX.Orderer().RemoveConsenter(consenter) if err != nil { return errors.Wrapf(err, "failed to remove consenter %s:%d", consenter.Address.Host, consenter.Address.Port) } + } else if needsUpdate { + log.Infof("Updating certificates for consenter %s:%d", consenter.Address.Host, consenter.Address.Port) + err = currentConfigTX.Orderer().RemoveConsenter(consenter) + if err != nil { + return errors.Wrapf(err, "failed to remove consenter %s:%d for cert update", consenter.Address.Host, consenter.Address.Port) + } + err = currentConfigTX.Orderer().AddConsenter(matchingNewConsenter) + if err != nil { + return errors.Wrapf(err, "failed to add updated consenter %s:%d", consenter.Address.Host, consenter.Address.Port) + } } } + for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters { found := false for _, consenter := range ord.EtcdRaft.Consenters { @@ -1427,6 +1487,36 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx } } } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetElectionInterval( + newConfigTx.Orderer.EtcdRaft.Options.ElectionTick, + ) + if err != nil { + return errors.Wrapf(err, "failed to set election interval") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetHeartbeatTick( + newConfigTx.Orderer.EtcdRaft.Options.HeartbeatTick, + ) + if err != nil { + return errors.Wrapf(err, "failed to set heartbeat tick") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetTickInterval( + newConfigTx.Orderer.EtcdRaft.Options.TickInterval, + ) + if err != nil { + return errors.Wrapf(err, "failed to set tick interval") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetSnapshotIntervalSize( + newConfigTx.Orderer.EtcdRaft.Options.SnapshotIntervalSize, + ) + if err != nil { + return errors.Wrapf(err, "failed to set snapshot interval size") + } + err = currentConfigTX.Orderer().EtcdRaftOptions().SetMaxInflightBlocks( + newConfigTx.Orderer.EtcdRaft.Options.MaxInflightBlocks, + ) + if err != nil { + return errors.Wrapf(err, "failed to set max inflight blocks") + } } else if newConfigTx.Orderer.OrdererType == orderer.ConsensusTypeBFT { err = currentConfigTX.Orderer().SetConfiguration(newConfigTx.Orderer) if err != nil { @@ -1583,6 +1673,7 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx if err != nil { return errors.Wrapf(err, "failed to set preferred max bytes") } + err = currentConfigTX.Orderer().SetBatchTimeout(newConfigTx.Orderer.BatchTimeout) if err != nil { return errors.Wrapf(err, "failed to set batch timeout") diff --git a/kubectl-hlf/cmd/channel/inspect.go b/kubectl-hlf/cmd/channel/inspect.go index ab3ecc33..361c5e6d 100644 --- a/kubectl-hlf/cmd/channel/inspect.go +++ b/kubectl-hlf/cmd/channel/inspect.go @@ -3,14 +3,15 @@ package channel import ( "bytes" "fmt" + "io" + + "github.com/hyperledger/fabric-config/protolator" "github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt" "github.com/hyperledger/fabric-sdk-go/pkg/core/config" "github.com/hyperledger/fabric-sdk-go/pkg/fab/resource" "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" - "github.com/hyperledger/fabric/common/tools/protolator" "github.com/kfsoftware/hlf-operator/kubectl-hlf/cmd/helpers" "github.com/spf13/cobra" - "io" ) type inspectChannelCmd struct { @@ -18,6 +19,7 @@ type inspectChannelCmd struct { peer string channelName string userName string + ordererName string } func (c *inspectChannelCmd) validate() error { @@ -50,9 +52,11 @@ func (c *inspectChannelCmd) run(out io.Writer) error { if err != nil { return err } - block, err := resClient.QueryConfigBlockFromOrderer( - c.channelName, - ) + resmgmtOptions := []resmgmt.RequestOption{} + if c.ordererName != "" { + resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(c.ordererName)) + } + block, err := resClient.QueryConfigBlockFromOrderer(c.channelName, resmgmtOptions...) if err != nil { return err } @@ -87,6 +91,7 @@ func newInspectChannelCMD(out io.Writer, errOut io.Writer) *cobra.Command { persistentFlags.StringVarP(&c.userName, "user", "u", "", "User name for the transaction") persistentFlags.StringVarP(&c.channelName, "channel", "c", "", "Channel name") persistentFlags.StringVarP(&c.configPath, "config", "", "", "Configuration file for the SDK") + persistentFlags.StringVarP(&c.ordererName, "orderer", "o", "", "Orderer endpoint to fetch config from (optional)") cmd.MarkPersistentFlagRequired("channel") cmd.MarkPersistentFlagRequired("user") cmd.MarkPersistentFlagRequired("peer")