Skip to content

Commit

Permalink
improvements to the session entity queue
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Feb 26, 2025
1 parent 749476b commit a220634
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 15 deletions.
20 changes: 11 additions & 9 deletions engine/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ func (d *dis) completedCallback(data interface{}) {
ede.Event.Session.Log().WithGroup("event").With("name", ede.Event.Name).Error(err.Error())
}
// increment the number of events processed in the session
stats := ede.Event.Session.Stats()
stats.Lock()
stats.WorkItemsCompleted++
stats.Unlock()
if stats := ede.Event.Session.Stats(); stats != nil {
stats.Lock()
stats.WorkItemsCompleted++
stats.Unlock()
}
}

func (d *dis) DispatchEvent(e *et.Event) error {
Expand All @@ -152,6 +153,12 @@ func (d *dis) DispatchEvent(e *et.Event) error {
if err != nil {
return err
}
// increment the number of events processed in the session
if stats := e.Session.Stats(); stats != nil {
stats.Lock()
stats.WorkItemsTotal++
stats.Unlock()
}

ap, err := d.reg.GetPipeline(e.Entity.Asset.AssetType())
if err != nil {
Expand Down Expand Up @@ -181,11 +188,6 @@ func (d *dis) appendToPipelineQueue(e *et.Event) error {
_ = e.Session.Queue().Processed(e.Entity)
data.Queue = d.completed
ap.Queue.Append(data)
// increment the number of events processed in the session
stats := e.Session.Stats()
stats.Lock()
stats.WorkItemsTotal++
stats.Unlock()
}
return nil
}
12 changes: 8 additions & 4 deletions engine/sessions/queuedb/queue_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ type QueueDB struct {

type Element struct {
ID uint64 `gorm:"primaryKey;column:id"`
CreatedAt time.Time `gorm:"index:idx_created_at;column:created_at"`
CreatedAt time.Time `gorm:"index:idx_created_at,sort:asc;column:created_at"`
UpdatedAt time.Time
Type string `gorm:"index:idx_etype;column:etype"`
EntityID string `gorm:"index:idx_entity_id,unique;column:entity_id"`
Processed bool `gorm:"index:idx_processed;column:processed"`
}

func NewQueueDB(dbPath string) (*QueueDB, error) {
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)})
db, err := gorm.Open(sqlite.Open(dbPath), &gorm.Config{
PrepareStmt: false,
SkipDefaultTransaction: true,
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -101,8 +105,8 @@ func (r *QueueDB) Processed(eid string) error {
func (r *QueueDB) Delete(eid string) error {
var element Element

result := r.db.Model(&Element{}).Where("entity_id = ?", eid).Find(&element)
if err := result.Error; err != nil {
err := r.db.Model(&Element{}).Where("entity_id = ?", eid).Find(&element).Error
if err != nil {
return err
}

Expand Down
8 changes: 8 additions & 0 deletions engine/sessions/queuedb/queue_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestNewQueueDB(t *testing.T) {

found := queueDB.db.Migrator().HasTable(&Element{})
assert.True(t, found, "Element table should exist")
found = queueDB.db.Migrator().HasIndex(&Element{}, "idx_created_at")
assert.True(t, found, "Index idx_created_at should exist")
found = queueDB.db.Migrator().HasIndex(&Element{}, "idx_etype")
assert.True(t, found, "Index idx_etype should exist")
found = queueDB.db.Migrator().HasIndex(&Element{}, "idx_entity_id")
assert.True(t, found, "Index idx_entity_id should exist")
found = queueDB.db.Migrator().HasIndex(&Element{}, "idx_processed")
assert.True(t, found, "Index idx_processed should exist")
}

func TestHas(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/cheggaaa/pb/v3 v3.1.6
github.com/fatih/color v1.18.0
github.com/geziyor/geziyor v0.0.0-20240812061556-229b8ca83ac1
github.com/glebarez/sqlite v1.11.0
github.com/go-ini/ini v1.67.0
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
Expand All @@ -36,6 +37,8 @@ require (
go.uber.org/ratelimit v0.3.1
golang.org/x/net v0.35.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/gorm v1.25.12
gorm.io/hints v1.1.2
mvdan.cc/xurls/v2 v2.6.0
)

Expand All @@ -57,7 +60,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/glebarez/go-sqlite v1.22.0 // indirect
github.com/glebarez/sqlite v1.11.0 // indirect
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 // indirect
github.com/go-kit/kit v0.13.0 // indirect
Expand Down Expand Up @@ -109,7 +111,6 @@ require (
gorm.io/datatypes v1.2.5 // indirect
gorm.io/driver/mysql v1.5.7 // indirect
gorm.io/driver/postgres v1.5.11 // indirect
gorm.io/gorm v1.25.12 // indirect
modernc.org/libc v1.61.13 // indirect
modernc.org/mathutil v1.7.1 // indirect
modernc.org/memory v1.8.2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc=
github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -2148,13 +2150,18 @@ gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=
gorm.io/driver/postgres v1.5.11/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI=
gorm.io/driver/sqlite v1.5.0/go.mod h1:kDMDfntV9u/vuMmz8APHtHF0b4nyBB7sfCieC6G8k8I=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/driver/sqlserver v1.5.4 h1:xA+Y1KDNspv79q43bPyjDMUgHoYHLhXYmdFcYPobg8g=
gorm.io/driver/sqlserver v1.5.4/go.mod h1:+frZ/qYmuna11zHPlh5oc2O6ZA/lS88Keb0XSH1Zh/g=
gorm.io/gorm v1.24.7-0.20230306060331-85eaf9eeda11/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
gorm.io/hints v1.1.2 h1:b5j0kwk5p4+3BtDtYqqfY+ATSxjj+6ptPgVveuynn9o=
gorm.io/hints v1.1.2/go.mod h1:/ARdpUHAtyEMCh5NNi3tI7FsGh+Cj/MIUlvNxCNCFWg=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down

0 comments on commit a220634

Please sign in to comment.