-
Notifications
You must be signed in to change notification settings - Fork 14
logging: add OpenSearch job log indexing and UI log viewer #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f5d0b18
to
55e4802
Compare
for h in app.logger.handlers: | ||
h.setLevel(app.config["LOGGING_JOBS_LEVEL"]) | ||
|
||
# Add OpenSearch logging handler if not already added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the setup if we pass the handler to apps
and api_apps
seems that is registered twice, so we check for existance to avoid issues, alternatively we can simply pass it to app
only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the mapping I decided to keep information that is produced by the python logging, so that if we wanted to we could enrich the message for instance on errors to display the name of the function, line, module ... Alternatively we can keep it simpler.
invenio_jobs/services/services.py
Outdated
def search(self, identity, params): | ||
"""Search for app logs.""" | ||
self.require_permission(identity, "search") | ||
search_after = params.pop("search_after", None) | ||
search = self._search( | ||
"search", | ||
identity, | ||
params, | ||
None, | ||
permission_action="read", | ||
) | ||
search = search.sort("timestamp", "_id").extra(size=100) | ||
if search_after: | ||
search = search.extra(search_after=search_after) | ||
|
||
final_results = None | ||
# Keep fetching until no more results | ||
while True: | ||
results = search.execute() | ||
hits = results.hits | ||
if not hits: | ||
if final_results is None: | ||
final_results = results | ||
break | ||
|
||
if not final_results: | ||
final_results = results # keep metadata from first page | ||
else: | ||
final_results.hits.extend(hits) | ||
final_results.hits.hits.extend(hits.hits) | ||
|
||
search = search.extra(search_after=hits[-1].meta.sort) | ||
|
||
return self.result_list( | ||
self, | ||
identity, | ||
final_results, | ||
links_tpl=self.links_item_tpl, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this func being a search endpoint with optional params. As you can see it's meant to fetch for logs without a limit. I set batch sizes of 100 (default is 10) but it could be increased.
For cases where we do not pass any query, with millions of entries this call would timeout, to solve this I can think of:
- Add a hard limit let's say 100k?
- Prefered: change this to be a read endpoint, that only allows to fetch logs of a concrete run
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the batch size, you might want to have a look how performant is the query/cluster with different sizes.
For the limit, from what I saw in other web apps (e.g. GitLab), when the log goes over 100 Mb, it is dropped.
Does it make any kind of sense to limit it, for example, to 1 million documents max? How would the web app handle this? and how fast will the rendering be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Agree on testing a bit further with the batch size, since logs are pretty small so even 1k batches would really speed things up and limit the number of total requests to the cluster.
- I would also impose a max limit in the beginning of the call, i.e. make a
search.count()
call and if the results are more than some max limit (e.g. 1 million results), bail with an error.
We discussed briefly IRL with @jrcastro2 that there might be an overall alternative "streaming" approach to serving the logs that will also help with memory performance and simplify the code:
- Because we serve a single JSON object inside our usual
hits.hits
response envelope, we need to collect all the logs for a job run to build the object. Even if we set a maximum total size/count, we're going to be serving out a relatively big response out with an array up to 100k (small) items.- This also imposes a UX issue on the client-side, since the user will have to wait for a long "Loading logs..." message until the entire response is received, and afterwards get the entire logs rendered.
- If we take further advantage of the paginating methods that OpenSearch provides, we could provide a "streaming" generator response in JSON-Lines format, which would also allow the client to start showing results from the very beginning of rendering the page.
- This requires the client-side code to change as well to handle the JSON-Lines response
- We would use the same response when polling every 5sec with the last log's timestamp as the offset
- Unfortunatley
opensearch-py
doesn't have theSearch.iterate(...)
method that theelasticsearch-py
client has, which would simplify a lot the entirewhile True: ...
loop in the code here. We could still useSearch.scan(...)
, but this uses the Scroll API now by default, which is not ideal for sorted results.
This is not meant to be done now in the scope of this PR, since we have a working implementation that's good enough ™️, but it would be a nice future improvement both in performance and UX. I'll shelve this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increased the batches to 1k, set a hard limit of 50k for now. If we want to allow more we need to optimize on both sides. backend and frontend, otherwise it starts to become a bit too slow. Given that there is already an issue, I haven't spend much time on it, right now: #74
# Search for log jobs, first set the logger level to INFO | ||
# and log a message by setting the job context | ||
job_context.set(dict(job_id=job_id, run_id=run_id)) | ||
app.logger.setLevel("INFO") | ||
app.logger.info("Test log message") | ||
sleep(1) # Wait for log to be indexed | ||
res = client.get(f"/logs/jobs?q={job_id}") | ||
assert res.status_code == 200 | ||
assert res.json["hits"]["total"] == 1 | ||
assert res.json["hits"]["hits"][0]["message"] == "Test log message" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this enough or do we want to add more tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for job log indexing in OpenSearch and a UI log viewer, enhancing logging and administration capabilities for job execution. Key changes include:
- Implementing a context-aware logging handler to index enriched job logs.
- Introducing new resource, service, and configuration classes for job logs.
- Updating UI components and administration views to display job log details and handle the new PARTIAL_SUCCESS run status.
Reviewed Changes
Copilot reviewed 32 out of 34 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
invenio_jobs/resources/config.py | Adds job log resource config and search args classes. |
invenio_jobs/resources/init.py | Exports new job log resource and config. |
invenio_jobs/proxies.py | Exposes jobs log service proxy. |
invenio_jobs/models.py | Introduces PARTIAL_SUCCESS in run status enum. |
invenio_jobs/logging/jobs.py | Implements a context-aware OpenSearch logging handler. |
invenio_jobs/logging/celery_signals.py | Adds context capture, restoration, and cleanup for Celery tasks. |
invenio_jobs/ext.py | Registers job log service and resource. |
invenio_jobs/config.py | Configures logging settings for jobs. |
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/StatusFormatter.js | Adds UI support for Partial Success status. |
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsSearchResultItemLayout.js | Updates link for run details. |
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogsView.js | Implements the React log viewer setup. |
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogs.js | Implements incremental log fetching and run status monitoring. |
invenio_jobs/administration/runs.py | Enhances admin view to display run logs and details. |
Files not reviewed (2)
- invenio_jobs/logging/mappings/os-v1/jobslog/log-v1.0.0.json: Language not supported
- invenio_jobs/logging/mappings/os-v2/jobslog/log-v1.0.0.json: Language not supported
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogs.js
Outdated
Show resolved
Hide resolved
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogs.js
Outdated
Show resolved
Hide resolved
invenio_jobs/assets/semantic-ui/js/invenio_jobs/administration/RunsLogs.js
Outdated
Show resolved
Hide resolved
{run.formatted_started_at ? ( | ||
<> | ||
<p> | ||
<strong>{run.formatted_started_at}</strong> | ||
</p> | ||
<p className="description">{runDuration} mins</p> | ||
</> | ||
) : ( | ||
<p className="description">Not yet started</p> | ||
)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: are the paragraphs needed?
with List.Item you should use List.Description, instead of "faking it" with the description name class
it will help with readability
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invenio_jobs/services/services.py
Outdated
@@ -215,8 +220,9 @@ def read(self, identity, job_id, run_id): | |||
self, identity, run_record, links_tpl=self.links_item_tpl | |||
) | |||
|
|||
@with_job_context() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment to highlight the order of the decorators is important tot not skipp any logging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have already shared my thoughts with you and @slint, mainly regarding the guard for filtering logs by the job context.
In case you decide to change approach and go for the custom logger (or any other solution), I would suggest then using a new custom logger as a context. What I also find not very intuitive is the need of using the update_context
func: as a developer, it won't be clear to me when I should use it.
In general, it is clearer to define required parameters when creating an object, and not later on via some setter
funcs/methods, and creating an assumption that a context should be set before using other funcs/methods.
We assume an implicit order of calls, and as a developer, It is not easy to use.
Something like this would be, IMO, easier to use, and will have more clear APIs (pseudocode):
logger = JobLogger(job_id, run_id)
logger.info(...)
The object could be a singleton per job, thread-safe. I am wondering if we should even be using it via some kind of map logger = getJobLogger(run_id)
(or similar), but it might be overkill. Without this, the assumption is that there is one job logger per Python process.
Changing the level of the logger (to have a run in debug mode) could be again via a constructor param, or via a .setLevel(...)
func (which might require resetting the level at the end of the run).
Happy to discuss this further.
invenio_jobs/services/services.py
Outdated
@@ -215,8 +220,9 @@ def read(self, identity, job_id, run_id): | |||
self, identity, run_record, links_tpl=self.links_item_tpl | |||
) | |||
|
|||
@with_job_context() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have already shared my thoughts with you and @slint, mainly regarding the guard for filtering logs by the job context.
In case you decide to change approach and go for the custom logger (or any other solution), I would suggest then using a new custom logger as a context. What I also find not very intuitive is the need of using the update_context
func: as a developer, it won't be clear to me when I should use it.
In general, it is clearer to define required parameters when creating an object, and not later on via some setter
funcs/methods, and creating an assumption that a context should be set before using other funcs/methods.
We assume an implicit order of calls, and as a developer, It is not easy to use.
Something like this would be, IMO, easier to use, and will have more clear APIs (pseudocode):
logger = JobLogger(job_id, run_id)
logger.info(...)
The object could be a singleton per job, thread-safe. I am wondering if we should even be using it via some kind of map logger = getJobLogger(run_id)
(or similar), but it might be overkill. Without this, the assumption is that there is one job logger per Python process.
Changing the level of the logger (to have a run in debug mode) could be again via a constructor param, or via a .setLevel(...)
func (which might require resetting the level at the end of the run).
Happy to discuss this further.
invenio_jobs/services/config.py
Outdated
class JobLog: | ||
"""Job Log API.""" | ||
|
||
index = IndexField("jobslog-log-v1.0.0", search_alias="job-logs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like that this could a good use case for OS datastreams
. @sakshamarora1 is creating class/methods in invenio-indexer
to use datastreams instead of classic indices.
Don't forget the retention period: this should be IMO a global config here, where I can set for how long we should keep logs. Default could be 30 days. Datastream has a policy to automatically delete them.
The UI should show that no logs have been found, or they have been deleted (with a nice message, we could check what GH Action shows in this case).
invenio_jobs/services/services.py
Outdated
def search(self, identity, params): | ||
"""Search for app logs.""" | ||
self.require_permission(identity, "search") | ||
search_after = params.pop("search_after", None) | ||
search = self._search( | ||
"search", | ||
identity, | ||
params, | ||
None, | ||
permission_action="read", | ||
) | ||
search = search.sort("timestamp", "_id").extra(size=100) | ||
if search_after: | ||
search = search.extra(search_after=search_after) | ||
|
||
final_results = None | ||
# Keep fetching until no more results | ||
while True: | ||
results = search.execute() | ||
hits = results.hits | ||
if not hits: | ||
if final_results is None: | ||
final_results = results | ||
break | ||
|
||
if not final_results: | ||
final_results = results # keep metadata from first page | ||
else: | ||
final_results.hits.extend(hits) | ||
final_results.hits.hits.extend(hits.hits) | ||
|
||
search = search.extra(search_after=hits[-1].meta.sort) | ||
|
||
return self.result_list( | ||
self, | ||
identity, | ||
final_results, | ||
links_tpl=self.links_item_tpl, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the batch size, you might want to have a look how performant is the query/cluster with different sizes.
For the limit, from what I saw in other web apps (e.g. GitLab), when the log goes over 100 Mb, it is dropped.
Does it make any kind of sense to limit it, for example, to 1 million documents max? How would the web app handle this? and how fast will the rendering be?
bc21678
to
f29a6e6
Compare
- Add custom logging handler using contextvars and OpenSearch - Define JobLogEntrySchema and LogContextSchema - Support search_after pagination in log search API - Fetch logs incrementally from UI using search_after cursor - Add React log viewer with fade-in and scroll support - closes inveniosoftware#67
Example of display of different levels
