Skip to content

kvs-watch/job-info: support initial sentinel #6880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from

Conversation

chu11
Copy link
Member

@chu11 chu11 commented Jun 17, 2025

Per discussion in #6872, support the idea of an "Initial Sentinel", that can inform KVS watchers that we are done reading all of the "current data" that exists in the KVS.

I went back and forth on how to do things:

  • settled on sending an empty message to indicate sentinel vs a parameter in an RPC. Since we need to send a sentinel when a value is "empty", it sort of eliminated the parameter idea b/c there might not be any data to send.

  • I went back and forth on how to handle this in terms of the API and eventually settled on adding the following two functions:

flux_kvs_lookup_get_initial_sentinel(flux_future_t *f, const char **value, bool *initial_sentinel)
flux_job_event_watch_get_initial_sentinel(flux_future_t *f, const char **event, bool *initial_sentinel)

they are identical to the get equivalents (i.e. flux_kvs_lookup_get() and flux_job_event_watch_get()) but set the sentinel flag if the sentinel was read.

I originally tried to have a separate function (i.e. flux_kvs_lookup_sentinel()), but that added an extra function call to be called, which I disliked. Also, b/c the RPC returns an empty payload for the sentinel, that messes up the current API that assumes a payload must be formatted a specific way (i.e. we get EPROTO errors), so making me lean to the functions I added above. I don't love this design, but it might be the least bad??? Welcome to other API ideas. (as an aside, I now dislike calling this "initial sentinel", probably should come up with a better name ... thinking of options).

The last commit here is simply a dumb "would --tail work in flux job attach". Largely can ignore it for now. That work will take some refactoring b/c of all the output options that exist and will be spliced out later.

@chu11 chu11 changed the title Issue6872 initial sentinel WIP: kvs-watch/job-info: support initial sentinel Jun 17, 2025
@chu11 chu11 force-pushed the issue6872_initial_sentinel branch 3 times, most recently from 72d3c17 to df57435 Compare June 17, 2025 22:20
@garlick
Copy link
Member

garlick commented Jun 17, 2025

One thought is just to document that a zero length value is returned when the watch stream is up to date, and then let people use flux_kvs_lookup_get_raw() to check the length, or heck just add:

ssize_t flux_kvs_lookup_get_size (flux_future_t *f);

I'm assuming the sentinel only applies when FLUX_KVS_WATCH_APPEND is used, since by definition you're up to date after the first response for the other modes I think.

Edit: or maybe flux_kvs_lookup_get() and flux_job_event_watch_get() could set the OUT parameter to NULL when the sentinel is received?

@chu11
Copy link
Member Author

chu11 commented Jun 17, 2025

I'm assuming the sentinel only applies when FLUX_KVS_WATCH_APPEND is used, since by definition you're up to date after the first response for the other modes I think.

That's true, I did support it with just FLUX_KVS_WATCH, but perhaps that was not necessary.

Edit: or maybe flux_kvs_lookup_get() and flux_job_event_watch_get() could set the OUT parameter to NULL when the sentinel is received?

I didn't do that for flux_kvs_lookup_get() b/c an empty value for a key is reasonable. But A) I should double check if the value returned is NULL or empty string. and B) perhaps that's not as big a deal as I originally thought it would be. If you're watching something, you always expect there to be some value and you can treat "no value" as the sentinel.

I did the kvs-watch first and just copied it over to job-info. It does make sense for flux_job_event_watch_get() since an event is the reasonable thing to always expect from that API.

@chu11
Copy link
Member Author

chu11 commented Jun 17, 2025

Just a note to myself for later, the failing test is b/c the test is a little racy.

  {"timestamp":1750199581.9394929,"name":"init"}
  {"timestamp":1750199581.9406893,"name":"starting"}
  {"timestamp":1750199581.9571524,"name":"shell.init","context":{"service":"1001-shell-f4qLBJAK","leader-rank":0,"size":1}}
  sentinel
  {"timestamp":1750199581.9598074,"name":"shell.start","context":{"taskmap":{"version":1,"map":[[0,1,1,1]]}}}

can't assume that the job has hit shell.start when launching the watcher.

@chu11
Copy link
Member Author

chu11 commented Jun 18, 2025

