Tom Redman
Tom Redman
CCConvex Community
Created by Tom Redman on 12/6/2024 in #support-community
Has anybody solved implementing a trigger for _scheduled_functions changes?
I'm creating a job queue, in which I ctx.schedule jobs, and I'd like to update the resulting jobQueue record when it's complete (my jobQueue objects are different and used differently from the primitive _scheduled_functions). I've tried the convex-helpers/trigger, but it doesn't seem to fire on the _scheduled_functions tables, as I don't update it directly.
import { customCtx, customMutation } from 'convex-helpers/server/customFunctions';
import { Triggers } from 'convex-helpers/server/triggers';

/* eslint-disable no-restricted-imports */
import { internalMutation as rawInternalMutation, mutation as rawMutation } from './_generated/server';

/* eslint-enable no-restricted-imports */
import { DataModel } from './_generated/dataModel';

// start using Triggers, with table types from schema.ts
const triggers = new Triggers<DataModel>();

// register a function to run when a `ctx.db.insert`, `ctx.db.patch`, `ctx.db.replace`, or `ctx.db.delete` changes the "users" table
triggers.register('_scheduled_functions', async (ctx, change) => {
console.log('Scheduled function changed', change);
});

console.log('Triggers registered');

// create wrappers that replace the built-in `mutation` and `internalMutation`
// the wrappers override `ctx` so that `ctx.db.insert`, `ctx.db.patch`, etc. run registered trigger functions
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
export const internalMutation = customMutation(rawInternalMutation, customCtx(triggers.wrapDB));
import { customCtx, customMutation } from 'convex-helpers/server/customFunctions';
import { Triggers } from 'convex-helpers/server/triggers';

/* eslint-disable no-restricted-imports */
import { internalMutation as rawInternalMutation, mutation as rawMutation } from './_generated/server';

/* eslint-enable no-restricted-imports */
import { DataModel } from './_generated/dataModel';

// start using Triggers, with table types from schema.ts
const triggers = new Triggers<DataModel>();

// register a function to run when a `ctx.db.insert`, `ctx.db.patch`, `ctx.db.replace`, or `ctx.db.delete` changes the "users" table
triggers.register('_scheduled_functions', async (ctx, change) => {
console.log('Scheduled function changed', change);
});

console.log('Triggers registered');

