Tom Redman
Tom Redman
CCConvex Community
Created by Tom Redman on 1/20/2025 in #support-community
Type error when NOT directly calling function (new to Convex 1.18.2)
I see in the update notes, it's not a good practice to call a function directly from another function. However, when rectifying this (e.g. using ctx.runQuery(myApiQuery) vs myApiQuery() I'm getting the stubborn "Function implicitly has return type 'any' because..." error. Any idea why this is now upsetting the type system? I feel like I'm missing something obvious!
export const getForCurrentUser = query({
args: {},
handler: async (ctx) => {
const { user, team } = await ctx.runQuery(
internal.authentication.user.getCurrentUserOrThrow
);
return await ctx.db
.query("mailchimpLists")
.withIndex("by_team", (q) => q.eq("teamId", team._id))
.collect();
},
});
export const getForCurrentUser = query({
args: {},
handler: async (ctx) => {
const { user, team } = await ctx.runQuery(
internal.authentication.user.getCurrentUserOrThrow
);
return await ctx.db
.query("mailchimpLists")
.withIndex("by_team", (q) => q.eq("teamId", team._id))
.collect();
},
});
And then I have this query:
export const getCurrentUserOrThrow = internalQuery({
args: {},
handler: async (ctx): Promise<{ user: Doc<"users">; team: Doc<"teams"> }> => {
const userId = await getAuthUserId(ctx);
if (!userId) {
throw new Error("Not signed in");
}

const user = await ctx.db.get(userId);

if (!user) {
throw new Error("User not found");
}

const teamId = user.teamId;

if (!teamId) {
throw new Error("User is not in a team");
}

const team = await ctx.db.get(teamId);

if (!team) {
throw new Error("Team not found");
}

return { user, team };
},
});
export const getCurrentUserOrThrow = internalQuery({
args: {},
handler: async (ctx): Promise<{ user: Doc<"users">; team: Doc<"teams"> }> => {
const userId = await getAuthUserId(ctx);
if (!userId) {
throw new Error("Not signed in");
}

const user = await ctx.db.get(userId);

if (!user) {
throw new Error("User not found");
}

const teamId = user.teamId;

if (!teamId) {
throw new Error("User is not in a team");
}

const team = await ctx.db.get(teamId);

if (!team) {
throw new Error("Team not found");
}

return { user, team };
},
});
25 replies
CCConvex Community
Created by Tom Redman on 1/10/2025 in #support-community
Is this typically a bad practice? v.union(v.id("thisList"), v.id("thatList"))
I have two different types of lists that a user can select to take a singular action on (in my case, send emails). I have a list of contacts that is a "segment", or a user can select the entire "list". A list is different from a segment, but the same action can be taken on both for all practical purposes. I have my schema like this: audience: v.union(v.id("mailchimpLists"), v.id("mailchimpSegments")), Butttttt I'm immediately realizing that disambiguating this in a function seems hacky at best. Is it better to do this, and then simply manage what's what further upstream?
mailchimpSegment: v.optional(v.id("mailchimpSegments")),
mailchimpList: v.optional(v.id("mailchimpLists")),
mailchimpSegment: v.optional(v.id("mailchimpSegments")),
mailchimpList: v.optional(v.id("mailchimpLists")),
This is fine as well, however, I find myself wanting to enforce that there must be one but not both. I could do something like this:
audienceType: v.union(
v.literal("segment"),
v.literal("list"),
),
mailchimpSegment: v.optional(v.id("mailchimpSegments")),
mailchimpList: v.optional(v.id("mailchimpLists")),
audienceType: v.union(
v.literal("segment"),
v.literal("list"),
),
mailchimpSegment: v.optional(v.id("mailchimpSegments")),
mailchimpList: v.optional(v.id("mailchimpLists")),
But this still feels brittle. I'm leaning toward the audienceType option, but would love to know if there's a better way, or if I'm missing something obvious with the v.union(v.id,v.id) idea.
14 replies
CCConvex Community
Created by Tom Redman on 12/6/2024 in #support-community
Has anybody solved implementing a trigger for _scheduled_functions changes?
I'm creating a job queue, in which I ctx.schedule jobs, and I'd like to update the resulting jobQueue record when it's complete (my jobQueue objects are different and used differently from the primitive _scheduled_functions). I've tried the convex-helpers/trigger, but it doesn't seem to fire on the _scheduled_functions tables, as I don't update it directly.
import { customCtx, customMutation } from 'convex-helpers/server/customFunctions';
import { Triggers } from 'convex-helpers/server/triggers';

