Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task definition helper function (ability to define per-task defaults + typescript as a first-class citizen) #491

Open
3 of 5 tasks
lauri865 opened this issue Sep 23, 2024 · 10 comments
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@lauri865
Copy link

Feature description

It would be great if there was a better helper function to define tasks, such as a createTask, that would enable inline payload validation as well as defining task defaults (retries, and other options).

There's many cases when a task e.g. calls an expensive API, and I only want it to be retried a few times if any, and I need to always configure it at the task caller level, which makes maintenance less straight forward than it needs to be.

This change would also help make typescript a first-class citizen and tremendously improve the DX of creating typescript-based tasks, which currently involves importing helper types and "manually" parsing payloads to validate them, and manually adding types for the payload to make sure that callers are aware of them. I think Zod could easily be a first-party integration here for the helper function to define payloads, which would then help to pass them through for implementations as well.

I really like the first benchmark below.

Motivating example

Benchmarks:

Breaking changes

It doesn't have to come with any breaking changes.

Supporting development

I [tick all that apply]:

  • am interested in building this feature myself (happy to design the helper function, but could use help from the maintainer on extending the DB schema to hold the task defaults)
  • am interested in collaborating on building this feature
  • am willing to help testing this feature before it's released
  • [] am willing to write a test-driven test suite for this feature (before it exists)
  • am a Graphile sponsor ❤️
  • have an active support or consultancy contract with Graphile
@lauri865 lauri865 changed the title Task defnition helper function (ability to define per-task defaults + typescript as a first-class citizen) Task definition helper function (ability to define per-task defaults + typescript as a first-class citizen) Sep 23, 2024
@benjie
Copy link
Member

benjie commented Sep 23, 2024

One issue with this approach in general is that Worker tasks would need to be temporally type safe - i.e. not just type safe now; but if you queued the job "type safely" with the previous version of your worker deploy, and then you updated the validation rule and deployed and then executed that job, the job would need to still conform - and we cannot enforce this with types, nor can we rely on it being true. This is why worker encourages you to validate your payloads at execution time - that way we can test that our assumptions still hold.

