Luiz
CCConvex Community
•Created by Luiz on 9/5/2024 in #support-community
Buffered queue race
Perfect! Very clear and to the point. Thank you!
4 replies
CCConvex Community
•Created by Luiz on 9/5/2024 in #support-community
Buffered queue race
Test Code:
// the processing task expected to run only once after 5 seconds of the last msg
export const testraceprint = internalMutation({
args: {
str: v.string(),
},
handler: async (ctx, args) => {
console.log("ran testraceprint: ", args.str);
},
});
// the batch tester, either set to simulate sequential arrivals (with await) that works fine, and simultaneous arrival which causes me the race
export const batchtestrace = mutation({
args: {},
handler: async (ctx, args) => {
// testrace(ctx, {}).then();
// testrace(ctx, {}).then();
// testrace(ctx, {}).then();
// testrace(ctx, {}).then();
// testrace(ctx, {}).then();
await testrace(ctx, {});
await testrace(ctx, {});
await testrace(ctx, {});
await testrace(ctx, {});
await testrace(ctx, {});
},
});
// lastly the logic, racetest document has only the scheduledId to keep track of the scheduler.
export const testrace = mutation({
args: {},
handler: async (ctx, args) => {
const racer = await ctx.db.get(
"m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">,
);
let scheduled = null;
if (racer?.scheduledId) {
// get the scheduled function
scheduled = await ctx.db.system.get(racer.scheduledId);
await ctx.scheduler.cancel(racer.scheduledId);
}
const newScheduledId = await ctx.scheduler.runAfter(
5000,
internal.customers.testraceprint,
{
str: scheduled ? scheduled.args[0].str + "x" : "x",
},
);
await ctx.db.patch("m57db885bsm3gkb2qp9c9s4bed706q8x" as Id<"racetest">, {
scheduledId: newScheduledId,
});
},
});
4 replies