David Alonso
David Alonso•2mo ago

How to handle migrations that need to run an action?

I need to iterate over a table to backfill some data that needs to be fetched from an external service through an SDK that requires "use node" . What is my best path forward?
11 Replies
Convex Bot
Convex Bot•2mo ago
Thanks for posting in <#1088161997662724167>. Reminder: If you have a Convex Pro account, use the Convex Dashboard to file support tickets. - Provide context: What are you trying to achieve, what is the end-user interaction, what are you seeing? (full error message, command output, etc.) - Use search.convex.dev to search Docs, Stack, and Discord all at once. - Additionally, you can post your questions in the Convex Community's <#1228095053885476985> channel to receive a response from AI. - Avoid tagging staff unless specifically instructed. Thank you!
David Alonso
David AlonsoOP•2mo ago
I guess another way of phrasing my question is: what's the recommended way to run an action for all documents on a large table reliably?
erquhart
erquhart•2mo ago
The best real world implementation example I know of for this sort of thing is Jamie's resend component, which uses the workpool and action retrier components for reliable background task execution. Good amount of code in-repo to get everything working well, sendEmail function is a good first thread to pull: https://github.com/get-convex/resend/blob/903451c490123a3ad0529d3bfa32033e14a8d028/src/component/lib.ts#L64 For the resend component it's adding emails to the db and then queuing up and sending. Since you're going off of data already in the db you don't need the insert part, but the rest should broadly apply.
ian
ian•2mo ago
Yeah I agree. If you want to run an action per-document, your migration could look like
migrateOne(ctx, doc) => workpool.enqueueAction(internal.foo.myAction, { doc })
migrateOne(ctx, doc) => workpool.enqueueAction(internal.foo.myAction, { doc })
Maybe with a check beforehand to see if it's necessary. If you want to run actions in batch (highly recommend), then migrations should have migrateBatch but doesn't yet, so you could do
export const actionMigration = internalMutation((ctx, args: { context: { cursor?: string, isDone?: boolean, onComplete?: string }) => {
const { context: { cursor, isDone, onComplete } } = args;
if (!isDone) {
const batch = await ctx.db....paginate({ cursor, numItems });
await workpool.enqueueAction(internal.foo.myAction, { batch: batch.page }, {
onComplete: internal.foo.actionMigration
context: { cursor: batch.continueCursor, isDone: batch.isDone onComplete: onComplete }
});
} else {
if (onComplete) {
await ctx.runMutation(onComplete as any, {...});
}
}
});
export const actionMigration = internalMutation((ctx, args: { context: { cursor?: string, isDone?: boolean, onComplete?: string }) => {
const { context: { cursor, isDone, onComplete } } = args;
if (!isDone) {
const batch = await ctx.db....paginate({ cursor, numItems });
await workpool.enqueueAction(internal.foo.myAction, { batch: batch.page }, {
onComplete: internal.foo.actionMigration
context: { cursor: batch.continueCursor, isDone: batch.isDone onComplete: onComplete }
});
} else {
if (onComplete) {
await ctx.runMutation(onComplete as any, {...});
}
}
});
and pass a { onComplete: await createFunctionHandle(internal.foo.onMigrationComplete)} arg so you can run some mutation when the last page finishes. This will run each page in series. If you don't care about knowing when it's done/ handling errors, then you could just enqueue the action and do workpool.enqueueMutation(internal.foo.actionMigration for the next page immediately
David Alonso
David AlonsoOP•2mo ago
You guys are awesome! Will try this out I'm curious why you'd highly recommend doing this in batches vs enqueue'ing an action for each document?
ian
ian•2mo ago
Oh I'm assuming that document's action might be something like fetching a URL and doing some work, and batching those in one action saves money since the action is mostly idle. Especially node actions are slower / heavier weight. The migration helper runs in batches to help minimize the number of function overhead.
David Alonso
David AlonsoOP•2mo ago
ah gotcha so you mean that one action that does something like await Promise.all(<fetch like operation>) will be more cost effective than triggering a bunch of actions that do await <fetch-like op> I'm okay with a cost hit for a one-time migration. My bigger concern is hitting API rate limits when running a migration over a large set of documents that all hit an API. Is there any way to run a migration in stages or add a delay between steps in the migration?
ian
ian•2mo ago
Each time one is enqueueing the next page, you can do const { retryAfter } = rateLimit.limit(..., { reserve: true }); and if the retryAfter is non-0, then you can pass { runAfter } to the workpool, so it doesn't start until there's capacity for it. Then you have ~perfect bandwidth pacing with reserved capacity
djbalin
djbalin•2mo ago
Wow lol I literally thought "migrate should have a migrateBatch feature" last week as well and had a crack at implementing it. I thought I would open a PR once I thought it made sense. The blow is my implementation so far, does this approach seem somewhat reasonable @ian ? It's still a bit of a WIP, but seems to work as expected https://github.com/djbalin/migrations/commit/51c72ffb10204a1a061b3c6970bac522a01d8e09#diff-24f1ac19d22913269d99a8a5afa4c13aa7fd2c8b9858529046424841ca6421da
djbalin
djbalin•2mo ago
I would ideally avoid having to define the defineBatchMigration method which mostly mirrors the regular define, but I'm so weak at doing proper overloading/polymorphism in JS 🥲 My take it that it would be nice to have only the migrations.define constructor which then has two discriminated signatures: either you can pass migrateOne or migrateBatch. I also need to consider whether parallelize makes sense at all for the migrateBatch case. you can perform parallelization inside your migrateBatch function - it's probably a bit crazy to do nested parallelization? Could quickly lead to hitting some limits?
ian
ian•2mo ago
Yeah that's the right direction - and thanks for adding content / UI to the example app. The change I'd make is to share the logic between that and the normal path that either does the batch or one-by-one. e.g. the batch definition function would call
await this._migrationImpl(table, customRange, batchSize, async (ctx, page) => {
try {
await migrateBatch(ctx, page);
} catch (error) {
console.error(`Batch failed: ${page}`);
throw error;
}
});
await this._migrationImpl(table, customRange, batchSize, async (ctx, page) => {
try {
await migrateBatch(ctx, page);
} catch (error) {
console.error(`Batch failed: ${page}`);
throw error;
}
});
And the normal path would call
await this._migrationImpl(table, customRange, batchSize, async (ctx, page) => {
async function doOne(doc: DocumentByName<DataModel, TableName>) {
try {
const next = await migrateOne(
ctx,
doc as { _id: GenericId<TableName> }
);
if (next && Object.keys(next).length > 0) {
await ctx.db.patch(doc._id as GenericId<TableName>, next);
}
} catch (error) {
console.error(`Document failed: ${doc._id}`);
throw error;
}
}
if (parallelize) {
await Promise.all(page.map(doOne));
} else {
for (const doc of page) {
await doOne(doc);
}
}
});
await this._migrationImpl(table, customRange, batchSize, async (ctx, page) => {
async function doOne(doc: DocumentByName<DataModel, TableName>) {
try {
const next = await migrateOne(
ctx,
doc as { _id: GenericId<TableName> }
);
if (next && Object.keys(next).length > 0) {
await ctx.db.patch(doc._id as GenericId<TableName>, next);
}
} catch (error) {
console.error(`Document failed: ${doc._id}`);
throw error;
}
}
if (parallelize) {
await Promise.all(page.map(doOne));
} else {
for (const doc of page) {
await doOne(doc);
}
}
});

Did you find this page helpful?