Skip to content

Commit

Permalink
reimplement cluster connect cmd (GreptimeTeam#180)
Browse files Browse the repository at this point in the history
Signed-off-by: sh2 <[email protected]>
  • Loading branch information
shawnh2 authored Nov 3, 2023
1 parent 412ad43 commit 0c4cb58
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 357 deletions.
59 changes: 19 additions & 40 deletions cmd/gtctl/cluster_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import (
"context"
"fmt"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"github.com/GreptimeTeam/gtctl/pkg/connector"
"github.com/GreptimeTeam/gtctl/pkg/deployer/k8s"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/cluster/kubernetes"
"github.com/GreptimeTeam/gtctl/pkg/logger"
)

Expand All @@ -34,11 +31,6 @@ type clusterConnectCliOptions struct {
}

func NewConnectCommand(l logger.Logger) *cobra.Command {
const (
connectionProtocolMySQL = "mysql"
connectionProtocolPostgres = "pg"
)

var options clusterConnectCliOptions

cmd := &cobra.Command{
Expand All @@ -50,50 +42,37 @@ func NewConnectCommand(l logger.Logger) *cobra.Command {
return fmt.Errorf("cluster name should be set")
}

k8sDeployer, err := k8s.NewDeployer(l)
if err != nil {
return err
}

var (
ctx = context.TODO()
clusterName = args[0]
namespace = options.Namespace
protocol opt.ConnectProtocol
)

name := types.NamespacedName{
Namespace: options.Namespace,
Name: clusterName,
}.String()
cluster, err := k8sDeployer.GetGreptimeDBCluster(ctx, name, nil)
if err != nil && errors.IsNotFound(err) {
l.Errorf("cluster %s in %s not found\n", clusterName, namespace)
return nil
}

rawCluster, ok := cluster.Raw.(*greptimedbclusterv1alpha1.GreptimeDBCluster)
if !ok {
return fmt.Errorf("invalid cluster type")
cluster, err := kubernetes.NewCluster(l)
if err != nil {
return err
}

switch options.Protocol {
case connectionProtocolMySQL:
if err = connector.MySQLConnectCommand(rawCluster, l); err != nil {
return fmt.Errorf("error connecting to mysql: %v", err)
}
case connectionProtocolPostgres:
if err = connector.PostgresSQLConnectCommand(rawCluster, l); err != nil {
return fmt.Errorf("error connecting to postgres: %v", err)
}
case "mysql":
protocol = opt.MySQL
case "pg", "psql", "postgres":
protocol = opt.Postgres
default:
return fmt.Errorf("database type not supported: %s", options.Protocol)
return fmt.Errorf("unsupported connection protocol: %s", options.Protocol)
}
return nil
connectOptions := &opt.ConnectOptions{
Namespace: options.Namespace,
Name: clusterName,
Protocol: protocol,
}

return cluster.Connect(ctx, connectOptions)
},
}

cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "default", "Namespace of GreptimeDB cluster.")
cmd.Flags().StringVarP(&options.Protocol, "protocol", "p", "mysql", "Specify a database, like mysql or pg.")
cmd.Flags().StringVarP(&options.Protocol, "protocol", "p", "mysql", "Specify a database protocol, like mysql or pg.")

return cmd
}
3 changes: 2 additions & 1 deletion cmd/gtctl/cluster_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func NewCluster(args []string, options *clusterCreateCliOptions, l logger.Logger
UseGreptimeCNArtifacts: options.UseGreptimeCNArtifacts,
ValuesFile: options.GreptimeDBClusterValuesFile,
},
Spinner: spinner,
}

var cluster opt.Operations
Expand Down Expand Up @@ -211,7 +212,7 @@ func NewCluster(args []string, options *clusterCreateCliOptions, l logger.Logger
}
}

