Skip to content

Commit 41f0e90

Browse files
committed
Add replicate flags to pump Litestream manually
For some special setups it is sometimes useful to run Litestream manually instead of letting it replicate in the background. This commit implements the following flags for replicate: * -once for doing synchronous replication and then exit * -force-snapshot to force a snapshot during -once * -enforce-retention to enforce retention rules during -once Because running once does not respect the snapshot interval the caller is expected to use -force-snapshot and -enforce-retention regularly to ensure the replication targets stay clean. For this to work correctly with a live database it needs to be opened with auto checkpointing disabled and SQLITE_FCNTL_PERSIST_WAL. Other uses include only using -force-snapshot to create regular backups of the database instead of live replication. Fixes benbjohnson#486
1 parent 6a3b929 commit 41f0e90

File tree

3 files changed

+105
-14
lines changed

3 files changed

+105
-14
lines changed

Diff for: cmd/litestream/main.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,19 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
9292

9393
// Wait for signal to stop program.
9494
select {
95-
case err = <-c.execCh:
95+
case err = <-c.runCh:
9696
fmt.Println("subprocess exited, litestream shutting down")
9797
case sig := <-signalCh:
9898
fmt.Println("signal received, litestream shutting down")
9999

100-
if c.cmd != nil {
100+
if c.runSignal != nil {
101101
fmt.Println("sending signal to exec process")
102-
if err := c.cmd.Process.Signal(sig); err != nil {
102+
if err := c.runSignal(sig); err != nil {
103103
return fmt.Errorf("cannot signal exec process: %w", err)
104104
}
105105

106106
fmt.Println("waiting for exec process to close")
107-
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
107+
if err := <-c.runCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
108108
return fmt.Errorf("cannot wait for exec process: %w", err)
109109
}
110110
}

Diff for: cmd/litestream/replicate.go

+100-9
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ import (
2323

2424
// ReplicateCommand represents a command that continuously replicates SQLite databases.
2525
type ReplicateCommand struct {
26-
cmd *exec.Cmd // subcommand
27-
execCh chan error // subcommand error channel
26+
runSignal func(os.Signal) error // run cancel signaler
27+
runCh chan error // run error channel
28+
29+
once bool // replicate once and exit
30+
forceSnapshot bool // force snapshot to all replicas
31+
enforceRetention bool // enforce retention of old snapshots
2832

2933
Config Config
3034

@@ -34,14 +38,17 @@ type ReplicateCommand struct {
3438

3539
func NewReplicateCommand() *ReplicateCommand {
3640
return &ReplicateCommand{
37-
execCh: make(chan error),
41+
runCh: make(chan error),
3842
}
3943
}
4044

4145
// ParseFlags parses the CLI flags and loads the configuration file.
4246
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
4347
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
4448
execFlag := fs.String("exec", "", "execute subcommand")
49+
onceFlag := fs.Bool("once", false, "replicate once and exit")
50+
forceSnapshotFlag := fs.Bool("force-snapshot", false, "force snapshot when replicating once")
51+
enforceRetentionFlag := fs.Bool("enforce-retention", false, "enforce retention of old snapshots")
4552
configPath, noExpandEnv := registerConfigFlag(fs)
4653
fs.Usage = c.Usage
4754
if err := fs.Parse(args); err != nil {
@@ -79,6 +86,22 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
7986
c.Config.Exec = *execFlag
8087
}
8188

89+
// Once is mutually exclusive with exec
90+
c.once = *onceFlag
91+
if c.once && c.Config.Exec != "" {
92+
return fmt.Errorf("cannot specify -once flag with exec")
93+
}
94+
95+
c.forceSnapshot = *forceSnapshotFlag
96+
if !c.once && c.forceSnapshot {
97+
return fmt.Errorf("cannot specify -force-snapshot flag without -once")
98+
}
99+
100+
c.enforceRetention = *enforceRetentionFlag
101+
if !c.once && c.enforceRetention {
102+
return fmt.Errorf("cannot specify -enforce-retention flag without -once")
103+
}
104+
82105
return nil
83106
}
84107

@@ -98,6 +121,14 @@ func (c *ReplicateCommand) Run() (err error) {
98121
return err
99122
}
100123

124+
// Disable monitors if we're running once.
125+
if c.once {
126+
db.MonitorInterval = 0
127+
for _, r := range db.Replicas {
128+
r.MonitorEnabled = false
129+
}
130+
}
131+
101132
// Open database & attach to program.
102133
if err := db.Open(); err != nil {
103134
return err
@@ -152,14 +183,65 @@ func (c *ReplicateCommand) Run() (err error) {
152183
return fmt.Errorf("cannot parse exec command: %w", err)
153184
}
154185

155-
c.cmd = exec.Command(execArgs[0], execArgs[1:]...)
156-
c.cmd.Env = os.Environ()
157-
c.cmd.Stdout = os.Stdout
158-
c.cmd.Stderr = os.Stderr
159-
if err := c.cmd.Start(); err != nil {
186+
cmd := exec.Command(execArgs[0], execArgs[1:]...)
187+
cmd.Env = os.Environ()
188+
cmd.Stdout = os.Stdout
189+
cmd.Stderr = os.Stderr
190+
if err := cmd.Start(); err != nil {
160191
return fmt.Errorf("cannot start exec command: %w", err)
161192
}
162-
go func() { c.execCh <- c.cmd.Wait() }()
193+
c.runSignal = cmd.Process.Signal
194+
go func() { c.runCh <- cmd.Wait() }()
195+
} else if c.once {
196+
// Run replication once for each replica with cancel.
197+
ctx, cancel := context.WithCancel(context.Background())
198+
c.runSignal = func(s os.Signal) error {
199+
cancel()
200+
return nil
201+
}
202+
203+
go func() {
204+
var err error
205+
206+
defer func() {
207+
cancel()
208+
c.runCh <- err
209+
}()
210+
211+
for _, db := range c.DBs {
212+
if c.forceSnapshot {
213+
// Force next index with RESTART checkpoint.
214+
db.MaxCheckpointPageN = 1
215+
}
216+
217+
if err = db.Sync(ctx); err != nil {
218+
return
219+
}
220+
221+
// Prevent checkpointing on Close()
222+
db.MinCheckpointPageN = 0
223+
db.MaxCheckpointPageN = 0
224+
db.TruncatePageN = 0
225+
db.CheckpointInterval = 0
226+
227+
for _, r := range db.Replicas {
228+
if c.forceSnapshot {
229+
_, err = r.Snapshot(ctx)
230+
} else {
231+
err = r.Sync(ctx)
232+
}
233+
if err != nil {
234+
return
235+
}
236+
237+
if c.enforceRetention {
238+
if err = r.EnforceRetention(ctx); err != nil {
239+
return
240+
}
241+
}
242+
}
243+
}
244+
}()
163245
}
164246

165247
return nil
@@ -202,6 +284,15 @@ Arguments:
202284
Executes a subcommand. Litestream will exit when the child
203285
process exits. Useful for simple process management.
204286
287+
-once
288+
Execute replication once and exit.
289+
290+
-force-snapshot
291+
When replicating once, force taking a snapshot to all replicas.
292+
293+
-enforce-retention
294+
When replicating once, enforce rentention of old snapshots.
295+
205296
-no-expand-env
206297
Disables environment variable expansion in configuration file.
207298

Diff for: db.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
775775
checkpoint, checkpointMode = true, CheckpointModeTruncate
776776
} else if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
777777
checkpoint, checkpointMode = true, CheckpointModeRestart
778-
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
778+
} else if db.MinCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
779779
checkpoint = true
780780
} else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
781781
checkpoint = true

0 commit comments

Comments
 (0)