diff options
author | Bjørn Erik Pedersen <[email protected]> | 2021-07-02 09:54:03 +0200 |
---|---|---|
committer | Bjørn Erik Pedersen <[email protected]> | 2021-07-04 16:12:28 +0200 |
commit | 24ce98b6d10b2088af61c15112f5c5ed915a0c35 (patch) | |
tree | 88b4090f4997b1763a1d9a49bcfa4ba92500647e /watcher | |
parent | 0019d60f67b6c4dde085753641a917fcd0aa4c76 (diff) | |
download | hugo-24ce98b6d10b2088af61c15112f5c5ed915a0c35.tar.gz hugo-24ce98b6d10b2088af61c15112f5c5ed915a0c35.zip |
Add polling as a fallback to native filesystem events in server watch
Fixes #8720
Fixes #6849
Fixes #7930
Diffstat (limited to 'watcher')
-rw-r--r-- | watcher/batcher.go | 30 | ||||
-rw-r--r-- | watcher/filenotify/filenotify.go | 49 | ||||
-rw-r--r-- | watcher/filenotify/fsnotify.go | 20 | ||||
-rw-r--r-- | watcher/filenotify/poller.go | 326 | ||||
-rw-r--r-- | watcher/filenotify/poller_test.go | 304 |
5 files changed, 721 insertions, 8 deletions
diff --git a/watcher/batcher.go b/watcher/batcher.go index 12c51940d..718eea73f 100644 --- a/watcher/batcher.go +++ b/watcher/batcher.go @@ -17,11 +17,12 @@ import ( "time" "github.com/fsnotify/fsnotify" + "github.com/gohugoio/hugo/watcher/filenotify" ) // Batcher batches file watch events in a given interval. type Batcher struct { - *fsnotify.Watcher + filenotify.FileWatcher interval time.Duration done chan struct{} @@ -29,12 +30,25 @@ type Batcher struct { } // New creates and starts a Batcher with the given time interval. -func New(interval time.Duration) (*Batcher, error) { - watcher, err := fsnotify.NewWatcher() +// It will fall back to a poll based watcher if native isn's supported. +// To always use polling, set poll to true. +func New(intervalBatcher, intervalPoll time.Duration, poll bool) (*Batcher, error) { + var err error + var watcher filenotify.FileWatcher + + if poll { + watcher = filenotify.NewPollingWatcher(intervalPoll) + } else { + watcher, err = filenotify.New(intervalPoll) + } + + if err != nil { + return nil, err + } batcher := &Batcher{} - batcher.Watcher = watcher - batcher.interval = interval + batcher.FileWatcher = watcher + batcher.interval = intervalBatcher batcher.done = make(chan struct{}, 1) batcher.Events = make(chan []fsnotify.Event, 1) @@ -42,7 +56,7 @@ func New(interval time.Duration) (*Batcher, error) { go batcher.run() } - return batcher, err + return batcher, nil } func (b *Batcher) run() { @@ -51,7 +65,7 @@ func (b *Batcher) run() { OuterLoop: for { select { - case ev := <-b.Watcher.Events: + case ev := <-b.FileWatcher.Events(): evs = append(evs, ev) case <-tick: if len(evs) == 0 { @@ -69,5 +83,5 @@ OuterLoop: // Close stops the watching of the files. func (b *Batcher) Close() { b.done <- struct{}{} - b.Watcher.Close() + b.FileWatcher.Close() } diff --git a/watcher/filenotify/filenotify.go b/watcher/filenotify/filenotify.go new file mode 100644 index 000000000..b9d0d2e14 --- /dev/null +++ b/watcher/filenotify/filenotify.go @@ -0,0 +1,49 @@ +// Package filenotify provides a mechanism for watching file(s) for changes. +// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support. +// These are wrapped up in a common interface so that either can be used interchangeably in your code. +// +// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "time" + + "github.com/fsnotify/fsnotify" +) + +// FileWatcher is an interface for implementing file notification watchers +type FileWatcher interface { + Events() <-chan fsnotify.Event + Errors() <-chan error + Add(name string) error + Remove(name string) error + Close() error +} + +// New tries to use an fs-event watcher, and falls back to the poller if there is an error +func New(interval time.Duration) (FileWatcher, error) { + if watcher, err := NewEventWatcher(); err == nil { + return watcher, nil + } + return NewPollingWatcher(interval), nil +} + +// NewPollingWatcher returns a poll-based file watcher +func NewPollingWatcher(interval time.Duration) FileWatcher { + return &filePoller{ + interval: interval, + done: make(chan struct{}), + events: make(chan fsnotify.Event), + errors: make(chan error), + } +} + +// NewEventWatcher returns an fs-event based file watcher +func NewEventWatcher() (FileWatcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + return &fsNotifyWatcher{watcher}, nil +} diff --git a/watcher/filenotify/fsnotify.go b/watcher/filenotify/fsnotify.go new file mode 100644 index 000000000..19534128a --- /dev/null +++ b/watcher/filenotify/fsnotify.go @@ -0,0 +1,20 @@ +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import "github.com/fsnotify/fsnotify" + +// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface +type fsNotifyWatcher struct { + *fsnotify.Watcher +} + +// Events returns the fsnotify event channel receiver +func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event { + return w.Watcher.Events +} + +// Errors returns the fsnotify error channel receiver +func (w *fsNotifyWatcher) Errors() <-chan error { + return w.Watcher.Errors +} diff --git a/watcher/filenotify/poller.go b/watcher/filenotify/poller.go new file mode 100644 index 000000000..71d806209 --- /dev/null +++ b/watcher/filenotify/poller.go @@ -0,0 +1,326 @@ +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +var ( + // errPollerClosed is returned when the poller is closed + errPollerClosed = errors.New("poller is closed") + // errNoSuchWatch is returned when trying to remove a watch that doesn't exist + errNoSuchWatch = errors.New("watch does not exist") +) + +// filePoller is used to poll files for changes, especially in cases where fsnotify +// can't be run (e.g. when inotify handles are exhausted) +// filePoller satisfies the FileWatcher interface +type filePoller struct { + // the duration between polls. + interval time.Duration + // watches is the list of files currently being polled, close the associated channel to stop the watch + watches map[string]struct{} + // Will be closed when done. + done chan struct{} + // events is the channel to listen to for watch events + events chan fsnotify.Event + // errors is the channel to listen to for watch errors + errors chan error + // mu locks the poller for modification + mu sync.Mutex + // closed is used to specify when the poller has already closed + closed bool +} + +// Add adds a filename to the list of watches +// once added the file is polled for changes in a separate goroutine +func (w *filePoller) Add(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return errPollerClosed + } + + item, err := newItemToWatch(name) + if err != nil { + return err + } + if item.left.FileInfo == nil { + return os.ErrNotExist + } + + if w.watches == nil { + w.watches = make(map[string]struct{}) + } + if _, exists := w.watches[name]; exists { + return fmt.Errorf("watch exists") + } + w.watches[name] = struct{}{} + + go w.watch(item) + return nil +} + +// Remove stops and removes watch with the specified name +func (w *filePoller) Remove(name string) error { + w.mu.Lock() + defer w.mu.Unlock() + return w.remove(name) +} + +func (w *filePoller) remove(name string) error { + if w.closed { + return errPollerClosed + } + + _, exists := w.watches[name] + if !exists { + return errNoSuchWatch + } + delete(w.watches, name) + return nil +} + +// Events returns the event channel +// This is used for notifications on events about watched files +func (w *filePoller) Events() <-chan fsnotify.Event { + return w.events +} + +// Errors returns the errors channel +// This is used for notifications about errors on watched files +func (w *filePoller) Errors() <-chan error { + return w.errors +} + +// Close closes the poller +// All watches are stopped, removed, and the poller cannot be added to +func (w *filePoller) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if w.closed { + return nil + } + w.closed = true + close(w.done) + for name := range w.watches { + w.remove(name) + } + + return nil +} + +// sendEvent publishes the specified event to the events channel +func (w *filePoller) sendEvent(e fsnotify.Event) error { + select { + case w.events <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// sendErr publishes the specified error to the errors channel +func (w *filePoller) sendErr(e error) error { + select { + case w.errors <- e: + case <-w.done: + return fmt.Errorf("closed") + } + return nil +} + +// watch watches item for changes until done is closed. +func (w *filePoller) watch(item *itemToWatch) { + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-w.done: + return + } + + evs, err := item.checkForChanges() + if err != nil { + if err := w.sendErr(err); err != nil { + return + } + } + + item.left, item.right = item.right, item.left + + for _, ev := range evs { + if err := w.sendEvent(ev); err != nil { + return + } + } + + } +} + +// recording records the state of a file or a dir. +type recording struct { + os.FileInfo + + // Set if FileInfo is a dir. + entries map[string]os.FileInfo +} + +func (r *recording) clear() { + r.FileInfo = nil + if r.entries != nil { + for k := range r.entries { + delete(r.entries, k) + } + } +} + +func (r *recording) record(filename string) error { + r.clear() + + fi, err := os.Stat(filename) + if err != nil && !os.IsNotExist(err) { + return err + } + + if fi == nil { + return nil + } + + r.FileInfo = fi + + // If fi is a dir, we watch the files inside that directory (not recursively). + // This matches the behaviour of fsnotity. + if fi.IsDir() { + f, err := os.Open(filename) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + defer f.Close() + + fis, err := f.Readdir(-1) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + for _, fi := range fis { + r.entries[fi.Name()] = fi + } + } + + return nil +} + +// itemToWatch may be a file or a dir. +type itemToWatch struct { + // Full path to the filename. + filename string + + // Snapshots of the stat state of this file or dir. + left *recording + right *recording +} + +func newItemToWatch(filename string) (*itemToWatch, error) { + r := &recording{ + entries: make(map[string]os.FileInfo), + } + err := r.record(filename) + if err != nil { + return nil, err + } + + return &itemToWatch{filename: filename, left: r}, nil + +} + +func (item *itemToWatch) checkForChanges() ([]fsnotify.Event, error) { + if item.right == nil { + item.right = &recording{ + entries: make(map[string]os.FileInfo), + } + } + + err := item.right.record(item.filename) + if err != nil && !os.IsNotExist(err) { + return nil, err + } + + dirOp := checkChange(item.left.FileInfo, item.right.FileInfo) + + if dirOp != 0 { + evs := []fsnotify.Event{fsnotify.Event{Op: dirOp, Name: item.filename}} + return evs, nil + } + + if item.left.FileInfo == nil || !item.left.IsDir() { + // Done. + return nil, nil + } + + leftIsIn := false + left, right := item.left.entries, item.right.entries + if len(right) > len(left) { + left, right = right, left + leftIsIn = true + } + + var evs []fsnotify.Event + + for name, fi1 := range left { + fi2 := right[name] + fil, fir := fi1, fi2 + if leftIsIn { + fil, fir = fir, fil + } + op := checkChange(fil, fir) + if op != 0 { + evs = append(evs, fsnotify.Event{Op: op, Name: filepath.Join(item.filename, name)}) + } + + } + + return evs, nil + +} + +func checkChange(fi1, fi2 os.FileInfo) fsnotify.Op { + if fi1 == nil && fi2 != nil { + return fsnotify.Create + } + if fi1 != nil && fi2 == nil { + return fsnotify.Remove + } + if fi1 == nil && fi2 == nil { + return 0 + } + if fi1.IsDir() || fi2.IsDir() { + return 0 + } + if fi1.Mode() != fi2.Mode() { + return fsnotify.Chmod + } + if fi1.ModTime() != fi2.ModTime() || fi1.Size() != fi2.Size() { + return fsnotify.Write + } + + return 0 +} diff --git a/watcher/filenotify/poller_test.go b/watcher/filenotify/poller_test.go new file mode 100644 index 000000000..b4723c758 --- /dev/null +++ b/watcher/filenotify/poller_test.go @@ -0,0 +1,304 @@ +// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License. +// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9 +package filenotify + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + qt "github.com/frankban/quicktest" + "github.com/fsnotify/fsnotify" + "github.com/gohugoio/hugo/htesting" +) + +const ( + subdir1 = "subdir1" + subdir2 = "subdir2" + watchWaitTime = 200 * time.Millisecond +) + +var ( + isMacOs = runtime.GOOS == "darwin" + isWindows = runtime.GOOS == "windows" + isCI = htesting.IsCI() +) + +func TestPollerAddRemove(t *testing.T) { + c := qt.New(t) + w := NewPollingWatcher(watchWaitTime) + + c.Assert(w.Add("foo"), qt.Not(qt.IsNil)) + c.Assert(w.Remove("foo"), qt.Not(qt.IsNil)) + + f, err := ioutil.TempFile("", "asdf") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(f.Name()) + c.Assert(w.Add(f.Name()), qt.IsNil) + c.Assert(w.Remove(f.Name()), qt.IsNil) + +} + +func TestPollerEvent(t *testing.T) { + c := qt.New(t) + + for _, poll := range []bool{true, false} { + if !(poll || isMacOs) || isCI { + // Only run the fsnotify tests on MacOS locally. + continue + } + method := "fsnotify" + if poll { + method = "poll" + } + + c.Run(fmt.Sprintf("%s, Watch dir", method), func(c *qt.C) { + dir, w := preparePollTest(c, poll) + subdir := filepath.Join(dir, subdir1) + c.Assert(w.Add(subdir), qt.IsNil) + + filename := filepath.Join(subdir, "file1") + + // Write to one file. + c.Assert(ioutil.WriteFile(filename, []byte("changed"), 0600), qt.IsNil) + + var expected []fsnotify.Event + + if poll { + expected = append(expected, fsnotify.Event{Name: filename, Op: fsnotify.Write}) + assertEvents(c, w, expected...) + } else { + // fsnotify sometimes emits Chmod before Write, + // which is hard to test, so skip it here. + drainEvents(c, w) + } + + // Remove one file. + filename = filepath.Join(subdir, "file2") + c.Assert(os.Remove(filename), qt.IsNil) + assertEvents(c, w, fsnotify.Event{Name: filename, Op: fsnotify.Remove}) + + // Add one file. + filename = filepath.Join(subdir, "file3") + c.Assert(ioutil.WriteFile(filename, []byte("new"), 0600), qt.IsNil) + assertEvents(c, w, fsnotify.Event{Name: filename, Op: fsnotify.Create}) + + // Remove entire directory. + subdir = filepath.Join(dir, subdir2) + c.Assert(w.Add(subdir), qt.IsNil) + + c.Assert(os.RemoveAll(subdir), qt.IsNil) + + expected = expected[:0] + + // This looks like a bug in fsnotify on MacOS. There are + // 3 files in this directory, yet we get Remove events + // for one of them + the directory. + if !poll { + expected = append(expected, fsnotify.Event{Name: filepath.Join(subdir, "file2"), Op: fsnotify.Remove}) + } + expected = append(expected, fsnotify.Event{Name: subdir, Op: fsnotify.Remove}) + assertEvents(c, w, expected...) + + }) + + c.Run(fmt.Sprintf("%s, Add should not trigger event", method), func(c *qt.C) { + dir, w := preparePollTest(c, poll) + subdir := filepath.Join(dir, subdir1) + w.Add(subdir) + assertEvents(c, w) + // Create a new sub directory and add it to the watcher. + subdir = filepath.Join(dir, subdir1, subdir2) + c.Assert(os.Mkdir(subdir, 0777), qt.IsNil) + w.Add(subdir) + // This should create only one event. + assertEvents(c, w, fsnotify.Event{Name: subdir, Op: fsnotify.Create}) + }) + + } +} + +func TestPollerClose(t *testing.T) { + c := qt.New(t) + w := NewPollingWatcher(watchWaitTime) + f1, err := ioutil.TempFile("", "f1") + c.Assert(err, qt.IsNil) + f2, err := ioutil.TempFile("", "f2") + c.Assert(err, qt.IsNil) + filename1 := f1.Name() + filename2 := f2.Name() + f1.Close() + f2.Close() + + c.Assert(w.Add(filename1), qt.IsNil) + c.Assert(w.Add(filename2), qt.IsNil) + c.Assert(w.Close(), qt.IsNil) + c.Assert(w.Close(), qt.IsNil) + c.Assert(ioutil.WriteFile(filename1, []byte("new"), 0600), qt.IsNil) + c.Assert(ioutil.WriteFile(filename2, []byte("new"), 0600), qt.IsNil) + // No more event as the watchers are closed. + assertEvents(c, w) + + f2, err = ioutil.TempFile("", "f2") + c.Assert(err, qt.IsNil) + + defer os.Remove(f2.Name()) + + c.Assert(w.Add(f2.Name()), qt.Not(qt.IsNil)) + +} + +func TestCheckChange(t *testing.T) { + c := qt.New(t) + + dir := prepareTestDirWithSomeFiles(c, "check-change") + + stat := func(s ...string) os.FileInfo { + fi, err := os.Stat(filepath.Join(append([]string{dir}, s...)...)) + c.Assert(err, qt.IsNil) + return fi + } + + f0, f1, f2 := stat(subdir2, "file0"), stat(subdir2, "file1"), stat(subdir2, "file2") + d1 := stat(subdir1) + + // Note that on Windows, only the 0200 bit (owner writable) of mode is used. + c.Assert(os.Chmod(filepath.Join(filepath.Join(dir, subdir2, "file1")), 0400), qt.IsNil) + f1_2 := stat(subdir2, "file1") + + c.Assert(ioutil.WriteFile(filepath.Join(filepath.Join(dir, subdir2, "file2")), []byte("changed"), 0600), qt.IsNil) + f2_2 := stat(subdir2, "file2") + + c.Assert(checkChange(f0, nil), qt.Equals, fsnotify.Remove) + c.Assert(checkChange(nil, f0), qt.Equals, fsnotify.Create) + c.Assert(checkChange(f1, f1_2), qt.Equals, fsnotify.Chmod) + c.Assert(checkChange(f2, f2_2), qt.Equals, fsnotify.Write) + c.Assert(checkChange(nil, nil), qt.Equals, fsnotify.Op(0)) + c.Assert(checkChange(d1, f1), qt.Equals, fsnotify.Op(0)) + c.Assert(checkChange(f1, d1), qt.Equals, fsnotify.Op(0)) +} + +func BenchmarkPoller(b *testing.B) { + runBench := func(b *testing.B, item *itemToWatch) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + evs, err := item.checkForChanges() + if err != nil { + b.Fatal(err) + } + if len(evs) != 0 { + b.Fatal("got events") + } + + } + + } + + b.Run("Check for changes in dir", func(b *testing.B) { + c := qt.New(b) + dir := prepareTestDirWithSomeFiles(c, "bench-check") + item, err := newItemToWatch(dir) + c.Assert(err, qt.IsNil) + runBench(b, item) + + }) + + b.Run("Check for changes in file", func(b *testing.B) { + c := qt.New(b) + dir := prepareTestDirWithSomeFiles(c, "bench-check-file") + filename := filepath.Join(dir, subdir1, "file1") + item, err := newItemToWatch(filename) + c.Assert(err, qt.IsNil) + runBench(b, item) + }) + +} + +func prepareTestDirWithSomeFiles(c *qt.C, id string) string { + dir, err := ioutil.TempDir("", fmt.Sprintf("test-poller-dir-%s", id)) + c.Assert(err, qt.IsNil) + c.Assert(os.MkdirAll(filepath.Join(dir, subdir1), 0777), qt.IsNil) + c.Assert(os.MkdirAll(filepath.Join(dir, subdir2), 0777), qt.IsNil) + + for i := 0; i < 3; i++ { + c.Assert(ioutil.WriteFile(filepath.Join(dir, subdir1, fmt.Sprintf("file%d", i)), []byte("hello1"), 0600), qt.IsNil) + } + + for i := 0; i < 3; i++ { + c.Assert(ioutil.WriteFile(filepath.Join(dir, subdir2, fmt.Sprintf("file%d", i)), []byte("hello2"), 0600), qt.IsNil) + } + + c.Cleanup(func() { + os.RemoveAll(dir) + }) + + return dir +} + +func preparePollTest(c *qt.C, poll bool) (string, FileWatcher) { + var w FileWatcher + if poll { + w = NewPollingWatcher(watchWaitTime) + } else { + var err error + w, err = NewEventWatcher() + c.Assert(err, qt.IsNil) + } + + dir := prepareTestDirWithSomeFiles(c, fmt.Sprint(poll)) + + c.Cleanup(func() { + w.Close() + }) + return dir, w +} + +func assertEvents(c *qt.C, w FileWatcher, evs ...fsnotify.Event) { + c.Helper() + i := 0 + check := func() error { + for { + select { + case got := <-w.Events(): + if i > len(evs)-1 { + return fmt.Errorf("got too many event(s): %q", got) + } + expected := evs[i] + i++ + if expected.Name != got.Name { + return fmt.Errorf("got wrong filename, expected %q: %v", expected.Name, got.Name) + } else if got.Op&expected.Op != expected.Op { + return fmt.Errorf("got wrong event type, expected %q: %v", expected.Op, got.Op) + } + case e := <-w.Errors(): + return fmt.Errorf("got unexpected error waiting for events %v", e) + case <-time.After(watchWaitTime + (watchWaitTime / 2)): + return nil + } + } + } + c.Assert(check(), qt.IsNil) + c.Assert(i, qt.Equals, len(evs)) +} + +func drainEvents(c *qt.C, w FileWatcher) { + c.Helper() + check := func() error { + for { + select { + case <-w.Events(): + case e := <-w.Errors(): + return fmt.Errorf("got unexpected error waiting for events %v", e) + case <-time.After(watchWaitTime * 2): + return nil + } + } + } + c.Assert(check(), qt.IsNil) +} |