if err = cluster.Create(ctx, createOptions, spinner); err != nil {
if err = cluster.Create(ctx, createOptions); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/artifacts/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
// GreptimeChartReleaseDownloadURL is the URL of the Greptime charts that stored in the GitHub release.
GreptimeChartReleaseDownloadURL = "https://github.com/GreptimeTeam/helm-charts/releases/download"

// Greptime release bucket public endpoint in CN region.
// GreptimeReleaseBucketCN releases bucket public endpoint in CN region.
GreptimeReleaseBucketCN = "https://downloads.greptime.cn/releases"

// GreptimeCNCharts is the URL of the Greptime charts that stored in the S3 bucket of the CN region.
Expand Down
19 changes: 14 additions & 5 deletions pkg/cluster/baremetal/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,27 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/artifacts"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/logger"
"github.com/GreptimeTeam/gtctl/pkg/status"
fileutils "github.com/GreptimeTeam/gtctl/pkg/utils/file"
)

func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions, spinner *status.Spinner) error {
func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions) error {
spinner := options.Spinner

withSpinner := func(target string, f func(context.Context, *opt.CreateOptions) error) error {
spinner.Start(fmt.Sprintf("Installing %s...", target))
if spinner != nil {
spinner.Start(fmt.Sprintf("Installing %s...", target))
}

if err := f(ctx, options); err != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
if spinner != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
}
return err
}
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))

if spinner != nil {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
}
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/cluster/baremetal/not_implemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ func (c *Cluster) List(ctx context.Context, options *opt.ListOptions) error {
func (c *Cluster) Scale(ctx context.Context, options *opt.ScaleOptions) error {
return fmt.Errorf("do not support")
}

func (c *Cluster) Connect(ctx context.Context, options *opt.ConnectOptions) error {
return fmt.Errorf("do not support")
}
61 changes: 61 additions & 0 deletions pkg/cluster/kubernetes/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kubernetes

import (
"context"
"fmt"
"strconv"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"

opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/connector"
)

func (c *Cluster) Connect(ctx context.Context, options *opt.ConnectOptions) error {
cluster, err := c.get(ctx, &opt.GetOptions{
Namespace: options.Namespace,
Name: options.Name,
})
if err != nil && errors.IsNotFound(err) {
c.logger.V(0).Infof("cluster %s in %s not found", options.Name, options.Namespace)
return nil
}

switch options.Protocol {
case opt.MySQL:
if err = c.connectMySQL(cluster); err != nil {
return fmt.Errorf("error connecting to mysql: %v", err)
}
case opt.Postgres:
if err = c.connectPostgres(cluster); err != nil {
return fmt.Errorf("error connecting to postgres: %v", err)
}
default:
return fmt.Errorf("unsupported connect protocol type")
}

return nil
}

func (c *Cluster) connectMySQL(cluster *greptimedbclusterv1alpha1.GreptimeDBCluster) error {
return connector.Mysql(strconv.Itoa(int(cluster.Spec.MySQLServicePort)), cluster.Name, c.logger)
}

func (c *Cluster) connectPostgres(cluster *greptimedbclusterv1alpha1.GreptimeDBCluster) error {
return connector.PostgresSQL(strconv.Itoa(int(cluster.Spec.PostgresServicePort)), cluster.Name, c.logger)
}
17 changes: 12 additions & 5 deletions pkg/cluster/kubernetes/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/GreptimeTeam/gtctl/pkg/artifacts"
opt "github.com/GreptimeTeam/gtctl/pkg/cluster"
"github.com/GreptimeTeam/gtctl/pkg/helm"
"github.com/GreptimeTeam/gtctl/pkg/status"
)

