1
1
# Continuous Queries
2
2
3
- Continuous Queries are great way to get update notifications on your filters.
3
+ Query tasks have a special option to create Continuous queries.
4
+ Continuous queries are great way to get update notifications on your filters.
4
5
Continuous query is started by creating a query task service with CONTINUOUS
5
- option. And then setting up a notification on that service . A continuous query
6
+ option, and setting up a notification on that query task instance . A continuous query
6
7
task does two things:
7
- 1 . It will give the historical, existing data as the first notification
8
- 2 . It will give you any further updates
8
+ 1 . It will self patch, causing the initial notification, once the initial query results are available, providing the documents currently in the index.
9
+ 2 . On any additional index updates that match the query associated with the
10
+ task, it will once again self patch, with the result being the document that
11
+ changed or was added. Once again this will cause notifications to be sent.
9
12
10
13
In the remainder of the document we expect you have basic understanding of [[ QueryTaskService]] .
11
14
@@ -28,8 +31,8 @@ Here are the basic steps required to efficiently use the continuous query tasks.
28
31
29
32
We can avoid setting up the subscription with query task here, and instead do
30
33
the polling on this continues query task service for updates. But that would
31
- NOT be efficient. Instead we recommend using subscription model here and get
32
- the results whenever they are available from this friend on the other side .
34
+ not be efficient. Instead we recommend using subscription model here and get
35
+ the results whenever they are available.
33
36
34
37
In rest of the document we will go over the steps mentioned above.
35
38
@@ -39,14 +42,17 @@ Following is the simple example of QueryTask in which CONTINUOUS query option is
39
42
This query task is filtering the results based on Kind field clause.
40
43
41
44
``` java
42
- QueryTask . Query query = QueryTask . Query . Builder . create()
43
- .addKindFieldClause(Employee . class)
44
- .build();
45
-
46
- QueryTask queryTask = QueryTask . Builder . create()
47
- .addOption(QueryOption . EXPAND_CONTENT )
48
- .addOption(QueryOption . CONTINUOUS )
49
- .setQuery(query). build();
45
+ public QueryTask createContinuousQuery() {
46
+ QueryTask . Query query = QueryTask . Query . Builder . create()
47
+ .addKindFieldClause(EmployeeService . Employee . class)
48
+ .build();
49
+
50
+ QueryTask queryTask = QueryTask . Builder . create()
51
+ .addOption(QueryOption . EXPAND_CONTENT )
52
+ .addOption(QueryOption . CONTINUOUS )
53
+ .setQuery(query). build();
54
+ return queryTask;
55
+ }
50
56
```
51
57
52
58
By default, query tasks services expire after few (10 at the time of writing)
@@ -93,96 +99,191 @@ Json payload of above query.
93
99
}
94
100
```
95
101
96
- ## Send the request
102
+ ## Sending the request and subscribing to notifications
97
103
98
- After sending the query we need to capture the returned query task service link
99
- and subscribe to it for any updates. * NOTE:* One important thing to note here
100
- is that continuous query task service being a long running service that queries the
101
- index regularly for updates. Hence this should be used with care and should be
102
- called by a single host. If following code is running on a replicated service
103
- on multiple nodes, then we would be triggering this continues query task
104
- multiple times on our targetHost, which would be a over kill and can be big
105
- performance hit on your poor hosts.
104
+ After creating the query task, we POST it to the local-query-tasks factory service
105
+ with specific documentSelfLink (QUERY_TASK_LINK) that will create a query task with
106
+ that name, helping us to point to this query task service easily in future.
107
+
108
+ On creation of this continuous query task service we call ` subscribeToContinuousQuery `
109
+ to create the subscription to get notifications from query task service with updates.
110
+
111
+ ** Note:** We always use a * local* query task (CORE_LOCAL_QUERY_TASKS) for continuous queries!
112
+
113
+ * See also* [[ DeferredResult-Tutorial]]
106
114
107
115
``` java
108
- Operation post = Operation . createPost(targetHost, ServiceUriPaths . CORE_LOCAL_QUERY_TASKS )
109
- .setBody(queryTask)
110
- .setReferer(this . clientHost. getUri());
111
-
112
- post. setCompletion((o, e) - > {
113
- if (e != null ) {
114
- System . out. printf(" Query failed %s" , e. toString());
115
- return ;
116
- }
117
- QueryTask queryResponse = o. getBody(QueryTask . class);
118
- subscribeToContinuousQueryTask(targetHost, queryResponse. documentSelfLink);
119
- );
120
-
121
- this . clientHost. sendRequest(post);
116
+ public void createAndSubscribeToContinuousQuery(Operation op) {
117
+ QueryTask queryTask = createContinuousQuery();
118
+ queryTask. documentSelfLink = QUERY_TASK_LINK ;
119
+ Operation post = Operation . createPost(getHost(), ServiceUriPaths . CORE_LOCAL_QUERY_TASKS )
120
+ .setBody(queryTask)
121
+ .setReferer(getHost(). getUri());
122
+
123
+ getHost(). sendWithDeferredResult(post)
124
+ .thenAccept((state) - > subscribeToContinuousQuery())
125
+ .whenCompleteNotify(op);
126
+ }
122
127
```
123
128
124
- ## Subscribe to the results
129
+ Now let us see the code for ` subscribeToContinuousQuery ` which will create start
130
+ subscription service to listen for any notifications from continuous query task service
131
+ we created earlier. We are using ` startSubscriptionService ` API of the ServiceHost
132
+ to setup the subscription service. This API is creating a subscription service with a
133
+ callback URI, and registring that URI with the continuous query task service.
125
134
126
- Notice above, in the completion handler we are calling
127
- `subscribeToContinuousQueryTask` (shown below) method with the self- Link of the
128
- query task service.
135
+ Please read the section "Details!" for mode details on creating a robust solution.
129
136
130
137
``` java
131
- public void subscribeToContinuousQueryTask(ServiceHost host, String serviceLink) {
132
- Consumer<Operation > target = this :: processResults;
138
+ public void subscribeToContinuousQuery() {
139
+ Operation post = Operation
140
+ .createPost(UriUtils . buildUri(getHost(), QUERY_TASK_LINK ))
141
+ .setReferer(getHost(). getUri());
133
142
134
- Operation subPost = Operation
135
- .createPost(UriUtils . buildUri(host, serviceLink))
136
- .setReferer(host. getUri());
137
-
138
- host. startSubscriptionService(subPost, target);
143
+ URI subscriptionUri = getHost(). startSubscriptionService(post, this :: processResults);
144
+ updateSubscriptionLink(subscriptionUri);
139
145
}
140
-
141
146
```
142
147
143
- `subscribeToContinuousQueryTask` is just calling `startSubscriptionService`
144
- method on the local host that will listen for any notifications from the the
145
- target service and call our target method(`processResults`).
148
+
149
+ * See also section on Subscriptions in [[ Programming-Model]] *
146
150
147
151
## Process the results
148
152
149
153
Following is the basic implementation of ` processResults ` that would be called
150
154
whenever the query task service on the target host has any new data for us to
151
155
process. We check for presence of the results and then loop over all the result
152
- documents to process.
156
+ documents to process. During the processing we can check ` state.documentUpdateAction `
157
+ to see what was the last action (created, deleted) on that service.
153
158
154
159
``` java
155
160
public void processResults(Operation op) {
156
- QueryTask body = op. getBody(QueryTask . class);
161
+ QueryTask body = op. getBody(QueryTask . class);
162
+
163
+ if (body. results == null || body. results. documentLinks. isEmpty()) {
164
+ return ;
165
+ }
166
+
167
+ for (Object doc : body. results. documents. values()) {
168
+ EmployeeService . Employee state = Utils . fromJson(doc, EmployeeService . Employee . class);
169
+ getHost(). log(Level . INFO , " Employee Name: %s, Action: %s" , state. name, state. documentUpdateAction);
170
+ }
171
+ }
172
+ ```
173
+
174
+ ## Details!
175
+ So far we have went over the basics of continuous query tasks. But there are
176
+ few things, we need to be aware of, when we are writing code for production.
177
+
178
+ In a multi-node environment, one important thing to note
179
+ is that continuous query task service, being a long running service, that hits the
180
+ index regularly for updates. Hence this should be used with care and should be
181
+ called by a single host. If above code is running on a replicated service
182
+ on multiple nodes, then we would be triggering this continuous query task
183
+ multiple times on our host, which would be a over kill and can be big
184
+ performance hit on your hosts.
185
+
186
+ That means when we are creating continuous queries, we need to make
187
+ sure, that the caller with subscription to notifications is fault-tolarant and
188
+ load-balanced. We do that by making a stateful service which is replicated,
189
+ and owner-selected. Lets call this service a watch service.
190
+
191
+ Because it will be a replicated service, it will give us
192
+ fault-tolerance in case one of the replica goes down. And beause this is
193
+ owner-selected we can make sure that only one node is doing the heavy lifting
194
+ at anyone time, and other nodes are not dublicating the work. That watch
195
+ service will create the continuous query task and watch for notifications from
196
+ it. But not just that, we would want it to do this only on one of the
197
+ replicated nodes, not on all nodes. How do we make sure that stateful service
198
+ is only creating the query task in one of the replcas? We can do that by
199
+ checking for the ownership of its state in handleNodeGroupMaintenance method.
200
+
201
+ What is ` handleNodeGroupMaintenance ` ? This is overridable method in stateful
202
+ services that is invoked by the Xenon runtime when there is change in the
203
+ ownership of the state of the service.
204
+
205
+ We can override this method in our watch service and check if we are owner or
206
+ not using ` hasOption(ServiceOption.DOCUMENT_OWNER) ` , and based on that create
207
+ continuous query task if we are owner, and delete running continuous query task
208
+ service and subscription if we are not the owner anymore.
209
+ Following is the example code that implements ` handleNodeGroupMaintenance `
210
+
211
+ ```
212
+ @Override
213
+ public void handleNodeGroupMaintenance(Operation op) {
214
+ // Create continuous queries and subscriptions in case of change in node group topology.
215
+ if (hasOption(ServiceOption.DOCUMENT_OWNER)) {
216
+ createAndSubscribeToContinuousQuery(op);
217
+ } else {
218
+ deleteSubscriptionAndContinuousQuery(op);
219
+ }
220
+ }
221
+ ```
157
222
158
- if ( body . results == null || body . results . documentLinks . isEmpty ()) {
159
- return ;
160
- }
223
+ We have already seen the basic implementation of
224
+ ` createAndSubscribeToContinuousQueryTask ` earlier. Now lets see what
225
+ ` deleteSubscriptionAndContinuousQuery ` might look like in this watch service.
161
226
162
- for (Object doc : body .results .documents .values ()) {
163
- Employee state = Utils . fromJson(doc, Employee . class);
164
- System . out. printf(" Name %s \n " , state. name);
165
- }
227
+ ```
228
+ private void deleteSubscriptionAndContinuousQuery(Operation op) {
229
+ Operation unsubscribeOperation = Operation.createPost(UriUtils.buildUri(getHost(), QUERY_TASK_LINK))
230
+ .setReferer(getUri())
231
+ .setCompletion((o, e) -> {
232
+ updateSubscriptionLink(null);
233
+ deleteContinuousQuery();
234
+ });
235
+
236
+ getStateAndApply(state -> getHost().stopSubscriptionService(unsubscribeOperation,
237
+ UriUtils.buildUri(state.subscriptionLink)));
166
238
}
167
239
```
240
+ In above method we are first creating a ` Operation ` to unsubscribe the
241
+ subscription service being created by the runtime. ` stopSubscriptionService ` deletes
242
+ both the subscriber entry in the query task service and also deletes the subscription callback
243
+ endpoint.
244
+ On the completion of these deletions we are deleting the continuous query service by
245
+ calling ` deleteContinuousQuery ` .
246
+
247
+ Following is the implementation of ` deleteContinuousQuery ` .
248
+ ```
249
+ private void deleteContinuousQuery() {
250
+ getHost().sendRequest(Operation
251
+ .createDelete(UriUtils.buildUri(getHost(), QUERY_TASK_LINK))
252
+ .setReferer(getUri()));
253
+ }
254
+ ```
255
+
256
+ Following is the implementation of ` getStateAndApply ` .
257
+ ```
258
+ private void getStateAndApply(Consumer<? super State> action) {
259
+ Operation get = Operation
260
+ .createGet(this, this.getSelfLink())
261
+ .setReferer(getUri());
262
+
263
+ getHost().sendWithDeferredResult(get, State.class)
264
+ .thenAccept(action)
265
+ .whenCompleteNotify(get);
266
+ }
267
+ ```
268
+
168
269
## FAQ
169
270
170
271
#### Can I do continuous query on a stateless service?
171
272
No, because queries work on stateful and persisted services.
172
273
173
- #### How can I see list of all created continuous query services?
174
- You can do curl on `http: // host/core/query-tasks` and `http://host/core/ local-query-tasks` to see list of all query task services.
274
+ #### How can I see the list of all created continuous query services?
275
+ You can do curl on ` http://[ host] /core/local-query-tasks ` to see list of all local query task services.
175
276
176
- #### My continuous query task is not there after few minutes. Why it got disappeared ?
277
+ #### My continuous query task is terminated after few minutes?
177
278
It got expired after 10 minutes. Set it to never expire using ` querytask.documentExpirationTimeMicros = Long.MAX_VALUE; `
178
279
179
280
#### Why I am not getting any results in subscription of my continues query tasks service?
180
281
Subscription handler will be called when there are any updates. Make sure your
181
282
updates are being reflected on the index.
182
283
183
284
#### Let’s say I’ve got a query for “all example services”, and one of the example services is deleted: do I learn that it’s no longer in the query result?
184
- You will get a PATCH , with the body being the last version of the example
185
- service when it was deleted. The documentUpdateAction will be DELETE .
285
+ You will get a ` PATCH ` , with the body being the last version of the example
286
+ service when it was deleted. The ` documentUpdateAction ` will be ` DELETE ` .
186
287
187
288
#### You said, any update that satisfies the query filter will cause the results to be updated and a self PATCH to be sent on the service. Does that mean I’ll see a diff of new things that match that query?
188
289
@@ -192,21 +293,11 @@ with the specific update to a service that matched the query if your query no
192
293
longer match anything, you get no notifications. You can cancel it, have it
193
294
expire, etc.
194
295
195
- #### I want to get notification on every single create, update, delete on all services on my host. Will a single continuous query task do that?
196
- Please don't try this at your home(production). Yes , you can do that, but it
197
- will be a big performance hit on your host for the duration of your query task.
198
- Make sure you set expire to be short .
199
-
200
296
#### When does a continuous query ends?
201
297
It never ends, until it gets expired. You will keep getting notification as
202
- long as there are updates that fulfil continuous query task' s filter.
298
+ long as there are updates that fulfil continuous query task's filter or until it gets expired .
203
299
204
300
#### How do you calculate a total sum using a continuous query when you don't know when your are done getting update notifications?
205
- Well a total sum implies you know the full set that you want to compute it
301
+ Well, a total sum implies you know the full set that you want to compute it
206
302
over, which means that you can use a normal query. If you want to keep
207
303
counting, and do a running sum, then you need to use a continuous query.
208
-
209
- #### Ok , I am still confused. What does a continuous query actually do ?
210
- A continuous query does two things:
211
- 1. It will give the historical, existing data as the first notification
212
- 2. It will give you any further updates
0 commit comments