Skip to content

Commit

Permalink
skip migration on new db (#59)
Browse files Browse the repository at this point in the history
* create when db file does not exists

* fix UpdateMetadata (#58)

* CreateModel

* fix spelling

* fix tests

* extract fs funcs

* manifest & model etag
  • Loading branch information
gertd authored Nov 17, 2023
1 parent e9ba84f commit a3873d6
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 20 deletions.
23 changes: 4 additions & 19 deletions pkg/bdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package bdb

import (
"net/http"
"os"
"path/filepath"
"time"

"github.com/aserto-dev/azm/cache"
"github.com/aserto-dev/azm/model"
cerr "github.com/aserto-dev/errors"
"github.com/aserto-dev/go-edge-ds/pkg/fs"

"github.com/pkg/errors"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -59,13 +59,9 @@ func (s *BoltDB) Open() error {
}

dbDir := filepath.Dir(s.config.DBPath)
exists, err := filePathExists(dbDir)
if err != nil {
return errors.Wrap(err, "failed to determine if store path/file exists")
}
if !exists {
if err = os.MkdirAll(dbDir, 0700); err != nil {
return errors.Wrapf(err, "failed to create directory '%s'", dbDir)
if !fs.DirExists(dbDir) {
if err := fs.EnsureDirPath(dbDir); err != nil {
return err
}
}

Expand All @@ -91,17 +87,6 @@ func (s *BoltDB) Close() {
}
}

// filePathExists, internal helper function to detect if the file path exists.
func filePathExists(path string) (bool, error) {
if _, err := os.Stat(path); err == nil {
return true, nil
} else if os.IsNotExist(err) {
return false, nil
} else {
return false, errors.Wrapf(err, "failed to stat file [%s]", path)
}
}

func (s *BoltDB) DB() *bolt.DB {
return s.db
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/bdb/migrate/mig/mig.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/aserto-dev/go-edge-ds/pkg/bdb"
"github.com/aserto-dev/go-edge-ds/pkg/fs"
"github.com/rs/zerolog"

"github.com/Masterminds/semver"
Expand Down Expand Up @@ -197,6 +198,13 @@ func Backup(db *bolt.DB, version *semver.Version) error {
}

func OpenDB(cfg *bdb.Config) (*bolt.DB, error) {
dbDir := filepath.Dir(cfg.DBPath)
if !fs.DirExists(dbDir) {
if err := fs.EnsureDirPath(dbDir); err != nil {
return nil, err
}
}

db, err := bolt.Open(cfg.DBPath, 0644, &bolt.Options{
Timeout: cfg.RequestTimeout,
})
Expand Down
66 changes: 66 additions & 0 deletions pkg/bdb/migrate/mig003/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package mig003
import (
"bytes"
"context"
"hash/fnv"
"os"
"path/filepath"
"strconv"

"github.com/aserto-dev/azm/migrate"
v3 "github.com/aserto-dev/azm/v3"
Expand Down Expand Up @@ -39,7 +41,10 @@ const (
)

var fnMap = []func(*zerolog.Logger, *bolt.DB, *bolt.DB) error{
mig.CreateBucket(bdb.SystemPath),

mig.CreateBucket(bdb.ManifestPath),
createModel(),
migrateModel(),

mig.DeleteBucket(bdb.ObjectTypesPath),
Expand Down Expand Up @@ -70,13 +75,62 @@ func Migrate(log *zerolog.Logger, roDB, rwDB *bolt.DB) error {
return nil
}

func createModel() func(*zerolog.Logger, *bolt.DB, *bolt.DB) error {
return func(log *zerolog.Logger, roDB *bolt.DB, rwDB *bolt.DB) error {
// skip when roDB is set.
if roDB != nil {
log.Debug().Msg("SKIP CreateModel")
return nil
}

log.Info().Str("version", Version).Msg("CreateModel")
ctx := context.Background()

manifestBuf := new(bytes.Buffer)

model, err := v3.Load(bytes.NewReader(manifestBuf.Bytes()))
if err != nil {
return err
}

h := fnv.New64a()
h.Reset()
_, _ = h.Write(manifestBuf.Bytes())

md := &dsm3.Metadata{
UpdatedAt: timestamppb.Now(),
Etag: strconv.FormatUint(h.Sum64(), 10),
}
if err := rwDB.Update(func(tx *bolt.Tx) error {
if err := ds.Manifest(md).Set(ctx, tx, manifestBuf); err != nil {
return errors.Errorf("failed to set manifest")
}

if err := ds.Manifest(md).SetModel(ctx, tx, model); err != nil {
return errors.Errorf("failed to set model")
}

return nil
}); err != nil {
return err
}

return nil
}
}

// migrateModel,
// 1) creates a manifest file from the metadata objects in the db
// 2) computes the in-memory model
// 2) persists the manifest file byte-stream in the store
// 3) persists the serialized model in the store.
func migrateModel() func(*zerolog.Logger, *bolt.DB, *bolt.DB) error {
return func(log *zerolog.Logger, roDB *bolt.DB, rwDB *bolt.DB) error {
// skip when roDB is nil.
if roDB == nil {
log.Debug().Msg("SKIP MigrateModel")
return nil
}

log.Info().Str("version", Version).Msg("MigrateModel")
ctx := context.Background()
Expand Down Expand Up @@ -178,6 +232,12 @@ func getMetadata(ctx context.Context, roDB *bolt.DB) (*migrate.Metadata, error)
// updateObjects, read values from read-only backup, write to new bucket.
func updateObjects(path bdb.Path) func(*zerolog.Logger, *bolt.DB, *bolt.DB) error {
return func(log *zerolog.Logger, roDB *bolt.DB, rwDB *bolt.DB) error {
// skip when roDB is nil.
if roDB == nil {
log.Debug().Msg("SKIP UpdateObjects")
return nil
}

log.Info().Str("version", Version).Msg("UpdateObjects")

if err := roDB.View(func(rtx *bolt.Tx) error {
Expand Down Expand Up @@ -228,6 +288,12 @@ func updateObjects(path bdb.Path) func(*zerolog.Logger, *bolt.DB, *bolt.DB) erro
// updateRelations, read values from read-only backup, write to new bucket.
func updateRelations(path bdb.Path, d ds.Direction) func(*zerolog.Logger, *bolt.DB, *bolt.DB) error {
return func(log *zerolog.Logger, roDB *bolt.DB, rwDB *bolt.DB) error {
// skip when roDB is nil.
if roDB == nil {
log.Debug().Msg("SKIP UpdateRelations")
return nil
}

log.Info().Str("version", Version).Msg("UpdateRelations")

if err := roDB.View(func(rtx *bolt.Tx) error {
Expand Down
39 changes: 39 additions & 0 deletions pkg/bdb/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aserto-dev/go-edge-ds/pkg/bdb/migrate/mig001"
"github.com/aserto-dev/go-edge-ds/pkg/bdb/migrate/mig002"
"github.com/aserto-dev/go-edge-ds/pkg/bdb/migrate/mig003"
"github.com/aserto-dev/go-edge-ds/pkg/fs"

"github.com/Masterminds/semver"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -38,10 +39,18 @@ var (
// higher returns false, error
// errors: returns false, error.
func CheckSchemaVersion(config *bdb.Config, logger *zerolog.Logger, reqVersion *semver.Version) (bool, error) {
if !fs.FileExists(config.DBPath) {
if err := create(config, logger, reqVersion); err != nil {
return false, err
}
return true, nil
}

boltdb, err := bdb.New(config, logger)
if err != nil {
return false, err
}

if err := boltdb.Open(); err != nil {
return false, err
}
Expand All @@ -51,6 +60,7 @@ func CheckSchemaVersion(config *bdb.Config, logger *zerolog.Logger, reqVersion *
if err != nil {
return false, err
}

logger.Info().Str("current", curVersion.String()).Msg("schema_version")

switch {
Expand Down Expand Up @@ -127,6 +137,35 @@ func getCurrent(config *bdb.Config, logger *zerolog.Logger) (*semver.Version, er
return mig.GetVersion(boltdb.DB())
}

func create(config *bdb.Config, log *zerolog.Logger, version *semver.Version) error {
rwDB, err := mig.OpenDB(config)
if err != nil {
return err
}
defer func() {
log.Debug().Str("db_path", rwDB.Path()).Msg("close-rw")
if err := rwDB.Close(); err != nil {
log.Error().Err(err).Msg("close rwDB")
}
rwDB = nil
}()

// create flow is signaled by roDB == nil.
if err := execute(log, nil, rwDB, version); err != nil {
return err
}

if err := mig.SetVersion(rwDB, version); err != nil {
return err
}

if err := rwDB.Sync(); err != nil {
return err
}

return nil
}

func migrate(config *bdb.Config, log *zerolog.Logger, curVersion, nextVersion *semver.Version) error {
rwDB, err := mig.OpenDB(config)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ds/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (m *manifest) Set(ctx context.Context, tx *bolt.Tx, buf *bytes.Buffer) erro
// SetModel, persists the model cache in the _manifest bucket
// _metadata/{name}/{version}/model.
func (m *manifest) SetModel(ctx context.Context, tx *bolt.Tx, mod *model.Model) error {
if mod.Metadata == nil {
mod.Metadata = &model.Metadata{}
}
mod.Metadata.ETag = m.Metadata.Etag
mod.Metadata.UpdatedAt = m.Metadata.UpdatedAt.AsTime()

if _, err := bdb.SetAny[model.Model](ctx, tx, bdb.ManifestPath, bdb.ModelKey, mod); err != nil {
return err
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/fs/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package fs

import (
"os"
)

func FileExists(path string) bool {
fsInfo, err := os.Stat(path)
if err == nil && !fsInfo.IsDir() {
return true
}
return false
}

func DirExists(path string) bool {
fsInfo, err := os.Stat(path)
if err == nil && fsInfo.IsDir() {
return true
}
return false
}

func EnsureDirPath(path string) error {
if !DirExists(path) {
return os.MkdirAll(path, 0700)
}
return nil
}
7 changes: 6 additions & 1 deletion tests/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ func TestMain(m *testing.M) {
ctx := context.Background()
logger := zerolog.New(io.Discard)

dbPath := path.Join(os.TempDir(), "edge-ds", "test-eds.db")
dirPath := os.TempDir()
if err := os.MkdirAll(dirPath, 0700); err != nil {
panic(err)
}

dbPath := path.Join(dirPath, "edge-ds", "test-eds.db")
os.Remove(dbPath)
fmt.Println(dbPath)

Expand Down

0 comments on commit a3873d6

Please sign in to comment.