Mordsith
Mordsith7mo ago

Database seed with 70,000 items

I've been trying to get past this error: You have an outstanding query call. Operations should be awaited or they might not run. Not awaiting promises might result in unexpected failures. See https://docs.convex.dev/functions/actions#dangling-promises for more information. This is my code, I've tried writing it in many ways I have a npm script that seeds data into a database, this seed is an internal action that is meant to fetch a JSON response from an external source, the size of the JSON response is approximately 7MB. When this action runs, I want to run a mutation that adds each of the item to a table. I'm not able to get past this issue with Promises.
23 Replies
Mordsith
MordsithOP7mo ago
The seed function looks like this:
export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...")
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
// Fetch first 10 results
const diagnosisUploaded = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisInternal, {
paginationOpts: {
numItems: 10,
cursor: null
}
})
// Check if there are are no diagnosis code present
if (diagnosisUploaded.page.length < 1) {
// Run with sceduler
await ctx.scheduler.runAfter(0, internal.init.seedDiagnosis)
console.log("Diagnosis Data Upload in progress")
}
} catch (err) {
console.error("Error seeding diagnosis", err)
}
}
})
export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...")
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
// Fetch first 10 results
const diagnosisUploaded = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisInternal, {
paginationOpts: {
numItems: 10,
cursor: null
}
})
// Check if there are are no diagnosis code present
if (diagnosisUploaded.page.length < 1) {
// Run with sceduler
await ctx.scheduler.runAfter(0, internal.init.seedDiagnosis)
console.log("Diagnosis Data Upload in progress")
}
} catch (err) {
console.error("Error seeding diagnosis", err)
}
}
})
The scheduler calls this action with this code, no console log happens but I confirm it fetches the JSON data with 70,000+ results.
seedDiagnosis action

for (let i = 0; i < diagnosis.length; i += 1) {
await Promise.all(
await diagnosis.map(async ({ code, desc }) => {
const diagnosisFromCode = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
})
);
}

asyncMap(diagnosis, ({ code, desc }) => {
const diagnosisFromCode = await
ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
})
seedDiagnosis action

for (let i = 0; i < diagnosis.length; i += 1) {
await Promise.all(
await diagnosis.map(async ({ code, desc }) => {
const diagnosisFromCode = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
})
);
}

