Skip to content

Commit

Permalink
chore: upgraded fsnotify to v1.4.9
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosroman committed Aug 28, 2020
1 parent 2bc4b3d commit c201a61
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 66 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/docker/go-units v0.4.0 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/fortytw2/leaktest v1.3.1-0.20190606143808-d73c753520d9
github.com/fsnotify/fsnotify v0.9.3
github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/gogo/protobuf v1.1.2-0.20181116123445-07eab6a8298c // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/fortytw2/leaktest v1.3.1-0.20190606143808-d73c753520d9 h1:tKHw9zBEj0r
github.com/fortytw2/leaktest v1.3.1-0.20190606143808-d73c753520d9/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v0.9.3 h1:ohbr5HT+EHaq7KpTqPxkowzWw0vTFj25E8HVrVVHNIA=
github.com/fsnotify/fsnotify v0.9.3/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-fsnotify/fsnotify v0.0.0-20180321022601-755488143dae h1:PeVNzgTRtWGm6fVic5i21t+n5ptPGCZuMcSPVMyTWjs=
Expand Down Expand Up @@ -135,6 +137,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
6 changes: 3 additions & 3 deletions internal/agent/bulk_inventories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
const maxInventoryDataSize = 3 * 1000 * 1000

var plugin = &delta.PluginInfo{
Source: "metadata/plugin",
Plugin: "metadata",
FileName: "plugin.json",
Source: "metadata/plugin",
Plugin: "metadata",
FileName: "plugin.json",
}