Edit: or maybe flux_kvs_lookup_get() and flux_job_event_watch_get() could set the OUT parameter to NULL when the sentinel is received?

Now I remember, there were two reasons I didn't do this.

A) an "empty value" is a legitimate return value for flux_kvs_lookup_get() when the value for the KVS is empty. But perhaps we can just ignore that. it could be "empty value " or sentinel. Or we could just put some "is_sentinel" parameter into the response RPC. Lemme mull that a bit more.

B) we'd presumably want to support the the NULL OUT parameter in all of these functions.

int flux_kvs_lookup_get (flux_future_t *f, const char **value);
int flux_kvs_lookup_get_unpack (flux_future_t *f, const char *fmt, ...);
int flux_kvs_lookup_get_raw (flux_future_t *f, const void **data, size_t *len);
int flux_kvs_lookup_get_treeobj (flux_future_t *f, const char **treeobj);
int flux_kvs_lookup_get_dir (flux_future_t *f, const flux_kvsdir_t **dir);
int flux_kvs_lookup_get_symlink (flux_future_t *f,
                                 const char **ns,
                                 const char **target);

Which is plausible for all except for flux_kvs_lookup_get_unpack().

flux_kvs_lookup_get_raw() to check the length

I like your idea of using flux_kvs_lookup_get_raw(), it's not so different than my _initial_sentinel() idea but we're just using the "len" from the raw function instead.

This is how I began doing things at first

Code that was

if (flux_kvs_lookup_get (...) < 0)
   error handling
do good stuff

turned into

if (flux_rpc_get (...) < 0)
   error handling
if (value != NULL) {
   if (flux_kvs_lookup_get (...) < 0)
      error handling
   do good stuff
}
else {
    sentinel path
}

OR

if (flux_kvs_lookup_get (...) < 0) {
     if (errno == EPROTO) {
          if (flux_rpc_get (...) < 0)
              more error handling
          if (value == NULL) {
              handle sentinel
          }
     }
     error handling normal
}
do good stuff

Which I disliked both.

@chu11 chu11 force-pushed the issue6872_initial_sentinel branch 3 times, most recently from 6d99b84 to 3d4b86e Compare June 18, 2025 22:48
@chu11
Copy link
Member Author

chu11 commented Jun 18, 2025

I've re-worked the PR

A) The sentinel only works with FLUX_KVS_WATCH and FLUX_KVS_WATCH_APPEND. So it doesn't work with just watching a value normally.

B) Now going with the following functions:

int flux_kvs_lookup_get (flux_future_t *f, const char **value);
int flux_kvs_lookup_get_raw (flux_future_t *f, const void **data, size_t *len);
int flux_kvs_lookup_get_treeobj (flux_future_t *f, const char **treeobj);

and

int flux_job_event_watch_get (flux_future_t *f, const char **event);

returning NULL for their OUT parameters when the sentinel has been received (FYI, lookup_get_unpack() doesn't work for this style. lookup_get_dir() and lookup_get_symlink() dont' make much sense with FLUX_KVS_WATCH_APPEND and this flag).

Pro: no new extra functions, the API is pretty simple and easy

Con: the primary corner case (which lead me to shy away from this before) is an "empty value" is a legitimate KVS value and that returns "NULL" in flux_kvs_lookup_get(). So having this API style with "sentinels" is mostly b/c users will not use this with any key they believe an "empty value" is feasible. Extra side note, it's really only feasible if a key begins as an empty value, an "empty value" can't be appended to an existing key.

@chu11
Copy link
Member Author

chu11 commented Jun 20, 2025

just pushed a commit on top that implements --tail=N in flux job attach. Everything appears to work as expected.

Lemme work on some tests, add appropriate documentation, and split this up into several PRs.

@chu11 chu11 force-pushed the issue6872_initial_sentinel branch 5 times, most recently from 5a9cd5d to 3d33482 Compare June 30, 2025 20:22
@chu11
Copy link
Member Author

chu11 commented Jun 30, 2025

removing WIP, I think this is good for a review. Added docs and some tests. A few minor changes from prior push.

A) only these kvs functions return NULL if we reach the sentinel w/ kvs-watch

int flux_kvs_lookup_get (flux_future_t *f, const char **value);
int flux_kvs_lookup_get_raw (flux_future_t *f, const void **data, size_t *len);

