Skip to content

Commit 2cc9134

Browse files
committed
feat: add synchronous evaluator for pipe functions
1 parent cea0eb3 commit 2cc9134

File tree

4 files changed

+69
-37
lines changed

4 files changed

+69
-37
lines changed

src/evaluator/evaluate.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,11 +366,13 @@ const EXECUTORS: ExecutorMap = {
366366
PipeFuncCall: {
367367
async executeAsync({func, base, args}, scope) {
368368
const baseValue = await executeAsync(base, scope)
369+
if (baseValue.type !== 'stream' && baseValue.type !== 'array') return NULL_VALUE
369370
return func.executeAsync({base: baseValue, args}, scope)
370371
},
371372

372373
executeSync({func, base, args}, scope) {
373374
const baseValue = executeSync(base, scope)
375+
if (baseValue.type !== 'array') return NULL_VALUE
374376
return func.executeSync({base: baseValue, args}, scope)
375377
},
376378
},

src/evaluator/functions/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {type ExprNode} from '../../nodeTypes'
2-
import {type Value} from '../../values'
2+
import {StreamValue, type ArrayValue} from '../../values'
33
import type {Executor} from '../types'
44
import array from './array'
55
import dateTime from './dateTime'
@@ -35,7 +35,10 @@ export type FunctionSet = Record<string, WithOptions<GroqFunction> | undefined>
3535

3636
export type NamespaceSet = Record<string, FunctionSet | undefined>
3737

38-
export type GroqPipeFunction = Executor<{base: Value; args: ExprNode[]}>
38+
export type GroqPipeFunction = Executor<
39+
{base: ArrayValue | StreamValue; args: ExprNode[]},
40+
{base: ArrayValue; args: ExprNode[]}
41+
>
3942

4043
export const namespaces: NamespaceSet = {
4144
global: _global,

src/evaluator/functions/pipeFunctions.ts

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,21 @@
11
import type {GroqPipeFunction, WithOptions} from '.'
2-
import {fromJS, NULL_VALUE} from '../../values'
3-
import {asyncOnlyExecutor, executeAsync} from '../evaluate'
2+
import type {ExprNode} from '../../nodeTypes'
3+
import {fromArray, fromJS, NULL_VALUE} from '../../values'
4+
import {asyncOnlyExecutor, executeAsync, executeSync} from '../evaluate'
45
import {totalCompare} from '../ordering'
56
import {evaluateScore} from '../scoring'
67

78
type ObjectWithScore = Record<string, unknown> & {_score: number}
89

9-
const pipeFunctions: {[key: string]: WithOptions<GroqPipeFunction>} = {}
10-
11-
pipeFunctions['order'] = asyncOnlyExecutor(async function order({base, args}, scope) {
12-
// eslint-disable-next-line max-len
13-
// This is a workaround for https://github.com/rpetrich/babel-plugin-transform-async-to-promises/issues/59
14-
await true
15-
16-
if (!base.isArray()) {
17-
return NULL_VALUE
18-
}
10+
type Direction = 'asc' | 'desc'
11+
type AuxItem = [unknown, number, ...unknown[]]
1912

13+
function extractOrderArgs(args: ExprNode[]): {mappers: ExprNode[]; directions: Direction[]} {
2014
const mappers = []
21-
const directions: string[] = []
22-
let n = 0
15+
const directions: Direction[] = []
2316

2417
for (let mapper of args) {
25-
let direction = 'asc'
18+
let direction: Direction = 'asc'
2619

2720
if (mapper.type === 'Desc') {
2821
direction = 'desc'
@@ -33,25 +26,13 @@ pipeFunctions['order'] = asyncOnlyExecutor(async function order({base, args}, sc
3326

3427
mappers.push(mapper)
3528
directions.push(direction)
36-
n++
37-
}
38-
39-
const aux = []
40-
let idx = 0
41-
42-
for await (const value of base) {
43-
const newScope = scope.createNested(value)
44-
const tuple = [await value.get(), idx]
45-
for (let i = 0; i < n; i++) {
46-
const result = await executeAsync(mappers[i], newScope)
47-
tuple.push(await result.get())
48-
}
49-
aux.push(tuple)
50-
idx++
5129
}
30+
return {mappers, directions}
31+
}
5232

33+
function sortArray(aux: AuxItem[], directions: Direction[]): unknown[] {
5334
aux.sort((aTuple, bTuple) => {
54-
for (let i = 0; i < n; i++) {
35+
for (let i = 0; i < directions.length; i++) {
5536
let c = totalCompare(aTuple[i + 2], bTuple[i + 2])
5637
if (directions[i] === 'desc') {
5738
c = -c
@@ -64,8 +45,54 @@ pipeFunctions['order'] = asyncOnlyExecutor(async function order({base, args}, sc
6445
return aTuple[1] - bTuple[1]
6546
})
6647

67-
return fromJS(aux.map((v) => v[0]))
68-
})
48+
return aux.map((v) => v[0])
49+
}
50+
51+
const pipeFunctions: {[key: string]: WithOptions<GroqPipeFunction>} = {}
52+
53+
pipeFunctions['order'] = {
54+
executeSync({base, args}, scope) {
55+
const {mappers, directions} = extractOrderArgs(args)
56+
const aux: AuxItem[] = []
57+
58+
let idx = 0
59+
const n = directions.length
60+
61+
for (const value of base.data) {
62+
const newScope = scope.createNested(fromJS(value))
63+
const tuple: AuxItem = [value, idx]
64+
for (let i = 0; i < n; i++) {
65+
const result = executeSync(mappers[i]!, newScope)
66+
tuple.push(result.data)
67+
}
68+
aux.push(tuple)
69+
idx++
70+
}
71+
72+
return fromArray(sortArray(aux, directions))
73+
},
74+
75+
async executeAsync({base, args}, scope) {
76+
const {mappers, directions} = extractOrderArgs(args)
77+
const aux: AuxItem[] = []
78+
79+
let idx = 0
80+
const n = directions.length
81+
82+
for await (const value of base) {
83+
const newScope = scope.createNested(value)
84+
const tuple: AuxItem = [await value.get(), idx]
85+
for (let i = 0; i < n; i++) {
86+
const result = await executeAsync(mappers[i]!, newScope)
87+
tuple.push(await result.get())
88+
}
89+
aux.push(tuple)
90+
idx++
91+
}
92+
93+
return fromArray(sortArray(aux, directions))
94+
},
95+
}
6996
pipeFunctions['order'].arity = (count) => count >= 1
7097

7198
// eslint-disable-next-line require-await

src/evaluator/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import type {ExprNode} from '../nodeTypes'
22
import type {AnyStaticValue, Value} from '../values'
33
import {Scope} from './scope'
44

5-
export type Executor<N = ExprNode> = {
6-
executeSync(node: N, scope: Scope): AnyStaticValue
5+
export type Executor<N = ExprNode, Sync = N> = {
6+
executeSync(node: Sync, scope: Scope): AnyStaticValue
77
executeAsync(node: N, scope: Scope): Promise<Value>
88
}
99

0 commit comments

Comments
 (0)