asyncMap(diagnosis, ({ code, desc }) => {
const diagnosisFromCode = await
ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
})
lee
lee7mo ago
you have a ctx.runQuery call that is missing an await (at the bottom of the code you pasted)
Mordsith
MordsithOP7mo ago
@lee That was a typo. I still get the same error with await before ctx.runQuery The asyncMap was a second approach I tried, the for loop was the first, they are meant to achieve the same thing. Both return same error
lee
lee7mo ago
you also need to await the asyncMap
Mordsith
MordsithOP7mo ago
@lee No luck still, error is the same
No description
No description
lee
lee7mo ago
Interesting. Can you try simplifying to just have the loop. No .map or Promise.all or asyncMap The "you have an outstanding query" thing isn't actually an error. It's a warning. It's happening because of a separate error (which unhelpfully appears to have the message "Error") which is short circuiting the Promise.all. I would investigate by making everything serial (remove the Promise.all parallelism) and track down the error
Mordsith
MordsithOP7mo ago
@lee I only get the error when I run the convex query iniside a loop / map... The first null value is the same query that was inisde the loop.
No description
No description
lee
lee7mo ago
Can you call the query inside the loop? Like this
for (const {code, desc} of diagnosis) {
const diagnosisFromCode = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
}
for (const {code, desc} of diagnosis) {
const diagnosisFromCode = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisByCode, {
code
})
console.log(diagnosisFromCode)
}
For my own debugging, is this action in a file with "use node" at the top?
Mordsith
MordsithOP7mo ago
@lee Yes, there is "use node" at the top. This ran without the error
No description
No description
lee
lee7mo ago
Gotcha thanks for checking. If you remove the log line, does it execute successfully without erroring?
lee
lee7mo ago
I think you're running into the limit on concurrent operations within node: https://docs.convex.dev/functions/actions#limits
Actions | Convex Developer Hub
Actions can call third party services to do things such as processing a payment
lee
lee7mo ago
(i'm creating internal tasks because this error message is not helpful)
Mordsith
MordsithOP7mo ago
@lee Yes, it's currently running. I'm seeding 70,000 data, this action is triggered from a scheduler... It's currently at 2k plus.. I see from the link that:
Actions can do up to 1000 concurrent operations, such as executing queries, mutations or performing fetch requests.
Actions can do up to 1000 concurrent operations, such as executing queries, mutations or performing fetch requests.
In my case and from the code shared above, which of the operations were concurrent? Was it referring to the operation inside the loop?
lee
lee7mo ago
when you do ctx.runQuery and await it in a Promise.all with other ctx.runQuery promises, then they are running concurrently
Mordsith
MordsithOP7mo ago
I get it, thanks for that information @lee I was able to seed 9000+ data before hitting this error
No description
Mordsith
MordsithOP7mo ago
Seeding stopped
lee
lee7mo ago
hmm i'm not sure what that transient error could be. I also don't really understand the flow here. Can you describe how seedDiagnosis works? It looks to me like it does a fetch request but doesn't save the data into Convex
Mordsith
MordsithOP7mo ago
@lee
"use node";

// Seed convex data
// https://stack.convex.dev/seeding-data-for-preview-deployments#2-importing-from-the-cli
//
// International Diagnostics Code:
// https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0
import { internalAction } from "@/convex/_generated/server";
import { internal } from "./_generated/api";
import type { Doc } from "./_generated/dataModel";

const DIAGNOSIS_CODES_URL =
"https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0/raw/b31a549638a609004e9a45f8933c3f37bdf4c27d/icd10_codes.json";

export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...");
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
// Fetch first 10 results
const diagnosisUploaded = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisInternal,
{
paginationOpts: {
numItems: 10,
cursor: null,
},
},
);
// Check if there are are no diagnosis code present
if (diagnosisUploaded.page.length < 1) {
// Run with sceduler
await ctx.scheduler.runAfter(0, internal.init.seedDiagnosis);
console.log("Diagnosis Data Upload in progress");
}
} catch (err) {
console.error("Error seeding diagnosis", err);
}
},
});
"use node";

// Seed convex data
// https://stack.convex.dev/seeding-data-for-preview-deployments#2-importing-from-the-cli
//
// International Diagnostics Code:
// https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0
import { internalAction } from "@/convex/_generated/server";
import { internal } from "./_generated/api";
import type { Doc } from "./_generated/dataModel";

const DIAGNOSIS_CODES_URL =
"https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0/raw/b31a549638a609004e9a45f8933c3f37bdf4c27d/icd10_codes.json";

export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...");
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
// Fetch first 10 results
const diagnosisUploaded = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisInternal,
{
paginationOpts: {
numItems: 10,
cursor: null,
},
},
);
// Check if there are are no diagnosis code present
if (diagnosisUploaded.page.length < 1) {
// Run with sceduler
await ctx.scheduler.runAfter(0, internal.init.seedDiagnosis);
console.log("Diagnosis Data Upload in progress");
}
} catch (err) {
console.error("Error seeding diagnosis", err);
}
},
});
export const seedDiagnosis = internalAction({
args: {},
handler: async (ctx) => {
try {
// Fetch all diagnosis code and data. Total: 70,000+
const request = await fetch(DIAGNOSIS_CODES_URL);
const diagnosis: Pick<Doc<"diagnosis">, "code" | "desc">[] =
await request.json();

for (const { code, desc } of diagnosis) {
const diagnosisFromCode = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisByCode,
{
code,
},
);
// Prevent adding duplicate
if (!diagnosisFromCode) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.addDiagnosis,
{
code,
desc,
},
);
}
}
console.log("Diagnosis Data Uploaded");
} catch (err) {
console.error("Error saving diagnosis", err);
}
},
});
export const seedDiagnosis = internalAction({
args: {},
handler: async (ctx) => {
try {
// Fetch all diagnosis code and data. Total: 70,000+
const request = await fetch(DIAGNOSIS_CODES_URL);
const diagnosis: Pick<Doc<"diagnosis">, "code" | "desc">[] =
await request.json();

for (const { code, desc } of diagnosis) {
const diagnosisFromCode = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisByCode,
{
code,
},
);
// Prevent adding duplicate
if (!diagnosisFromCode) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.addDiagnosis,
{
code,
desc,
},
);
}
}
console.log("Diagnosis Data Uploaded");
} catch (err) {
console.error("Error saving diagnosis", err);
}
},
});
Mordsith
MordsithOP7mo ago
I tried to run it today, I got this error
No description
Mordsith
MordsithOP7mo ago
import { internalMutation } from "@/convex/_generated/server";
import { diagnosisFields } from "./schema";

