Tom RedmanT
Convex Community14mo ago
3 replies
Tom Redman

Has anybody solved implementing a trigger for _scheduled_functions changes?

I'm creating a job queue, in which I ctx.schedule jobs, and I'd like to update the resulting jobQueue record when it's complete (my jobQueue objects are different and used differently from the primitive _scheduled_functions).

I've tried the convex-helpers/trigger, but it doesn't seem to fire on the _scheduled_functions tables, as I don't update it directly.

import { customCtx, customMutation } from 'convex-helpers/server/customFunctions';
import { Triggers } from 'convex-helpers/server/triggers';

/* eslint-disable no-restricted-imports */
import { internalMutation as rawInternalMutation, mutation as rawMutation } from './_generated/server';

/* eslint-enable no-restricted-imports */
import { DataModel } from './_generated/dataModel';

// start using Triggers, with table types from schema.ts
const triggers = new Triggers<DataModel>();

// register a function to run when a `ctx.db.insert`, `ctx.db.patch`, `ctx.db.replace`, or `ctx.db.delete` changes the "users" table
triggers.register('_scheduled_functions', async (ctx, change) => {
  console.log('Scheduled function changed', change);
});

console.log('Triggers registered');

// create wrappers that replace the built-in `mutation` and `internalMutation`
// the wrappers override `ctx` so that `ctx.db.insert`, `ctx.db.patch`, etc. run registered trigger functions
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
export const internalMutation = customMutation(rawInternalMutation, customCtx(triggers.wrapDB));


Here's how I'm using the queue:

try {
      await ctx.runMutation(internal.jobs.jobProcessing.markJobStarted, { jobId });
      const job = await ctx.runQuery(internal.jobs.jobQueue.getJob, { jobId });
      if (!job) throw new Error('Job not found');

      console.log(job);

      switch (job.fnType) {
        case 'action':
          await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'action'>, job.args);
          break;
        case 'mutation':
          await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'mutation'>, job.args);
          break;
      }
    } catch (error: any) {
      await ctx.runMutation(internal.jobs.jobProcessing.markJobFailed, {
        error: {
          message: error.message,
          stack: error.stack,
        },
        jobId,
      });
    }
Was this page helpful?