1
- import Stream from 'stream' ;
2
- import { AbortController } from 'node-abort-controller' ;
3
- import { type ClickHouseClient , type ResponseJSON } from '../../src' ;
4
- import { createTable , createTestClient , guid } from '../utils' ;
5
- import { TestEnv } from '../utils' ;
1
+ import Stream from 'stream'
2
+ import { AbortController } from 'node-abort-controller'
3
+ import { type ClickHouseClient , type ResponseJSON } from '../../src'
4
+ import { createTable , createTestClient , guid } from '../utils'
5
+ import { TestEnv } from '../utils'
6
6
7
7
describe ( 'abort request' , ( ) => {
8
- let client : ClickHouseClient ;
8
+ let client : ClickHouseClient
9
9
beforeAll ( function ( ) {
10
10
// FIXME: Jest does not seem to have it
11
11
// if (process.env.browser) {
12
12
// this.skip();
13
13
// }
14
- } ) ;
14
+ } )
15
15
16
16
beforeEach ( ( ) => {
17
- client = createTestClient ( ) ;
18
- } ) ;
17
+ client = createTestClient ( )
18
+ } )
19
19
20
20
afterEach ( async ( ) => {
21
- await client . close ( ) ;
22
- } ) ;
21
+ await client . close ( )
22
+ } )
23
23
24
24
describe ( 'select' , ( ) => {
25
25
it ( 'cancels a select query before it is sent' , async ( ) => {
26
- const controller = new AbortController ( ) ;
26
+ const controller = new AbortController ( )
27
27
const selectPromise = client . select ( {
28
28
query : 'SELECT sleep(3)' ,
29
29
format : 'CSV' ,
30
30
abort_signal : controller . signal as AbortSignal ,
31
- } ) ;
32
- controller . abort ( ) ;
31
+ } )
32
+ controller . abort ( )
33
33
34
34
await expect ( selectPromise ) . rejects . toEqual (
35
35
expect . objectContaining ( {
36
36
message : expect . stringMatching ( 'The request was aborted' ) ,
37
37
} )
38
- ) ;
39
- } ) ;
38
+ )
39
+ } )
40
40
41
41
it ( 'cancels a select query after it is sent' , async ( ) => {
42
- const controller = new AbortController ( ) ;
42
+ const controller = new AbortController ( )
43
43
const selectPromise = client . select ( {
44
44
query : 'SELECT sleep(3)' ,
45
45
format : 'CSV' ,
46
46
abort_signal : controller . signal as AbortSignal ,
47
- } ) ;
47
+ } )
48
48
49
49
setTimeout ( ( ) => {
50
- controller . abort ( ) ;
51
- } , 50 ) ;
50
+ controller . abort ( )
51
+ } , 50 )
52
52
53
53
await expect ( selectPromise ) . rejects . toEqual (
54
54
expect . objectContaining ( {
55
55
message : expect . stringMatching ( 'The request was aborted' ) ,
56
56
} )
57
- ) ;
58
- } ) ;
57
+ )
58
+ } )
59
59
60
60
it ( 'cancels a select query while reading response' , async ( ) => {
61
- const controller = new AbortController ( ) ;
61
+ const controller = new AbortController ( )
62
62
const selectPromise = client
63
63
. select ( {
64
64
query : 'SELECT * from system.numbers' ,
65
65
format : 'JSONCompactEachRow' ,
66
66
abort_signal : controller . signal as AbortSignal ,
67
67
} )
68
68
. then ( async ( rows ) => {
69
- const stream = rows . asStream ( ) ;
69
+ const stream = rows . asStream ( )
70
70
for await ( const chunk of stream ) {
71
- const [ [ number ] ] = chunk . json ( ) ;
71
+ const [ [ number ] ] = chunk . json ( )
72
72
// abort when reach number 3
73
73
if ( number === '3' ) {
74
- controller . abort ( ) ;
74
+ controller . abort ( )
75
75
}
76
76
}
77
- } ) ;
77
+ } )
78
78
79
79
// There is no assertion against an error message.
80
80
// A race condition on events might lead to
81
81
// Request Aborted or ERR_STREAM_PREMATURE_CLOSE errors.
82
- await expect ( selectPromise ) . rejects . toThrowError ( ) ;
83
- } ) ;
82
+ await expect ( selectPromise ) . rejects . toThrowError ( )
83
+ } )
84
84
85
85
it ( 'cancels a select query while reading response by closing response stream' , async ( ) => {
86
86
const selectPromise = client
@@ -89,48 +89,48 @@ describe('abort request', () => {
89
89
format : 'JSONCompactEachRow' ,
90
90
} )
91
91
. then ( async function ( rows ) {
92
- const stream = rows . asStream ( ) ;
92
+ const stream = rows . asStream ( )
93
93
for await ( const chunk of stream ) {
94
- const [ [ number ] ] = chunk . json ( ) ;
94
+ const [ [ number ] ] = chunk . json ( )
95
95
// abort when reach number 3
96
96
if ( number === '3' ) {
97
- stream . destroy ( ) ;
97
+ stream . destroy ( )
98
98
}
99
99
}
100
- } ) ;
101
- expect ( await selectPromise ) . toEqual ( undefined ) ;
102
- } ) ;
100
+ } )
101
+ expect ( await selectPromise ) . toEqual ( undefined )
102
+ } )
103
103
104
104
// FIXME: it does not work with ClickHouse Cloud.
105
105
// Active queries never contain the long running query unlike local setup.
106
106
it . skip ( 'ClickHouse server must cancel query on abort' , async ( ) => {
107
- const controller = new AbortController ( ) ;
107
+ const controller = new AbortController ( )
108
108
109
- const longRunningQuery = `SELECT sleep(3), '${ guid ( ) } '` ;
110
- console . log ( `Long running query: ${ longRunningQuery } ` ) ;
109
+ const longRunningQuery = `SELECT sleep(3), '${ guid ( ) } '`
110
+ console . log ( `Long running query: ${ longRunningQuery } ` )
111
111
void client . select ( {
112
112
query : longRunningQuery ,
113
113
abort_signal : controller . signal as AbortSignal ,
114
114
format : 'JSONCompactEachRow' ,
115
- } ) ;
115
+ } )
116
116
117
117
await assertActiveQueries ( client , ( queries ) => {
118
- console . log ( `Active queries: ${ JSON . stringify ( queries , null , 2 ) } ` ) ;
119
- return queries . some ( ( q ) => q . query . includes ( longRunningQuery ) ) ;
120
- } ) ;
118
+ console . log ( `Active queries: ${ JSON . stringify ( queries , null , 2 ) } ` )
119
+ return queries . some ( ( q ) => q . query . includes ( longRunningQuery ) )
120
+ } )
121
121
122
- controller . abort ( ) ;
122
+ controller . abort ( )
123
123
124
124
await assertActiveQueries ( client , ( queries ) =>
125
125
queries . every ( ( q ) => ! q . query . includes ( longRunningQuery ) )
126
- ) ;
127
- } ) ;
128
- } ) ;
126
+ )
127
+ } )
128
+ } )
129
129
130
130
describe ( 'insert' , ( ) => {
131
- let tableName : string ;
131
+ let tableName : string
132
132
beforeEach ( async ( ) => {
133
- tableName = `abort_request_insert_test_${ guid ( ) } ` ;
133
+ tableName = `abort_request_insert_test_${ guid ( ) } `
134
134
await createTable ( client , ( env ) => {
135
135
switch ( env ) {
136
136
// ENGINE can be omitted in the cloud statements:
@@ -140,75 +140,75 @@ describe('abort request', () => {
140
140
CREATE TABLE ${ tableName }
141
141
(id UInt64)
142
142
ORDER BY (id)
143
- ` ;
143
+ `
144
144
case TestEnv . LocalSingleNode :
145
145
return `
146
146
CREATE TABLE ${ tableName }
147
147
(id UInt64)
148
148
ENGINE MergeTree()
149
149
ORDER BY (id)
150
- ` ;
150
+ `
151
151
case TestEnv . LocalCluster :
152
152
return `
153
153
CREATE TABLE ${ tableName } ON CLUSTER '{cluster}'
154
154
(id UInt64)
155
155
ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/{database}/{table}/{shard}', '{replica}')
156
156
ORDER BY (id)
157
- ` ;
157
+ `
158
158
}
159
- } ) ;
160
- } ) ;
159
+ } )
160
+ } )
161
161
162
162
it ( 'cancels an insert query before it is sent' , async ( ) => {
163
- const controller = new AbortController ( ) ;
164
- const stream = getStubStream ( ) ;
163
+ const controller = new AbortController ( )
164
+ const stream = getStubStream ( )
165
165
const insertPromise = client . insert ( {
166
166
table : tableName ,
167
167
values : stream ,
168
168
abort_signal : controller . signal as AbortSignal ,
169
- } ) ;
170
- controller . abort ( ) ;
169
+ } )
170
+ controller . abort ( )
171
171
172
172
await expect ( insertPromise ) . rejects . toEqual (
173
173
expect . objectContaining ( {
174
174
message : expect . stringMatching ( 'The request was aborted' ) ,
175
175
} )
176
- ) ;
177
- } ) ;
176
+ )
177
+ } )
178
178
179
179
it ( 'cancels an insert query before it is sent by closing a stream' , async ( ) => {
180
- const stream = getStubStream ( ) ;
181
- stream . push ( null ) ;
180
+ const stream = getStubStream ( )
181
+ stream . push ( null )
182
182
183
183
expect (
184
184
await client . insert ( {
185
185
table : tableName ,
186
186
values : stream ,
187
187
} )
188
- ) . toEqual ( undefined ) ;
189
- } ) ;
188
+ ) . toEqual ( undefined )
189
+ } )
190
190
191
191
it ( 'cancels an insert query after it is sent' , async ( ) => {
192
- const controller = new AbortController ( ) ;
193
- const stream = getStubStream ( ) ;
192
+ const controller = new AbortController ( )
193
+ const stream = getStubStream ( )
194
194
const insertPromise = client . insert ( {
195
195
table : tableName ,
196
196
values : stream ,
197
197
abort_signal : controller . signal as AbortSignal ,
198
- } ) ;
198
+ } )
199
199
200
200
setTimeout ( ( ) => {
201
- controller . abort ( ) ;
202
- } , 50 ) ;
201
+ controller . abort ( )
202
+ } , 50 )
203
203
204
204
await expect ( insertPromise ) . rejects . toEqual (
205
205
expect . objectContaining ( {
206
206
message : expect . stringMatching ( 'The request was aborted' ) ,
207
207
} )
208
- ) ;
209
- } ) ;
210
- } ) ;
211
- } ) ;
208
+ )
209
+ } )
210
+ } )
211
+ } )
212
212
213
213
async function assertActiveQueries (
214
214
client : ClickHouseClient ,
@@ -219,15 +219,15 @@ async function assertActiveQueries(
219
219
const rows = await client . select ( {
220
220
query : 'SELECT query FROM system.processes' ,
221
221
format : 'JSON' ,
222
- } ) ;
222
+ } )
223
223
224
- const queries = await rows . json < ResponseJSON < { query : string } > > ( ) ;
224
+ const queries = await rows . json < ResponseJSON < { query : string } > > ( )
225
225
226
226
if ( assertQueries ( queries . data ) ) {
227
- break ;
227
+ break
228
228
}
229
229
230
- await new Promise ( ( res ) => setTimeout ( res , 100 ) ) ;
230
+ await new Promise ( ( res ) => setTimeout ( res , 100 ) )
231
231
}
232
232
}
233
233
@@ -237,5 +237,5 @@ function getStubStream(): Stream.Readable {
237
237
read ( ) {
238
238
/* stub */
239
239
} ,
240
- } ) ;
240
+ } )
241
241
}
0 commit comments