Skip to content

Commit

Permalink
Add a series of docs about operation (#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanahmily authored Sep 2, 2024
1 parent c2a39bb commit bc003cb
Show file tree
Hide file tree
Showing 31 changed files with 581 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Release Notes.
- Add web-ui interacting guide.
- Add bydbctl interacting guide.
- Add cluster management guide.
- Add operation related documents: configuration, troubleshooting, system, upgrade, and observability.

### Chores

Expand Down
12 changes: 7 additions & 5 deletions banyand/measure/merger_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,29 @@ package measure
import (
"math"
"sort"

"github.com/apache/skywalking-banyandb/pkg/run"
)

// MergePolicy aims to choose an optimal combination
// that has the lowest write amplification.
type mergePolicy struct {
maxParts int
minMergeMultiplier float64
maxFanOutSize uint64
maxFanOutSize run.Bytes
}

// NewDefaultMergePolicy create a MergePolicy with default parameters.
func newDefaultMergePolicy() *mergePolicy {
return newMergePolicy(8, 1.7, math.MaxUint64)
return newMergePolicy(8, 1.7, run.Bytes(math.MaxInt64))
}

func newDefaultMergePolicyForTesting() *mergePolicy {
return newMergePolicy(4, 1.7, math.MaxUint64)
return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
}

// NewMergePolicy creates a MergePolicy with given parameters.
func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize uint64) *mergePolicy {
func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize run.Bytes) *mergePolicy {
return &mergePolicy{
maxParts: maxParts,
minMergeMultiplier: minMergeMul,
Expand All @@ -53,7 +55,7 @@ func (l *mergePolicy) getPartsToMerge(dst, src []*partWrapper, freeDiskSize uint
return dst
}

maxFanOut := min(freeDiskSize, l.maxFanOutSize)
maxFanOut := min(freeDiskSize, uint64(l.maxFanOutSize))
// Filter out too big parts.
// This should reduce N for O(N^2) algorithm below.
maxInPartBytes := uint64(float64(maxFanOut) / l.minMergeMultiplier)
Expand Down
3 changes: 1 addition & 2 deletions banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package measure

import (
"context"
"math"
"path"

"github.com/pkg/errors"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of database")
flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", defaultFlushTimeout, "the memory data timeout of measure")
s.option.mergePolicy = newDefaultMergePolicy()
flagS.Uint64Var(&s.option.mergePolicy.maxFanOutSize, "max-fan-out-size", math.MaxUint64, "the upper bound of a single file size after merge")
flagS.VarP(&s.option.mergePolicy.maxFanOutSize, "measure-max-fan-out-size", "", "the upper bound of a single file size after merge of measure")
return flagS
}

Expand Down
2 changes: 1 addition & 1 deletion banyand/observability/metrics_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/meter"
)

var log = logger.GetLogger("observability", "metrics", "system")
var log = logger.GetLogger("metrics")

var (
cpuCount = 0
Expand Down
2 changes: 1 addition & 1 deletion banyand/queue/pub/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (*pub) Name() string {
}

func (p *pub) PreRun(context.Context) error {
p.log = logger.GetLogger("server-queue")
p.log = logger.GetLogger("server-queue-pub")
p.metadata.RegisterHandler("queue-client", schema.KindNode, p)
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions banyand/queue/sub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func NewServer(omr observability.MetricsRegistry) queue.Server {
}

func (s *server) PreRun(_ context.Context) error {
s.log = logger.GetLogger("server-queue")
s.log = logger.GetLogger("server-queue-sub")
s.metrics = newMetrics(s.omr.With(queueSubScope))
return nil
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (s *server) Validate() error {
if s.addr == ":" {
return errNoAddr
}
s.healthAddr = net.JoinHostPort("127.0.0.1", strconv.FormatUint(uint64(s.healthPort), 10))
s.healthAddr = net.JoinHostPort(s.host, strconv.FormatUint(uint64(s.healthPort), 10))
if s.healthAddr == ":" {
return errNoAddr
}
Expand Down Expand Up @@ -194,10 +194,10 @@ func (s *server) Serve() run.StopNotify {
}
gwMux := runtime.NewServeMux(runtime.WithHealthzEndpoint(client))
mux := chi.NewRouter()
mux.Mount("/api", gwMux)
mux.Mount("/api", http.StripPrefix("/api", gwMux))
s.healthSrv = &http.Server{
Addr: s.healthAddr,
Handler: chi.NewRouter(),
Handler: mux,
ReadHeaderTimeout: 3 * time.Second,
}
var wg sync.WaitGroup
Expand Down
12 changes: 7 additions & 5 deletions banyand/stream/merger_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,33 @@ package stream
import (
"math"
"sort"

"github.com/apache/skywalking-banyandb/pkg/run"
)

// MergePolicy aims to choose an optimal combination
// that has the lowest write amplification.
type mergePolicy struct {
maxParts int
minMergeMultiplier float64
maxFanOutSize uint64
maxFanOutSize run.Bytes
}

// NewDefaultMergePolicy create a MergePolicy with default parameters.
func newDefaultMergePolicy() *mergePolicy {
return newMergePolicy(15, 1.7, math.MaxUint64)
return newMergePolicy(15, 1.7, run.Bytes(math.MaxInt64))
}

func newDefaultMergePolicyForTesting() *mergePolicy {
return newMergePolicy(4, 1.7, math.MaxUint64)
return newMergePolicy(4, 1.7, run.Bytes(math.MaxInt64))
}

func newDisabledMergePolicyForTesting() *mergePolicy {
return newMergePolicy(0, 0, 0)
}

// NewMergePolicy creates a MergePolicy with given parameters.
func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize uint64) *mergePolicy {
func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize run.Bytes) *mergePolicy {
return &mergePolicy{
maxParts: maxParts,
minMergeMultiplier: minMergeMul,
Expand All @@ -57,7 +59,7 @@ func (l *mergePolicy) getPartsToMerge(dst, src []*partWrapper, freeDiskSize uint
return dst
}

maxFanOut := min(freeDiskSize, l.maxFanOutSize)
maxFanOut := min(freeDiskSize, uint64(l.maxFanOutSize))
// Filter out too big parts.
// This should reduce N for O(N^2) algorithm below.
maxInPartBytes := uint64(float64(maxFanOut) / l.minMergeMultiplier)
Expand Down
3 changes: 1 addition & 2 deletions banyand/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package stream

import (
"context"
"math"
"path"

"github.com/pkg/errors"
Expand Down Expand Up @@ -82,7 +81,7 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", defaultFlushTimeout, "the memory data timeout of stream")
flagS.DurationVar(&s.option.elementIndexFlushTimeout, "element-index-flush-timeout", defaultFlushTimeout, "the elementIndex timeout of stream")
s.option.mergePolicy = newDefaultMergePolicy()
flagS.Uint64Var(&s.option.mergePolicy.maxFanOutSize, "max-fan-out-size", math.MaxUint64, "the upper bound of a single file size after merge")
flagS.VarP(&s.option.mergePolicy.maxFanOutSize, "stream-max-fan-out-size", "", "the upper bound of a single file size after merge of stream")
return flagS
}

Expand Down
10 changes: 5 additions & 5 deletions bydbctl/internal/cmd/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func newGroupCmd() *cobra.Command {
fmt.Printf("group %s is created", reqBody.name)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func newGroupCmd() *cobra.Command {
fmt.Printf("group %s is updated", reqBody.name)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}
bindFileFlag(createCmd, updateCmd)
Expand All @@ -102,7 +102,7 @@ func newGroupCmd() *cobra.Command {
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request) (*resty.Response, error) {
return request.req.SetPathParam("group", request.group).Get(getPath("/api/v1/group/schema/{group}"))
}, yamlPrinter, enableTLS, insecure, grpcCert)
}, yamlPrinter, enableTLS, insecure, cert)
},
}

Expand All @@ -118,7 +118,7 @@ func newGroupCmd() *cobra.Command {
fmt.Printf("group %s is deleted", reqBody.group)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}

Expand All @@ -129,7 +129,7 @@ func newGroupCmd() *cobra.Command {
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(nil, func(request request) (*resty.Response, error) {
return request.req.Get(getPath("/api/v1/group/schema/lists"))
}, yamlPrinter, enableTLS, insecure, grpcCert)
}, yamlPrinter, enableTLS, insecure, cert)
},
}

Expand Down
6 changes: 3 additions & 3 deletions bydbctl/internal/cmd/health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ func newHealthCheckCmd() *cobra.Command {
if grpcAddr == "" {
return rest(nil, func(request request) (*resty.Response, error) {
return request.req.Get(getPath("/api/healthz"))
}, yamlPrinter, enableTLS, insecure, grpcCert)
}, yamlPrinter, enableTLS, insecure, cert)
}
if enableTLS {
config := &tls.Config{
// #nosec G402
InsecureSkipVerify: insecure,
}
if grpcCert != "" {
cert, errRead := os.ReadFile(grpcCert)
if cert != "" {
cert, errRead := os.ReadFile(cert)
if errRead != nil {
return errRead
}
Expand Down
54 changes: 51 additions & 3 deletions bydbctl/internal/cmd/health_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/setup"
)

var _ = Describe("health check after launching banyandb server", func() {
var _ = Describe("health check after launching banyandb server with gRPC and HTTP TLS", func() {
var deferFunc func()
var grpcAddr, httpAddr, certFile string
var rootCmd *cobra.Command
Expand All @@ -47,16 +47,31 @@ var _ = Describe("health check after launching banyandb server", func() {
})

It("http health check should pass", func() {
rootCmd.SetArgs([]string{"health", "--addr", "http://" + httpAddr})
rootCmd.SetArgs([]string{"health", "--addr", "https://" + httpAddr, "--cert", certFile, "--enable-tls", "true"})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
Expect(out).To(ContainSubstring("SERVING"))
})

It("http health check should pass with insecure flag set", func() {
rootCmd.SetArgs([]string{"health", "--addr", "https://" + httpAddr, "--insecure", "true", "--enable-tls", "true"})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
Expect(out).To(ContainSubstring("SERVING"))
})

It("http health check should fail without the proper cert", func() {
rootCmd.SetArgs([]string{"health", "--addr", "https://" + httpAddr, "--enable-tls", "true"})
err := rootCmd.Execute()
Expect(err).To(HaveOccurred())
})

It("grpc health check should pass", func() {
rootCmd.SetArgs([]string{"health", "--grpc-addr", grpcAddr, "--grpc-cert", certFile, "--enable-tls", "true"})
rootCmd.SetArgs([]string{"health", "--grpc-addr", grpcAddr, "--cert", certFile, "--enable-tls", "true"})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -84,6 +99,39 @@ var _ = Describe("health check after launching banyandb server", func() {
})
})

var _ = Describe("health check after launching banyandb server", func() {
var deferFunc func()
var grpcAddr, httpAddr string
var rootCmd *cobra.Command
BeforeEach(func() {
grpcAddr, httpAddr, deferFunc = setup.Standalone()
rootCmd = &cobra.Command{Use: "root"}
cmd.RootCmdFlags(rootCmd)
})

It("http health check should pass", func() {
rootCmd.SetArgs([]string{"health", "--addr", "http://" + httpAddr})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
Expect(out).To(ContainSubstring("SERVING"))
})

It("grpc health check should pass", func() {
rootCmd.SetArgs([]string{"health", "--grpc-addr", grpcAddr})
out := capturer.CaptureStdout(func() {
err := rootCmd.Execute()
Expect(err).NotTo(HaveOccurred())
})
Expect(out).To(ContainSubstring("connected"))
})

AfterEach(func() {
deferFunc()
})
})

var _ = Describe("health check without launching banyandb server", func() {
var rootCmd *cobra.Command
BeforeEach(func() {
Expand Down
10 changes: 5 additions & 5 deletions bydbctl/internal/cmd/index_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newIndexRuleCmd() *cobra.Command {
fmt.Printf("indexRule %s.%s is created", reqBody.group, reqBody.name)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func newIndexRuleCmd() *cobra.Command {
fmt.Printf("indexRule %s.%s is updated", reqBody.group, reqBody.name)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}

Expand All @@ -106,7 +106,7 @@ func newIndexRuleCmd() *cobra.Command {
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request) (*resty.Response, error) {
return request.req.SetPathParam("name", request.name).SetPathParam("group", request.group).Get(getPath(indexRuleSchemaPathWithParams))
}, yamlPrinter, enableTLS, insecure, grpcCert)
}, yamlPrinter, enableTLS, insecure, cert)
},
}

Expand All @@ -121,7 +121,7 @@ func newIndexRuleCmd() *cobra.Command {
fmt.Printf("indexRule %s.%s is deleted", reqBody.group, reqBody.name)
fmt.Println()
return nil
}, enableTLS, insecure, grpcCert)
}, enableTLS, insecure, cert)
},
}
bindNameFlag(getCmd, deleteCmd)
Expand All @@ -133,7 +133,7 @@ func newIndexRuleCmd() *cobra.Command {
RunE: func(_ *cobra.Command, _ []string) (err error) {
return rest(parseFromFlags, func(request request) (*resty.Response, error) {
return request.req.SetPathParam("group", request.group).Get(getPath("/api/v1/index-rule/schema/lists/{group}"))
}, yamlPrinter, enableTLS, insecure, grpcCert)
}, yamlPrinter, enableTLS, insecure, cert)
},
}

Expand Down
Loading

0 comments on commit bc003cb

Please sign in to comment.