Skip to content

Commit f8a1218

Browse files
authored
Merge pull request #927 from pastelnetwork/PSL-1257_enablePragmasOnMetaDB
[PSL-1257] enable pragma to handle db locking on meta-migration db
2 parents 494ae2c + c2a6f56 commit f8a1218

File tree

1 file changed

+53
-10
lines changed

1 file changed

+53
-10
lines changed

p2p/kademlia/store/sqlite/meta_worker.go

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var (
2424
migrationMetaDB = "data001-migration-meta.sqlite3"
2525
accessUpdateBufferSize = 100000
2626
commitInsertsInterval = 10 * time.Second
27-
metaSyncBatchSize = 10000
27+
metaSyncBatchSize = 5000
2828
lowSpaceThresholdGB = 50 // in GB
2929
minKeysToMigrate = 100
3030

@@ -78,6 +78,10 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
7878
db.SetMaxOpenConns(20)
7979
db.SetMaxIdleConns(10)
8080

81+
if err := setPragmas(db); err != nil {
82+
log.WithContext(ctx).WithError(err).Error("error executing pragmas")
83+
}
84+
8185
p2pDataStore, err := connectP2PDataStore(dataDir)
8286
if err != nil {
8387
log.WithContext(ctx).WithError(err).Error("error connecting p2p store from meta-migration store")
@@ -121,6 +125,33 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
121125
return handler, nil
122126
}
123127

128+
func setPragmas(db *sqlx.DB) error {
129+
// Set journal mode to WAL
130+
_, err := db.Exec("PRAGMA journal_mode=WAL;")
131+
if err != nil {
132+
return err
133+
}
134+
// Set synchronous to NORMAL
135+
_, err = db.Exec("PRAGMA synchronous=NORMAL;")
136+
if err != nil {
137+
return err
138+
}
139+
140+
// Set cache size
141+
_, err = db.Exec("PRAGMA cache_size=-262144;")
142+
if err != nil {
143+
return err
144+
}
145+
146+
// Set busy timeout
147+
_, err = db.Exec("PRAGMA busy_timeout=5000;")
148+
if err != nil {
149+
return err
150+
}
151+
152+
return nil
153+
}
154+
124155
func (d *MigrationMetaStore) migrateMeta() error {
125156
query := `
126157
CREATE TABLE IF NOT EXISTS meta (
@@ -205,17 +236,23 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
205236
return err
206237
}
207238

208-
var batchProcessed bool
209-
210239
tx, err := d.db.Beginx()
211240
if err != nil {
212241
rows.Close()
213242
log.WithContext(ctx).WithError(err).Error("Failed to start transaction")
214243
return err
215244
}
216245

246+
stmt, err := tx.Prepare(insertQuery)
247+
if err != nil {
248+
tx.Rollback()
249+
rows.Close()
250+
log.WithContext(ctx).WithError(err).Error("Failed to prepare statement")
251+
return err
252+
}
253+
254+
var batchProcessed bool
217255
for rows.Next() {
218-
batchProcessed = true
219256
var r Record
220257
var t *time.Time
221258

@@ -227,14 +264,15 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
227264
r.UpdatedAt = *t
228265
}
229266

230-
if _, err := tx.Exec(insertQuery, r.Key, r.UpdatedAt, len(r.Data)); err != nil {
231-
tx.Rollback()
232-
rows.Close()
233-
log.WithContext(ctx).WithError(err).Error("Failed to execute batch insert")
234-
return err
267+
if _, err := stmt.Exec(r.Key, r.UpdatedAt, len(r.Data)); err != nil {
268+
log.WithContext(ctx).WithField("key", r.Key).WithError(err).Error("error inserting key to meta")
269+
continue
235270
}
271+
272+
batchProcessed = true
236273
}
237274

275+
stmt.Close()
238276
if err := rows.Err(); err != nil {
239277
tx.Rollback()
240278
rows.Close()
@@ -255,7 +293,12 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
255293
}
256294

257295
rows.Close()
258-
offset += metaSyncBatchSize
296+
if !batchProcessed {
297+
tx.Rollback()
298+
log.WithContext(ctx).Info("no rows processed, rolling back and breaking.")
299+
break
300+
}
301+
offset += metaSyncBatchSize //
259302
}
260303

261304
return nil

0 commit comments

Comments
 (0)