// create wrappers that replace the built-in `mutation` and `internalMutation`
// the wrappers override `ctx` so that `ctx.db.insert`, `ctx.db.patch`, etc. run registered trigger functions
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
export const internalMutation = customMutation(rawInternalMutation, customCtx(triggers.wrapDB));
Here's how I'm using the queue:
try {
await ctx.runMutation(internal.jobs.jobProcessing.markJobStarted, { jobId });
const job = await ctx.runQuery(internal.jobs.jobQueue.getJob, { jobId });
if (!job) throw new Error('Job not found');

console.log(job);

switch (job.fnType) {
case 'action':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'action'>, job.args);
break;
case 'mutation':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'mutation'>, job.args);
break;
}
} catch (error: any) {
await ctx.runMutation(internal.jobs.jobProcessing.markJobFailed, {
error: {
message: error.message,
stack: error.stack,
},
jobId,
});
}
try {
await ctx.runMutation(internal.jobs.jobProcessing.markJobStarted, { jobId });
const job = await ctx.runQuery(internal.jobs.jobQueue.getJob, { jobId });
if (!job) throw new Error('Job not found');

console.log(job);

switch (job.fnType) {
case 'action':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'action'>, job.args);
break;
case 'mutation':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'mutation'>, job.args);
break;
}
} catch (error: any) {
await ctx.runMutation(internal.jobs.jobProcessing.markJobFailed, {
error: {
message: error.message,
stack: error.stack,
},
jobId,
});
}
4 replies
CCConvex Community
Created by Tom Redman on 12/6/2024 in #support-community
Not sure why this is throwing a "multiple paginated queries" error
export const getJobsCountWithQueueAndStatus = internalQuery({
args: {
queue: v.string(),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
},
handler: async (ctx, { queue, status }) => {
let continueCursor = null;
let isDone = false;
let page;
const pageSize = 5;

let jobsCount = 0;

while (!isDone) {
console.log(queue, status);

({ continueCursor, isDone, page } = await ctx.db
.query('jobs')
.withIndex('by_queue_status', (q) => q.eq('queue', queue).eq('status', status))
.paginate({
cursor: continueCursor,
numItems: pageSize,
}));
jobsCount += page.length;
}

return jobsCount;
},
});
export const getJobsCountWithQueueAndStatus = internalQuery({
args: {
queue: v.string(),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
},
handler: async (ctx, { queue, status }) => {
let continueCursor = null;
let isDone = false;
let page;
const pageSize = 5;

let jobsCount = 0;

while (!isDone) {
console.log(queue, status);

({ continueCursor, isDone, page } = await ctx.db
.query('jobs')
.withIndex('by_queue_status', (q) => q.eq('queue', queue).eq('status', status))
.paginate({
cursor: continueCursor,
numItems: pageSize,
}));
jobsCount += page.length;
}

return jobsCount;
},
});
Schema:
jobs: defineTable({
args: v.any(),
attempt: v.number(),
dependencies: v.optional(v.array(v.id('jobs'))),
error: v.optional(
v.object({
attempt: v.number(),
message: v.string(),
stack: v.optional(v.string()),
timestamp: v.number(),
}),
),
fnArgs: v.any(),
fnHandle: v.string(),
fnName: v.string(),
fnType: v.union(v.literal('action'), v.literal('mutation'), v.literal('query')),
maxAttempts: v.number(),
metadata: v.optional(v.any()),
nextJobId: v.optional(v.id('jobs')),
priority: v.number(),
progress: v.optional(
v.object({
current: v.number(),
status: v.string(),
total: v.number(),
}),
),
queue: v.string(),
resourceRequirements: v.optional(
v.object({
cpu: v.number(),
memory: v.number(),
timeout: v.number(),
}),
),
result: v.optional(v.any()),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
timing: v.object({
completedAt: v.optional(v.number()),
createdAt: v.number(),
duration: v.optional(v.number()),
processingTime: v.optional(v.number()),
queueTime: v.optional(v.number()),
startedAt: v.optional(v.number()),
}),
}).index('by_queue_status', ['queue', 'status']),
jobs: defineTable({
args: v.any(),
attempt: v.number(),
dependencies: v.optional(v.array(v.id('jobs'))),
error: v.optional(
v.object({
attempt: v.number(),
message: v.string(),
stack: v.optional(v.string()),
timestamp: v.number(),
}),
),
fnArgs: v.any(),
fnHandle: v.string(),
fnName: v.string(),
fnType: v.union(v.literal('action'), v.literal('mutation'), v.literal('query')),
maxAttempts: v.number(),
metadata: v.optional(v.any()),
nextJobId: v.optional(v.id('jobs')),
priority: v.number(),
progress: v.optional(
v.object({
current: v.number(),
status: v.string(),
total: v.number(),
}),
),
queue: v.string(),
resourceRequirements: v.optional(
v.object({
cpu: v.number(),
memory: v.number(),
timeout: v.number(),
}),
),
result: v.optional(v.any()),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
timing: v.object({
completedAt: v.optional(v.number()),
createdAt: v.number(),
duration: v.optional(v.number()),
processingTime: v.optional(v.number()),
queueTime: v.optional(v.number()),
startedAt: v.optional(v.number()),
}),
}).index('by_queue_status', ['queue', 'status']),
Error:
Error: [CONVEX Q(jobs/jobQueue:getJobsCountWithQueueAndStatus)] [Request ID: e735d416db91c2a6] Server Error
Uncaught Error: This query or mutation function ran multiple paginated queries. Convex only supports a single paginated query in each function.
at async handler (../../convex/jobs/jobQueue.ts:54:18)

Called by client
Error: [CONVEX Q(jobs/jobQueue:getJobsCountWithQueueAndStatus)] [Request ID: e735d416db91c2a6] Server Error
Uncaught Error: This query or mutation function ran multiple paginated queries. Convex only supports a single paginated query in each function.
at async handler (../../convex/jobs/jobQueue.ts:54:18)

Called by client
Calling once with arguments:
{
queue: "instagramApi",
status: "completed",
}
{
queue: "instagramApi",
status: "completed",
}
10 replies