Skip to content

Commit

Permalink
back out jamf changes
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Oct 17, 2024
1 parent 2251d5f commit ddd9e66
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142]
- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192]
- Don't send redundant documents for non-modified entities in the Jamf and Active Directory entityanalytics input. {pull}41179[41179]
- Don't send redundant documents for non-modified entities in the Active Directory entityanalytics input. {pull}41179[41179]

*Heartbeat*

Expand Down
17 changes: 7 additions & 10 deletions x-pack/filebeat/input/entityanalytics/provider/jamf/jamf.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C
return err
}

var last time.Time
for {
select {
case <-inputCtx.Cancelation.Done():
if !errors.Is(inputCtx.Cancelation.Err(), context.Canceled) {
return inputCtx.Cancelation.Err()
}
return nil
case start := <-syncTimer.C:
case <-syncTimer.C:
start := time.Now()
if err := p.runFullSync(inputCtx, store, client); err != nil {
p.logger.Errorw("Error running full sync", "error", err)
p.metrics.syncError.Inc()
Expand All @@ -146,17 +146,16 @@ func (p *jamfInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.C
}
updateTimer.Reset(p.cfg.UpdateInterval)
p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval))
last = start
case start := <-updateTimer.C:
if err := p.runIncrementalUpdate(inputCtx, store, last, client); err != nil {
case <-updateTimer.C:
start := time.Now()
if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil {
p.logger.Errorw("Error running incremental update", "error", err)
p.metrics.updateError.Inc()
}
p.metrics.updateTotal.Inc()
p.metrics.updateProcessingTime.Update(time.Since(start).Nanoseconds())
updateTimer.Reset(p.cfg.UpdateInterval)
p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval))
last = start
}
}
}
Expand Down Expand Up @@ -351,7 +350,7 @@ func (p *jamfInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, clien
// runIncrementalUpdate will run an incremental update. The process is similar
// to full synchronization, except only users which have changed (newly
// discovered, modified, or deleted) will be published.
func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, last time.Time, client beat.Client) error {
func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger.Debugf("Running incremental update...")

state, err := newStateStore(store)
Expand All @@ -375,9 +374,7 @@ func (p *jamfInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Sto
if len(updatedDevices) != 0 {
tracker = kvstore.NewTxTracker(ctx)
for _, d := range updatedDevices {
if d.Modified.After(last) {
p.publishComputer(d, inputCtx.ID, client, tracker)
}
p.publishComputer(d, inputCtx.ID, client, tracker)
}
tracker.Wait()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func TestJamfDoFetch(t *testing.T) {
wantComputers = append(wantComputers, &Computer{
Computer: c,
State: Discovered,
Modified: time.Now(),
})
}

Expand Down Expand Up @@ -90,11 +89,8 @@ func TestJamfDoFetch(t *testing.T) {
t.Fatalf("unexpected error from doFetch: %v", err)
}

validTime := cmp.Comparer(func(a, b time.Time) bool {
return !a.IsZero() && !b.IsZero()
})
if wantComputers != nil && !cmp.Equal(wantComputers, got, validTime) {
t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(wantComputers, got, validTime))
if wantComputers != nil && !cmp.Equal(wantComputers, got) {
t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(wantComputers, got))
}
})
}
Expand Down
15 changes: 11 additions & 4 deletions x-pack/filebeat/input/entityanalytics/provider/jamf/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var (
computersBucket = []byte("computers")
stateBucket = []byte("state")

whenChangedKey = []byte("when_changed")
lastSyncKey = []byte("last_sync")
lastUpdateKey = []byte("last_update")
computersLinkKey = []byte("devices_link")
Expand All @@ -35,15 +36,18 @@ const (

type Computer struct {
jamf.Computer `json:"properties"`
State State `json:"state"`
Modified time.Time `json:"modified"`
State State `json:"state"`
}

// stateStore wraps a kvstore.Transaction and provides convenience methods for
// accessing and store relevant data within the kvstore database.
type stateStore struct {
tx *kvstore.Transaction

// whenChanged is the last whenChanged time in the set of
// users and their associated groups.
whenChanged time.Time

// lastSync and lastUpdate are the times of the first update
// or sync operation of users/devices.
lastSync time.Time
Expand Down Expand Up @@ -75,6 +79,10 @@ func newStateStore(store *kvstore.Store) (*stateStore, error) {
if err != nil && !errIsItemNotFound(err) {
return nil, fmt.Errorf("unable to get last update time from state: %w", err)
}
err = s.tx.Get(stateBucket, whenChangedKey, &s.whenChanged)
if err != nil && !errIsItemNotFound(err) {
return nil, fmt.Errorf("unable to get last change time from state: %w", err)
}

err = s.tx.ForEach(computersBucket, func(key, value []byte) error {
var c Computer
Expand Down Expand Up @@ -108,7 +116,7 @@ func (s *stateStore) storeComputer(c jamf.Computer) (_ *Computer, changed bool)
if !ok {
// Whether this is managed or not, it is discovered. The next sync
// will change its state to Deleted if it is unmanaged.
curr := &Computer{Computer: c, State: Discovered, Modified: time.Now()}
curr := &Computer{Computer: c, State: Discovered}
s.computers[*c.Udid] = curr
return curr, true
}
Expand All @@ -121,7 +129,6 @@ func (s *stateStore) storeComputer(c jamf.Computer) (_ *Computer, changed bool)
}
if changed {
stored.State = Modified
stored.Modified = time.Now()
}
return stored, changed
}
Expand Down

0 comments on commit ddd9e66

Please sign in to comment.