Skip to content

Commit

Permalink
update: add -commit-latest option
Browse files Browse the repository at this point in the history
run/diff command will only commit once the last or latest available
file was imported.
  • Loading branch information
olt committed Jul 2, 2024
1 parent 0cd1603 commit 02dd673
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 127 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
Schemas Schemas `json:"schemas"`
ExpireTilesDir string `json:"expiretiles_dir"`
ExpireTilesZoom int `json:"expiretiles_zoom"`
CommitLatest bool `json:"commit_latest"`
ReplicationURL string `json:"replication_url"`
ReplicationInterval MinutesInterval `json:"replication_interval"`
DiffStateBefore MinutesInterval `json:"diff_state_before"`
Expand Down Expand Up @@ -53,6 +54,7 @@ type Base struct {
Schemas Schemas
ExpireTilesDir string
ExpireTilesZoom int
CommitLatest bool
ReplicationURL string
ReplicationInterval time.Duration
DiffStateBefore time.Duration
Expand Down Expand Up @@ -124,6 +126,10 @@ func (o *Base) updateFromConfig() error {
o.ExpireTilesZoom = 14
}

if !o.CommitLatest {
o.CommitLatest = conf.CommitLatest
}

if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval == time.Minute {
o.ReplicationInterval = conf.ReplicationInterval.Duration
}
Expand Down Expand Up @@ -248,6 +254,7 @@ func ParseDiffImport(args []string) (Base, []string) {
flags.StringVar(&opts.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
flags.IntVar(&opts.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
flags.BoolVar(&opts.ForceDiffImport, "force", false, "force import of diff if sequence was already imported")
flags.BoolVar(&opts.CommitLatest, "commit-latest", false, "commit after last diff, instead after each diff")

flags.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s %s [args] [.osc.gz, ...]\n\n", os.Args[0], os.Args[1])
Expand Down Expand Up @@ -290,6 +297,7 @@ func ParseRunImport(args []string) Base {
addBaseFlags(&opts, flags)
flags.StringVar(&opts.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir")
flags.IntVar(&opts.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level")
flags.BoolVar(&opts.CommitLatest, "commit-latest", false, "commit after last diff, instead after each diff")
flags.DurationVar(&opts.ReplicationInterval, "replication-interval", time.Minute, "replication interval as duration (1m, 1h, 24h)")

flags.Usage = func() {
Expand Down
269 changes: 150 additions & 119 deletions update/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pkg/errors"
)

const LastStateFilename = "last.state.txt"

func Diff(baseOpts config.Base, files []string) {
if baseOpts.Quiet {
log.SetMinLevel(log.LInfo)
Expand All @@ -38,7 +40,7 @@ func Diff(baseOpts config.Base, files []string) {
log.Fatalf("[error] Checking diff files: %v", err)
}

if err := diffImportLoop(baseOpts, nextSeq, commitEach, false); err != nil {
if err := diffImportLoop(baseOpts, nextSeq); err != nil {
log.Fatalf("[error] Importing diffs: %v", err)
}
}
Expand Down Expand Up @@ -71,19 +73,12 @@ func Run(baseOpts config.Base) {
nextSeq := downloader.Sequences()
defer downloader.Stop()

if err := diffImportLoop(baseOpts, nextSeq, commitLatest, true); err != nil {
if err := diffImportLoop(baseOpts, nextSeq); err != nil {
log.Fatalf("[error] Importing diffs: %v", err)
}
}

type commitMode int

const (
commitEach commitMode = iota
commitLatest
)

func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, commit commitMode, run bool) error {
func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence) error {
var geometryLimiter *limit.Limiter
if baseOpts.LimitTo != "" {
var err error
Expand Down Expand Up @@ -115,17 +110,14 @@ func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, c
signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)

var tilelist *expire.TileList
var tileExpireor expire.Expireor
if baseOpts.ExpireTilesDir != "" {
tilelist = expire.NewTileList(baseOpts.ExpireTilesZoom, baseOpts.ExpireTilesDir)
tileExpireor = tilelist
}

tagmapping, err := mapping.FromFile(baseOpts.MappingFile)
if err != nil {
log.Fatalf("[fatal] reading tagmapping: %v", err)
}
var lastStateFile = filepath.Join(baseOpts.DiffDir, LastStateFilename)

dbConf := database.Config{
ConnectionParams: baseOpts.Connection,
Expand All @@ -145,131 +137,175 @@ func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, c
log.Fatalf("[fatal] unable to start transaction: %v", err)
}

var lastDiffFile string
flush := func() error {
if tilelist != nil {
err := tilelist.Flush()
if err != nil {
log.Println("[error] Writing tile expire list", err)
}
}
if err := db.End(); err != nil {
return errors.Wrapf(err, "unable to commit transaction")
}
if err := db.Begin(); err != nil {
return errors.Wrapf(err, "unable to start transaction")
}
if err := osmCache.Coords.Flush(); err != nil {
return errors.Wrapf(err, "flushing coords cache")
}
diffCache.Flush()
u := updater{
baseOpts: baseOpts,
commitEachDiff: baseOpts.CommitLatest == false,

if lastDiffFile != "" {
if err := markImported(lastDiffFile, lastStateFile); err != nil {
log.Println("[error] Unable to write last state:", err)
}
}
db: db.(database.FullDB),
osmCache: osmCache,
diffCache: diffCache,
geometryLimiter: geometryLimiter,
tagmapping: tagmapping,
tilelist: tilelist,

sigc: sigc,
}
defer u.shutdown()

if err := u.importLoop(nextSeq); err != nil {
return err
}

if err := u.flush(); err != nil {
return err
}
return nil

}

var StopImport = errors.New("STOP")

type updater struct {
baseOpts config.Base
commitEachDiff bool

lastDiff replication.Sequence

db database.FullDB
osmCache *cache.OSMCache
diffCache *cache.DiffCache
geometryLimiter *limit.Limiter
tagmapping *mapping.Mapping
tilelist *expire.TileList

sigc chan os.Signal
}

func (u *updater) flush() error {
if u.lastDiff.Filename == "" {
return nil
}

shutdown := func() error {
osmCache.Close()
diffCache.Close()
if err := db.Abort(); err != nil {
return err
}
if err := db.Close(); err != nil {
return err
defer func() {
// reset to prevent commit without import
u.lastDiff = replication.Sequence{}
}()

if u.tilelist != nil {
err := u.tilelist.Flush()
if err != nil {
log.Println("[error] Writing tile expire list", err)
}
return nil
}
if err := u.db.End(); err != nil {
return errors.Wrapf(err, "unable to commit transaction")
}
if err := u.db.Begin(); err != nil {
return errors.Wrapf(err, "unable to start transaction")
}
if err := u.osmCache.Coords.Flush(); err != nil {
return errors.Wrapf(err, "flushing coords cache")
}
u.diffCache.Flush()

exp := newExpBackoff(2*time.Second, 5*time.Minute)
var lastStateFile = filepath.Join(u.baseOpts.DiffDir, LastStateFilename)
if err := markImported(u.lastDiff, lastStateFile); err != nil {
log.Println("[error] Unable to write last state:", err)
}

return nil
}

func (u *updater) shutdown() error {
u.osmCache.Close()
u.diffCache.Close()
if err := u.db.Abort(); err != nil {
return err
}
if err := u.db.Close(); err != nil {
return err
}
return nil
}

func (u *updater) importLoop(nextSeq <-chan replication.Sequence) error {
for {
select {
case <-sigc:
case <-u.sigc:
log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)")
return shutdown()
return StopImport
case seq := <-nextSeq:
if seq.Error != nil {
log.Printf("[error] Get seq #%d: %s", seq.Sequence, seq.Error)
continue
}

if seq.Filename == "" { // seq is zero-value if channel was closed (when all files are imported)
if err := flush(); err != nil {
if err := shutdown(); err != nil {
log.Printf("[error] error during shutdown for following error: %v", err)
}
return err
}
return shutdown()
return nil
}

lastDiffFile = seq.Filename // for last.state.txt update in Flush
u.lastDiff = seq // for last.state.txt update in Flush
if err := u.importDiff(seq); err != nil {
return err
}

logName := seq.Filename
if seq.Sequence != 0 {
logName = "#" + strconv.FormatInt(int64(seq.Sequence), 10)
if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" {
return nil
}
}
}
}

for {
if seq.Sequence != 0 {
log.Printf("[info] Importing %s including changes till %s (%s behind)",
logName, seq.Time, time.Since(seq.Time).Truncate(time.Second))
} else {
log.Printf("[info] Importing %s", logName)
}
func (u *updater) importDiff(seq replication.Sequence) error {
logName := seq.Filename
if seq.Sequence != 0 {
logName = "#" + strconv.FormatInt(int64(seq.Sequence), 10)
}

logFinishedImport := log.Step(fmt.Sprintf("Importing %s", logName))
err := Update(baseOpts, seq.Filename, db.(database.FullDB),
tagmapping, geometryLimiter, tileExpireor,
osmCache, diffCache,
)
if err == nil {
exp.Reset()
if seq.Latest || commit == commitEach {
if err := flush(); err != nil {
return err
}
}
}
logFinishedImport()

if err != nil {
if commit == commitEach {
// we can retry if we commited the previous import
log.Printf("[error] Importing %s: %v", logName, err)
log.Println("[info] Retrying in", exp.Duration())
select {
case <-sigc:
log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)")
return shutdown()
case <-exp.Wait():
default:
}
continue
} else {
// terminate on error, systemd/etc. should restart imposm
if err := shutdown(); err != nil {
log.Printf("[error] error during shutdown for following error: %v", err)
}
return err
}
if seq.Sequence != 0 {
log.Printf("[info] Importing %s including changes till %s (%s behind)",
logName, seq.Time, time.Since(seq.Time).Truncate(time.Second))
} else {
log.Printf("[info] Importing %s", logName)
}
defer log.Step(fmt.Sprintf("Importing %s", logName))()

exp := newExpBackoff(2*time.Second, 5*time.Minute)

var exptiles expire.Expireor
if u.tilelist != nil {
exptiles = u.tilelist
}

for {

if err := importDiffFile(seq.Filename, u.db,
u.tagmapping, u.baseOpts.Srid, u.geometryLimiter, exptiles,
u.osmCache, u.diffCache,
); err != nil {
if u.commitEachDiff {
// we can retry if we commited the previous import
log.Printf("[error] Importing %s: %v", logName, err)
log.Println("[info] Retrying in", exp.Duration())
select {
case <-u.sigc:
log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)")
return StopImport
case <-exp.Wait():
default:
}
break
continue
} else {
return err
}
if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" {
if err := flush(); err != nil {
if err := shutdown(); err != nil {
log.Printf("[error] error during shutdown for following error: %v", err)
}
return err
}
return shutdown()
}

if seq.Latest || u.commitEachDiff {
if err := u.flush(); err != nil {
return err
}
}
return nil
}
}

Expand Down Expand Up @@ -320,16 +356,11 @@ func sequenceFromFiles(files []string, lastStateFile string, force bool) (<-chan
return c, nil
}

func markImported(oscFile string, lastStateFile string) error {
if !strings.HasSuffix(oscFile, ".osc.gz") {
return errors.New("diff file requires .osc.gz suffix")
}
stateFile := oscFile[:len(oscFile)-len(".osc.gz")] + ".state.txt"
state, err := diffstate.ParseFile(stateFile)
if err != nil {
return errors.Wrapf(err, "reading state %s", stateFile)
func markImported(seq replication.Sequence, lastStateFile string) error {
state := &diffstate.DiffState{
Time: seq.Time,
Sequence: seq.Sequence,
}

lastState, err := diffstate.ParseFile(lastStateFile)
if err == nil {
state.URL = lastState.URL
Expand Down
Loading

0 comments on commit 02dd673

Please sign in to comment.