/* eslint-disable no-restricted-imports */
import { internalMutation as rawInternalMutation, mutation as rawMutation } from './_generated/server';

/* eslint-enable no-restricted-imports */
import { DataModel } from './_generated/dataModel';

// start using Triggers, with table types from schema.ts
const triggers = new Triggers<DataModel>();

// register a function to run when a `ctx.db.insert`, `ctx.db.patch`, `ctx.db.replace`, or `ctx.db.delete` changes the "users" table
triggers.register('_scheduled_functions', async (ctx, change) => {
console.log('Scheduled function changed', change);
});

console.log('Triggers registered');

// create wrappers that replace the built-in `mutation` and `internalMutation`
// the wrappers override `ctx` so that `ctx.db.insert`, `ctx.db.patch`, etc. run registered trigger functions
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
export const internalMutation = customMutation(rawInternalMutation, customCtx(triggers.wrapDB));
import { customCtx, customMutation } from 'convex-helpers/server/customFunctions';
import { Triggers } from 'convex-helpers/server/triggers';

/* eslint-disable no-restricted-imports */
import { internalMutation as rawInternalMutation, mutation as rawMutation } from './_generated/server';

/* eslint-enable no-restricted-imports */
import { DataModel } from './_generated/dataModel';

// start using Triggers, with table types from schema.ts
const triggers = new Triggers<DataModel>();

// register a function to run when a `ctx.db.insert`, `ctx.db.patch`, `ctx.db.replace`, or `ctx.db.delete` changes the "users" table
triggers.register('_scheduled_functions', async (ctx, change) => {
console.log('Scheduled function changed', change);
});

console.log('Triggers registered');