export const addDiagnosis = internalMutation({
args: diagnosisFields,
handler: async (ctx, params) => {
const diagnosis = await ctx.db.insert("diagnosis", params);
return diagnosis;
},
});
import { internalMutation } from "@/convex/_generated/server";
import { diagnosisFields } from "./schema";

export const addDiagnosis = internalMutation({
args: diagnosisFields,
handler: async (ctx, params) => {
const diagnosis = await ctx.db.insert("diagnosis", params);
return diagnosis;
},
});
lee
lee7mo ago
thanks for sharing, now i understand the flow. I would expect this to work, so i'm confused why you're getting this error. But here are some ideas to try: - have addDiagnosis take in a batch of ~100 diagnosis documents, and insert them all in a loop. Having fewer mutations may help them avoid conflicts. - check getDiagnosisByCode to make sure it's using an index. I wouldn't expect a query to contribute to the conflict, but making the query more efficient can't hurt - since it's a one-time operation, you could try a different flow where you curl the endpoint from your computer, construct a csv or jsonl, and use npx convex import to upload the data. This will do the efficient patterns.
Mordsith
MordsithOP7mo ago
@lee getDiagnosisByCode uses an index by_code The batch solution worked for me and it was way faster. Thank you very much. I batched with 1000, this creates 70 scheduled functions. Each of the function run a loop of 1000 items that adds the mutation. All 70k items are in. Are there any violations with using schedulers like this?
"use node";

// Seed convex data
// https://stack.convex.dev/seeding-data-for-preview-deployments#2-importing-from-the-cli
//
// International Diagnostics Code:
// https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0
import { internalAction } from "@/convex/_generated/server";
import { internal } from "./_generated/api";
import type { Doc } from "./_generated/dataModel";
import { diagnosisFields } from "./routes/diagnosis/schema";
import { pick } from "convex-helpers";
import { v } from "convex/values";

const DIAGNOSIS_CODES_URL =
"https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0/raw/b31a549638a609004e9a45f8933c3f37bdf4c27d/icd10_codes.json";

const TOTAL_DIAGNOSIS = 71000
const CHUNK_SIZE = 1000
"use node";

// Seed convex data
// https://stack.convex.dev/seeding-data-for-preview-deployments#2-importing-from-the-cli
//
// International Diagnostics Code:
// https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0
import { internalAction } from "@/convex/_generated/server";
import { internal } from "./_generated/api";
import type { Doc } from "./_generated/dataModel";
import { diagnosisFields } from "./routes/diagnosis/schema";
import { pick } from "convex-helpers";
import { v } from "convex/values";

const DIAGNOSIS_CODES_URL =
"https://gist.githubusercontent.com/cryocaustik/b86de96e66489ada97c25fc25f755de0/raw/b31a549638a609004e9a45f8933c3f37bdf4c27d/icd10_codes.json";

