Trigger.dev v3 - Improved Background Jobs #784
Replies: 9 comments 3 replies
-
DeploymentWe'll need to make deployment of background jobs as easy as possible. Vercel IntegrationWe can create a vercel integration to automatically deploy alongside your Vercel deployments, sync project environment variables, and more easily setup git access. GitHub AppWe'll create a GitHub app that can be installed in a user's repository. We'll subscribe to push events to a branch ( CLIOur CLI tool will allow building and deploying, which can then be used either locally or in a CI/CD environment: npx @trigger.dev/cli@latest deploy |
Beta Was this translation helpful? Give feedback.
-
Alternatives considered & Prior ArtThe example code above was not the only version of durable job code that we considered, and there are also quite a few other projects that are doing something similar (in both TypeScript and other languages), so in this section I'm going to catalog those alternative approaches and what we can learn from them. "Magic tasks"One thing we considered early on was a version that allowed users to write 100% normal code, without any need for tasks. So for example our Ticket Purchasing code above would hypothetically look like this: // @ts-nocheck
import { client } from "@/trigger";
import { WebClient } from "@slack/web-api";
import { invokeTrigger, uuid, waitForEvent } from "@trigger.dev/sdk";
import { Resend } from "resend";
import { Stripe } from "stripe";
import { z } from "zod";
import { db } from "~/db.server";
// These SDK clients use standard environment variables to authenticate
const stripe = new Stripe({
apiKey: process.env.OPENAI_API_KEY,
});
const resend = new Resend({
apiKey: process.env.RESEND_API_KEY,
});
export const purchaseTicket = client.defineBackgroundJob({
id: "purchase-ticket",
name: "Purchase Ticket",
version: "1.0.1",
trigger: invokeTrigger({
schema: z.object({
ticketId: z.number(),
userId: z.string(),
}),
}),
// This gets called every time before the run function is called, allowing you to do any setup
// like hydrating SDK clients with just-in-time credentials
prepare: async (payload, ctx) => {
const slackToken = await client.auth.resolveAccessToken("slack");
const slack = new WebClient(slackToken);
return {
slack
}
},
run: async (payload, ctx) => {
const undos = [];
try {
// First we need to reserve the ticket
const reservedTicket = await db.ticket.update({
where: {
id: payload.ticketId,
},
data: {
reservedBy: payload.userId,
},
});
// Logs will show up in the Trigger.dev UI
console.log("Reserved ticket", { reservedTicket });
// We can undo this action if the user doesn't complete the checkout
undos.push(() => db.ticket.update({
where: {
id: payload.ticketId,
},
data: {
reservedBy: null,
},
}));
// Give the user a 5 minute window to checkout. Will throw an error if the user doesn't checkout in time
const event = await waitForEvent({
name: "cart.checked-out",
filter: {
items: [payload.ticketId],
userId: [payload.userId],
}
}, {
timeout: 5 * 60 * 1000,
});
console.log("Checked out", { checkout: event.payload });
// Now lookup the cart
const cart = await db.cart.findUnique({
where: {
id: event.cartId,
},
include: {
items: true,
},
});
// Charge the user
const charge = stripe.charges.create({
amount: cart.total,
currency: "usd",
source: cart.paymentSource,
description: `Ticket purchase for ${cart.items[0].name}`,
}, {
idempotencyKey: uuid(),
})
console.log("Charged user", { charge });
// Refund the charge if there are any errors after this point
undos.push(() => stripe.refunds.create({
charge: charge.id,
}, { idempotencyKey: uuid() }));
// Finalize the ticket
await db.ticket.update({
where: {
id: reservedTicket.id,
},
data: {
purchasedBy: payload.userId,
},
});
console.log("Finalized ticket");
// This will run once the job is completed successfully
try {
await Promise.all([
resend.send({
from: "[email protected]",
to: cart.email,
subject: "Ticket Purchase Confirmation",
text: `Thanks for purchasing a ticket to ${cart.items[0].name}!`,
}),
ctx.slack.chat.postMessage({
channel: "C1234567890",
text: `Someone just purchased a ticket to ${cart.items[0].name}!`,
})
])
} catch (e) {
// If the email fails to send, we don't want to undo the ticket purchase
console.error("Failed to send email", e);
}
} catch (e) {
for (const undo of undos) {
await undo();
}
}
},
}); Notice how the above no longer uses
In short, there could be an endless list of edge-cases that we'd need to handle. So we decided against "general magic tasks" and decided that we would only insert tasks in very specific situations (e.g. AlternativesInngestimport { inngest } from "./client";
inngest.createFunction(
{ id: "process-contacts-csv-upload" },
{ event: "api/contact_list.uploaded" },
async ({ event, step }) => {
const { isValid, errors } = await step.run(
"validate-upload-contents",
async () => {
const uploadFilename = event.data.filename;
// Download the csv file from object storage, validate columns and data in each row
}
);
if (!isValid) {
return await step.run(
"notify-user-of-invalid-contents",
async () => await sendContactsImportFailedEmail(event.user.id, errors)
);
}
await step.run("enrich-address-information", async () => {
// Call a third party API service to enriches each contact's address information
// with zip codes, etc., then uploads the enriched data to the object store
});
const { totalUsersAdded } = await step.run(
"create-contacts-in-crm",
async () => {
// Download the enriched file and insert into contacts into the database
}
);
await step.run(
"notify-user-of-successful-import",
async () =>
await sendContactsImportSuccessEmail(event.user.id, totalUsersAdded)
);
}
); Inngest and Trigger.dev look very similar, as you can see above everything needs to be in a step (similar to how everything needs to be a task for Trigger.dev), and step's have explicit cache keys. Temporal// activities.ts
import { sleep } from '@temporalio/workflow';
// Simulate a payment processing activity
export async function processPayment(orderId: string): Promise<boolean> {
console.log(`Processing payment for order: ${orderId}`);
await sleep(1000); // Simulate async operation
return true; // Payment successful
}
// Simulate an order validation activity
export async function validateOrder(orderId: string): Promise<boolean> {
console.log(`Validating order: ${orderId}`);
await sleep(500); // Simulate async operation
return true; // Validation successful
}
// Simulate inventory update activity
export async function updateInventory(orderId: string): Promise<void> {
console.log(`Updating inventory for order: ${orderId}`);
await sleep(1000); // Simulate async operation
}
// Simulate sending an email activity
export async function sendConfirmationEmail(orderId: string): Promise<void> {
console.log(`Sending confirmation email for order: ${orderId}`);
await sleep(500); // Simulate async operation
}
// workflows.ts
import { defineWorkflow, proxyActivities } from '@temporalio/workflow';
import * as activities from './activities';
// Create a proxy to your activities
const { validateOrder, processPayment, updateInventory, sendConfirmationEmail } = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
});
// Define the workflow
export const orderProcessingWorkflow = defineWorkflow({
async execute(orderId: string): Promise<void> {
const isValidOrder = await validateOrder(orderId);
if (!isValidOrder) throw new Error('Invalid order');
const paymentSuccessful = await processPayment(orderId);
if (!paymentSuccessful) throw new Error('Payment failed');
await updateInventory(orderId);
await sendConfirmationEmail(orderId);
console.log(`Order processing completed for order: ${orderId}`);
},
}); Restate.devimport * as restate from "@restatedev/restate-sdk";
import { v4 as uuid } from "uuid";
import { StripeClient } from "./auxiliary/stripe_client";
import { EmailClient } from "./auxiliary/email_client";
const doCheckout = async (
ctx: restate.RpcContext,
request: { userId: string; tickets: string[] }
) => {
// We are a uniform shop where everything costs 40 USD
const totalPrice = request.tickets.length * 40;
// Generate idempotency key for the stripe client
const idempotencyKey = await ctx.sideEffect(async () => uuid());
const stripe = StripeClient.get();
const doPayment = async () => stripe.call(idempotencyKey, totalPrice);
const success = await ctx.sideEffect(doPayment);
const email = EmailClient.get();
if (success) {
console.info("Payment successful. Notifying user about shipment.");
await ctx.sideEffect(async () =>
email.notifyUserOfPaymentSuccess(request.userId)
);
} else {
console.info("Payment failure. Notifying user about it.");
await ctx.sideEffect(async () =>
email.notifyUserOfPaymentFailure(request.userId)
);
}
return success;
};
export const checkoutApi: restate.ServiceApi<typeof checkoutRouter> = {
path: "CheckoutProcess",
};
export const checkoutRouter = restate.router({
checkout: doCheckout,
}); |
Beta Was this translation helpful? Give feedback.
-
TypeScript DecoratorsThere is another alternative that could be a pretty nice option once TypeScript supports decorators mutating types: import { client } from "@/trigger";
import { WebClient } from "@slack/web-api";
import { invokeTrigger, task, uuid, waitForEvent } from "@trigger.dev/sdk";
import { Resend } from "resend";
import { Stripe } from "stripe";
import { z } from "zod";
import { db } from "~/db.server";
// These SDK clients use standard environment variables to authenticate
const stripe = new Stripe({
apiKey: process.env.OPENAI_API_KEY,
});
const resend = new Resend({
apiKey: process.env.RESEND_API_KEY,
});
class TicketService {
// The @task decorator is used to define durable tasks
@task
async reserveTicket(ticketId: number, userId: string) {
return db.ticket.update({
where: {
id: ticketId,
},
data: {
reservedBy: userId,
},
});
}
@task
async finalizeTicket(ticketId: number, userId: string) {
return db.ticket.update({
where: {
id: ticketId,
},
data: {
purchasedBy: userId,
},
});
}
@task
async releaseTicket(ticketId: number) {
return db.ticket.update({
where: {
id: ticketId,
},
data: {
reservedBy: null,
},
});
}
@task
async findCart(cartId: string) {
return db.cart.findUnique({
where: {
id: cartId,
},
include: {
items: true,
},
});
}
}
const ticketService = new TicketService();
export const purchaseTicket = client.defineBackgroundJob({
id: "purchase-ticket",
name: "Purchase Ticket",
version: "1.0.1",
trigger: invokeTrigger({
schema: z.object({
ticketId: z.number(),
userId: z.string(),
}),
}),
// This gets called every time before the run function is called, allowing you to do any setup
// like hydrating SDK clients with just-in-time credentials
prepare: async (payload, ctx) => {
const slackToken = await client.auth.resolveAccessToken("slack");
const slack = new WebClient(slackToken);
return {
slack
}
},
run: async (payload, ctx) => {
const undos = [];
try {
// First we need to reserve the ticket
const reservedTicket = await ticketService.reserveTicket(payload.ticketId, payload.userId);
// Logs will show up in the Trigger.dev UI
console.log("Reserved ticket", { reservedTicket });
// We can undo this action if the user doesn't complete the checkout
undos.push(() => ticketService.releaseTicket(reservedTicket.id));
// Give the user a 5 minute window to checkout. Will throw an error if the user doesn't checkout in time
const event = await waitForEvent({
name: "cart.checked-out",
filter: {
items: [payload.ticketId],
userId: [payload.userId],
}
}, {
timeout: 5 * 60 * 1000,
});
console.log("Checked out", { checkout: event.payload });
// Now lookup the cart
const cart = await ticketService.findCart(event.cartId);
// Charge the user
const charge = task.run(() => stripe.charges.create({
amount: cart.total,
currency: "usd",
source: cart.paymentSource,
description: `Ticket purchase for ${cart.items[0].name}`,
}, {
idempotencyKey: uuid(),
}))
console.log("Charged user", { charge });
// Refund the charge if there are any errors after this point
undos.push(() => task.run(() => stripe.refunds.create({
charge: charge.id,
}, { idempotencyKey: uuid() })));
// Finalize the ticket
await ticketService.finalizeTicket(reservedTicket.id, payload.userId);
console.log("Finalized ticket");
// This will run once the job is completed successfully
try {
await Promise.all([
task.run(() => resend.send({
from: "[email protected]",
to: cart.email,
subject: "Ticket Purchase Confirmation",
text: `Thanks for purchasing a ticket to ${cart.items[0].name}!`,
})),
task.run(() => ctx.slack.chat.postMessage({
channel: "C1234567890",
text: `Someone just purchased a ticket to ${cart.items[0].name}!`,
}))
])
} catch (e) {
// If the email fails to send, we don't want to undo the ticket purchase
console.error("Failed to send email", e);
}
} catch (e) {
for (const undo of undos) {
await undo();
}
}
},
}); |
Beta Was this translation helpful? Give feedback.
-
Environment VariablesAs background jobs won't run in the existing deployment, we'll have to decide how to handle environment variables.
|
Beta Was this translation helpful? Give feedback.
-
Task Queues#787 brings up a good use-case for limiting job runs for a given tenant, which we have heard a bunch of times now over the last few months. One idea for how to handle this use-case is to allow tasks to be enqueued, and not just run: // Inside of a job
await task.enqueue(() => checkSubscription(subId), { queueName: `tenant:${tenantId}` }); The queue controls could be managed via API calls through our SDK: await client.queues.update(`tenant:${tenantId}`, { concurrency: 1 }); |
Beta Was this translation helpful? Give feedback.
-
Logging with OpenTelemetryOur background functions should log with the OpenTelemetry ecosystem in mind. All logging that goes from background function -> Trigger.dev server should be over OTEL. Functions that invoke other functions should be in the same trace. |
Beta Was this translation helpful? Give feedback.
-
Long running serversOne thing not mentioned above is v3 would come with #430 built-in, so this would unify the architecture no matter how users are deploying their code. |
Beta Was this translation helpful? Give feedback.
-
Client-less definitionsWe currently have everything defined against a In v3 this won't be necessary, and will allow us to simplify the definitions. I think we can also further simplify by removing the need for an We could use a similar style as Next.js/Vercel in defining endpoints, with customization done via named exports: import { stripe } from "@trigger.dev/sdk/stripe";
// Default export is the background function, with a stripe trigger
export default stripe.on("price.created").run(({ payload, ctx }) => {
// payload type is inferred from the trigger
// This always runs, and allows just-in-time access tokens to hydrate clients (no need for prepare)
const slack = await ctx.always(async () => {
const slackToken = await client.auth.resolveAccessToken("slack");
return new WebClient(slackToken);
});
});
export const maxConcurrency = 1; Or we could add an optional second parameter for configuration, instead of the separate exports (and allow > 1 background function to be defined in a file): import { stripe } from "@trigger.dev/sdk/stripe";
// Named export is a background function, with a stripe trigger
export const updateProducts = stripe.on("price.created").run(({ payload, ctx }) => {
// payload type is inferred from the trigger
}, { maxConcurrency: 1 });
// > 1 background function supported in a single file
export const updateCustomer = stripe.on("subscription.updated").run(({ payload, ctx }) => {
// payload type is inferred from the trigger
}, { maxConcurrency: 10 }); Or we could keep the current way we do things with a single object parameter: import { $function } from "@trigger.dev/sdk";
import { stripe } from "@trigger.dev/sdk/stripe";
// Named export is a background function, with a stripe trigger
export const updateProducts = $function({
trigger: stripe.on("price.created"),
maxConcurrency: 1,
run: async ({ payload, ctx }) => {
// payload type is inferred from the trigger
},
}); |
Beta Was this translation helpful? Give feedback.
-
I'm closing this as v3 is now available in early access: https://trigger.dev/blog/v3-developer-preview-launch/ |
Beta Was this translation helpful? Give feedback.
-
Problem
Trigger.dev v2 provides a way for developers to write background jobs in their existing codebase, but the architecture we've chosen to make this possible has caused too many issues and it's really easy to write code that doesn't work.
One major issue is that the Trigger.dev code developers write is still executed in the existing deployment, and so is truly not "long-running". This makes it hard to communicate what will work as a background job and what won't. Currently we only support something being "long-running" if a job is broken up into many individual (and short) tasks, as no one task can have an execution time longer than their serverless function timeout.
We've added a few workarounds for this, specifically
io.backgroundFetch()
, which will actually pause running while the Trigger.dev server actually does the fetch request and then resumes running with the response, because some requests cough OpenAI cough take longer than a serverless function execution.Other issues include:
Try/Catch
causes issues.There are also lots of issues in making this system work. We have to create and maintain adapters for a bunch of frameworks and runtimes, not to mention vastly different setups. We very frequently run into issues trying to make connections "back" to the user's endpoint. Middleware, auth, CDNs, caching, tree-shaking, etc, all make the system currently very hard to make work in all circumstances.
Proposed Solution
We think we can fix the majority of problems with v3 of Trigger.dev by taking over the responsibility of running the background job code ourselves. This will give us more control over the code, which will in turn allow us to:
try/catch
blocksPromise.all
toio.parallel()
Additional improvements
Along with the improvements laid out above, we've learned a lot of lessons over the last 8-10 months while building V2 that we're going to incorporate into changes for v3:
Examples
The following example is of a Ticket Purchase flow where the user has 5 minutes to checkout from their cart and then they are charged in stripe, and their ticket is assigned to them. We also send them an email with the purchase confirmation and for bonus points send ourselves a slack message:
A few notes:
task.wrap
which "taskifies" a function and makes it durableprepare
function gives us a place to hydrate our slack client with an up to date access tokenIO
. Everything that needs access to information about the current context/run can get access to it through AsyncLocalStorage so we can just expose functions liketask.run
,task.wrap
,waitForEvent
, etc.Challenges and other considerations
To make this system work we're going to need to bundle and modify user code, as well as provide errors when side effects take place outside of a task (if a task cannot be automatically inserted, e.g.
Date.now
). This is going to be a considerable DX challenge.We're also going to need a way to build & run this code locally during development. Ideally by introspecting the code we should be able to provide better assistance and suggestions for how to create reliable and durable code.
The background code users write will still possibly be shipped with their main app/deploy. For example, the job above exports a job which needs to be manually invoked on the users backend:
The above won't actually run the code, but make a request to the Trigger.dev server to invoke the [email protected] with the provided payload. This means our SDK will still need to support multiple runtimes (Bun, CF Workers, Deno, Node.js), even if we don't support multiple runtimes for background jobs.
Runtime and dependency support
It's an open question which runtimes we'll need to support for running background job code. We could start with Node.js but I wouldn't be surprised if we need to quickly start supporting other runtimes (like Bun).
We'll also need to look into support for dependencies, especially ones that are more complicated, like
puppeteer
andprisma
, that might not be so easy to just bundle. There's probably a lot of prior art on this problem though so hopefully this won't be too difficult.Migrating and legacy code
Our plan is to support V2 alongside v3, and allow people to adopt v3 on a per-project basis. v3 will initially be rolled out as a "technology preview" without additional billing concerns on the Trigger.dev Cloud. Self-hosters will, at least initially, be responsible for deploying and running their own background job code separate from the Trigger.dev server, but how that will work is still an open question. Also, while v3 is in technology preview, the SDK will be available as
@trigger.dev/v3
, but will eventually be rolled into@trigger.dev/sdk
. At some point we'll deprecate V2 when we can provide the ability to migrate projects.There is no timeline now for this new version but we like to move fast, hopefully getting the tech preview in alpha by the end of Q1 2024 with a goal of shipping v3 in the first half of 2024, and scheduling V2 to be deprecated by the end of 2024 (giving plenty of time to migrate).
Feedback
We're still in the exploratory phase of this new version and would really love and appreciate any feedback from our community. Please feel free to leave a comment below or reach out on our Discord channel.
Beta Was this translation helpful? Give feedback.
All reactions