Skip to content

Commit

Permalink
Enhance orderer channel configuration in mainchannel_controller (#250)
Browse files Browse the repository at this point in the history
* Enhance orderer channel configuration in mainchannel_controller

- Added methods to set various EtcdRaft options including ElectionInterval, HeartbeatTick, TickInterval, SnapshotIntervalSize, and MaxInflightBlocks in the updateOrdererChannelConfigTx function.
- Improved error handling for each configuration setting to ensure robust updates.

These changes facilitate better control over the orderer channel's Raft consensus parameters, enhancing overall system reliability.

Signed-off-by: David VIEJO <[email protected]>

* Refactor orderer block fetching with retry logic and update inspect command to use ledger client

- Replaced manual retry logic in `fetchOrdererChannelBlock` and `fetchConfigBlock` methods with a retry option from the Fabric SDK, improving code readability and maintainability.
- Updated the `inspectChannelCmd` to utilize the new ledger client for querying the configuration block, enhancing the command's functionality and performance.
- Removed deprecated code related to resource management client in the inspect command.

These changes streamline the process of fetching orderer blocks and improve the overall robustness of the channel inspection functionality.

Signed-off-by: David VIEJO <[email protected]>

* Added orderer parameter for inspect

Signed-off-by: David VIEJO <[email protected]>

* don't put all load in the last orderer

Signed-off-by: David VIEJO <[email protected]>

* Cleanup code

Signed-off-by: David VIEJO <[email protected]>

* Fix consenter replace certificate issue

Signed-off-by: dviejokfs <[email protected]>

* Try to fix the error with test kubectl

Signed-off-by: dviejokfs <[email protected]>

* Fix the test issue

Signed-off-by: dviejokfs <[email protected]>

---------

Signed-off-by: David VIEJO <[email protected]>
Signed-off-by: dviejokfs <[email protected]>
  • Loading branch information
dviejokfs authored Feb 6, 2025
1 parent 6475da6 commit 5c96ac0
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 54 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/test-kubectl-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,14 @@ jobs:
ordererOrganizations:
- caName: "ord-ca"
caNamespace: "default"
externalOrderersToJoin:
- host: ord-node1.default
port: 7053
externalOrderersToJoin: []
mspID: OrdererMSP
ordererEndpoints:
- orderer0-ord.localho.st:443
orderersToJoin: []
orderersToJoin:
- name: ord-node1
namespace: default
orderers:
- host: orderer0-ord.localho.st
port: 443
Expand Down
181 changes: 136 additions & 45 deletions controllers/mainchannel/mainchannel_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cb "github.com/hyperledger/fabric-protos-go/common"
sb "github.com/hyperledger/fabric-protos-go/orderer/smartbft"
"github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt"
"github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry"
fab2 "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config"
Expand Down Expand Up @@ -115,22 +116,43 @@ func (r *FabricMainChannelReconciler) Reconcile(ctx context.Context, req ctrl.Re
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

resmgmtOptions := r.setupResmgmtOptions(fabricMainChannel)
options, endpoints := r.setupResmgmtOptions(fabricMainChannel)

blockBytes, err := r.fetchConfigBlock(resClient, fabricMainChannel, resmgmtOptions)
// Try to get existing channel config
channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}
// Channel doesn't exist, create it and join orderers
log.Infof("Channel %s does not exist, creating it", fabricMainChannel.Spec.Name)
blockBytes, err := r.createNewChannel(fabricMainChannel)
if err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to create new channel"))
}

if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
// Join orderers to the channel
if err := r.joinOrderers(ctx, fabricMainChannel, clientSet, hlfClientSet, blockBytes); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to join orderers to channel"))
}

// Wait for orderers to stabilize
time.Sleep(5 * time.Second)

// Get the config block after channel creation
channelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, errors.Wrap(err, "failed to get config block after creating channel"))
}
}
_ = channelBlock