// create wrappers that replace the built-in `mutation` and `internalMutation`
// the wrappers override `ctx` so that `ctx.db.insert`, `ctx.db.patch`, etc. run registered trigger functions
export const mutation = customMutation(rawMutation, customCtx(triggers.wrapDB));
export const internalMutation = customMutation(rawInternalMutation, customCtx(triggers.wrapDB));
Here's how I'm using the queue:
try {
await ctx.runMutation(internal.jobs.jobProcessing.markJobStarted, { jobId });
const job = await ctx.runQuery(internal.jobs.jobQueue.getJob, { jobId });
if (!job) throw new Error('Job not found');

console.log(job);

switch (job.fnType) {
case 'action':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'action'>, job.args);
break;
case 'mutation':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'mutation'>, job.args);
break;
}
} catch (error: any) {
await ctx.runMutation(internal.jobs.jobProcessing.markJobFailed, {
error: {
message: error.message,
stack: error.stack,
},
jobId,
});
}
try {
await ctx.runMutation(internal.jobs.jobProcessing.markJobStarted, { jobId });
const job = await ctx.runQuery(internal.jobs.jobQueue.getJob, { jobId });
if (!job) throw new Error('Job not found');

console.log(job);

switch (job.fnType) {
case 'action':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'action'>, job.args);
break;
case 'mutation':
await ctx.scheduler.runAfter(0, job.fnHandle as FunctionHandle<'mutation'>, job.args);
break;
}
} catch (error: any) {
await ctx.runMutation(internal.jobs.jobProcessing.markJobFailed, {
error: {
message: error.message,
stack: error.stack,
},
jobId,
});
}
4 replies
CCConvex Community
Created by Tom Redman on 12/6/2024 in #support-community
Not sure why this is throwing a "multiple paginated queries" error
export const getJobsCountWithQueueAndStatus = internalQuery({
args: {
queue: v.string(),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
},
handler: async (ctx, { queue, status }) => {
let continueCursor = null;
let isDone = false;
let page;
const pageSize = 5;

let jobsCount = 0;

while (!isDone) {
console.log(queue, status);

({ continueCursor, isDone, page } = await ctx.db
.query('jobs')
.withIndex('by_queue_status', (q) => q.eq('queue', queue).eq('status', status))
.paginate({
cursor: continueCursor,
numItems: pageSize,
}));
jobsCount += page.length;
}

return jobsCount;
},
});
export const getJobsCountWithQueueAndStatus = internalQuery({
args: {
queue: v.string(),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
},
handler: async (ctx, { queue, status }) => {
let continueCursor = null;
let isDone = false;
let page;
const pageSize = 5;

let jobsCount = 0;

while (!isDone) {
console.log(queue, status);

({ continueCursor, isDone, page } = await ctx.db
.query('jobs')
.withIndex('by_queue_status', (q) => q.eq('queue', queue).eq('status', status))
.paginate({
cursor: continueCursor,
numItems: pageSize,
}));
jobsCount += page.length;
}

return jobsCount;
},
});
Schema:
jobs: defineTable({
args: v.any(),
attempt: v.number(),
dependencies: v.optional(v.array(v.id('jobs'))),
error: v.optional(
v.object({
attempt: v.number(),
message: v.string(),
stack: v.optional(v.string()),
timestamp: v.number(),
}),
),
fnArgs: v.any(),
fnHandle: v.string(),
fnName: v.string(),
fnType: v.union(v.literal('action'), v.literal('mutation'), v.literal('query')),
maxAttempts: v.number(),
metadata: v.optional(v.any()),
nextJobId: v.optional(v.id('jobs')),
priority: v.number(),
progress: v.optional(
v.object({
current: v.number(),
status: v.string(),
total: v.number(),
}),
),
queue: v.string(),
resourceRequirements: v.optional(
v.object({
cpu: v.number(),
memory: v.number(),
timeout: v.number(),
}),
),
result: v.optional(v.any()),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
timing: v.object({
completedAt: v.optional(v.number()),
createdAt: v.number(),
duration: v.optional(v.number()),
processingTime: v.optional(v.number()),
queueTime: v.optional(v.number()),
startedAt: v.optional(v.number()),
}),
}).index('by_queue_status', ['queue', 'status']),
jobs: defineTable({
args: v.any(),
attempt: v.number(),
dependencies: v.optional(v.array(v.id('jobs'))),
error: v.optional(
v.object({
attempt: v.number(),
message: v.string(),
stack: v.optional(v.string()),
timestamp: v.number(),
}),
),
fnArgs: v.any(),
fnHandle: v.string(),
fnName: v.string(),
fnType: v.union(v.literal('action'), v.literal('mutation'), v.literal('query')),
maxAttempts: v.number(),
metadata: v.optional(v.any()),
nextJobId: v.optional(v.id('jobs')),
priority: v.number(),
progress: v.optional(
v.object({
current: v.number(),
status: v.string(),
total: v.number(),
}),
),
queue: v.string(),
resourceRequirements: v.optional(
v.object({
cpu: v.number(),
memory: v.number(),
timeout: v.number(),
}),
),
result: v.optional(v.any()),
status: v.union(
v.literal('pending'),
v.literal('processing'),
v.literal('completed'),
v.literal('failed'),
v.literal('cancelled'),
),
timing: v.object({
completedAt: v.optional(v.number()),
createdAt: v.number(),
duration: v.optional(v.number()),
processingTime: v.optional(v.number()),
queueTime: v.optional(v.number()),
startedAt: v.optional(v.number()),
}),
}).index('by_queue_status', ['queue', 'status']),
Error:
Error: [CONVEX Q(jobs/jobQueue:getJobsCountWithQueueAndStatus)] [Request ID: e735d416db91c2a6] Server Error
Uncaught Error: This query or mutation function ran multiple paginated queries. Convex only supports a single paginated query in each function.
at async handler (../../convex/jobs/jobQueue.ts:54:18)

Called by client
Error: [CONVEX Q(jobs/jobQueue:getJobsCountWithQueueAndStatus)] [Request ID: e735d416db91c2a6] Server Error
Uncaught Error: This query or mutation function ran multiple paginated queries. Convex only supports a single paginated query in each function.
at async handler (../../convex/jobs/jobQueue.ts:54:18)

Called by client
Calling once with arguments:
{
queue: "instagramApi",
status: "completed",
}
{
queue: "instagramApi",
status: "completed",
}
10 replies