diff --git a/contracts/database/orm/orm.go b/contracts/database/orm/orm.go index caece7feb..617bc9729 100644 --- a/contracts/database/orm/orm.go +++ b/contracts/database/orm/orm.go @@ -43,6 +43,13 @@ type Query interface { Begin() (Query, error) // BeginTransaction begins a new transaction BeginTransaction() (Query, error) + // Chunk processes a given number of records in batches. + Chunk(count int, callback func([]any) error) error + // ChunkByID processes records in batches by comparing numeric IDs in ascending order. + // This avoids issues with offset-based pagination when records are added or deleted during processing. + ChunkByID(count int, callback func([]any) error) error + // ChunkByIDDesc processes a given number of records in batches, ordered by ID in descending order. + ChunkByIDDesc(count int, callback func([]any) error) error // Commit commits the changes in a transaction. Commit() error // Count retrieve the "count" result of the query. diff --git a/database/gorm/query.go b/database/gorm/query.go index fbd61ba3f..66f99ced5 100644 --- a/database/gorm/query.go +++ b/database/gorm/query.go @@ -189,6 +189,179 @@ func (r *Query) Cursor() chan contractsdb.Row { return cursorChan } +func (r *Query) Chunk(count int, callback func([]any) error) error { + if count <= 0 { + return errors.OrmQueryChunkZeroOrLess + } + + if r.conditions.model == nil { + return errors.OrmQueryInvalidModel.Args("nil") + } + + initialOffset := 0 + if r.conditions.offset != nil { + initialOffset = *r.conditions.offset + } + var remaining *int + if r.conditions.limit != nil { + limit := *r.conditions.limit + remaining = &limit + } + + query := r.addGlobalScopes().buildConditions() + + destType := reflect.TypeOf(r.conditions.model) + sliceType := reflect.SliceOf(reflect.PointerTo(destType)) + offset := initialOffset + + for remaining == nil || *remaining > 0 { + chunkLimit := count + if remaining != nil && *remaining < count { + chunkLimit = *remaining + } + + results := reflect.New(sliceType).Interface() + + chunkQuery := query.Offset(offset).Limit(chunkLimit).(*Query) + err := chunkQuery.Find(results) + if err != nil { + return err + } + + resultsValue := reflect.ValueOf(results).Elem() + length := resultsValue.Len() + if length == 0 { + return nil + } + + if remaining != nil { + *remaining = max(*remaining-length, 0) + } + + values := make([]any, length) + for i := 0; i < length; i++ { + values[i] = resultsValue.Index(i).Interface() + } + + if err = callback(values); err != nil { + return err + } + + if length < chunkLimit { + return nil + } + + offset += chunkLimit + } + + return nil +} + +func (r *Query) ChunkByID(count int, callback func([]any) error) error { + return r.orderedChunkByID(count, callback, false) +} + +func (r *Query) ChunkByIDDesc(count int, callback func([]any) error) error { + return r.orderedChunkByID(count, callback, true) +} + +func (r *Query) orderedChunkByID(count int, callback func([]any) error, descending bool) error { + if count <= 0 { + return errors.OrmQueryChunkZeroOrLess + } + + if r.conditions.model == nil { + return errors.OrmQueryInvalidModel.Args("nil") + } + + column := "id" + initialOffset := 0 + if r.conditions.offset != nil { + initialOffset = *r.conditions.offset + } + var remaining *int + if r.conditions.limit != nil { + limit := *r.conditions.limit + remaining = &limit + } + + destType := reflect.TypeOf(r.conditions.model) + sliceType := reflect.SliceOf(reflect.PointerTo(destType)) + var lastID any + page := 1 + + for remaining == nil || *remaining > 0 { + chunkLimit := count + if remaining != nil && *remaining < count { + chunkLimit = *remaining + } + + clone := r.addGlobalScopes() + + if initialOffset > 0 { + if page > 1 { + clone = clone.Offset(0).(*Query) + } else { + clone = clone.Offset(initialOffset).(*Query) + } + } + + if descending { + clone = clone.OrderByDesc(column).(*Query) + } else { + clone = clone.OrderBy(column).(*Query) + } + + if lastID != nil { + if descending { + clone = clone.Where(column+" < ?", lastID).(*Query) + } else { + clone = clone.Where(column+" > ?", lastID).(*Query) + } + } + clone = clone.Limit(chunkLimit).(*Query) + + query := clone.buildConditions() + results := reflect.New(sliceType).Interface() + + err := query.Find(results) + if err != nil { + return err + } + + resultsValue := reflect.ValueOf(results).Elem() + length := resultsValue.Len() + + if length == 0 { + break + } + + if remaining != nil { + *remaining = max(*remaining-length, 0) + } + + values := make([]any, length) + for i := 0; i < length; i++ { + values[i] = resultsValue.Index(i).Interface() + } + + lastRecord := values[length-1] + lastID = database.GetID(lastRecord) + + if err = callback(values); err != nil { + return err + } + + if length < chunkLimit { + break + } + + page++ + } + + return nil +} + func (r *Query) DB() (*sql.DB, error) { return r.instance.DB() } diff --git a/errors/list.go b/errors/list.go index a274ae218..61c488947 100644 --- a/errors/list.go +++ b/errors/list.go @@ -128,6 +128,7 @@ var ( OrmMissingWhereClause = New("WHERE conditions required") OrmNoDialectorsFound = New("no dialectors found") OrmQueryAssociationsConflict = New("cannot set orm.Associations and other fields at the same time") + OrmQueryChunkZeroOrLess = New("chunk count must be greater than 0") OrmQueryConditionRequired = New("query condition is required") OrmQueryEmptyId = New("id can't be empty") OrmQueryEmptyRelation = New("relation can't be empty") diff --git a/mocks/database/orm/Query.go b/mocks/database/orm/Query.go index b09772e27..84e76199e 100644 --- a/mocks/database/orm/Query.go +++ b/mocks/database/orm/Query.go @@ -233,6 +233,147 @@ func (_c *Query_BeginTransaction_Call) RunAndReturn(run func() (orm.Query, error return _c } +// Chunk provides a mock function with given fields: count, callback +func (_m *Query) Chunk(count int, callback func([]interface{}) error) error { + ret := _m.Called(count, callback) + + if len(ret) == 0 { + panic("no return value specified for Chunk") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, func([]interface{}) error) error); ok { + r0 = rf(count, callback) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Query_Chunk_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Chunk' +type Query_Chunk_Call struct { + *mock.Call +} + +// Chunk is a helper method to define mock.On call +// - count int +// - callback func([]interface{}) error +func (_e *Query_Expecter) Chunk(count interface{}, callback interface{}) *Query_Chunk_Call { + return &Query_Chunk_Call{Call: _e.mock.On("Chunk", count, callback)} +} + +func (_c *Query_Chunk_Call) Run(run func(count int, callback func([]interface{}) error)) *Query_Chunk_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int), args[1].(func([]interface{}) error)) + }) + return _c +} + +func (_c *Query_Chunk_Call) Return(_a0 error) *Query_Chunk_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Query_Chunk_Call) RunAndReturn(run func(int, func([]interface{}) error) error) *Query_Chunk_Call { + _c.Call.Return(run) + return _c +} + +// ChunkByID provides a mock function with given fields: count, callback +func (_m *Query) ChunkByID(count int, callback func([]interface{}) error) error { + ret := _m.Called(count, callback) + + if len(ret) == 0 { + panic("no return value specified for ChunkByID") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, func([]interface{}) error) error); ok { + r0 = rf(count, callback) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Query_ChunkByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChunkByID' +type Query_ChunkByID_Call struct { + *mock.Call +} + +// ChunkByID is a helper method to define mock.On call +// - count int +// - callback func([]interface{}) error +func (_e *Query_Expecter) ChunkByID(count interface{}, callback interface{}) *Query_ChunkByID_Call { + return &Query_ChunkByID_Call{Call: _e.mock.On("ChunkByID", count, callback)} +} + +func (_c *Query_ChunkByID_Call) Run(run func(count int, callback func([]interface{}) error)) *Query_ChunkByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int), args[1].(func([]interface{}) error)) + }) + return _c +} + +func (_c *Query_ChunkByID_Call) Return(_a0 error) *Query_ChunkByID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Query_ChunkByID_Call) RunAndReturn(run func(int, func([]interface{}) error) error) *Query_ChunkByID_Call { + _c.Call.Return(run) + return _c +} + +// ChunkByIDDesc provides a mock function with given fields: count, callback +func (_m *Query) ChunkByIDDesc(count int, callback func([]interface{}) error) error { + ret := _m.Called(count, callback) + + if len(ret) == 0 { + panic("no return value specified for ChunkByIDDesc") + } + + var r0 error + if rf, ok := ret.Get(0).(func(int, func([]interface{}) error) error); ok { + r0 = rf(count, callback) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Query_ChunkByIDDesc_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChunkByIDDesc' +type Query_ChunkByIDDesc_Call struct { + *mock.Call +} + +// ChunkByIDDesc is a helper method to define mock.On call +// - count int +// - callback func([]interface{}) error +func (_e *Query_Expecter) ChunkByIDDesc(count interface{}, callback interface{}) *Query_ChunkByIDDesc_Call { + return &Query_ChunkByIDDesc_Call{Call: _e.mock.On("ChunkByIDDesc", count, callback)} +} + +func (_c *Query_ChunkByIDDesc_Call) Run(run func(count int, callback func([]interface{}) error)) *Query_ChunkByIDDesc_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int), args[1].(func([]interface{}) error)) + }) + return _c +} + +func (_c *Query_ChunkByIDDesc_Call) Return(_a0 error) *Query_ChunkByIDDesc_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Query_ChunkByIDDesc_Call) RunAndReturn(run func(int, func([]interface{}) error) error) *Query_ChunkByIDDesc_Call { + _c.Call.Return(run) + return _c +} + // Commit provides a mock function with no fields func (_m *Query) Commit() error { ret := _m.Called()