if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions, blockBytes, sdk, clientSet); err != nil {
// Update channel config if needed
if err := r.updateChannelConfig(ctx, fabricMainChannel, resClient, options, sdk, clientSet); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

time.Sleep(3 * time.Second)
if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient, resmgmtOptions); err != nil {

// Save channel config
if err := r.saveChannelConfig(ctx, fabricMainChannel, resClient); err != nil {
return r.handleReconcileError(ctx, fabricMainChannel, err)
}

Expand Down Expand Up @@ -394,21 +416,47 @@ func (r *FabricMainChannelReconciler) joinInternalOrderers(ctx context.Context,
return nil
}

func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) {
var ordererChannelBlock *common.Block
var err error
attemptsLeft := 5
for {
ordererChannelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...)
if err == nil || attemptsLeft == 0 {
break
func (r *FabricMainChannelReconciler) queryConfigBlockFromOrdererWithRoundRobin(resClient *resmgmt.Client, channelID string, ordererEndpoints []string, resmgmtOptions []resmgmt.RequestOption) (*common.Block, error) {
if len(ordererEndpoints) == 0 {
return nil, fmt.Errorf("no orderer endpoints available")
}

// Try each orderer in sequence until one succeeds
var lastErr error
for _, endpoint := range ordererEndpoints {
// Create options for this specific orderer
ordererOpts := []resmgmt.RequestOption{
resmgmt.WithOrdererEndpoint(endpoint),
resmgmt.WithRetry(retry.Opts{
Attempts: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
}),
}

// Add any other options that were passed in (except orderer endpoints)
ordererOpts = append(ordererOpts, resmgmtOptions...)

log.Infof("Attempting to query config block from orderer %s", endpoint)
block, err := resClient.QueryConfigBlockFromOrderer(channelID, ordererOpts...)
if err != nil {
attemptsLeft--
log.Warnf("Failed to query config block from orderer %s: %v", endpoint, err)
lastErr = err
continue
}
log.Infof("Failed to get block %v, attempts left %d", err, attemptsLeft)
time.Sleep(1500 * time.Millisecond)
log.Infof("Successfully queried config block from orderer %s", endpoint)
return block, nil
}

return nil, fmt.Errorf("failed to query config block from all orderers, last error: %v", lastErr)
}

func (r *FabricMainChannelReconciler) fetchOrdererChannelBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) (*common.Block, error) {
var ordererChannelBlock *common.Block
var err error

options, endpoints := r.setupResmgmtOptions(fabricMainChannel)
ordererChannelBlock, err = r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name)
}
Expand Down Expand Up @@ -483,39 +531,30 @@ func (r *FabricMainChannelReconciler) handleReconcileError(ctx context.Context,
return r.updateCRStatusOrFailReconcile(ctx, r.Log, fabricMainChannel)
}

func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) []resmgmt.RequestOption {
func (r *FabricMainChannelReconciler) setupResmgmtOptions(fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]resmgmt.RequestOption, []string) {
resmgmtOptions := []resmgmt.RequestOption{
resmgmt.WithTimeout(fab2.ResMgmt, 30*time.Second),
resmgmt.WithRetry(retry.Opts{
Attempts: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
}),
}

var ordererEndpoints []string
for _, ordOrg := range fabricMainChannel.Spec.OrdererOrganizations {
for _, endpoint := range ordOrg.OrdererEndpoints {
resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(endpoint))
}
ordererEndpoints = append(ordererEndpoints, ordOrg.OrdererEndpoints...)
}

return resmgmtOptions
return resmgmtOptions, ordererEndpoints
}

