Luiz
Luiz
CCConvex Community
Created by Luiz on 9/5/2024 in #support-community
Buffered queue race
Perfect! Very clear and to the point. Thank you!
4 replies
CCConvex Community
Created by Luiz on 9/5/2024 in #support-community
Buffered queue race
Test Code: // the processing task expected to run only once after 5 seconds of the last msg export const testraceprint = internalMutation({ args: { str: v.string(), }, handler: async (ctx, args) => { console.log("ran testraceprint: ", args.str); }, }); // the batch tester, either set to simulate sequential arrivals (with await) that works fine, and simultaneous arrival which causes me the race export const batchtestrace = mutation({ args: {}, handler: async (ctx, args) => { // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); // testrace(ctx, {}).then(); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); await testrace(ctx, {}); }, }); // lastly the logic, racetest document has only the scheduledId to keep track of the scheduler. export const testrace = mutation({ args: {}, handler: async (ctx, args) => { const racer = await ctx.db.get( "m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">, ); let scheduled = null; if (racer?.scheduledId) { // get the scheduled function scheduled = await ctx.db.system.get(racer.scheduledId); await ctx.scheduler.cancel(racer.scheduledId); } const newScheduledId = await ctx.scheduler.runAfter( 5000, internal.customers.testraceprint, { str: scheduled ? scheduled.args[0].str + "x" : "x", }, ); await ctx.db.patch("m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">, { scheduledId: newScheduledId, }); }, });
4 replies