const (
Expand All @@ -30,17 +29,25 @@ const (
disableRBACConfig = "auth.rbac.create=false,auth.rbac.token.enabled=false,"
)

func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions, spinner *status.Spinner) error {
func (c *Cluster) Create(ctx context.Context, options *opt.CreateOptions) error {
spinner := options.Spinner

withSpinner := func(target string, f func(context.Context, *opt.CreateOptions) error) error {
if !c.dryRun {
if !c.dryRun && spinner != nil {
spinner.Start(fmt.Sprintf("Installing %s...", target))
}

if err := f(ctx, options); err != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
if spinner != nil {
spinner.Stop(false, fmt.Sprintf("Installing %s failed", target))
}
return err
}

if !c.dryRun {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
if spinner != nil {
spinner.Stop(true, fmt.Sprintf("Installing %s successfully 🎉", target))
}
}
return nil
}
Expand Down
20 changes: 19 additions & 1 deletion pkg/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ type Operations interface {
Scale(ctx context.Context, options *ScaleOptions) error

// Create creates a new cluster.
Create(ctx context.Context, options *CreateOptions, spinner *status.Spinner) error
Create(ctx context.Context, options *CreateOptions) error

// Delete deletes a specific cluster.
Delete(ctx context.Context, options *DeleteOptions) error

// Connect connects to a specific cluster.
Connect(ctx context.Context, options *ConnectOptions) error
}

type GetOptions struct {
Expand Down Expand Up @@ -74,6 +77,8 @@ type CreateOptions struct {
Cluster *CreateClusterOptions
Operator *CreateOperatorOptions
Etcd *CreateEtcdOptions

Spinner *status.Spinner
}

// CreateClusterOptions is the options to create a GreptimeDB cluster.
Expand Down Expand Up @@ -114,3 +119,16 @@ type CreateEtcdOptions struct {
EtcdStorageSize string `helm:"persistence.size"`
ConfigValues string `helm:"*"`
}

type ConnectProtocol int

const (
MySQL ConnectProtocol = iota
Postgres
)

type ConnectOptions struct {
Namespace string
Name string
Protocol ConnectProtocol
}
13 changes: 3 additions & 10 deletions pkg/connector/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ import (
"net"
"os"
"os/exec"
"strconv"
"sync"
"syscall"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/go-sql-driver/mysql"

"github.com/GreptimeTeam/gtctl/pkg/logger"
Expand All @@ -43,16 +41,11 @@ const (
portForward = "port-forward"
)

// MySQLConnectCommand connects to a GreptimeDB cluster
func MySQLConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return mysqlConnect(strconv.Itoa(int(rawCluster.Spec.MySQLServicePort)), rawCluster.Name, l)
}

// mysqlConnect connects to a GreptimeDB cluster
func mysqlConnect(port, clusterName string, l logger.Logger) error {
// Mysql connects to a GreptimeDB cluster using mysql protocol.
func Mysql(port, clusterName string, l logger.Logger) error {
waitGroup := sync.WaitGroup{}

// TODO(sh2): is there any elegant way to enable port-forward?
// TODO: is there any elegant way to enable port-forward?
cmd := exec.CommandContext(context.Background(), kubectl, portForward, "-n", "default", "svc/"+clusterName+"-frontend", fmt.Sprintf("%s:%s", port, port))
if err := cmd.Start(); err != nil {
l.Errorf("Error starting port-forwarding: %v", err)
Expand Down
11 changes: 3 additions & 8 deletions pkg/connector/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"net"
"os"
"os/exec"
"strconv"
"sync"
"syscall"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/go-pg/pg/v10"

"github.com/GreptimeTeam/gtctl/pkg/logger"
Expand All @@ -41,14 +39,11 @@ const (
postgresSQLDatabaseArg = "-d"
)

func PostgresSQLConnectCommand(rawCluster *greptimedbclusterv1alpha1.GreptimeDBCluster, l logger.Logger) error {
return postgresSQLConnect(strconv.Itoa(int(rawCluster.Spec.PostgresServicePort)), rawCluster.Name, l)
}

func postgresSQLConnect(port, clusterName string, l logger.Logger) error {
// PostgresSQL connects to a GreptimeDB cluster using postgres protocol.
func PostgresSQL(port, clusterName string, l logger.Logger) error {
waitGroup := sync.WaitGroup{}

// TODO(sh2): is there any elegant way to enable port-forward?
// TODO: is there any elegant way to enable port-forward?
cmd := exec.CommandContext(context.Background(), kubectl, portForward, "-n", "default", "svc/"+clusterName+"-frontend", fmt.Sprintf("%s:%s", port, port))
if err := cmd.Start(); err != nil {
l.Errorf("Error starting port-forwarding: %v", err)
Expand Down
Loading

0 comments on commit 0c4cb58

Please sign in to comment.