Skip to content

Commit

Permalink
Add ScanNow()
Browse files Browse the repository at this point in the history
Source: radovskyb#94
  • Loading branch information
dideler committed Apr 21, 2020
1 parent f5989f8 commit e140b68
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 3 deletions.
61 changes: 58 additions & 3 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ var (
// from previously calling Start and not yet calling Close.
ErrWatcherRunning = errors.New("error: watcher is already running")

// ErrWatcherNotRunning occurs when trying to perform a ScanNow
// when the watcher is not running. It will also occur if Close is called
// whilst a ScanNow() is running / pending.
ErrWatcherNotRunning = errors.New("error: watcher is not running")

// ErrWatchedFileDeleted is an error that occurs when a file or folder that was
// being watched has been deleted.
ErrWatchedFileDeleted = errors.New("error: watched file or folder deleted")
Expand Down Expand Up @@ -129,6 +134,7 @@ type Watcher struct {
ops map[Op]struct{} // Op filtering.
ignoreHidden bool // ignore hidden files or not.
maxEvents int // max sent events per cycle
scanNow chan chan struct{} // allows requests for immediate synchronous scans
}

// New creates a new Watcher.
Expand All @@ -147,6 +153,7 @@ func New() *Watcher {
files: make(map[string]os.FileInfo),
ignored: make(map[string]struct{}),
names: make(map[string]bool),
scanNow: make(chan chan struct{}),
}
}

Expand Down Expand Up @@ -550,6 +557,8 @@ func (w *Watcher) Start(d time.Duration) error {
// Unblock w.Wait().
w.wg.Done()

var scanNowRequest chan struct{}

for {
// done lets the inner polling cycle loop know when the
// current cycle's method has finished executing.
Expand Down Expand Up @@ -589,7 +598,7 @@ func (w *Watcher) Start(d time.Duration) error {
}
}
numEvents++
if w.maxEvents > 0 && numEvents > w.maxEvents {
if scanNowRequest == nil && w.maxEvents > 0 && numEvents > w.maxEvents {
close(cancel)
break inner
}
Expand All @@ -604,8 +613,52 @@ func (w *Watcher) Start(d time.Duration) error {
w.files = fileList
w.mu.Unlock()

if scanNowRequest != nil {
close(scanNowRequest)
scanNowRequest = nil
}

// Sleep and then continue to the next loop iteration.
time.Sleep(d)
// If a request to do a full scan is received, handle it and then signal to the requester it is complete.
select {
case <-w.close: // break out of wait early if we get a Close
case scanNowRequest = <-w.scanNow: // sync scan request received
case <-time.After(d): // periodic re-roll time elapsed
}
}
}

// ScanNow can be called on a already running watcher to perform an immediate synchronous scan of all watched files
// and generate the events for any changes. When ScanNow() returns to the caller, all events for any changed files
// have been published. ScanNow() can be used when you know FS changes have occurred and you want to ensure all events
// for the changes have been gathered before continuing, for example, to better process batched updates to groups of
// files.
// You can also specify a very long poll duration and then use ScanNow() to break from the poll wait and perform a scan
// before going back to sleep.
func (w *Watcher) ScanNow() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return ErrWatcherNotRunning
}
w.mu.Unlock()

scanComplete := make(chan struct{})
select {
case w.scanNow <- scanComplete:
case <-w.close:
// if the watcher is no longer running, or is closed whilst we're waiting for our scan to be accepted, return
// an error
return ErrWatcherNotRunning
}

select {
case <-w.close:
// if the watcher is closed whilst we're waiting for our scan to complete, return an error
return ErrWatcherNotRunning
case <-scanComplete:
// scan completed ok
return nil
}
}

Expand Down Expand Up @@ -700,6 +753,7 @@ func (w *Watcher) Wait() {
}

// Close stops a Watcher and unlocks its mutex, then sends a close signal.
// Note, it is not safe to Start() a Watcher again after closing it. You must create a new Watcher.
func (w *Watcher) Close() {
w.mu.Lock()
if !w.running {
Expand All @@ -711,5 +765,6 @@ func (w *Watcher) Close() {
w.names = make(map[string]bool)
w.mu.Unlock()
// Send a close signal to the Start method.
w.close <- struct{}{}
// Use a channel close() rather than a channel write, so that ScanNow() can react to the closure also.
close(w.close)
}
84 changes: 84 additions & 0 deletions watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,90 @@ func TestTriggerEvent(t *testing.T) {
wg.Wait()
}

func TestScanNow(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()

w := New()
w.FilterOps(Create)

// Add the testDir to the watchlist.
if err := w.AddRecursive(testDir); err != nil {
t.Fatal(err)
}

// should not be able to ScanNow() before the watcher is started
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}

testFilePath := filepath.Join(testDir, "test_file1.txt")
done := make(chan struct{})
go func() {
evt := <-w.Event
if evt.Op == Create && evt.Path == testFilePath {
close(done)
} else {
t.Fatal("unexpected event")
}
}()

// Start scanning with a very long poll duration
go func() {
if err := w.Start(time.Hour); err != nil {
t.Fatal(err)
}
}()

w.Wait()
defer w.Close()

// perform initial scan, which should yield no changes
// this ensures the initial scan has happened, and means the watcher is now waiting 1hr before scanning again
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as no changes have occurred since ScanNow() completed")
}

// create the test file, we will not receive events due to the 1hr poll duration
if err := ioutil.WriteFile(testFilePath, []byte{}, 0755); err != nil {
t.Error(err)
}

// wait for a short period just to ensure no unexpected events arrive now we've changed a file
select {
case <-time.After(time.Millisecond * 100):
case <-done:
t.Fatal("should not have received an event as a poll duration of 1 hour is used")
}

// issue a scan now, and we will receive the events while ScanNow() is running.
if err := w.ScanNow(); err != nil {
t.Error(err)
}

// all events should have been received *whilst* ScanNow() was running, so the done channel should already be
// closed
select {
case <-done:
default:
t.Fatal("events from ScanNow() should have been received before ScanNow() returned")
}

w.Close()

// issue a scan now after closing, should error
if err := w.ScanNow(); err != ErrWatcherNotRunning {
t.Fatal("expected an ErrWatcherNotRunning error, but didn't get one")
}
}

func TestEventAddFile(t *testing.T) {
testDir, teardown := setup(t)
defer teardown()
Expand Down

0 comments on commit e140b68

Please sign in to comment.