const TOTAL_DIAGNOSIS = 71000
const CHUNK_SIZE = 1000
export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...");
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
const diagnosisUploaded = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisCount)
// Check if there are are no diagnosis code present
if (diagnosisUploaded < TOTAL_DIAGNOSIS) {
// Fetch all diagnosis code and data. Total: 70,000+
const request = await fetch(DIAGNOSIS_CODES_URL);
const diagnosis: Pick<Doc<"diagnosis">, "code" | "desc">[] =
await request.json();
// Split and process as a batch (max 7), with 10,000 in a batch
for (let i = 0; i < diagnosis.length; i += CHUNK_SIZE) {
// This chunk contain max of 10,000 items
const chunk = diagnosis.slice(i, i + CHUNK_SIZE);
await ctx.scheduler.runAfter(0, internal.init.processBatch, {
diagnosis: chunk
})
}
console.log("Diagnosis Data Upload in progress");
}
} catch (err) {
console.error("Error seeding diagnosis", err);
}
console.log("Seed run complete")
},
});
export const seed = internalAction({
args: {},
handler: async (ctx) => {
console.log("Start Diagnosis seed...");
// Diagnosis Data is about 7-8MB, run this seed once to speed up deployment
// and to prevent seeding large amount of data all the time.
try {
const diagnosisUploaded = await ctx.runQuery(internal.routes.diagnosis.internal_queries.getDiagnosisCount)
// Check if there are are no diagnosis code present
if (diagnosisUploaded < TOTAL_DIAGNOSIS) {
// Fetch all diagnosis code and data. Total: 70,000+
const request = await fetch(DIAGNOSIS_CODES_URL);
const diagnosis: Pick<Doc<"diagnosis">, "code" | "desc">[] =
await request.json();
// Split and process as a batch (max 7), with 10,000 in a batch
for (let i = 0; i < diagnosis.length; i += CHUNK_SIZE) {
// This chunk contain max of 10,000 items
const chunk = diagnosis.slice(i, i + CHUNK_SIZE);
await ctx.scheduler.runAfter(0, internal.init.processBatch, {
diagnosis: chunk
})
}
console.log("Diagnosis Data Upload in progress");
}
} catch (err) {
console.error("Error seeding diagnosis", err);
}
console.log("Seed run complete")
},
});
export const processBatch = internalAction({
args: {
diagnosis: v.array(v.object(pick(diagnosisFields, ["code", "desc"])))
},
handler: async (ctx, { diagnosis }) => {
for (const { code, desc } of diagnosis) {
// Run with sceduler
try {
await ctx.runAction(internal.init.seedDiagnosis, {
code,
desc
});
} catch (err) {
console.log("Diagnosis Seed Batch Process Error", err)
}
}
}
})

export const seedDiagnosis = internalAction({
args: pick(diagnosisFields, ["code", "desc"]),
handler: async (ctx, { code, desc }) => {
try {
const diagnosisFromCode = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisByCode,
{
code,
},
);
// Prevent adding duplicate
if (!diagnosisFromCode) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.addDiagnosis,
{
code,
desc,
searchable: `${code} ${desc}`
},
);
} else if (!diagnosisFromCode.searchable) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.updateDiagnosis,
{
diagnosisId: diagnosisFromCode._id,
searchable: `${code} ${desc}`
},
);
}
console.log("Diagnosis Data Uploaded");
} catch (err) {
console.error("Error saving diagnosis", err);
}
},
});
export const processBatch = internalAction({
args: {
diagnosis: v.array(v.object(pick(diagnosisFields, ["code", "desc"])))
},
handler: async (ctx, { diagnosis }) => {
for (const { code, desc } of diagnosis) {
// Run with sceduler
try {
await ctx.runAction(internal.init.seedDiagnosis, {
code,
desc
});
} catch (err) {
console.log("Diagnosis Seed Batch Process Error", err)
}
}
}
})

export const seedDiagnosis = internalAction({
args: pick(diagnosisFields, ["code", "desc"]),
handler: async (ctx, { code, desc }) => {
try {
const diagnosisFromCode = await ctx.runQuery(
internal.routes.diagnosis.internal_queries.getDiagnosisByCode,
{
code,
},
);
// Prevent adding duplicate
if (!diagnosisFromCode) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.addDiagnosis,
{
code,
desc,
searchable: `${code} ${desc}`
},
);
} else if (!diagnosisFromCode.searchable) {
await ctx.runMutation(
internal.routes.diagnosis.internal_mutations.updateDiagnosis,
{
diagnosisId: diagnosisFromCode._id,
searchable: `${code} ${desc}`
},
);
}
console.log("Diagnosis Data Uploaded");
} catch (err) {
console.error("Error saving diagnosis", err);
}
},
});
lee
lee7mo ago
Awesome! Sounds like a good usage of the scheduler to me

Did you find this page helpful?