Skip to content

Commit

Permalink
controlling the queues a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Feb 27, 2025
1 parent a220634 commit 9e0a1c2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 49 deletions.
94 changes: 50 additions & 44 deletions engine/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os"
"time"

"github.com/caffix/queue"
et "github.com/owasp-amass/amass/v4/engine/types"
oam "github.com/owasp-amass/open-asset-model"
)
Expand All @@ -22,11 +21,13 @@ const (
)

type dis struct {
logger *slog.Logger
reg et.Registry
mgr et.SessionManager
done chan struct{}
completed queue.Queue
logger *slog.Logger
reg et.Registry
mgr et.SessionManager
done chan struct{}
dchan chan *et.Event
pchan chan *et.Event
cchan chan *et.EventDataElement
}

func NewDispatcher(l *slog.Logger, r et.Registry, mgr et.SessionManager) et.Dispatcher {
Expand All @@ -35,11 +36,13 @@ func NewDispatcher(l *slog.Logger, r et.Registry, mgr et.SessionManager) et.Disp
}

d := &dis{
logger: l,
reg: r,
mgr: mgr,
done: make(chan struct{}),
completed: queue.NewQueue(),
logger: l,
reg: r,
mgr: mgr,
done: make(chan struct{}),
dchan: make(chan *et.Event, MinPipelineQueueSize),
pchan: make(chan *et.Event, MinPipelineQueueSize),
cchan: make(chan *et.EventDataElement, MinPipelineQueueSize),
}

go d.maintainPipelines()
Expand All @@ -55,29 +58,43 @@ func (d *dis) Shutdown() {
close(d.done)
}

func (d *dis) DispatchEvent(e *et.Event) error {
if e == nil {
return errors.New("the event is nil")
} else if e.Session == nil {
return errors.New("the event has no associated session")
} else if e.Session.Done() {
return errors.New("the associated session has been terminated")
} else if e.Entity == nil || e.Entity.Asset == nil {
return errors.New("the event has no associated entity or asset")
}

d.dchan <- e
return nil
}

func (d *dis) maintainPipelines() {
ctick := time.NewTicker(5 * time.Second)
defer ctick.Stop()
qtick := time.NewTicker(time.Second)
defer qtick.Stop()
loop:
for {
select {
case <-d.done:
break loop
case <-qtick.C:
case <-ctick.C:
d.fillPipelineQueues()
case <-d.completed.Signal():
if element, ok := d.completed.Next(); ok {
d.completedCallback(element)
case e := <-d.dchan:
if err := d.safeDispatch(e); err != nil {
d.logger.Error(fmt.Sprintf("Failed to dispatch event: %s", err.Error()))
}
case <-ctick.C:
if element, ok := d.completed.Next(); ok {
d.completedCallback(element)
case e := <-d.cchan:
d.completedCallback(e)
case e := <-d.pchan:
if err := d.appendToPipeline(e); err != nil {
d.logger.Error(fmt.Sprintf("Failed to append to a data pipeline: %s", err.Error()))
}
}
}
d.completed.Process(d.completedCallback)
}

func (d *dis) fillPipelineQueues() {
Expand All @@ -103,14 +120,11 @@ func (d *dis) fillPipelineQueues() {
for _, atype := range ptypes {
if entities, err := s.Queue().Next(atype, numRequested); err == nil && len(entities) > 0 {
for _, entity := range entities {
event := &et.Event{
d.pchan <- &et.Event{
Name: fmt.Sprintf("%s - %s", string(atype), entity.Asset.Key()),
Entity: entity,
Session: s,
}
if err := d.appendToPipelineQueue(event); err != nil {
s.Log().WithGroup("event").With("name", event.Name).Error(err.Error())
}
}
}
}
Expand All @@ -134,46 +148,38 @@ func (d *dis) completedCallback(data interface{}) {
}
}

func (d *dis) DispatchEvent(e *et.Event) error {
if e == nil {
return errors.New("the event is nil")
} else if e.Session == nil {
return errors.New("the event has no associated session")
} else if e.Session.Done() {
return errors.New("the associated session has been terminated")
} else if e.Entity == nil || e.Entity.Asset == nil {
return errors.New("the event has no associated entity or asset")
func (d *dis) safeDispatch(e *et.Event) error {
ap, err := d.reg.GetPipeline(e.Entity.Asset.AssetType())
if err != nil {
return err
}

// do not schedule the same asset more than once
if e.Session.Queue().Has(e.Entity) {
return errors.New("this event was processed previously")
return nil
}

err := e.Session.Queue().Append(e.Entity)
err = e.Session.Queue().Append(e.Entity)
if err != nil {
return err
}

// increment the number of events processed in the session
if stats := e.Session.Stats(); stats != nil {
stats.Lock()
stats.WorkItemsTotal++
stats.Unlock()
}

ap, err := d.reg.GetPipeline(e.Entity.Asset.AssetType())
if err != nil {
return err
}

if qlen := ap.Queue.Len(); e.Meta != nil || qlen < MinPipelineQueueSize {
if err := d.appendToPipelineQueue(e); err != nil {
if err := d.appendToPipeline(e); err != nil {
return err
}
}
return nil
}

func (d *dis) appendToPipelineQueue(e *et.Event) error {
func (d *dis) appendToPipeline(e *et.Event) error {
if e == nil || e.Session == nil || e.Entity == nil || e.Entity.Asset == nil {
return errors.New("the event is nil")
}
Expand All @@ -186,7 +192,7 @@ func (d *dis) appendToPipelineQueue(e *et.Event) error {
e.Dispatcher = d
if data := et.NewEventDataElement(e); data != nil {
_ = e.Session.Queue().Processed(e.Entity)
data.Queue = d.completed
data.Queue = d.cchan
ap.Queue.Append(data)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions engine/registry/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func makeSink() pipeline.SinkFunc {
return errors.New("pipeline sink failed to extract the EventDataElement")
}

ede.Queue.Append(ede)
ede.Queue <- ede
return nil
})
}
Expand All @@ -106,11 +106,11 @@ func handlerTask(h *et.Handler) pipeline.TaskFunc {

select {
case <-ctx.Done():
ede.Queue.Append(ede)
ede.Queue <- ede
return nil, nil
default:
if ede.Event.Session.Done() {
ede.Queue.Append(ede)
ede.Queue <- ede
return nil, nil
}
}
Expand Down
3 changes: 1 addition & 2 deletions engine/types/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package types

import (
"github.com/caffix/pipeline"
"github.com/caffix/queue"
"github.com/google/uuid"
dbt "github.com/owasp-amass/asset-db/types"
oam "github.com/owasp-amass/open-asset-model"
Expand Down Expand Up @@ -44,7 +43,7 @@ type Asset struct {
type EventDataElement struct {
Event *Event
Error error
Queue queue.Queue
Queue chan *EventDataElement
}

func NewEventDataElement(e *Event) *EventDataElement {
Expand Down

0 comments on commit 9e0a1c2

Please sign in to comment.