-
Notifications
You must be signed in to change notification settings - Fork 81
Introduce Insights API #1610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Introduce Insights API #1610
Conversation
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
4883d42 to
77fcda9
Compare
Signed-off-by: Jackie <[email protected]>
Signed-off-by: Jackie <[email protected]>
0eba385 to
49018e0
Compare
|
CI failed due to jacoco changes in build.gradle. Not sure how to fix. One naive way is to add correlation request, response, and Action in AD to avoid ml-commons dependency. |
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/rest/RestInsightsJobAction.java
Outdated
Show resolved
Hide resolved
| builder.startObject(); | ||
|
|
||
| // Task metadata | ||
| builder.field("task_id", "task_" + ADCommonName.INSIGHTS_JOB_NAME + "_" + UUID.randomUUID().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need task id? AD task id is the doc id of state index.
|
|
||
| if (parts.length > 1) { | ||
| String seriesKey = parts[1]; | ||
| seriesKeys.add(seriesKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the entities set redundant with seriesKeys set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't necessarily need it. Just followed the current practice to have this logical run identifier for the insights generation. Maybe it's useful in the future when integrate with Investigation so we can refer to a specific insights run this this id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need it in the future, we can add it later. Currently we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to remove it from the frontend.
| // Use MAX score if multiple anomalies in same bucket | ||
| double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0); | ||
| double newScore = anomaly.getAnomalyScore(); | ||
| bucketScores.put(bucketIndex, Math.max(currentScore, newScore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider interval? Our anomalies are interval anomalies. We can put anomalies scores to all of the buckets interleaving current interval [data start, data end]. If you have already done it, can you point me the code? I cannot find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced offline, no changes needed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know where your code is. Can you point me the location?
Signed-off-by: Jackie <[email protected]>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/transport/InsightsJobTransportAction.java
Show resolved
Hide resolved
| .sort("generated_at", SortOrder.DESC) | ||
| ); | ||
|
|
||
| client.search(searchRequest, ActionListener.wrap(searchResponse -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to add backend role filtering before search? Please add security tests with backend role filtering on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do tenant-isolated search, but not necessarily backend role filtering here. For insights generation, it's a background job, so followed existing pattern to use InjectSecurity directly for background work, just impersonate the stored user via InjectSecurity, then execute search directly. For user-facing search APIs like search anomaly result transport action, AD reads the current user from thread context and then adds backend role filtering.
Adding security tests in the next revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have three kinds of auth now:
- fgac role
- backend-role filtering
- resource sharing.
We will need to cover all three of them.
| try { | ||
| injectSecurity.inject(user, roles); | ||
|
|
||
| localClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should verify if mapping is changed by customer before writing. If yes, report error/stop job and stop writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, updated in the new revision
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Show resolved
Hide resolved
Signed-off-by: Jackie <[email protected]>
3076f15 to
1356920
Compare
Signed-off-by: Jackie <[email protected]>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/org/opensearch/ad/rest/handler/InsightsJobActionHandler.java
Outdated
Show resolved
Hide resolved
|
|
||
| log.info("Running Insights job for time window: {} to {}", executionStartTime, executionEndTime); | ||
|
|
||
| querySystemResultIndex(jobParameter, lockService, lock, executionStartTime, executionEndTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only querying system result index would hinder your ability to go GA alone. You have to tie insights with Auto AD creation. One route to go to GA is to add a text box in AD overview page. That would add a summary on top of existing detectors' results.
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partial review
| fetchDetectorMetadataAndProceed(allAnomalies, jobParameter, lockService, lock, executionStartTime, executionEndTime); | ||
| } else { | ||
| log.info("No anomalies found in time window, skipping ML correlation"); | ||
| releaseLock(jobParameter, lockService, lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
releaseLock is scattered through InsightsJobProcessor right now is fragile. It’s easy to add a new early-return or error path and forget to release the lock. Also, it is hard for others to maintain.
How about
private void runInsightsJob(Job job, LockService lockService, LockModel lock,
Instant start, Instant end) {
if (lock == null) {
log.warn("Can't run Insights job due to null lock for {}", job.getName());
return;
}
ActionListener<Void> lockReleasing = guardedLockReleasingListener(job, lockService, lock);
// Top-level listener for “anomalies finished”
ActionListener<List<AnomalyResult>> anomaliesListener = ActionListener.wrap(
anomalies -> {
if (anomalies.isEmpty()) {
log.info("No anomalies, skipping ML correlation");
lockReleasing.onResponse(null);
return;
}
fetchDetectorMetadataAndProceed(anomalies, job, start, end, lockReleasing);
},
lockReleasing::onFailure
);
querySystemResultIndex(job, start, end, anomaliesListener);
}
private ActionListener<Void> guardedLockReleasingListener(Job job, LockService lockService, LockModel lock) {
AtomicBoolean done = new AtomicBoolean(false);
return ActionListener.wrap(
r -> {
if (done.compareAndSet(false, true)) {
releaseLock(job, lockService, lock);
} else {
log.warn("Lock already released for Insights job {}", job.getName());
}
},
e -> {
if (done.compareAndSet(false, true)) {
log.error("Insights job failed", e);
releaseLock(job, lockService, lock);
} else {
log.warn("Lock already released for Insights job {} (got extra failure)", job.getName(), e);
}
}
);
}
Now:
querySystemResultIndex(...) only gets ActionListener<List> listener. fetchPagedAnomalies(...) only gets ActionListener<List> listener.
They know nothing about locks, nothing about completion – they just follow the normal “always call your listener” rule.
This would reduce the “special” callsites to:
runInsightsJob (creates lockReleasing).
A few terminal branches in the top-level logic (no anomalies, ML disabled, write succeeded/failed, etc.).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored, now it has a single guarded lock releasing listener and lock-free sub processes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still see a lot of releaseLock call. Maybe you didn't upload the latest changes?
Signed-off-by: Jackie <[email protected]>
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished one round of review.
| * @param listener Action listener for the response | ||
| */ | ||
| public void startInsightsJob(String frequency, ActionListener<InsightsJobResponse> listener) { | ||
| logger.info("Starting insights job with frequency: {}", frequency); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be safe, you will need to verify if users have search result permission by using their credentials run search api and check if there is any security exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean we should check if users have search insights result index permission? _results api is checking it, _start is just about starting the job, no search insights result involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant when you run query in InsightsJobProcessor.fetchPagedAnomalies, you stashed context and use superadmin to run the query. Once you changed to use custom result index (see my previous comment on why want to change), you will need to assume cx role to run the search query. Also, it is better to run search api and check if there is any security exception before _start return success.
| // entity level task to track just one specific entity's state, init progress, error etc. | ||
| HISTORICAL_HC_ENTITY; | ||
| HISTORICAL_HC_ENTITY, | ||
| INSIGHTS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do you use the new task type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added it while implementing but ended up not using it, removed
src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java
Show resolved
Hide resolved
| } | ||
|
|
||
| public void testInsightsApisUseSystemContextForJobIndex() throws IOException { | ||
| // Use a non-admin user with AD access (alice) to exercise Insights APIs end-to-end under security |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also use non-admin user (e.g., bobUser) to check if things would fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added in the new revision
| Response stopResp = TestHelpers.makeRequest(aliceClient, "POST", stopPath, ImmutableMap.of(), "", null); | ||
| assertEquals("Stop insights job failed", RestStatus.OK, TestHelpers.restStatus(stopResp)); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have alice create detectors, check if results are generated, start insights, check if any insights can be generated after a few minutes; query insights using normal user; then stop insights?
src/main/java/org/opensearch/ad/ml/MLMetricsCorrelationInputBuilder.java
Show resolved
Hide resolved
|
|
||
| if (parts.length > 1) { | ||
| String seriesKey = parts[1]; | ||
| seriesKeys.add(seriesKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we need it in the future, we can add it later. Currently we can remove it.
| // Use MAX score if multiple anomalies in same bucket | ||
| double currentScore = bucketScores.getOrDefault(bucketIndex, 0.0); | ||
| double newScore = anomaly.getAnomalyScore(); | ||
| bucketScores.put(bucketIndex, Math.max(currentScore, newScore)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know where your code is. Can you point me the location?
| .sort("generated_at", SortOrder.DESC) | ||
| ); | ||
|
|
||
| client.search(searchRequest, ActionListener.wrap(searchResponse -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have three kinds of auth now:
- fgac role
- backend-role filtering
- resource sharing.
We will need to cover all three of them.
|
Any idea why "Build and Test anomaly detection" CI are skipped (saw it happens yesterday and today)? |
kaituo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few more comments.
| "series_keys": { | ||
| "type": "keyword" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it
| "type": "date", | ||
| "format": "strict_date_time||epoch_millis" | ||
| }, | ||
| "text": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
text is OpenSearch keyword. Try not to use it.
| IndexRequest indexRequest = new IndexRequest(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS).source(insightsDoc); | ||
|
|
||
| // Before writing, validate that the insights result index mapping has not been modified. | ||
| indexManagement.validateResultIndexMapping(ADCommonName.INSIGHTS_RESULT_INDEX_ALIAS, ActionListener.wrap(valid -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is for ad/forecast result. Not for insights result.
| pageSource.searchAfter(searchAfter); | ||
| } | ||
|
|
||
| SearchRequest pageRequest = new SearchRequest(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS).source(pageSource); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For GA, I plan not to support default result index in AD service as it is a security challenge to write to a system index from outside. I'll ask Amit to change to create user result index instead. Can you change here too?
| Instant executionStartTime, | ||
| Instant executionEndTime | ||
| ) { | ||
| MLMetricsCorrelationInput input = MLMetricsCorrelationInputBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have a max limit of the number of time series sent to ML-commons? How about 10 MB? If there are more, you can switch to use Pearson as it only need anomaly points instead of a series.
| private final Client client; | ||
| private final Gson gson; | ||
|
|
||
| public MLCommonsClient(Client client, NamedXContentRegistry xContentRegistry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
xContentRegistry is not used
| // Raw ML output (stored but not indexed) | ||
| builder.startObject("mlc_raw"); | ||
| String rawJson = gson.toJson(mlOutput.getRawOutput()); | ||
| java.io.InputStream is = new java.io.ByteArrayInputStream(rawJson.getBytes(java.nio.charset.StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use try-with-resources on is.
| fetchDetectorMetadataAndProceed(allAnomalies, jobParameter, lockService, lock, executionStartTime, executionEndTime); | ||
| } else { | ||
| log.info("No anomalies found in time window, skipping ML correlation"); | ||
| releaseLock(jobParameter, lockService, lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still see a lot of releaseLock call. Maybe you didn't upload the latest changes?
| * @param listener Action listener for the response | ||
| */ | ||
| public void startInsightsJob(String frequency, ActionListener<InsightsJobResponse> listener) { | ||
| logger.info("Starting insights job with frequency: {}", frequency); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant when you run query in InsightsJobProcessor.fetchPagedAnomalies, you stashed context and use superadmin to run the query. Once you changed to use custom result index (see my previous comment on why want to change), you will need to assume cx role to run the search query. Also, it is better to run search api and check if there is any security exception before _start return success.

Description
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
--signoff.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.