// createDelta creates and stores a delta JSON for a given entity, with a size approximate to the given size
Expand Down
4 changes: 2 additions & 2 deletions internal/agent/delta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func (s *Store) updateLastDeltaSent(entityKey string, dRaw *inventoryapi.RawDelt
// Send again? This is a no-op, set last sent id to one previous.
dslog.WithFields(logrus.Fields{"sendNextID": id, "plugin": p}).
Debug("Requesting to update last delta sent to identical value.")
p.setLastSentID(entityKey, id - 1)
p.setLastSentID(entityKey, id-1)
}
} else if id > p.lastSentID(entityKey) {
p.setLastSentID(entityKey, id)
Expand All @@ -417,7 +417,7 @@ func (s *Store) updateLastDeltaSent(entityKey string, dRaw *inventoryapi.RawDelt

func (s *Store) reconciliateWithBackend(pi *PluginInfo, entityKey string, resultHint *inventoryapi.DeltaState) {
_ = s.clearPluginDeltaStore(pi, entityKey)
pi.setLastSentID(entityKey, resultHint.SendNextID - 1)
pi.setLastSentID(entityKey, resultHint.SendNextID-1)
pi.setDeltaID(entityKey, resultHint.LastStoredID)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/agent/delta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (s *DeltaUtilsCoreSuite) TestCompactStoreTrimSentDelta(c *C) {
const eKey = "entity:ID"

ds := s.SetupSavedState(c)
ds.plugins["metadata/plugin"].setLastSentID(eKey,2)
ds.plugins["metadata/plugin"].setLastSentID(eKey, 2)
err := ds.archivePlugin(ds.plugins["metadata/plugin"], eKey)
c.Check(err, IsNil)
size, err := ds.StorageSize(ds.CacheDir)
Expand Down Expand Up @@ -801,8 +801,8 @@ func (s *DeltaUtilsCoreSuite) TestDeltaFileCorrupt(c *C) {

secondPlugin := newPluginInfo("metadata", "plugin.json")
// break on purpose, so read should fail
secondPlugin.Source = "metadata/plugin2"
secondPlugin.Plugin = "metadata2"
secondPlugin.Source = "metadata/plugin2"
secondPlugin.Plugin = "metadata2"

srcFile2 := ds.SourceFilePath(secondPlugin, eKey)
err = os.MkdirAll(filepath.Dir(srcFile2), 0755)
Expand Down
18 changes: 10 additions & 8 deletions internal/plugins/linux/dpkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (self *DpkgPlugin) Run() {
return
}

err = watcher.WatchFlags("/var/lib/dpkg/lock", FSN_CLOSE_WRITE)
err = watcher.Add("/var/lib/dpkg/lock")
if err != nil {
dpkglog.WithError(err).Error("can't setup trigger file watcher for dpkg")
self.Unregister()
Expand All @@ -147,14 +147,16 @@ func (self *DpkgPlugin) Run() {
ticker := time.NewTicker(1)
for {
select {
case _, ok := <-watcher.Event:
case event, ok := <-watcher.Events:
if ok {
counter = counter + 1
if counter > 1 {
dpkglog.WithFields(logrus.Fields{
"frequency": self.frequency,
"counter": counter,
}).Debug("dpkg plugin oversampling.")
if event.Op&fsnotify.Write == fsnotify.Write {
counter = counter + 1
if counter > 1 {
dpkglog.WithFields(logrus.Fields{
"frequency": self.frequency,
"counter": counter,
}).Debug("dpkg plugin oversampling.")
}
}
} else {
dpkglog.Debug("dpkg lock watcher closed.")
Expand Down
20 changes: 11 additions & 9 deletions internal/plugins/linux/rpm.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,11 @@ func (p *rpmPlugin) Run() {
return
}

err = watcher.WatchFlags("/var/lib/rpm/.rpm.lock", FSN_CLOSE_WRITE)
err = watcher.Add("/var/lib/rpm/.rpm.lock")
if err != nil {
// Some old distros, like SLES 11, do not provide .rpm.lock file, but the same
// effect can be achieved by listening some standard files from the RPM database
err = watcher.WatchFlags("/var/lib/rpm/Installtid", FSN_CLOSE_WRITE)
err = watcher.Add("/var/lib/rpm/Installtid")
if err != nil {
rpmlog.WithError(err).Error("can't setup trigger file watcher for rpm")
p.Unregister()
Expand All @@ -151,14 +151,16 @@ func (p *rpmPlugin) Run() {
ticker := time.NewTicker(1)
for {
select {
case _, ok := <-watcher.Event:
case event, ok := <-watcher.Events:
if ok {
counter = counter + 1
if counter > 1 {
rpmlog.WithFields(logrus.Fields{
"frequency": p.frequency,
"counter": counter,
}).Debug("rpm plugin oversampling.")
if event.Op&fsnotify.Write == fsnotify.Write {
counter = counter + 1
if counter > 1 {
rpmlog.WithFields(logrus.Fields{
"frequency": p.frequency,
"counter": counter,
}).Debug("rpm plugin oversampling.")
}
}
} else {
rpmlog.Debug("rpm lock watcher closed.")
Expand Down
26 changes: 16 additions & 10 deletions internal/plugins/linux/sysctl_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewSysctlSubscriberMonitor(id ids.PluginID, ctx agent.AgentContext) (*Sysct
return nil, errors.Wrap(err, "cannot create sys watcher")
}

err = watcher.WatchFlags(sysPoller.procSysDir, fsnotify.FSN_MODIFY)
err = watcher.Add(sysPoller.procSysDir)
if err != nil {
return nil, errors.Wrap(err, "cannot watch on sys filesystem")
}
Expand Down Expand Up @@ -71,19 +71,25 @@ func (p *SysctlSubscriberPlugin) Run() {
ticker.Stop()
ticker = time.NewTicker(p.frequency)

case e := <-p.watcher.Event:
needsFlush = true
output, err := ioutil.ReadFile(e.Name)
if err != nil {
sclog.WithField("file", e.Name).Debug("Cannot read sys file.")
} else {
deltas = append(deltas, p.newSysctlItem(e.Name, output))
case event, ok := <-p.watcher.Events:
if !ok {
continue
}

if event.Op&fsnotify.Write == fsnotify.Write {
needsFlush = true
output, err := ioutil.ReadFile(event.Name)
if err != nil {
sclog.WithField("file", event.Name).Debug("Cannot read sys file.")
} else {
deltas = append(deltas, p.newSysctlItem(event.Name, output))
}
}
}
}
}

// deprecated, just for testing purposes
func (p *SysctlSubscriberPlugin) EventsCh() chan *fsnotify.FileEvent {
return p.watcher.Event
func (p *SysctlSubscriberPlugin) EventsCh() chan fsnotify.Event {
return p.watcher.Events
}
10 changes: 7 additions & 3 deletions internal/plugins/linux/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (self *UsersPlugin) Run() {
return
}

err = watcher.WatchFlags("/var/run/utmp", fsnotify.FSN_MODIFY)
err = watcher.Add("/var/run/utmp")
if err != nil {
usrlog.WithError(err).Error("can't setup trigger file watcher for users")
self.Unregister()
Expand All @@ -108,8 +108,12 @@ func (self *UsersPlugin) Run() {

for {
select {
case <-watcher.Event:
needsFlush = true
case event, ok := <-watcher.Events:
if ok {
if event.Op&fsnotify.Write == fsnotify.Write {
needsFlush = true
}
}
case <-refreshTimer.C:
{
refreshTimer.Reset(self.frequency)
Expand Down
30 changes: 22 additions & 8 deletions pkg/integrations/v4/logs/cfg_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (ccw *ConfigChangesWatcher) Watch(ctx ctx2.Context, changes chan<- struct{}
return
}
ccw.logger.Debugf("adding path to watching %v", ccw.path)
if err := ccw.watcher.Watch(ccw.path); err != nil {
if err := ccw.watcher.Add(ccw.path); err != nil {
ccw.logger.WithError(err).Warn("cant watch for file changes in folder")
return
}
Expand All @@ -55,9 +55,9 @@ func (ccw *ConfigChangesWatcher) watchForChanges(ctx ctx2.Context, changes chan<
ccw.logger.Debug("Watching for logging config file changes.")
for {
select {
case event := <-ccw.watcher.Event:
ccw.handleFileEvent(event, changes)
case err := <-ccw.watcher.Error:
case event := <-ccw.watcher.Events:
ccw.handleFileEvent(&event, changes)
case err := <-ccw.watcher.Errors:
ccw.logger.WithError(err).Debug("Error occurred while watching for logging config file changes.")
case <-ctx.Done():
ccw.logger.Debug("Stopping logging config changes watcher.")
Expand All @@ -69,7 +69,7 @@ func (ccw *ConfigChangesWatcher) watchForChanges(ctx ctx2.Context, changes chan<
}
}

func (ccw *ConfigChangesWatcher) handleFileEvent(event *fsnotify.FileEvent, signalReload chan<- struct{}) {
func (ccw *ConfigChangesWatcher) handleFileEvent(event *fsnotify.Event, signalReload chan<- struct{}) {
helog := ccw.logger.WithField("function", "handleFileEvent")

if event == nil {
Expand All @@ -81,9 +81,23 @@ func (ccw *ConfigChangesWatcher) handleFileEvent(event *fsnotify.FileEvent, sign
WithField("file_name", event.Name)
elog.Debug("Received File event.")

isDelete := event.IsDelete() || event.IsRename()
isCreate := event.IsCreate()
isWrite := isCreate || event.IsModify()
var eDelete, eCreate, eWrite, eRename bool
if event.Op&fsnotify.Write == fsnotify.Write {
eWrite = true
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
eDelete = true
}
if event.Op&fsnotify.Create == fsnotify.Create {
eCreate = true
}
if event.Op&fsnotify.Rename == fsnotify.Rename {
eRename = true
}

isDelete := eDelete || eRename
isCreate := eCreate
isWrite := isCreate || eWrite
if !isDelete && !isWrite {
elog.Debug("Ignoring File event.")
return
Expand Down
36 changes: 25 additions & 11 deletions pkg/integrations/v4/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,12 @@ func NewManager(cfg Configuration, emitter emitter.Emitter) *Manager {

if watcher != nil {
flog.Debugf("watching %v", folder)
if err := watcher.Watch(folder); err != nil {
if err := watcher.Add(folder); err != nil {
flog.WithError(err).Warn("cant watch for file changes in folder")
}
for i := range configs {
flog.Debugf("watching :%v", i)
if err := watcher.Watch(i); err != nil {
if err := watcher.Add(i); err != nil {
flog.WithError(err).Warn("cant watch for file change")
}
}
Expand Down Expand Up @@ -309,9 +309,9 @@ func (mgr *Manager) watchForChanges() {
wclog.Debug("Watching for integrations file changes.")
for {
select {
case event := <-mgr.watcher.Event:
mgr.handleFileEvent(event)
case err := <-mgr.watcher.Error:
case event := <-mgr.watcher.Events:
mgr.handleFileEvent(&event)
case err := <-mgr.watcher.Errors:
wclog.WithError(err).Debug("Error watching file changes.")
case <-mgr.parent.Done():
wclog.Debug("Parent context has been cancelled. Stopped watching for file changes.")
Expand All @@ -320,7 +320,7 @@ func (mgr *Manager) watchForChanges() {
}
}

func (mgr *Manager) handleFileEvent(event *fsnotify.FileEvent) {
func (mgr *Manager) handleFileEvent(event *fsnotify.Event) {
wclog := illog.WithField("function", "handleFileEvent")

if event == nil {
Expand All @@ -332,9 +332,23 @@ func (mgr *Manager) handleFileEvent(event *fsnotify.FileEvent) {
WithField("file_name", event.Name)
elog.Debug("Received File event.")

isDelete := event.IsDelete() || event.IsRename()
isCreate := event.IsCreate()
isWrite := isCreate || event.IsModify()
var eDelete, eCreate, eWrite, eRename bool
if event.Op&fsnotify.Write == fsnotify.Write {
eWrite = true
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
eDelete = true
}
if event.Op&fsnotify.Create == fsnotify.Create {
eCreate = true
}
if event.Op&fsnotify.Rename == fsnotify.Rename {
eRename = true
}

isDelete := eDelete || eRename
isCreate := eCreate
isWrite := isCreate || eWrite
if !isDelete && !isWrite {
elog.Debug("Ignoring File event.")
return
Expand All @@ -359,14 +373,14 @@ func (mgr *Manager) handleFileEvent(event *fsnotify.FileEvent) {
}

elog.Debugf("file '%v' says deleted but still here", event.Name)
if err := mgr.watcher.Watch(event.Name); err != nil {
if err := mgr.watcher.Add(event.Name); err != nil {
elog.WithError(err).Warn("cant watch for file changes")
}
}

if isCreate {
elog.Debugf("watching file '%v' as brand new", event.Name)
if err := mgr.watcher.Watch(event.Name); err != nil {
if err := mgr.watcher.Add(event.Name); err != nil {
elog.WithError(err).Warn("cant watch for file changes")
}

Expand Down
Loading

0 comments on commit c201a61

Please sign in to comment.