func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resmgmtOptions []resmgmt.RequestOption) ([]byte, error) {
var channelBlock *cb.Block
var err error

for i := 0; i < 5; i++ {
channelBlock, err = resClient.QueryConfigBlockFromOrderer(fabricMainChannel.Spec.Name, resmgmtOptions...)
if err == nil {
break
}
log.Warnf("Attempt %d failed to query config block from orderer: %v retrying in 1 second", i+1, err)
time.Sleep(1 * time.Second)
}

func (r *FabricMainChannelReconciler) fetchConfigBlock(resClient *resmgmt.Client, fabricMainChannel *hlfv1alpha1.FabricMainChannel) ([]byte, error) {
options, endpoints := r.setupResmgmtOptions(fabricMainChannel)
channelBlock, err := r.queryConfigBlockFromOrdererWithRoundRobin(resClient, fabricMainChannel.Spec.Name, endpoints, options)
if err != nil {
log.Infof("Channel %s does not exist, creating it: %v", fabricMainChannel.Spec.Name, err)
return r.createNewChannel(fabricMainChannel)
return nil, errors.Wrapf(err, "failed to get block from channel %s", fabricMainChannel.Spec.Name)
}

log.Infof("Channel %s already exists", fabricMainChannel.Spec.Name)
return proto.Marshal(channelBlock)
}

Expand Down Expand Up @@ -557,8 +596,8 @@ func (r *FabricMainChannelReconciler) joinOrderers(ctx context.Context, fabricMa
return nil
}

func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, blockBytes []byte, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error {
ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions)
func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption, sdk *fabsdk.FabricSDK, clientSet *kubernetes.Clientset) error {
ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel)
if err != nil {
return err
}
Expand Down Expand Up @@ -641,8 +680,8 @@ func (r *FabricMainChannelReconciler) updateChannelConfig(ctx context.Context, f
return nil
}

func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client, resmgmtOptions []resmgmt.RequestOption) error {
ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel, resmgmtOptions)
func (r *FabricMainChannelReconciler) saveChannelConfig(ctx context.Context, fabricMainChannel *hlfv1alpha1.FabricMainChannel, resClient *resmgmt.Client) error {
ordererChannelBlock, err := r.fetchOrdererChannelBlock(resClient, fabricMainChannel)
if err != nil {
return err
}
Expand Down Expand Up @@ -1397,20 +1436,41 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx
}
for _, consenter := range ord.EtcdRaft.Consenters {
deleted := true
needsUpdate := false
var matchingNewConsenter orderer.Consenter

for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters {
if newConsenter.Address.Host == consenter.Address.Host && newConsenter.Address.Port == consenter.Address.Port {
deleted = false
matchingNewConsenter = newConsenter
// Check if TLS certs are different
if !bytes.Equal(newConsenter.ClientTLSCert.Raw, consenter.ClientTLSCert.Raw) ||
!bytes.Equal(newConsenter.ServerTLSCert.Raw, consenter.ServerTLSCert.Raw) {
needsUpdate = true
}
break
}
}

if deleted {
log.Infof("Removing consenter %s:%d", consenter.Address.Host, consenter.Address.Port)
err = currentConfigTX.Orderer().RemoveConsenter(consenter)
if err != nil {
return errors.Wrapf(err, "failed to remove consenter %s:%d", consenter.Address.Host, consenter.Address.Port)
}
} else if needsUpdate {
log.Infof("Updating certificates for consenter %s:%d", consenter.Address.Host, consenter.Address.Port)
err = currentConfigTX.Orderer().RemoveConsenter(consenter)
if err != nil {
return errors.Wrapf(err, "failed to remove consenter %s:%d for cert update", consenter.Address.Host, consenter.Address.Port)
}
err = currentConfigTX.Orderer().AddConsenter(matchingNewConsenter)
if err != nil {
return errors.Wrapf(err, "failed to add updated consenter %s:%d", consenter.Address.Host, consenter.Address.Port)
}
}
}

