Skip to content

Commit 853bd60

Browse files
author
arashackdev
committed
feat: add chunk by query methods (#714)
1 parent d029a42 commit 853bd60

File tree

4 files changed

+367
-0
lines changed

4 files changed

+367
-0
lines changed

contracts/database/orm/orm.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ type Query interface {
4343
Begin() (Query, error)
4444
// BeginTransaction begins a new transaction
4545
BeginTransaction() (Query, error)
46+
// Chunk processes a given number of records in batches.
47+
Chunk(count int, callback func([]any) error) error
48+
// Chunk the results of a query by comparing numeric IDs.
49+
ChunkByID(count int, callback func([]any) error) error
50+
// ChunkByIDDesc processes a given number of records in batches, ordered by ID in descending order.
51+
ChunkByIDDesc(count int, callback func([]any) error) error
4652
// Commit commits the changes in a transaction.
4753
Commit() error
4854
// Count retrieve the "count" result of the query.
@@ -120,6 +126,8 @@ type Query interface {
120126
OrderByDesc(column string) Query
121127
// OrderByRaw specifies the order should be raw.
122128
OrderByRaw(raw string) Query
129+
// OrderedChunkByID processes a given number of records in batches, ordered by ID.
130+
OrderedChunkByID(count int, callback func([]any) error, descending bool) error
123131
// OrWhere add an "or where" clause to the query.
124132
OrWhere(query any, args ...any) Query
125133
// OrWhereBetween adds an "or where column between x and y" clause to the query.

database/gorm/query.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,175 @@ func (r *Query) Cursor() chan contractsdb.Row {
189189
return cursorChan
190190
}
191191

192+
func (r *Query) Chunk(count int, callback func([]any) error) error {
193+
if count <= 0 {
194+
return errors.OrmQueryChunkZeroOrLess
195+
}
196+
197+
var remaining *int
198+
if r.conditions.limit != nil {
199+
limit := *r.conditions.limit
200+
remaining = &limit
201+
}
202+
203+
query := r.addGlobalScopes().buildConditions()
204+
205+
destType := reflect.TypeOf(r.conditions.model)
206+
sliceType := reflect.SliceOf(reflect.PointerTo(destType))
207+
offset := 0
208+
209+
for {
210+
if remaining != nil && *remaining == 0 {
211+
break
212+
}
213+
214+
chunkLimit := count
215+
if remaining != nil && *remaining < count {
216+
chunkLimit = *remaining
217+
}
218+
219+
results := reflect.New(sliceType).Interface()
220+
221+
chunkQuery := query.Offset(offset).Limit(chunkLimit).(*Query)
222+
err := chunkQuery.Find(results)
223+
if err != nil {
224+
return err
225+
}
226+
227+
resultsValue := reflect.ValueOf(results).Elem()
228+
length := resultsValue.Len()
229+
if length == 0 {
230+
return nil
231+
}
232+
233+
if remaining != nil {
234+
*remaining = max(*remaining-length, 0)
235+
}
236+
237+
values := make([]any, length)
238+
for i := 0; i < length; i++ {
239+
values[i] = resultsValue.Index(i).Interface()
240+
}
241+
242+
if err = callback(values); err != nil {
243+
return err
244+
}
245+
246+
if length < chunkLimit {
247+
return nil
248+
}
249+
250+
offset += chunkLimit
251+
}
252+
253+
return nil
254+
}
255+
256+
func (r *Query) ChunkByID(count int, callback func([]any) error) error {
257+
return r.OrderedChunkByID(count, callback, false)
258+
}
259+
260+
func (r *Query) ChunkByIDDesc(count int, callback func([]any) error) error {
261+
return r.OrderedChunkByID(count, callback, true)
262+
}
263+
264+
func (r *Query) OrderedChunkByID(count int, callback func([]any) error, descending bool) error {
265+
if count <= 0 {
266+
return errors.OrmQueryChunkZeroOrLess
267+
}
268+
269+
column := "id"
270+
initialOffset := 0
271+
if r.conditions.offset != nil {
272+
initialOffset = *r.conditions.offset
273+
}
274+
var remaining *int
275+
if r.conditions.limit != nil {
276+
limit := *r.conditions.limit
277+
remaining = &limit
278+
}
279+
280+
destType := reflect.TypeOf(r.conditions.model)
281+
sliceType := reflect.SliceOf(reflect.PointerTo(destType))
282+
var lastID any
283+
page := 1
284+
285+
for {
286+
if remaining != nil && *remaining == 0 {
287+
break
288+
}
289+
290+
chunkLimit := count
291+
if remaining != nil && *remaining < count {
292+
chunkLimit = *remaining
293+
}
294+
295+
clone := r.addGlobalScopes()
296+
297+
if initialOffset > 0 {
298+
if page > 1 {
299+
clone = clone.Offset(0).(*Query)
300+
} else {
301+
clone = clone.Offset(initialOffset).(*Query)
302+
}
303+
}
304+
305+
if descending {
306+
clone = clone.OrderByDesc(column).(*Query)
307+
} else {
308+
clone = clone.OrderBy(column).(*Query)
309+
}
310+
311+
if lastID != nil {
312+
if descending {
313+
clone = clone.Where(column+" < ?", lastID).(*Query)
314+
} else {
315+
clone = clone.Where(column+" > ?", lastID).(*Query)
316+
}
317+
}
318+
clone = clone.Limit(chunkLimit).(*Query)
319+
320+
query := clone.buildConditions()
321+
results := reflect.New(sliceType).Interface()
322+
323+
err := query.Find(results)
324+
if err != nil {
325+
return err
326+
}
327+
328+
resultsValue := reflect.ValueOf(results).Elem()
329+
length := resultsValue.Len()
330+
331+
if length == 0 {
332+
break
333+
}
334+
335+
if remaining != nil {
336+
*remaining = max(*remaining-length, 0)
337+
}
338+
339+
values := make([]any, length)
340+
for i := 0; i < length; i++ {
341+
values[i] = resultsValue.Index(i).Interface()
342+
}
343+
344+
lastRecord := values[length-1]
345+
lastID = database.GetID(lastRecord)
346+
347+
if err = callback(values); err != nil {
348+
return err
349+
}
350+
351+
if length < chunkLimit {
352+
break
353+
}
354+
355+
page++
356+
}
357+
358+
return nil
359+
}
360+
192361
func (r *Query) DB() (*sql.DB, error) {
193362
return r.instance.DB()
194363
}

errors/list.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ var (
128128
OrmMissingWhereClause = New("WHERE conditions required")
129129
OrmNoDialectorsFound = New("no dialectors found")
130130
OrmQueryAssociationsConflict = New("cannot set orm.Associations and other fields at the same time")
131+
OrmQueryChunkZeroOrLess = New("chunk count must be greater than 0")
131132
OrmQueryConditionRequired = New("query condition is required")
132133
OrmQueryEmptyId = New("id can't be empty")
133134
OrmQueryEmptyRelation = New("relation can't be empty")

0 commit comments

Comments
 (0)