diff --git a/config/config.go b/config/config.go index 93b360a4..b5b402b9 100644 --- a/config/config.go +++ b/config/config.go @@ -22,6 +22,7 @@ type Config struct { Schemas Schemas `json:"schemas"` ExpireTilesDir string `json:"expiretiles_dir"` ExpireTilesZoom int `json:"expiretiles_zoom"` + CommitLatest bool `json:"commit_latest"` ReplicationURL string `json:"replication_url"` ReplicationInterval MinutesInterval `json:"replication_interval"` DiffStateBefore MinutesInterval `json:"diff_state_before"` @@ -53,6 +54,7 @@ type Base struct { Schemas Schemas ExpireTilesDir string ExpireTilesZoom int + CommitLatest bool ReplicationURL string ReplicationInterval time.Duration DiffStateBefore time.Duration @@ -124,6 +126,10 @@ func (o *Base) updateFromConfig() error { o.ExpireTilesZoom = 14 } + if !o.CommitLatest { + o.CommitLatest = conf.CommitLatest + } + if conf.ReplicationInterval.Duration != 0 && o.ReplicationInterval == time.Minute { o.ReplicationInterval = conf.ReplicationInterval.Duration } @@ -248,6 +254,7 @@ func ParseDiffImport(args []string) (Base, []string) { flags.StringVar(&opts.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir") flags.IntVar(&opts.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level") flags.BoolVar(&opts.ForceDiffImport, "force", false, "force import of diff if sequence was already imported") + flags.BoolVar(&opts.CommitLatest, "commit-latest", false, "commit after last diff, instead after each diff") flags.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: %s %s [args] [.osc.gz, ...]\n\n", os.Args[0], os.Args[1]) @@ -290,6 +297,7 @@ func ParseRunImport(args []string) Base { addBaseFlags(&opts, flags) flags.StringVar(&opts.ExpireTilesDir, "expiretiles-dir", "", "write expire tiles into dir") flags.IntVar(&opts.ExpireTilesZoom, "expiretiles-zoom", 14, "write expire tiles in this zoom level") + flags.BoolVar(&opts.CommitLatest, "commit-latest", false, "commit after last diff, instead after each diff") flags.DurationVar(&opts.ReplicationInterval, "replication-interval", time.Minute, "replication interval as duration (1m, 1h, 24h)") flags.Usage = func() { diff --git a/update/cmd.go b/update/cmd.go index 90623f1a..6add04b4 100644 --- a/update/cmd.go +++ b/update/cmd.go @@ -24,6 +24,8 @@ import ( "github.com/pkg/errors" ) +const LastStateFilename = "last.state.txt" + func Diff(baseOpts config.Base, files []string) { if baseOpts.Quiet { log.SetMinLevel(log.LInfo) @@ -38,7 +40,7 @@ func Diff(baseOpts config.Base, files []string) { log.Fatalf("[error] Checking diff files: %v", err) } - if err := diffImportLoop(baseOpts, nextSeq, commitEach, false); err != nil { + if err := diffImportLoop(baseOpts, nextSeq); err != nil { log.Fatalf("[error] Importing diffs: %v", err) } } @@ -71,19 +73,12 @@ func Run(baseOpts config.Base) { nextSeq := downloader.Sequences() defer downloader.Stop() - if err := diffImportLoop(baseOpts, nextSeq, commitLatest, true); err != nil { + if err := diffImportLoop(baseOpts, nextSeq); err != nil { log.Fatalf("[error] Importing diffs: %v", err) } } -type commitMode int - -const ( - commitEach commitMode = iota - commitLatest -) - -func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, commit commitMode, run bool) error { +func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence) error { var geometryLimiter *limit.Limiter if baseOpts.LimitTo != "" { var err error @@ -115,17 +110,14 @@ func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, c signal.Notify(sigc, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) var tilelist *expire.TileList - var tileExpireor expire.Expireor if baseOpts.ExpireTilesDir != "" { tilelist = expire.NewTileList(baseOpts.ExpireTilesZoom, baseOpts.ExpireTilesDir) - tileExpireor = tilelist } tagmapping, err := mapping.FromFile(baseOpts.MappingFile) if err != nil { log.Fatalf("[fatal] reading tagmapping: %v", err) } - var lastStateFile = filepath.Join(baseOpts.DiffDir, LastStateFilename) dbConf := database.Config{ ConnectionParams: baseOpts.Connection, @@ -145,52 +137,103 @@ func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, c log.Fatalf("[fatal] unable to start transaction: %v", err) } - var lastDiffFile string - flush := func() error { - if tilelist != nil { - err := tilelist.Flush() - if err != nil { - log.Println("[error] Writing tile expire list", err) - } - } - if err := db.End(); err != nil { - return errors.Wrapf(err, "unable to commit transaction") - } - if err := db.Begin(); err != nil { - return errors.Wrapf(err, "unable to start transaction") - } - if err := osmCache.Coords.Flush(); err != nil { - return errors.Wrapf(err, "flushing coords cache") - } - diffCache.Flush() + u := updater{ + baseOpts: baseOpts, + commitEachDiff: baseOpts.CommitLatest == false, - if lastDiffFile != "" { - if err := markImported(lastDiffFile, lastStateFile); err != nil { - log.Println("[error] Unable to write last state:", err) - } - } + db: db.(database.FullDB), + osmCache: osmCache, + diffCache: diffCache, + geometryLimiter: geometryLimiter, + tagmapping: tagmapping, + tilelist: tilelist, + + sigc: sigc, + } + defer u.shutdown() + + if err := u.importLoop(nextSeq); err != nil { + return err + } + + if err := u.flush(); err != nil { + return err + } + return nil + +} + +var StopImport = errors.New("STOP") + +type updater struct { + baseOpts config.Base + commitEachDiff bool + + lastDiff replication.Sequence + + db database.FullDB + osmCache *cache.OSMCache + diffCache *cache.DiffCache + geometryLimiter *limit.Limiter + tagmapping *mapping.Mapping + tilelist *expire.TileList + + sigc chan os.Signal +} + +func (u *updater) flush() error { + if u.lastDiff.Filename == "" { return nil } - shutdown := func() error { - osmCache.Close() - diffCache.Close() - if err := db.Abort(); err != nil { - return err - } - if err := db.Close(); err != nil { - return err + defer func() { + // reset to prevent commit without import + u.lastDiff = replication.Sequence{} + }() + + if u.tilelist != nil { + err := u.tilelist.Flush() + if err != nil { + log.Println("[error] Writing tile expire list", err) } - return nil } + if err := u.db.End(); err != nil { + return errors.Wrapf(err, "unable to commit transaction") + } + if err := u.db.Begin(); err != nil { + return errors.Wrapf(err, "unable to start transaction") + } + if err := u.osmCache.Coords.Flush(); err != nil { + return errors.Wrapf(err, "flushing coords cache") + } + u.diffCache.Flush() - exp := newExpBackoff(2*time.Second, 5*time.Minute) + var lastStateFile = filepath.Join(u.baseOpts.DiffDir, LastStateFilename) + if err := markImported(u.lastDiff, lastStateFile); err != nil { + log.Println("[error] Unable to write last state:", err) + } + + return nil +} + +func (u *updater) shutdown() error { + u.osmCache.Close() + u.diffCache.Close() + if err := u.db.Abort(); err != nil { + return err + } + if err := u.db.Close(); err != nil { + return err + } + return nil +} +func (u *updater) importLoop(nextSeq <-chan replication.Sequence) error { for { select { - case <-sigc: + case <-u.sigc: log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)") - return shutdown() + return StopImport case seq := <-nextSeq: if seq.Error != nil { log.Printf("[error] Get seq #%d: %s", seq.Sequence, seq.Error) @@ -198,78 +241,71 @@ func diffImportLoop(baseOpts config.Base, nextSeq <-chan replication.Sequence, c } if seq.Filename == "" { // seq is zero-value if channel was closed (when all files are imported) - if err := flush(); err != nil { - if err := shutdown(); err != nil { - log.Printf("[error] error during shutdown for following error: %v", err) - } - return err - } - return shutdown() + return nil } - lastDiffFile = seq.Filename // for last.state.txt update in Flush + u.lastDiff = seq // for last.state.txt update in Flush + if err := u.importDiff(seq); err != nil { + return err + } - logName := seq.Filename - if seq.Sequence != 0 { - logName = "#" + strconv.FormatInt(int64(seq.Sequence), 10) + if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" { + return nil } + } + } +} - for { - if seq.Sequence != 0 { - log.Printf("[info] Importing %s including changes till %s (%s behind)", - logName, seq.Time, time.Since(seq.Time).Truncate(time.Second)) - } else { - log.Printf("[info] Importing %s", logName) - } +func (u *updater) importDiff(seq replication.Sequence) error { + logName := seq.Filename + if seq.Sequence != 0 { + logName = "#" + strconv.FormatInt(int64(seq.Sequence), 10) + } - logFinishedImport := log.Step(fmt.Sprintf("Importing %s", logName)) - err := Update(baseOpts, seq.Filename, db.(database.FullDB), - tagmapping, geometryLimiter, tileExpireor, - osmCache, diffCache, - ) - if err == nil { - exp.Reset() - if seq.Latest || commit == commitEach { - if err := flush(); err != nil { - return err - } - } - } - logFinishedImport() - - if err != nil { - if commit == commitEach { - // we can retry if we commited the previous import - log.Printf("[error] Importing %s: %v", logName, err) - log.Println("[info] Retrying in", exp.Duration()) - select { - case <-sigc: - log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)") - return shutdown() - case <-exp.Wait(): - default: - } - continue - } else { - // terminate on error, systemd/etc. should restart imposm - if err := shutdown(); err != nil { - log.Printf("[error] error during shutdown for following error: %v", err) - } - return err - } + if seq.Sequence != 0 { + log.Printf("[info] Importing %s including changes till %s (%s behind)", + logName, seq.Time, time.Since(seq.Time).Truncate(time.Second)) + } else { + log.Printf("[info] Importing %s", logName) + } + defer log.Step(fmt.Sprintf("Importing %s", logName))() + + exp := newExpBackoff(2*time.Second, 5*time.Minute) + + var exptiles expire.Expireor + if u.tilelist != nil { + exptiles = u.tilelist + } + + for { + + if err := importDiffFile(seq.Filename, u.db, + u.tagmapping, u.baseOpts.Srid, u.geometryLimiter, exptiles, + u.osmCache, u.diffCache, + ); err != nil { + if u.commitEachDiff { + // we can retry if we commited the previous import + log.Printf("[error] Importing %s: %v", logName, err) + log.Println("[info] Retrying in", exp.Duration()) + select { + case <-u.sigc: + log.Println("[info] Exiting. (SIGTERM/SIGINT/SIGHUP)") + return StopImport + case <-exp.Wait(): + default: } - break + continue + } else { + return err } - if os.Getenv("IMPOSM3_SINGLE_DIFF") != "" { - if err := flush(); err != nil { - if err := shutdown(); err != nil { - log.Printf("[error] error during shutdown for following error: %v", err) - } - return err - } - return shutdown() + } + + if seq.Latest || u.commitEachDiff { + if err := u.flush(); err != nil { + return err } } + return nil } } @@ -320,16 +356,11 @@ func sequenceFromFiles(files []string, lastStateFile string, force bool) (<-chan return c, nil } -func markImported(oscFile string, lastStateFile string) error { - if !strings.HasSuffix(oscFile, ".osc.gz") { - return errors.New("diff file requires .osc.gz suffix") - } - stateFile := oscFile[:len(oscFile)-len(".osc.gz")] + ".state.txt" - state, err := diffstate.ParseFile(stateFile) - if err != nil { - return errors.Wrapf(err, "reading state %s", stateFile) +func markImported(seq replication.Sequence, lastStateFile string) error { + state := &diffstate.DiffState{ + Time: seq.Time, + Sequence: seq.Sequence, } - lastState, err := diffstate.ParseFile(lastStateFile) if err == nil { state.URL = lastState.URL diff --git a/update/process.go b/update/process.go index a70a9485..2185abcb 100644 --- a/update/process.go +++ b/update/process.go @@ -9,7 +9,6 @@ import ( osm "github.com/omniscale/go-osm" "github.com/omniscale/go-osm/parser/diff" "github.com/omniscale/imposm3/cache" - "github.com/omniscale/imposm3/config" "github.com/omniscale/imposm3/database" _ "github.com/omniscale/imposm3/database/postgis" "github.com/omniscale/imposm3/expire" @@ -21,13 +20,11 @@ import ( "github.com/omniscale/imposm3/writer" ) -const LastStateFilename = "last.state.txt" - -func Update( - baseOpts config.Base, +func importDiffFile( oscFile string, db database.FullDB, tagmapping *mapping.Mapping, + srid int, geometryLimiter *limit.Limiter, expireor expire.Expireor, osmCache *cache.OSMCache, @@ -80,7 +77,7 @@ func Update( tagmapping.PolygonMatcher, tagmapping.RelationMatcher, tagmapping.RelationMemberMatcher, - baseOpts.Srid) + srid) relWriter.SetLimiter(geometryLimiter) relWriter.SetExpireor(expireor) relWriter.Start() @@ -91,7 +88,7 @@ func Update( progress, tagmapping.PolygonMatcher, tagmapping.LineStringMatcher, - baseOpts.Srid) + srid) wayWriter.SetLimiter(geometryLimiter) wayWriter.SetExpireor(expireor) wayWriter.Start() @@ -99,7 +96,7 @@ func Update( nodeWriter := writer.NewNodeWriter(osmCache, nodes, db, progress, tagmapping.PointMatcher, - baseOpts.Srid) + srid) nodeWriter.SetLimiter(geometryLimiter) nodeWriter.SetExpireor(expireor) nodeWriter.Start()