MrBithles
MrBithles3d ago

conflict write warnings, and excessive DB reads

I have lots of write warnings, and my function, which is only inserting, is causing high DB read bandwidth usage. This is the code for the function:
No description
No description
No description
3 Replies
MrBithles
MrBithlesOP3d ago
ts

export const createJobRun = serverApiMutation({
args: v.object({
jobRun: v.object(jobRun),
}),
handler: async (ctx, { jobRun }) => {
await ctx.db.insert('jobRun', {
...jobRun,
crawlType: jobRun.crawlType ?? 'fetch',
crawlDuration: jobRun.crawlDuration ?? 0,
});
},
});

ts

export const createJobRun = serverApiMutation({
args: v.object({
jobRun: v.object(jobRun),
}),
handler: async (ctx, { jobRun }) => {
await ctx.db.insert('jobRun', {
...jobRun,
crawlType: jobRun.crawlType ?? 'fetch',
crawlDuration: jobRun.crawlDuration ?? 0,
});
},
});

It’s a little more complex as the mutation is wrapped to also accept an apiKey as arg (why it’s called serverApiMutation) and also have a custom trigger to call aggregate table update.

The code for the trigger looks like this:


ts

triggers.register('jobRun', async (ctx, change) => {
// aggregate updates
await jobRunsByStatus.trigger()(ctx, change);

// business logic on insert to update parent/link record
if (change.operation === 'insert' && change.newDoc) {
const jobRun = change.newDoc;
const job = await ctx.db.get(jobRun.jobId); // <— I think this is the cause for large bandwidth usage, but no idea why

if (job && jobRun) {
// UPDATE JOB WITH LATEST RUN INFO
const jobPatch: Partial<Job> = {
status: jobRun?.status === 'error' ? 'failing' : 'active',
crawlType: jobRun?.crawlType ?? job.crawlType,
};
// Only update job values if they are not already set
if ((!job.name || job.name === job.url) && jobRun?.name) {
jobPatch.name = jobRun.name;
}
if (!job.image && jobRun?.image) {
jobPatch.image = jobRun.image;
}
if (!job.initialPrice && !(jobRun?.price === undefined || jobRun?.price === null)) {
jobPatch.initialPrice = parsePrice(jobRun.price);
}
await ctx.db.patch(job._id, jobPatch); // <— I think this is the cause of the write conflictss
}
}

if (change.operation === 'update' && change.newDoc && change.oldDoc) {
if (typeof change.newDoc.crawlRatingGood === 'boolean') {
ctx.scheduler.runAfter(0, internal.utils.analytics.capture, {
distinctId: change.newDoc._id,
event: 'Crawl Rating',
properties: {
jobId: change.newDoc.jobId,
good: change.newDoc.crawlRatingGood,
},
});
}
}
});

ts

triggers.register('jobRun', async (ctx, change) => {
// aggregate updates
await jobRunsByStatus.trigger()(ctx, change);

// business logic on insert to update parent/link record
if (change.operation === 'insert' && change.newDoc) {
const jobRun = change.newDoc;
const job = await ctx.db.get(jobRun.jobId); // <— I think this is the cause for large bandwidth usage, but no idea why

if (job && jobRun) {
// UPDATE JOB WITH LATEST RUN INFO
const jobPatch: Partial<Job> = {
status: jobRun?.status === 'error' ? 'failing' : 'active',
crawlType: jobRun?.crawlType ?? job.crawlType,
};
// Only update job values if they are not already set
if ((!job.name || job.name === job.url) && jobRun?.name) {
jobPatch.name = jobRun.name;
}
if (!job.image && jobRun?.image) {
jobPatch.image = jobRun.image;
}
if (!job.initialPrice && !(jobRun?.price === undefined || jobRun?.price === null)) {
jobPatch.initialPrice = parsePrice(jobRun.price);
}
await ctx.db.patch(job._id, jobPatch); // <— I think this is the cause of the write conflictss
}
}

if (change.operation === 'update' && change.newDoc && change.oldDoc) {
if (typeof change.newDoc.crawlRatingGood === 'boolean') {
ctx.scheduler.runAfter(0, internal.utils.analytics.capture, {
distinctId: change.newDoc._id,
event: 'Crawl Rating',
properties: {
jobId: change.newDoc.jobId,
good: change.newDoc.crawlRatingGood,
},
});
}
}
});

I have attached a screenshot fo the warnings, what is strange is the job _ids reference in the warning don’t exist. 
 The bandwidth seems strange that the each call to insert a jobRun is reading 102 documents, should only be 1?
Clever Tagline
I haven't used triggers before, but I know the basic concept. Something tells me you've got some recursive triggering going on because writing to the jobRun table triggers further changes in the same table. On the surface it looks like you've checked for this, but there are parts of your logic that you haven't shared here. My only suggestion would be to review everything involved and scrutinize the logic very carefully to see if you can find anything that might lead to recursive behavior.
MrBithles
MrBithlesOP2d ago
I have reviewed code, and nothing obv on my side stood out. I only have one other trigger on the job table to handle aggregates and deleting jobRuns (to work similarly to SQL DB Cascading).
triggers.register('job', async (ctx, change) => {
// Handle aggregate updates
await enabledJobs.trigger()(ctx, change);
await jobsByStatus.trigger()(ctx, change);
await jobsByCrawlType.trigger()(ctx, change);
await jobsByUser.trigger()(ctx, change);

if (change.operation === 'delete') {
for await (const jobRun of ctx.db.query('jobRun').withIndex('by_jobId', (q) => q.eq('jobId', change.id))) {
await ctx.db.delete(jobRun._id);
}
}
});
triggers.register('job', async (ctx, change) => {
// Handle aggregate updates
await enabledJobs.trigger()(ctx, change);
await jobsByStatus.trigger()(ctx, change);
await jobsByCrawlType.trigger()(ctx, change);
await jobsByUser.trigger()(ctx, change);

if (change.operation === 'delete') {
for await (const jobRun of ctx.db.query('jobRun').withIndex('by_jobId', (q) => q.eq('jobId', change.id))) {
await ctx.db.delete(jobRun._id);
}
}
});
I like convex, but the aggregates and trigger solution seem half-complete. For example, I have to constantly remind myself not to edit data in the UI dashboard, else it breaks the whole DB. And triggers do feel like a footgun waiting to happen.

Did you find this page helpful?