B) the flag is now called FLUX_KVS_WATCH_APPEND_INITIAL_SENTINEL

@chu11 chu11 changed the title WIP: kvs-watch/job-info: support initial sentinel kvs-watch/job-info: support initial sentinel Jun 30, 2025
@chu11 chu11 force-pushed the issue6872_initial_sentinel branch from 3d33482 to 9412c01 Compare July 10, 2025 16:38
Copy link
Member

@garlick garlick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions about the kvs-watch implementation. Seems like it might need to be reworked based on the design changes?

@@ -36,7 +36,8 @@ enum kvs_op {
FLUX_KVS_WATCH_FULL = 64,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit message: instead of "an empty RPC will be sent", I think you mean an empty response will be sent.

@@ -36,7 +36,8 @@ enum kvs_op {
FLUX_KVS_WATCH_FULL = 64,
FLUX_KVS_WATCH_UNIQ = 128,
FLUX_KVS_WATCH_APPEND = 256,
FLUX_KVS_STREAM = 512
FLUX_KVS_STREAM = 512,
FLUX_KVS_WATCH_APPEND_INITIAL_SENTINEL = 1024
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag name is long and kind of confusing when taken out of context of the use case (does it append a sentinel?) Maybe something short and use case focused like FLUX_KVS_WATCH_TAIL would be better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's long. TAIL to me doesn't seem right b/c that usually signifies the data "after" or "later'. I prefixed the flag with "FLUX_KVS_WATCH_APPEND" b/c it only works with that flag. I suppose that isn't entirely necessarily. I could shorten it to just "FLUX_KVS_WATCH_SENTINEL" and documentation would indicate it only works with APPEND.

Comment on lines +258 to +286
if (!s) {
(*sentinel) = true;
return 0;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this code is left over from the earlier design when there was a special sentinel in the API?
Now couldn't we simplify this and just return a NULL payload all the way through?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the sentinel flag is mostly b/c code in lookup_get() and some other family of functions is like this:

    if (parse_response (f, ctx, &sentinel) < 0)                                                                                             
        return -1;                                                                                                                          
    if (sentinel) {                                                                                                                         
        if (value)                                                                                                                          
            *value = NULL;                                                                                                                  
        return 0;                                                                                                                           
    }                                                                                                                                       
    if (!ctx->val_valid) {                                                                                                                  
        if (treeobj_decode_val (ctx->treeobj,                                                                                               
                                &ctx->val_data,                                                                                             
                                &ctx->val_len) < 0)                                                                                         
            return -1;                                                                                                                      
        ctx->val_valid = true;                                                                                                              
        // N.B. val_data includes xtra 0 byte term not reflected in val_len                                                                 
    }                                  

There are some assumptions later on about what values are set, like ctx->treeobj. I suppose alternately code could be updated for "if treeobj NULL then RPC has no payload".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just assumed it was an artifact from before, since now the convention is "empty response" == "sentinel" both in the API and the RPC right? I wasn't sure why the API code needed to set a flag when it could just be unaware and pass the empty payload on to the API user.

Comment on lines 268 to 269
if (w->flags & FLUX_KVS_WATCH
&& w->flags & FLUX_KVS_WATCH_APPEND
&& w->flags & FLUX_KVS_WATCH_APPEND_INITIAL_SENTINEL
&& w->initial_sentinel_sent == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't flags be checked for compatibility when the watch request is initially accepted, so that we only have to check for the one flag here?

Copy link
Member Author

@chu11 chu11 Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems we don't (only w/n the kvs-watch API). perhaps we should do that!

Edit: Correction, we check a number of things, but not every "combo" ... we should add more checking.


if (flux_future_aux_get (f, "initial_sentinel"))
send_initial_sentinel (h, w);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there an aux entry for the sentinel? It seems like we have access to the w struct that includes the flags everywhere this is used. I'm probably missing something.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aux is simply a "flag". It indicates which future is the last "initial" blobref being loaded, thus needing to send the initial sentinel afterwards.

You are correct there could be other ways to do this. We could track the number of responses already sent and using some counting use that to indicate when to send the initial sentinel. But I felt this was easier / simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh duh, I see the big block comment now where the aux value is set.

chu11 added 2 commits July 18, 2025 10:00
Problem: A simple assignment to a variable did not have proper
spacing.

Fix the simple whitspace problem.
Problem: In flux-core, function parameters are normally placed
on separate lines if they exceed 80 chars.  Some code in the
libjob did not follow this.

Update code in libjob for consistency.
@chu11 chu11 force-pushed the issue6872_initial_sentinel branch from 9412c01 to af9c248 Compare July 18, 2025 17:10
chu11 added 3 commits July 18, 2025 11:36
Problem: The kvs-watch module does not check that the FLUX_KVS_WATCH
flag is set if other watch flags (i.e. FLUX_KVS_WATCH_APPEND) are set.

Add a check and return EPROTO if flags are set incorrectly.
Problem: There is no coverage for several EPROTO cases in kvs-watch.

Add coverage in t1007-kvs-lookup-watch.t.
Problem: In libkvs the parse_response() function cleans up a lot
of old treeobj information lingering in the context.  This code could
be useful at a later time.

Move the treeobj ctx information into its own function.
chu11 added 8 commits July 18, 2025 16:14
Problem: When watching appends to a key, it would be convenient if
we knew when the "initial" or "current" data has been fully sent before
waiting / watching for future data.  It would take several RPCs to
accomplish this at the moment.

Support a new KVS_WATCH_INITIAL_SENTINEL flag in kvs-watch.
An empty RPC response will be sent after all "initial" or "current"
data has read from the KVS and sent to the caller.

Update flux_kvs_lookup_get() and flux_kvs_lookup_get_raw() to return
NULL (or 0) for OUT parameters when a sentinel has been received.
Problem: There are no tests that cover the new
FLUX_KVS_WATCH_INITIAL_SENTINEL flag.

Solution: Add tests to t1007-kvs-lookup-watch.t and a helper
test file watch_initial_sentinel.c.
Problem: The new FLUX_KVS_WATCH_INITIAL_SENTINEL flag
in libkvs is not documented.

Add documentation in flux_kvs_loopup(3)
Problem: The kvs-watch module recently supported a
FLUX_KVS_WATCH_INITIAL_SENTINEL flag.  The the job-info module
does not take advantage of this flag when watching an eventlog.

Support a new FLUX_JOB_EVENT_WATCH_INITIAL_SENTINEL flag.  Update
flux_job_event_watch_get() to return an empty event if the sentinel
has been reached.

Fixes flux-framework#6872
Problem: There are no tests to cover the new
FLUX_JOB_EVENT_WATCH_INITIAL_SENTINEL flag.

Solution: Add tests to t2231-job-info-eventlog-watch.t and a helper
test file eventlog_watch_initial_sentinel.c.
@chu11 chu11 force-pushed the issue6872_initial_sentinel branch from af9c248 to 082604b Compare July 18, 2025 23:51
Copy link

codecov bot commented Jul 19, 2025

Codecov Report

Attention: Patch coverage is 86.33094% with 19 lines in your changes missing coverage. Please review.

Project coverage is 83.91%. Comparing base (df3c11c) to head (082604b).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
src/modules/kvs-watch/kvs-watch.c 66.66% 9 Missing ⚠️
src/common/libkvs/kvs_lookup.c 88.46% 6 Missing ⚠️
src/common/libjob/info.c 84.61% 2 Missing ⚠️
src/modules/job-info/guest_watch.c 95.23% 1 Missing ⚠️
src/modules/job-info/watch.c 96.15% 1 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##           master    #6880    +/-   ##
========================================
  Coverage   83.91%   83.91%            
========================================
  Files         540      540            
  Lines       90610    90721   +111     
========================================
+ Hits        76033    76131    +98     
- Misses      14577    14590    +13     
Files with missing lines Coverage Δ
src/modules/job-info/guest_watch.c 76.25% <95.23%> (+1.25%) ⬆️
src/modules/job-info/watch.c 69.43% <96.15%> (+2.05%) ⬆️
src/common/libjob/info.c 91.66% <84.61%> (-4.34%) ⬇️
src/common/libkvs/kvs_lookup.c 86.91% <88.46%> (+0.92%) ⬆️
src/modules/kvs-watch/kvs-watch.c 76.30% <66.66%> (-0.35%) ⬇️

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants