Skip to content

[sql-32] accounts: add migration code from kvdb to SQL #1047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ itest/itest.test
itest/.logs
itest/*.log

# Failed rapid test runs
accounts/testdata/rapid/*

vendor
*.idea
*.run
Expand Down
272 changes: 272 additions & 0 deletions accounts/sql_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package accounts

import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"reflect"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/lightning-terminal/db/sqlc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/pmezard/go-difflib/difflib"
)

var (
// ErrMigrationMismatch is returned when the migrated account does not
// match the original account.
ErrMigrationMismatch = fmt.Errorf("migrated account does not match " +
"original account")
)

// MigrateAccountStoreToSQL runs the migration of all accounts and indices from
// the KV database to the SQL database. The migration is done in a single
// transaction to ensure that all accounts are migrated or none at all.
func MigrateAccountStoreToSQL(ctx context.Context, kvStore *BoltStore,
tx SQLQueries) error {

log.Infof("Starting migration of the KV accounts store to SQL")

err := migrateAccountsToSQL(ctx, kvStore, tx)
if err != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap with formatted string so we can tell if error is coming from this migration or the next one

}

err = migrateAccountsIndicesToSQL(ctx, kvStore, tx)
if err != nil {
return err
}

return nil
}

// migrateAccountsToSQL runs the migration of all accounts from the KV database
// to the SQL database. The migration is done in a single transaction to ensure
// that all accounts are migrated or none at all.
func migrateAccountsToSQL(ctx context.Context, kvStore *BoltStore,
tx SQLQueries) error {

log.Infof("Starting migration of accounts from KV to SQL")

kvAccounts, err := kvStore.Accounts(ctx)
if err != nil {
return err
}

total := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is total ever not just len(kvAccounts)?


for i, kvAccount := range kvAccounts {
total++

migratedAccountID, err := migrateSingleAccountToSQL(
ctx, tx, kvAccounts[i],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but there is mixed use of kvAccount and kvAcounts[i] here. would prefer if we could just stick to 1 just for consistency.

)
if err != nil {
return fmt.Errorf("unable to migrate account(%v): %w",
kvAccount.ID, err)
}

migratedAccount, err := getAndMarshalAccount(
ctx, tx, migratedAccountID,
)
if err != nil {
return fmt.Errorf("unable to fetch migrated "+
"account(%v): %w", kvAccount.ID, err)
}

overrideAccountTimeZone(kvAccount)
overrideAccountTimeZone(migratedAccount)

if !reflect.DeepEqual(kvAccount, migratedAccount) {
diff := difflib.UnifiedDiff{
A: difflib.SplitLines(
spew.Sdump(kvAccount),
),
B: difflib.SplitLines(
spew.Sdump(migratedAccount),
),
FromFile: "Expected",
FromDate: "",
ToFile: "Actual",
ToDate: "",
Context: 3,
}
diffText, _ := difflib.GetUnifiedDiffString(diff)

return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
kvAccount.ID, diffText)
}
}

log.Infof("All accounts migrated from KV to SQL. Total number of "+
"accounts migrated: %d", total)

return nil
}

// migrateSingleAccountToSQL runs the migration for a single account from the
// KV database to the SQL database.
func migrateSingleAccountToSQL(ctx context.Context,
tx SQLQueries, account *OffChainBalanceAccount) (int64, error) {

insertAccountParams, err := makeInsertAccountParams(account)
if err != nil {
return 0, err
}

sqlId, err := tx.InsertAccount(ctx, insertAccountParams)
if err != nil {
return 0, err
}

for hash := range account.Invoices {
addInvoiceParams := makeAddAccountInvoiceParams(sqlId, hash)

err = tx.AddAccountInvoice(ctx, addInvoiceParams)
if err != nil {
return sqlId, err
}
}

for hash, paymentEntry := range account.Payments {
upsertPaymentParams := makeUpsertAccountPaymentParams(
sqlId, hash, paymentEntry,
)

err = tx.UpsertAccountPayment(ctx, upsertPaymentParams)
if err != nil {
return sqlId, err
}
}

return sqlId, nil
}

// migrateAccountsIndicesToSQL runs the migration for the account indices from
// the KV database to the SQL database.
func migrateAccountsIndicesToSQL(ctx context.Context, kvStore *BoltStore,
tx SQLQueries) error {

log.Infof("Starting migration of accounts indices from KV to SQL")

addIndex, settleIndex, err := kvStore.LastIndexes(ctx)
if errors.Is(err, ErrNoInvoiceIndexKnown) {
log.Infof("No indices found in KV store, skipping migration")
return nil
} else if err != nil {
return err
}

setAddIndexParams, err := makeSetAccountIndexParams(
addIndex, addIndexName,
)
if err != nil {
return err
}

err = tx.SetAccountIndex(ctx, setAddIndexParams)
if err != nil {
return err
}

setSettleIndexParams, err := makeSetAccountIndexParams(
settleIndex, settleIndexName,
)
if err != nil {
return err
}

err = tx.SetAccountIndex(ctx, setSettleIndexParams)
if err != nil {
return err
}

log.Infof("Successfully migratated accounts indices from KV to SQL")

return nil
}

// overrideAccountTimeZone overrides the time zone of the account to the local
// time zone and chops off the nanosecond part for comparison. This is needed
// because KV database stores times as-is which as an unwanted side effect would
// fail migration due to time comparison expecting both the original and
// migrated accounts to be in the same local time zone and in microsecond
// precision. Note that PostgresSQL stores times in microsecond precision while
// SQLite can store times in nanosecond precision if using TEXT storage class.
func overrideAccountTimeZone(account *OffChainBalanceAccount) {
fixTime := func(t time.Time) time.Time {
return t.In(time.Local).Truncate(time.Microsecond)
}

if !account.ExpirationDate.IsZero() {
account.ExpirationDate = fixTime(account.ExpirationDate)
}

if !account.LastUpdate.IsZero() {
account.LastUpdate = fixTime(account.LastUpdate)
}
}

func makeInsertAccountParams(account *OffChainBalanceAccount) (
sqlc.InsertAccountParams, error) {

var labelVal sql.NullString
if len(account.Label) > 0 {
labelVal = sql.NullString{
String: account.Label,
Valid: true,
}
}
Comment on lines +216 to +222
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just:

Label: sql.NullString{
			String: account.Label,
			Valid:  len(account.Label) > 0,
},


accountId, err := account.ID.ToInt64()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call it "accountAlias"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or legacyID

if err != nil {
return sqlc.InsertAccountParams{}, err
}

return sqlc.InsertAccountParams{
Type: int16(account.Type),
InitialBalanceMsat: int64(account.InitialBalance),
CurrentBalanceMsat: account.CurrentBalance,
LastUpdated: account.LastUpdate.UTC(),
Label: labelVal,
Alias: accountId,
Expiration: account.ExpirationDate.UTC(),
}, nil
}

func makeAddAccountInvoiceParams(sqlID int64,
hash lntypes.Hash) sqlc.AddAccountInvoiceParams {

return sqlc.AddAccountInvoiceParams{
AccountID: sqlID,
Hash: hash[:],
}
}

func makeUpsertAccountPaymentParams(sqlID int64, hash lntypes.Hash,
Comment on lines +240 to +249
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a huge deal, but i'd just inline these since they dont do anything extra

entry *PaymentEntry) sqlc.UpsertAccountPaymentParams {

return sqlc.UpsertAccountPaymentParams{
AccountID: sqlID,
Hash: hash[:],
Status: int16(entry.Status),
FullAmountMsat: int64(entry.FullAmount),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an overflow is very unrealistic here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we have to make a choice here, and for migrations in general:
We can either:

  1. Assume that since this is a migration, the data we are migrating must have already been sanity checked and rule checked during the initial insertion into the kvdb, and therefore errors like that shouldn't be possible to occur during the migration.
  2. We sanity check and check the rules again for all data during the migration.

I have opted for option 1 here and for upcoming migrations, as I do think that should be the case, and we'd only let the migrations use more resources during the migration if we chose option 2 instead.

But I'd very interested to hear from both of you if you think we should instead go with option 2 here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm unsure, maybe we should be extra careful with amounts, but we can leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are already indirectly checking though: if we went with option 2, we'd check here and error out if it did overflow - but if we go with option 2, we will just fail later on when we do the reflect.DeapEqual check.

so i think for simplicity we dont need to add the redundant check

}
}

func makeSetAccountIndexParams(indexValue uint64,
indexName string) (sqlc.SetAccountIndexParams, error) {

if indexValue > math.MaxInt64 {
return sqlc.SetAccountIndexParams{}, fmt.Errorf("%s:%v is "+
"above max int64 value", indexName, indexValue)
}

return sqlc.SetAccountIndexParams{
Name: indexName,
Value: int64(indexValue),
}, nil
}
Loading
Loading