Skip to content

Commit

Permalink
removed more queues
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Feb 27, 2025
1 parent 9e0a1c2 commit 41a6f32
Showing 1 changed file with 5 additions and 7 deletions.
12 changes: 5 additions & 7 deletions engine/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type dis struct {
mgr et.SessionManager
done chan struct{}
dchan chan *et.Event
pchan chan *et.Event
cchan chan *et.EventDataElement
}

Expand All @@ -41,7 +40,6 @@ func NewDispatcher(l *slog.Logger, r et.Registry, mgr et.SessionManager) et.Disp
mgr: mgr,
done: make(chan struct{}),
dchan: make(chan *et.Event, MinPipelineQueueSize),
pchan: make(chan *et.Event, MinPipelineQueueSize),
cchan: make(chan *et.EventDataElement, MinPipelineQueueSize),
}

Expand Down Expand Up @@ -89,10 +87,6 @@ loop:
}
case e := <-d.cchan:
d.completedCallback(e)
case e := <-d.pchan:
if err := d.appendToPipeline(e); err != nil {
d.logger.Error(fmt.Sprintf("Failed to append to a data pipeline: %s", err.Error()))
}
}
}
}
Expand Down Expand Up @@ -120,11 +114,14 @@ func (d *dis) fillPipelineQueues() {
for _, atype := range ptypes {
if entities, err := s.Queue().Next(atype, numRequested); err == nil && len(entities) > 0 {
for _, entity := range entities {
d.pchan <- &et.Event{
e := &et.Event{
Name: fmt.Sprintf("%s - %s", string(atype), entity.Asset.Key()),
Entity: entity,
Session: s,
}
if err := d.appendToPipeline(e); err != nil {
d.logger.Error(fmt.Sprintf("Failed to append to a data pipeline: %s", err.Error()))
}
}
}
}
Expand Down Expand Up @@ -173,6 +170,7 @@ func (d *dis) safeDispatch(e *et.Event) error {

if qlen := ap.Queue.Len(); e.Meta != nil || qlen < MinPipelineQueueSize {
if err := d.appendToPipeline(e); err != nil {
d.logger.Error(fmt.Sprintf("Failed to append to a data pipeline: %s", err.Error()))
return err
}
}
Expand Down

0 comments on commit 41a6f32

Please sign in to comment.