for _, newConsenter := range newConfigTx.Orderer.EtcdRaft.Consenters {
found := false
for _, consenter := range ord.EtcdRaft.Consenters {
Expand All @@ -1427,6 +1487,36 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx
}
}
}
err = currentConfigTX.Orderer().EtcdRaftOptions().SetElectionInterval(
newConfigTx.Orderer.EtcdRaft.Options.ElectionTick,
)
if err != nil {
return errors.Wrapf(err, "failed to set election interval")
}
err = currentConfigTX.Orderer().EtcdRaftOptions().SetHeartbeatTick(
newConfigTx.Orderer.EtcdRaft.Options.HeartbeatTick,
)
if err != nil {
return errors.Wrapf(err, "failed to set heartbeat tick")
}
err = currentConfigTX.Orderer().EtcdRaftOptions().SetTickInterval(
newConfigTx.Orderer.EtcdRaft.Options.TickInterval,
)
if err != nil {
return errors.Wrapf(err, "failed to set tick interval")
}
err = currentConfigTX.Orderer().EtcdRaftOptions().SetSnapshotIntervalSize(
newConfigTx.Orderer.EtcdRaft.Options.SnapshotIntervalSize,
)
if err != nil {
return errors.Wrapf(err, "failed to set snapshot interval size")
}
err = currentConfigTX.Orderer().EtcdRaftOptions().SetMaxInflightBlocks(
newConfigTx.Orderer.EtcdRaft.Options.MaxInflightBlocks,
)
if err != nil {
return errors.Wrapf(err, "failed to set max inflight blocks")
}
} else if newConfigTx.Orderer.OrdererType == orderer.ConsensusTypeBFT {
err = currentConfigTX.Orderer().SetConfiguration(newConfigTx.Orderer)
if err != nil {
Expand Down Expand Up @@ -1583,6 +1673,7 @@ func updateOrdererChannelConfigTx(currentConfigTX configtx.ConfigTx, newConfigTx
if err != nil {
return errors.Wrapf(err, "failed to set preferred max bytes")
}

err = currentConfigTX.Orderer().SetBatchTimeout(newConfigTx.Orderer.BatchTimeout)
if err != nil {
return errors.Wrapf(err, "failed to set batch timeout")
Expand Down
15 changes: 10 additions & 5 deletions kubectl-hlf/cmd/channel/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package channel
import (
"bytes"
"fmt"
"io"

"github.com/hyperledger/fabric-config/protolator"
"github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/resource"
"github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"
"github.com/hyperledger/fabric/common/tools/protolator"
"github.com/kfsoftware/hlf-operator/kubectl-hlf/cmd/helpers"
"github.com/spf13/cobra"
"io"
)

type inspectChannelCmd struct {
configPath string
peer string
channelName string
userName string
ordererName string
}

func (c *inspectChannelCmd) validate() error {
Expand Down Expand Up @@ -50,9 +52,11 @@ func (c *inspectChannelCmd) run(out io.Writer) error {
if err != nil {
return err
}
block, err := resClient.QueryConfigBlockFromOrderer(
c.channelName,
)
resmgmtOptions := []resmgmt.RequestOption{}
if c.ordererName != "" {
resmgmtOptions = append(resmgmtOptions, resmgmt.WithOrdererEndpoint(c.ordererName))
}
block, err := resClient.QueryConfigBlockFromOrderer(c.channelName, resmgmtOptions...)
if err != nil {
return err
}
Expand Down Expand Up @@ -87,6 +91,7 @@ func newInspectChannelCMD(out io.Writer, errOut io.Writer) *cobra.Command {
persistentFlags.StringVarP(&c.userName, "user", "u", "", "User name for the transaction")
persistentFlags.StringVarP(&c.channelName, "channel", "c", "", "Channel name")
persistentFlags.StringVarP(&c.configPath, "config", "", "", "Configuration file for the SDK")
persistentFlags.StringVarP(&c.ordererName, "orderer", "o", "", "Orderer endpoint to fetch config from (optional)")
cmd.MarkPersistentFlagRequired("channel")
cmd.MarkPersistentFlagRequired("user")
cmd.MarkPersistentFlagRequired("peer")
Expand Down

0 comments on commit 5c96ac0

Please sign in to comment.