Luiz
Luiz5mo ago

Buffered queue race

Objective: Concatenate incoming messages and process them 5 seconds after the last message is received. Solution: Check if a "processing" task is already scheduled. If so, cancel the existing task, concatenate the new message, and reschedule the task. Expectation: Convex should internally handle race conditions in the scheduler, ensuring that no more than one processing task is ever scheduled at a time. Issue: When messages arrive at a slow pace, everything works as expected. However, when messages arrive simultaneously, a race condition occurs, resulting in multiple scheduled tasks. Test code:
3 Replies
Luiz
LuizOP5mo ago
Test Code: // the processing task expected to run only once after 5 seconds of the last msg export const testraceprint = internalMutation({ args: { str: v.string(), }, handler: async (ctx, args) => { console.log("ran testraceprint: ", args.str); }, }); // the batch tester, either set to simulate sequential arrivals (with await) that works fine, and simultaneous arrival which causes me the race export const batchtestrace = mutation({ args: {}, handler: async (ctx, args) => { // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); }, }); // lastly the logic, racetest document has only the scheduledId to keep track of the scheduler. export const testrace = mutation({ args: {}, handler: async (ctx, args) => { const racer = await ctx.db.get( "m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">, ); let scheduled = null; if (racer?.scheduledId) { // get the scheduled function scheduled = await ctx.db.system.get(racer.scheduledId); await ctx.scheduler.cancel(racer.scheduledId); } const newScheduledId = await ctx.scheduler.runAfter( 5000, internal.customers.testraceprint, { str: scheduled ? scheduled.args[0].str + "x" : "x", }, ); await ctx.db.patch("m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">, { scheduledId: newScheduledId, }); }, });
lee
lee5mo ago
cool test, i've tested similar things myself. there are two issues with it: 1. Calling mutations directly as functions is not supported. Once you define a export const foo = mutation(...); , that's not a plain function anymore so it's not intended to be called like foo(). It's intended to be called with useMutation or ctx.runMutation or ctx.scheduler.runAfter. The fact that it sorta works doesn't mean it always will. 2. Convex functions can run JavaScript and TypeScript, so they are Turing complete, so you can write race conditions. Convex can't detect what your function is doing ( https://en.wikipedia.org/wiki/Rice's_theorem ) so it can't prevent races. In this case, you're telling the JavaScript engine to run functions in parallel, so they are racing with each other. If you want Convex to protect against race conditions, you need to run the testrace functions as separate mutations, which you can do with ctx.scheduler.runAfter or the other ways listed above. Then the mutations will run as transactions and you won't have issues with isolation.
Rice's theorem
In computability theory, Rice's theorem states that all non-trivial semantic properties of programs are undecidable. A semantic property is one about the program's behavior (for instance, "does the program terminate for all inputs?"), unlike a syntactic property (for instance, "does the program contain an if-then-else statement?"). A non-trivial...
Luiz
LuizOP5mo ago
Perfect! Very clear and to the point. Thank you!

Did you find this page helpful?