Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 46 additions & 12 deletions cmd/omni/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/cmd/omni/pkg/app"
"github.com/siderolabs/omni/internal/backend/runtime/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/sqlite"
"github.com/siderolabs/omni/internal/pkg/auth/actor"
"github.com/siderolabs/omni/internal/pkg/config"
"github.com/siderolabs/omni/internal/version"
Expand Down Expand Up @@ -82,7 +83,18 @@ var rootCmd = &cobra.Command{

ctx = actor.MarkContextAsInternalActor(ctx)

state, err := omni.NewState(ctx, config, logger, prometheus.DefaultRegisterer)
secondaryStorageDB, err := sqlite.OpenDB(config.Storage.SQLite)
if err != nil {
return fmt.Errorf("failed to open secondary storage db: %w", err)
}

defer func() {
if closeErr := secondaryStorageDB.Close(); closeErr != nil {
logger.Error("failed to close secondary storage db", zap.Error(closeErr))
}
}()

state, err := omni.NewState(ctx, config, secondaryStorageDB, logger, prometheus.DefaultRegisterer)
if err != nil {
return err
}
Expand All @@ -101,7 +113,7 @@ var rootCmd = &cobra.Command{
logger.Warn("running debug build")
}

return app.Run(ctx, state, config, logger)
return app.Run(ctx, state, secondaryStorageDB, config, logger)
},
}

Expand Down Expand Up @@ -480,24 +492,44 @@ func defineAuthFlags() {
"Allow OIDC tokens without email_verified claim.")
}

//nolint:staticcheck // defineLogsFlags uses deprecated fields for backwards-compatibility
func defineLogsFlags() {
rootCmd.Flags().IntVar(
&cmdConfig.Logs.Machine.BufferInitialCapacity, "machine-log-buffer-capacity",
cmdConfig.Logs.Machine.BufferInitialCapacity, "initial buffer capacity for machine logs in bytes")
cmdConfig.Logs.Machine.BufferInitialCapacity, "initial buffer capacity for machine logs in bytes, no-op if the sqlite storage is enabled")
rootCmd.Flags().IntVar(&cmdConfig.Logs.Machine.BufferMaxCapacity, "machine-log-buffer-max-capacity",
cmdConfig.Logs.Machine.BufferMaxCapacity, "max buffer capacity for machine logs in bytes")
cmdConfig.Logs.Machine.BufferMaxCapacity, "max buffer capacity for machine logs in bytes, no-op if the sqlite storage is enabled")
rootCmd.Flags().IntVar(&cmdConfig.Logs.Machine.BufferSafetyGap, "machine-log-buffer-safe-gap",
cmdConfig.Logs.Machine.BufferSafetyGap, "safety gap for machine log buffer in bytes")
cmdConfig.Logs.Machine.BufferSafetyGap, "safety gap for machine log buffer in bytes, no-op if the sqlite storage is enabled")
rootCmd.Flags().IntVar(&cmdConfig.Logs.Machine.Storage.NumCompressedChunks, "machine-log-num-compressed-chunks",
cmdConfig.Logs.Machine.Storage.NumCompressedChunks, "number of compressed log chunks to keep")
cmdConfig.Logs.Machine.Storage.NumCompressedChunks, "number of compressed log chunks to keep, no-op if the sqlite storage is enabled")
rootCmd.Flags().BoolVar(&cmdConfig.Logs.Machine.Storage.Enabled, "machine-log-storage-enabled",
cmdConfig.Logs.Machine.Storage.Enabled, "enable machine log storage")
cmdConfig.Logs.Machine.Storage.Enabled, "enable machine log storage, no-op if the sqlite storage is enabled")
rootCmd.Flags().StringVar(&cmdConfig.Logs.Machine.Storage.Path, "machine-log-storage-path",
cmdConfig.Logs.Machine.Storage.Path, "path of the directory for storing machine logs")
cmdConfig.Logs.Machine.Storage.Path, "path of the directory for storing machine logs, no-op if the sqlite storage is enabled")
rootCmd.Flags().DurationVar(&cmdConfig.Logs.Machine.Storage.FlushPeriod, "machine-log-storage-flush-period",
cmdConfig.Logs.Machine.Storage.FlushPeriod, "period for flushing machine logs to disk")
cmdConfig.Logs.Machine.Storage.FlushPeriod, "period for flushing machine logs to disk, no-op if the sqlite storage is enabled")
rootCmd.Flags().Float64Var(&cmdConfig.Logs.Machine.Storage.FlushJitter, "machine-log-storage-flush-jitter",
cmdConfig.Logs.Machine.Storage.FlushJitter, "jitter for the machine log storage flush period")
cmdConfig.Logs.Machine.Storage.FlushJitter, "jitter for the machine log storage flush period, no-op if the sqlite storage is enabled")

