Skip to content

Commit 5a57823

Browse files
committed
User defined functions
1 parent 5b990a7 commit 5a57823

File tree

10 files changed

+328
-97
lines changed

10 files changed

+328
-97
lines changed

src/execute/execute.js

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { executeJoins } from './join.js'
99
import { compareForTerm, defaultDerivedAlias, stringify } from './utils.js'
1010

1111
/**
12-
* @import { AsyncCells, AsyncDataSource, AsyncRow, ExecuteSqlOptions, OrderByItem, QueryHints, SelectStatement, SqlPrimitive } from '../types.js'
12+
* @import { AsyncCells, AsyncDataSource, AsyncRow, ExecuteSqlOptions, OrderByItem, QueryHints, SelectStatement, SqlPrimitive, UserDefinedFunction } from '../types.js'
1313
*/
1414

1515
/**
@@ -18,8 +18,8 @@ import { compareForTerm, defaultDerivedAlias, stringify } from './utils.js'
1818
* @param {ExecuteSqlOptions} options - the execution options
1919
* @yields {AsyncRow} async generator yielding result rows
2020
*/
21-
export async function* executeSql({ tables, query, signal }) {
22-
const select = typeof query === 'string' ? parseSql({ query }) : query
21+
export async function* executeSql({ tables, query, functions, signal }) {
22+
const select = typeof query === 'string' ? parseSql({ query, functions }) : query
2323

2424
// Check for unsupported operations
2525
if (!select.from) {
@@ -40,7 +40,7 @@ export async function* executeSql({ tables, query, signal }) {
4040
}
4141
}
4242

43-
yield* executeSelect({ select, tables: normalizedTables, signal })
43+
yield* executeSelect({ select, tables: normalizedTables, functions, signal })
4444
}
4545

4646
/**
@@ -49,10 +49,11 @@ export async function* executeSql({ tables, query, signal }) {
4949
* @param {Object} options
5050
* @param {SelectStatement} options.select
5151
* @param {Record<string, AsyncDataSource>} options.tables
52+
* @param {Record<string, UserDefinedFunction>} [options.functions]
5253
* @param {AbortSignal} [options.signal]
5354
* @yields {AsyncRow}
5455
*/
55-
export async function* executeSelect({ select, tables, signal }) {
56+
export async function* executeSelect({ select, tables, functions, signal }) {
5657
/** @type {AsyncDataSource} */
5758
let dataSource
5859
/** @type {string} */
@@ -68,15 +69,15 @@ export async function* executeSelect({ select, tables, signal }) {
6869
} else {
6970
// Nested subquery - recursively resolve
7071
leftTable = select.from.alias
71-
dataSource = generatorSource(executeSelect({ select: select.from.query, tables, signal }))
72+
dataSource = generatorSource(executeSelect({ select: select.from.query, tables, functions, signal }))
7273
}
7374

7475
// Execute JOINs if present
7576
if (select.joins.length) {
76-
dataSource = await executeJoins({ leftSource: dataSource, joins: select.joins, leftTable, tables })
77+
dataSource = await executeJoins({ leftSource: dataSource, joins: select.joins, leftTable, tables, functions })
7778
}
7879

79-
yield* evaluateSelectAst({ select, dataSource, tables, signal })
80+
yield* evaluateSelectAst({ select, dataSource, tables, functions, signal })
8081
}
8182

8283
/**
@@ -127,9 +128,10 @@ async function applyDistinct(rows, distinct) {
127128
* @param {AsyncRow[]} options.rows - the input rows
128129
* @param {OrderByItem[]} options.orderBy - the sort specifications
129130
* @param {Record<string, AsyncDataSource>} options.tables
131+
* @param {Record<string, UserDefinedFunction>} [options.functions]
130132
* @returns {Promise<AsyncRow[]>} the sorted rows
131133
*/
132-
async function sortRows({ rows, orderBy, tables }) {
134+
async function sortRows({ rows, orderBy, tables, functions }) {
133135
if (!orderBy.length) return rows
134136

135137
// Cache for evaluated values: evaluatedValues[rowIdx][colIdx]
@@ -160,6 +162,7 @@ async function sortRows({ rows, orderBy, tables }) {
160162
node: term.expr,
161163
row: rows[idx],
162164
tables,
165+
functions,
163166
})
164167
}
165168
}
@@ -211,10 +214,11 @@ async function sortRows({ rows, orderBy, tables }) {
211214
* @param {SelectStatement} options.select
212215
* @param {AsyncDataSource} options.dataSource
213216
* @param {Record<string, AsyncDataSource>} options.tables
217+
* @param {Record<string, UserDefinedFunction>} [options.functions]
214218
* @param {AbortSignal} [options.signal]
215219
* @yields {AsyncRow}
216220
*/
217-
async function* evaluateSelectAst({ select, dataSource, tables, signal }) {
221+
async function* evaluateSelectAst({ select, dataSource, tables, functions, signal }) {
218222
// SQL priority: from, where, group by, having, select, order by, offset, limit
219223

220224
const hasAggregate = select.columns.some(col => col.kind === 'derived' && containsAggregate(col.expr))
@@ -223,10 +227,10 @@ async function* evaluateSelectAst({ select, dataSource, tables, signal }) {
223227

224228
if (needsBuffering) {
225229
// BUFFERING PATH: Collect all rows, process, then yield
226-
yield* evaluateBuffered({ select, dataSource, tables, hasAggregate, useGrouping, signal })
230+
yield* evaluateBuffered({ select, dataSource, tables, functions, hasAggregate, useGrouping, signal })
227231
} else {
228232
// STREAMING PATH: Yield rows one by one
229-
yield* evaluateStreaming({ select, dataSource, tables, signal })
233+
yield* evaluateStreaming({ select, dataSource, tables, functions, signal })
230234
}
231235
}
232236

@@ -238,10 +242,11 @@ async function* evaluateSelectAst({ select, dataSource, tables, signal }) {
238242
* @param {SelectStatement} options.select
239243
* @param {AsyncDataSource} options.dataSource
240244
* @param {Record<string, AsyncDataSource>} options.tables
245+
* @param {Record<string, UserDefinedFunction>} [options.functions]
241246
* @param {AbortSignal} [options.signal]
242247
* @yields {AsyncRow}
243248
*/
244-
async function* evaluateStreaming({ select, dataSource, tables, signal }) {
249+
async function* evaluateStreaming({ select, dataSource, tables, functions, signal }) {
245250
let rowsYielded = 0
246251
let rowsSkipped = 0
247252
let rowIndex = 0
@@ -266,7 +271,7 @@ async function* evaluateStreaming({ select, dataSource, tables, signal }) {
266271
rowIndex++
267272
// WHERE filter
268273
if (select.where) {
269-
const pass = await evaluateExpr({ node: select.where, row, tables, rowIndex })
274+
const pass = await evaluateExpr({ node: select.where, row, tables, functions, rowIndex })
270275
if (!pass) continue
271276
}
272277

@@ -291,7 +296,7 @@ async function* evaluateStreaming({ select, dataSource, tables, signal }) {
291296
} else if (col.kind === 'derived') {
292297
const alias = col.alias ?? defaultDerivedAlias(col.expr)
293298
columns.push(alias)
294-
cells[alias] = () => evaluateExpr({ node: col.expr, row, tables, rowIndex: currentRowIndex })
299+
cells[alias] = () => evaluateExpr({ node: col.expr, row, tables, functions, rowIndex: currentRowIndex })
295300
}
296301
}
297302

@@ -322,12 +327,13 @@ async function* evaluateStreaming({ select, dataSource, tables, signal }) {
322327
* @param {SelectStatement} options.select
323328
* @param {AsyncDataSource} options.dataSource
324329
* @param {Record<string, AsyncDataSource>} options.tables
330+
* @param {Record<string, UserDefinedFunction>} [options.functions]
325331
* @param {boolean} options.hasAggregate
326332
* @param {boolean} options.useGrouping
327333
* @param {AbortSignal} [options.signal]
328334
* @yields {AsyncRow}
329335
*/
330-
async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, useGrouping, signal }) {
336+
async function* evaluateBuffered({ select, dataSource, tables, functions, hasAggregate, useGrouping, signal }) {
331337
// Build hints for data source optimization
332338
// Note: limit/offset not passed here since buffering needs all rows for sorting/grouping
333339
/** @type {QueryHints} */
@@ -351,7 +357,7 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
351357
const row = working[i]
352358
const rowIndex = i + 1 // 1-based
353359
if (select.where) {
354-
const passes = await evaluateExpr({ node: select.where, row, tables, rowIndex })
360+
const passes = await evaluateExpr({ node: select.where, row, tables, functions, rowIndex })
355361

356362
if (!passes) {
357363
continue
@@ -376,7 +382,7 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
376382
/** @type {string[]} */
377383
const keyParts = []
378384
for (const expr of select.groupBy) {
379-
const v = await evaluateExpr({ node: expr, row, tables })
385+
const v = await evaluateExpr({ node: expr, row, tables, functions })
380386
keyParts.push(stringify(v))
381387
}
382388
const key = keyParts.join('|')
@@ -421,15 +427,15 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
421427
columns.push(alias)
422428
// Pass group to evaluateExpr so it can handle aggregate functions within expressions
423429
// For empty groups, still provide an empty row context for aggregates to return appropriate values
424-
cells[alias] = () => evaluateExpr({ node: col.expr, row: group[0] ?? { columns: [], cells: {} }, tables, rows: group })
430+
cells[alias] = () => evaluateExpr({ node: col.expr, row: group[0] ?? { columns: [], cells: {} }, tables, functions, rows: group })
425431
continue
426432
}
427433
}
428434
const asyncRow = { columns, cells }
429435

430436
// Apply HAVING filter before adding to projected results
431437
if (select.having) {
432-
if (!await evaluateHavingExpr({ expr: select.having, row: asyncRow, group, tables })) {
438+
if (!await evaluateHavingExpr({ expr: select.having, row: asyncRow, group, tables, functions })) {
433439
continue
434440
}
435441
}
@@ -439,7 +445,7 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
439445
} else {
440446
// No grouping, simple projection
441447
// Sort before projection so ORDER BY can access columns not in SELECT
442-
const sorted = await sortRows({ rows: filtered, orderBy: select.orderBy, tables })
448+
const sorted = await sortRows({ rows: filtered, orderBy: select.orderBy, tables, functions })
443449

444450
// OPTIMIZATION: For non-DISTINCT queries, apply OFFSET/LIMIT before projection
445451
// to avoid reading expensive cells for rows that won't be in the final result
@@ -463,7 +469,7 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
463469
} else if (col.kind === 'derived') {
464470
const alias = col.alias ?? defaultDerivedAlias(col.expr)
465471
columns.push(alias)
466-
cells[alias] = () => evaluateExpr({ node: col.expr, row, tables })
472+
cells[alias] = () => evaluateExpr({ node: col.expr, row, tables, functions })
467473
}
468474
}
469475
projected.push({ columns, cells })
@@ -475,7 +481,7 @@ async function* evaluateBuffered({ select, dataSource, tables, hasAggregate, use
475481

476482
// Step 5: ORDER BY (final sort for grouped queries)
477483
if (useGrouping) {
478-
projected = await sortRows({ rows: projected, orderBy: select.orderBy, tables })
484+
projected = await sortRows({ rows: projected, orderBy: select.orderBy, tables, functions })
479485
}
480486

481487
// Step 6: OFFSET and LIMIT

0 commit comments

Comments
 (0)