zid
zid6mo ago

Best practice for testing recursive, potentially costly mutation

I'm nervous as there seems to be no manual kill switch for a recursive mutation call in convex. I've setup basic logic to exit out, however still, I'd appreciate a second look along with some ideas for best practices regarding these costly fns.
const MAX_RECURSION_DEPTH = 20;

export const deactivateThreads = internalMutation({
args: {
cursor: v.optional(v.union(v.string(), v.null())),
userId: v.id("users"),
depth: v.optional(v.number()),
},
handler: async ({ db, scheduler }, { cursor, userId, depth = 0 }) => {
if (depth > MAX_RECURSION_DEPTH) {
throw new Error("Maximum recursion depth exceeded");
}
try {
const user = await db.get(userId);
const savedCursor =
cursor ||
(user?.deactivation?.nextCursor ? user.deactivation.nextCursor : null);

const batchSize = 2;

const { page, continueCursor, isDone, pageStatus } = await db
.query("threads")
.withIndex("by_userId", (q) => q.eq("userId", userId))
.paginate({
numItems: batchSize,
cursor: savedCursor,
});

for (let i = 0; i < page.length; i++) {
const currentThread = page[i];
await db.delete(currentThread._id);
}

if (isDone) {
// await resetCursor(db);
await db.patch(userId, {
deactivation: {
...user.deactivation,
nextCursor: null,
},
});
return;
}

if (!isDone) {

await db.patch(userId, {
deactivation: {
count: user?.deactivation?.count ? user.deactivation.count + 1 : 0,
nextCursor: continueCursor,
},
});

await scheduler.runAfter(0, internal.internalUser.deactivateThreads, {
cursor: continueCursor,
userId: userId,
depth: depth + 1,
});
}
} catch (error) {
throw error;
}
},
});
const MAX_RECURSION_DEPTH = 20;

export const deactivateThreads = internalMutation({
args: {
cursor: v.optional(v.union(v.string(), v.null())),
userId: v.id("users"),
depth: v.optional(v.number()),
},
handler: async ({ db, scheduler }, { cursor, userId, depth = 0 }) => {
if (depth > MAX_RECURSION_DEPTH) {
throw new Error("Maximum recursion depth exceeded");
}
try {
const user = await db.get(userId);
const savedCursor =
cursor ||
(user?.deactivation?.nextCursor ? user.deactivation.nextCursor : null);

const batchSize = 2;

const { page, continueCursor, isDone, pageStatus } = await db
.query("threads")
.withIndex("by_userId", (q) => q.eq("userId", userId))
.paginate({
numItems: batchSize,
cursor: savedCursor,
});

for (let i = 0; i < page.length; i++) {
const currentThread = page[i];
await db.delete(currentThread._id);
}

if (isDone) {
// await resetCursor(db);
await db.patch(userId, {
deactivation: {
...user.deactivation,
nextCursor: null,
},
});
return;
}

if (!isDone) {

await db.patch(userId, {
deactivation: {
count: user?.deactivation?.count ? user.deactivation.count + 1 : 0,
nextCursor: continueCursor,
},
});

await scheduler.runAfter(0, internal.internalUser.deactivateThreads, {
cursor: continueCursor,
userId: userId,
depth: depth + 1,
});
}
} catch (error) {
throw error;
}
},
});
2 Replies
lee
lee6mo ago
this code looks good to me. i like how the depth argument guards against infinite loops. if you do end up with an infinite loop, note you can always go to Convex dashboard -> Settings -> Pause Deployment, which will stop all scheduled jobs but still let you cancel them or update your code with npx convex dev.
zid
zidOP6mo ago
oh so there is a kill switch, nice! And thanks for reviewing the code, @lee . I have a follow up question somewhat related to this but i'll create a separate ticket.

Did you find this page helpful?