Skip to content

Commit 9232f6d

Browse files
author
arashackdev
committed
feat: add chunk by query methods (#714)
1 parent d029a42 commit 9232f6d

File tree

4 files changed

+359
-0
lines changed

4 files changed

+359
-0
lines changed

contracts/database/orm/orm.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ type Query interface {
4343
Begin() (Query, error)
4444
// BeginTransaction begins a new transaction
4545
BeginTransaction() (Query, error)
46+
// Chunk processes a given number of records in batches.
47+
Chunk(count int, callback func([]any) error) error
48+
// Chunk the results of a query by comparing numeric IDs.
49+
ChunkByID(count int, callback func([]any) error) error
50+
// ChunkByIDDesc processes a given number of records in batches, ordered by ID in descending order.
51+
ChunkByIDDesc(count int, callback func([]any) error) error
4652
// Commit commits the changes in a transaction.
4753
Commit() error
4854
// Count retrieve the "count" result of the query.
@@ -120,6 +126,8 @@ type Query interface {
120126
OrderByDesc(column string) Query
121127
// OrderByRaw specifies the order should be raw.
122128
OrderByRaw(raw string) Query
129+
// OrderedChunkByID processes a given number of records in batches, ordered by ID.
130+
OrderedChunkByID(count int, callback func([]any) error, descending bool) error
123131
// OrWhere add an "or where" clause to the query.
124132
OrWhere(query any, args ...any) Query
125133
// OrWhereBetween adds an "or where column between x and y" clause to the query.

database/gorm/query.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,167 @@ func (r *Query) Cursor() chan contractsdb.Row {
189189
return cursorChan
190190
}
191191

192+
func (r *Query) Chunk(count int, callback func([]any) error) error {
193+
if count <= 0 {
194+
return errors.OrmQueryChunkZeroOrLess
195+
}
196+
197+
var remaining *int
198+
if r.conditions.limit != nil {
199+
limit := *r.conditions.limit
200+
remaining = &limit
201+
}
202+
203+
query := r.addGlobalScopes().buildConditions()
204+
205+
destType := reflect.TypeOf(r.conditions.model)
206+
sliceType := reflect.SliceOf(reflect.PointerTo(destType))
207+
offset := 0
208+
209+
for remaining == nil || *remaining > 0 {
210+
chunkLimit := count
211+
if remaining != nil && *remaining < count {
212+
chunkLimit = *remaining
213+
}
214+
215+
results := reflect.New(sliceType).Interface()
216+
217+
chunkQuery := query.Offset(offset).Limit(chunkLimit).(*Query)
218+
err := chunkQuery.Find(results)
219+
if err != nil {
220+
return err
221+
}
222+
223+
resultsValue := reflect.ValueOf(results).Elem()
224+
length := resultsValue.Len()
225+
if length == 0 {
226+
return nil
227+
}
228+
229+
if remaining != nil {
230+
*remaining = max(*remaining-length, 0)
231+
}
232+
233+
values := make([]any, length)
234+
for i := 0; i < length; i++ {
235+
values[i] = resultsValue.Index(i).Interface()
236+
}
237+
238+
if err = callback(values); err != nil {
239+
return err
240+
}
241+
242+
if length < chunkLimit {
243+
return nil
244+
}
245+
246+
offset += chunkLimit
247+
}
248+
249+
return nil
250+
}
251+
252+
func (r *Query) ChunkByID(count int, callback func([]any) error) error {
253+
return r.OrderedChunkByID(count, callback, false)
254+
}
255+
256+
func (r *Query) ChunkByIDDesc(count int, callback func([]any) error) error {
257+
return r.OrderedChunkByID(count, callback, true)
258+
}
259+
260+
func (r *Query) OrderedChunkByID(count int, callback func([]any) error, descending bool) error {
261+
if count <= 0 {
262+
return errors.OrmQueryChunkZeroOrLess
263+
}
264+
265+
column := "id"
266+
initialOffset := 0
267+
if r.conditions.offset != nil {
268+
initialOffset = *r.conditions.offset
269+
}
270+
var remaining *int
271+
if r.conditions.limit != nil {
272+
limit := *r.conditions.limit
273+
remaining = &limit
274+
}
275+
276+
destType := reflect.TypeOf(r.conditions.model)
277+
sliceType := reflect.SliceOf(reflect.PointerTo(destType))
278+
var lastID any
279+
page := 1
280+
281+
for remaining == nil || *remaining > 0 {
282+
chunkLimit := count
283+
if remaining != nil && *remaining < count {
284+
chunkLimit = *remaining
285+
}
286+
287+
clone := r.addGlobalScopes()
288+
289+
if initialOffset > 0 {
290+
if page > 1 {
291+
clone = clone.Offset(0).(*Query)
292+
} else {
293+
clone = clone.Offset(initialOffset).(*Query)
294+
}
295+
}
296+
297+
if descending {
298+
clone = clone.OrderByDesc(column).(*Query)
299+
} else {
300+
clone = clone.OrderBy(column).(*Query)
301+
}
302+
303+
if lastID != nil {
304+
if descending {
305+
clone = clone.Where(column+" < ?", lastID).(*Query)
306+
} else {
307+
clone = clone.Where(column+" > ?", lastID).(*Query)
308+
}
309+
}
310+
clone = clone.Limit(chunkLimit).(*Query)
311+
312+
query := clone.buildConditions()
313+
results := reflect.New(sliceType).Interface()
314+
315+
err := query.Find(results)
316+
if err != nil {
317+
return err
318+
}
319+
320+
resultsValue := reflect.ValueOf(results).Elem()
321+
length := resultsValue.Len()
322+
323+
if length == 0 {
324+
break
325+
}
326+
327+
if remaining != nil {
328+
*remaining = max(*remaining-length, 0)
329+
}
330+
331+
values := make([]any, length)
332+
for i := 0; i < length; i++ {
333+
values[i] = resultsValue.Index(i).Interface()
334+
}
335+
336+
lastRecord := values[length-1]
337+
lastID = database.GetID(lastRecord)
338+
339+
if err = callback(values); err != nil {
340+
return err
341+
}
342+
343+
if length < chunkLimit {
344+
break
345+
}
346+
347+
page++
348+
}
349+
350+
return nil
351+
}
352+
192353
func (r *Query) DB() (*sql.DB, error) {
193354
return r.instance.DB()
194355
}

errors/list.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ var (
128128
OrmMissingWhereClause = New("WHERE conditions required")
129129
OrmNoDialectorsFound = New("no dialectors found")
130130
OrmQueryAssociationsConflict = New("cannot set orm.Associations and other fields at the same time")
131+
OrmQueryChunkZeroOrLess = New("chunk count must be greater than 0")
131132
OrmQueryConditionRequired = New("query condition is required")
132133
OrmQueryEmptyId = New("id can't be empty")
133134
OrmQueryEmptyRelation = New("relation can't be empty")

0 commit comments

Comments
 (0)