If you're proposing a double-sided type safety that users can opt into, I'm all for that. I see no reason why such functionality couldn't be implemented through the plugin interface (though we'd need to add more hooks, which is fine!) that way concerns about performance would not be a concern (it's opt-in) and concerns over which specific validation library to use would also be moot. I'd encourage you to start making this, and we can discuss the hooks that are needed to support it.

@benjie benjie added enhancement New feature or request help wanted Extra attention is needed labels Sep 23, 2024
@lauri865
Copy link
Author

lauri865 commented Sep 23, 2024

I didn't actually mean that type-safety is checked at queue-time, but still at run-time. Which would make no difference to the current behaviour where payload is checked at run-time. And any changes would require a similar migration path.

Maybe createTask was poor choice, but rather defineTask, and I agree, this can probably be achieved with the plugin system to a large extent. Albeit, would benefit the DX of graphile-worker greatly to have it baked in as a first-party method. Because the current docs and approach to Typescript drove me off, and the amount of boiler plate I need to write to have fragile type definitions is... a lot.

Anyhow, maybe let's put types aside for a second, and focus on the other part of the suggestion around the ability to define task default properties (that would work both on Node+Postgres level). This is not possible with plugins today, and merits a consideration I believe.

E.g.

export const Task = defineTask("generateOpenAIEmbeddings")
  .options({ retries: 2, priority: 1000 })
  .run((payload) => console.log);

Whenever generateOpenAIEmbeddings is added, it inherits the task definition defaults, unless explicitly overwritten.

Could add additional helpers like:

defineTask("WithAutomaticJobKey").options({ jobKeyPath: "payload.id" }).run(...)

that would resolve dot notation path both in postgres and nodejs.

@lauri865
Copy link
Author

lauri865 commented Sep 23, 2024

And then (later) extending it with payload validation:

export const Task = defineTask("generateOpenAIEmbeddings")
  .input(z.object({ id: z.string().email() })
  .options({ retries: 2, priority: 1000, jobKeyPath: "payload.id" })
  .run((payload) => console.log(payload.id));

then we can in one go infer the payload type based on the input, and also export an automatic helper function with the inferred payload type (that will still be checked at run-time) to add new tasks of that type.

Task.create({ id: 123 })

And without any breaking changes, this could be used as:

addJob(...Task.create({ id: 123 }))

Or if the library is aware of all the available tasks, we can export addJob with types as well, so we have IDE-level autocomplete, but payload is still validated at run-time:

addJob("generateOpenAIEmbeddings", { id: 123 })

@benjie
Copy link
Member

benjie commented Sep 23, 2024

the suggestion around the ability to define task default properties (that would work both on Node+Postgres level).

Could you expand on how you see this working at the postgres level? (By which I assume you mean the graphile_worker.add_job() database function.)

@lauri865
Copy link
Author

lauri865 commented Sep 23, 2024

graphile_worker._private_tasks has another jsonb column default_options

Which the add_job would query against, and then it's a matter of updating the add_job function:
coalesce(spec.max_attempts, 25),
to:
coalesce(spec.max_attempts, task_default.max_attempts, 25),

@lauri865
Copy link
Author

lauri865 commented Sep 23, 2024

The whole motivation around it is two-part:

  1. We have yet to meet a requirement where we need to add job level options for retries, etc., usually it's related to the task at hand. Not being able to set it on task level means that we need to be careful on implementation level rather than task definition level. This gets complex if a job can be added both on the database and server.
  2. add_job in postgres is a function with many args, we often end up with add_job("test", "1", null, null, null, null, null, 10) just to configure retries that could be defined on the task level. And then we end up with cognitive overload trying to understand what the task options actually do.

@benjie
Copy link
Member

benjie commented Sep 23, 2024

graphile_worker._private_tasks has another jsonb column default_options

And you're proposing that a Graphile Worker instance would set the value of this when it starts up? What if a user has multiple Worker Instances with different (but potentially overlapping) task lists - in this case, would the latest booted Worker win, or would they fight, or...?

we need to add job level options for retries, etc.,

The easiest way to handle retries is to do it in your tasks themselves; you should always have something like this - hitting the real max attempts without any cleanup is bad for your database and your performance because you end up with loads of dead records sitting around in the jobs table. Example:

const MAX_ATTEMPTS = 3;

const myTask: Task = async (payload, helpers) => {
  if (helpers.job.attempts > MAX_ATTEMPTS) {
    // We shouldn't try again; permafail now
    
    // Replace this with whatever cleanup actions you need.
    console.log(`Permafail ${JSON.stringify(payload)} due to attempt exhaustion.`);
    
    // Successfully return so the job gets deleted (cleaned up) and is never re-attempted.
    return;
  }
  
  /* Normal task execution goes here */
}

add_job in postgres is a function with many args [...] And then we end up with cognitive overload trying to understand what the task options actually do.

You should use named parameters for that; see: https://worker.graphile.org/docs/sql-add-job

It's recommended that you use PostgreSQL's named parameters for the other parameters so that you only need specify the arguments you're using:

SELECT graphile_worker.add_job('reminder', run_at := NOW() + INTERVAL '2 days');

If you want to run a job after a variable number of seconds according to the database time (rather than the application time), you can use interval multiplication; see run_at in this example:

SELECT graphile_worker.add_job(
  $1,
  payload := $2,
  queue_name := $3,
  max_attempts := $4,
  run_at := NOW() + ($5 * INTERVAL '1 second')
);

@lauri865
Copy link
Author

Thanks for the hint about named params.

And you're proposing that a Graphile Worker instance would set the value of this when it starts up? What if a user has multiple Worker Instances with different (but potentially overlapping) task lists - in this case, would the latest booted Worker win, or would they fight, or...?

Last one wins can work for most cases, but to be on the safer side, it could also be versioned by default based on the last-modified date of the task definition file. And manually overwritten by any sortable string.

Or semver:
by default set as: 0.0.0-$LAST_MODIFIED_TIMESTAMP
manual: 0.0.0 (still higher than the default one, so will overwrite)

@lauri865
Copy link
Author

lauri865 commented Sep 23, 2024

The easiest way to handle retries is to do it in your tasks themselves; you should always have something like this - hitting the real max attempts without any cleanup is bad for your database and your performance because you end up with loads of dead records sitting around in the jobs table.

Opinions vary, but I wouldn't call it particularly easy. We have 100 different tasks, doing custom implementations on retries on each becomes maintenance nightmare down the road. Having dead records around is a feature not a bug in our books, as it's something the developer needs to deal with / fix to make these jobs work in most cases. If performance becomes an issue, we can easily create a DB trigger to send them to a separate table / dead-letter queue. Which is not possible / as easy by handling task retries manually on task definition level.

@benjie
Copy link
Member

benjie commented Sep 23, 2024

Having dead records around is a feature not a bug in our books

It's definitely a bug; over time these will eat into your job queue performance - you should write these perma-fails somewhere else for developers to deal with, and then clear them out of the job queue. Ref: https://worker.graphile.org/docs/scaling#keep-your-jobs-table-small

we can easily create a DB trigger to send them to a separate table / dead-letter queue

I'd strongly advise against that; _private_jobs changes structure from time to time which could break your trigger - in fact, I'm currently contemplating splitting it into 3 separate tables transparently to the user assuming that they respect that these tables are private implementation details. You should use the public interfaces to deal with this, which is the task functions themselves.

I plan to (if I haven't already) make the task executors "wrappable"; so this kind of common concern can be implemented via a plugin/preset and shared across all of your tasks. IMO this is much cleaner than poking into internals with a trigger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

2 participants