rootCmd.Flags().MarkDeprecated("machine-log-buffer-capacity", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-buffer-max-capacity", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-buffer-safe-gap", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-num-compressed-chunks", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-storage-enabled", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-storage-path", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-storage-flush-period", "use --machine-log-sqlite-* flags instead") //nolint:errcheck
rootCmd.Flags().MarkDeprecated("machine-log-storage-flush-jitter", "use --machine-log-sqlite-* flags instead") //nolint:errcheck

rootCmd.Flags().BoolVar(&cmdConfig.Logs.Machine.SQLite.Enabled, "machine-log-sqlite-enabled",
cmdConfig.Logs.Machine.SQLite.Enabled,
fmt.Sprintf("enable machine log storage in sqlite database. when enabled, they will be written to --%s", sqliteStoragePathFlag))
rootCmd.Flags().DurationVar(&cmdConfig.Logs.Machine.SQLite.Timeout, "machine-log-sqlite-timeout",
cmdConfig.Logs.Machine.SQLite.Timeout, "sqlite timeout for machine logs")
rootCmd.Flags().DurationVar(&cmdConfig.Logs.Machine.SQLite.CleanupInterval, "machine-log-sqlite-cleanup-interval",
cmdConfig.Logs.Machine.SQLite.CleanupInterval, "interval between machine log cleanup runs")
rootCmd.Flags().DurationVar(&cmdConfig.Logs.Machine.SQLite.CleanupOlderThan, "machine-log-sqlite-cleanup-older-than",
cmdConfig.Logs.Machine.SQLite.CleanupOlderThan, "age threshold for machine log entries to be cleaned up")

rootCmd.Flags().StringSliceVar(
&cmdConfig.Logs.ResourceLogger.Types,
Expand Down Expand Up @@ -540,6 +572,8 @@ func defineLogsFlags() {
}
}

const sqliteStoragePathFlag = "sqlite-storage-path"

func defineStorageFlags() {
rootCmd.Flags().StringVar(
&cmdConfig.Storage.Default.Kind,
Expand Down Expand Up @@ -621,11 +655,11 @@ func defineStorageFlags() {
&cmdConfig.Storage.Secondary.Path, //nolint:staticcheck // backwards compatibility, remove when migration from boltdb to sqlite is done
"secondary-storage-path",
cmdConfig.Storage.Secondary.Path, //nolint:staticcheck // backwards compatibility, remove when migration from boltdb to sqlite is done
"path of the file for boltdb-backed secondary storage for frequently updated data (deprecated, see sqlite-storage-path).",
fmt.Sprintf("path of the file for boltdb-backed secondary storage for frequently updated data (deprecated, see --%s).", sqliteStoragePathFlag),
)
rootCmd.Flags().StringVar(
&cmdConfig.Storage.SQLite.Path,
"sqlite-storage-path",
sqliteStoragePathFlag,
cmdConfig.Storage.SQLite.Path,
"path of the file for sqlite-backed secondary storage for frequently updated data, machine and audit logs.",
)
Expand Down
4 changes: 3 additions & 1 deletion cmd/omni/pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package app

import (
"context"
"database/sql"
"fmt"

"github.com/cosi-project/runtime/pkg/resource"
Expand Down Expand Up @@ -53,7 +54,7 @@ func PrepareConfig(logger *zap.Logger, params ...*config.Params) (*config.Params
}

// Run the Omni service.
func Run(ctx context.Context, state *omni.State, config *config.Params, logger *zap.Logger) error {
func Run(ctx context.Context, state *omni.State, secondaryStorageDB *sql.DB, config *config.Params, logger *zap.Logger) error {
talosClientFactory := talos.NewClientFactory(state.Default(), logger)
talosRuntime := talos.New(talosClientFactory, logger)

Expand Down Expand Up @@ -122,6 +123,7 @@ func Run(ctx context.Context, state *omni.State, config *config.Params, logger *
machineMap := siderolink.NewMachineMap(siderolink.NewStateStorage(state.Default()))

logHandler, err := siderolink.NewLogHandler(
secondaryStorageDB,
machineMap,
state.Default(),
&config.Logs.Machine,
Expand Down
22 changes: 9 additions & 13 deletions internal/backend/grpc/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/siderolabs/omni/internal/pkg/config"
"github.com/siderolabs/omni/internal/pkg/ctxstore"
siderolinkinternal "github.com/siderolabs/omni/internal/pkg/siderolink"
"github.com/siderolabs/omni/internal/pkg/xcontext"
)

type talosRuntime interface {
Expand Down Expand Up @@ -238,9 +237,11 @@ func (s *managementServer) Omniconfig(ctx context.Context, _ *emptypb.Empty) (*m
}, nil
}

func (s *managementServer) MachineLogs(request *management.MachineLogsRequest, response grpc.ServerStreamingServer[common.Data]) error {
func (s *managementServer) MachineLogs(request *management.MachineLogsRequest, serv grpc.ServerStreamingServer[common.Data]) error {
ctx := serv.Context()

// getting machine logs is equivalent to reading machine resource
if _, err := auth.CheckGRPC(response.Context(), auth.WithRole(role.Reader)); err != nil {
if _, err := auth.CheckGRPC(ctx, auth.WithRole(role.Reader)); err != nil {
return err
}

Expand All @@ -254,25 +255,20 @@ func (s *managementServer) MachineLogs(request *management.MachineLogsRequest, r
tailLines = optional.Some(request.TailLines)
}

logReader, err := s.logHandler.GetReader(siderolinkinternal.MachineID(machineID), request.Follow, tailLines)
logReader, err := s.logHandler.GetReader(ctx, siderolinkinternal.MachineID(machineID), request.Follow, tailLines)
if err != nil {
return handleError(err)
}

closeRdr := sync.OnceValue(logReader.Close)
defer closeRdr() //nolint:errcheck

// if connection closed, stop reading
stop := xcontext.AfterFuncSync(response.Context(), func() { closeRdr() }) //nolint:errcheck
defer stop()
defer logReader.Close() //nolint:errcheck

for {
line, err := logReader.ReadLine()
line, err := logReader.ReadLine(ctx)
if err != nil {
return handleError(err)
}

if err := response.Send(&common.Data{
if err := serv.Send(&common.Data{
Bytes: line,
}); err != nil {
return err
Expand Down Expand Up @@ -914,7 +910,7 @@ func handleError(err error) error {
switch {
case errors.Is(err, io.EOF):
return nil
case siderolinkinternal.IsBufferNotFoundError(err):
case errors.Is(err, siderolinkinternal.ErrLogStoreNotFound):
return status.Error(codes.NotFound, err.Error())
}

Expand Down
20 changes: 11 additions & 9 deletions internal/backend/grpc/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ import (
)

func (s *managementServer) GetSupportBundle(req *management.GetSupportBundleRequest, serv grpc.ServerStreamingServer[management.GetSupportBundleResponse]) error {
if _, err := auth.CheckGRPC(serv.Context(), auth.WithRole(role.Operator)); err != nil {
ctx := serv.Context()

if _, err := auth.CheckGRPC(ctx, auth.WithRole(role.Operator)); err != nil {
return err
}

resources, err := s.collectClusterResources(serv.Context(), req.Cluster)
resources, err := s.collectClusterResources(ctx, req.Cluster)
if err != nil {
return err
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func (s *managementServer) GetSupportBundle(req *management.GetSupportBundleRequ
cols = collectors.WithSource(cols, "omni")
}

ctx := actor.MarkContextAsInternalActor(serv.Context())
ctx = actor.MarkContextAsInternalActor(ctx)

talosClient, err := s.getTalosClient(ctx, req.Cluster)
if err != nil {
Expand Down Expand Up @@ -116,7 +118,7 @@ func (s *managementServer) GetSupportBundle(req *management.GetSupportBundleRequ
bundle.WithLogOutput(io.Discard),
)

talosCollectors, err := collectors.GetForOptions(serv.Context(), options)
talosCollectors, err := collectors.GetForOptions(ctx, options)
if err != nil {
if err = serv.Send(&management.GetSupportBundleResponse{
Progress: &management.GetSupportBundleResponse_Progress{
Expand Down Expand Up @@ -157,7 +159,7 @@ func (s *managementServer) GetSupportBundle(req *management.GetSupportBundleRequ
eg.Go(func() error {
defer close(progress)

return support.CreateSupportBundle(serv.Context(), options, cols...)
return support.CreateSupportBundle(ctx, options, cols...)
})

if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -185,10 +187,10 @@ func (s *managementServer) writeResource(res resource.Resource) *collectors.Coll
func (s *managementServer) collectLogs(machineID string) *collectors.Collector {
filename := fmt.Sprintf("omni/machine-logs/%s.log", machineID)

return collectors.NewCollector(filename, func(context.Context, *bundle.Options) ([]byte, error) {
r, err := s.logHandler.GetReader(slink.MachineID(machineID), false, optional.None[int32]())
return collectors.NewCollector(filename, func(ctx context.Context, _ *bundle.Options) ([]byte, error) {
r, err := s.logHandler.GetReader(ctx, slink.MachineID(machineID), false, optional.None[int32]())
if err != nil {
if slink.IsBufferNotFoundError(err) {
if errors.Is(err, slink.ErrLogStoreNotFound) {
return []byte{}, nil
}

Expand All @@ -202,7 +204,7 @@ func (s *managementServer) collectLogs(machineID string) *collectors.Collector {
w := bufio.NewWriter(&b)

for {
l, err := r.ReadLine()
l, err := r.ReadLine(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
break
Expand Down
6 changes: 3 additions & 3 deletions internal/backend/runtime/omni/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package omni

import (
"context"
"database/sql"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *State) HandleErrors(ctx context.Context) error {
}

// NewState creates a production Omni state.
func NewState(ctx context.Context, params *config.Params, logger *zap.Logger, metricsRegistry prometheus.Registerer) (*State, error) {
func NewState(ctx context.Context, params *config.Params, secondaryStorageDB *sql.DB, logger *zap.Logger, metricsRegistry prometheus.Registerer) (*State, error) {
var (
defaultPersistentState *PersistentState
err error
Expand All @@ -126,8 +127,7 @@ func NewState(ctx context.Context, params *config.Params, logger *zap.Logger, me

virtualState := virtual.NewState(state.WrapCore(defaultPersistentState.State))

secondaryPersistentState, err := newSQLitePersistentState(
ctx, params.Storage.SQLite, logger)
secondaryPersistentState, err := newSQLitePersistentState(ctx, secondaryStorageDB, logger)
if err != nil {
return nil, fmt.Errorf("failed to create SQLite state for secondary storage: %w", err)
}
Expand Down
16 changes: 4 additions & 12 deletions internal/backend/runtime/omni/state_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package omni

import (
"context"
"database/sql"
"fmt"
"os"

Expand All @@ -20,32 +21,23 @@ import (

"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
omnisqlite "github.com/siderolabs/omni/internal/backend/runtime/omni/sqlite"
"github.com/siderolabs/omni/internal/pkg/config"
)

func newSQLitePersistentState(ctx context.Context, config config.SQLite, logger *zap.Logger) (*PersistentState, error) {
db, err := omnisqlite.OpenDB(config)
if err != nil {
return nil, err
}

func newSQLitePersistentState(ctx context.Context, db *sql.DB, logger *zap.Logger) (*PersistentState, error) {
st, err := sqlite.NewState(ctx, db, store.ProtobufMarshaler{},
sqlite.WithLogger(logger),
sqlite.WithTablePrefix("metrics_"),
)
if err != nil {
db.Close() //nolint:errcheck

return nil, fmt.Errorf("failed to create sqlite state (path %q): %w", config.Path, err)
return nil, fmt.Errorf("failed to create sqlite state: %w", err)
}

return &PersistentState{
State: st,
Close: func() error {
st.Close()

return db.Close()
return nil
},
}, nil
}
Expand Down
25 changes: 23 additions & 2 deletions internal/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ package integration_test

import (
"context"
"database/sql"
"errors"
"flag"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

Expand All @@ -40,6 +42,7 @@ import (
_ "github.com/siderolabs/omni/cmd/acompat" // this package should always be imported first for init->set env to work
"github.com/siderolabs/omni/cmd/omni/pkg/app"
"github.com/siderolabs/omni/internal/backend/runtime/omni"
"github.com/siderolabs/omni/internal/backend/runtime/omni/sqlite"
"github.com/siderolabs/omni/internal/pkg/auth/actor"
"github.com/siderolabs/omni/internal/pkg/clientconfig"
"github.com/siderolabs/omni/internal/pkg/config"
Expand Down Expand Up @@ -473,7 +476,9 @@ func runOmni(t *testing.T) (string, error) {

omniCtx := actor.MarkContextAsInternalActor(t.Context())

state, err := omni.NewState(omniCtx, config, logger, prometheus.DefaultRegisterer)
secondaryStorageDB := testDB(t)

state, err := omni.NewState(omniCtx, config, secondaryStorageDB, logger, prometheus.DefaultRegisterer)
if err != nil {
return "", err
}
Expand All @@ -485,7 +490,7 @@ func runOmni(t *testing.T) (string, error) {
eg.Go(func() error {
defer cancel()

return app.Run(omniCtx, state, config, logger)
return app.Run(omniCtx, state, secondaryStorageDB, config, logger)
})

t.Log("waiting for Omni to start")
Expand All @@ -502,3 +507,19 @@ func runOmni(t *testing.T) (string, error) {

return sa, nil
}

func testDB(t *testing.T) *sql.DB {
t.Helper()

conf := config.Default().Storage.SQLite
conf.Path = filepath.Join(t.TempDir(), "integration-test.db")

db, err := sqlite.OpenDB(conf)
require.NoError(t, err)

t.Cleanup(func() {
require.NoError(t, db.Close())
})

return db
}
Loading
Loading