sbkl
sbkl6d ago

Streaming import API takes 3 times longer than regular functions

Just tried the streaming import api and uploaded 100k rows for test. While it was taking 2 min or so with regular queries (for database consistency check) and mutations, it takes 8 min with this import api which is supposed to be faster and more efficient at uploading big dataset than the regular function api. Below the code I am using within an action.
Convex HTTP API | Convex Developer Hub
Connecting to Convex directly with HTTP
4 Replies
Convex Bot
Convex Bot6d ago
Thanks for posting in <#1088161997662724167>. Reminder: If you have a Convex Pro account, use the Convex Dashboard to file support tickets. - Provide context: What are you trying to achieve, what is the end-user interaction, what are you seeing? (full error message, command output, etc.) - Use search.convex.dev to search Docs, Stack, and Discord all at once. - Additionally, you can post your questions in the Convex Community's <#1228095053885476985> channel to receive a response from AI. - Avoid tagging staff unless specifically instructed. Thank you!
sbkl
sbklOP6d ago
// index/primary key was added with /api/streaming_import/add_primary_key_indexes and confirmed active with /api/streaming_import/primary_key_indexes_ready
const primaryKey = [
["organisationId"],
["collectionId"],
["regionId"],
["marketId"],
["storeId"],
["materialExternalId"],
["articleExternalId"],
];

for (let i = 0; i < batches.length; i += 10000) {
const batch = batches.slice(i, i + 10000);
const data = {
tables: {
collectionAllocations: {
primaryKey,
jsonSchema: {
type: "object",
properties: {
organisationId: {
type: "id",
tableName: "organisations",
},
collectionId: { type: "id", tableName: "collections" },
regionId: { type: "id", tableName: "regions" },
marketId: { type: "id", tableName: "markets" },
storeId: { type: "id", tableName: "stores" },
materialExternalId: { type: "string" },
articleExternalId: { type: "string" },
units: {
type: "union",
value: [
{ type: "number" },
{ type: "null" },
{ type: "literal", value: "x" },
],
},
},
},
},
},
messages: batch.map((row) => {
return {
tableName: "collectionAllocations",
data: row,
};
}),
};

const response = await fetch(
`${env.CONVEX_URL}/api/streaming_import/import_airbyte_records`,
{
method: "POST",
headers: {
Accept: "application/json",
"Content-Type": "application/json",
"Convex-Client": "streaming-import-0.1.0",
Authorization: `Convex ${env.DEPLOY_KEY}`,
},
body: JSON.stringify(data),
},
);
}
// index/primary key was added with /api/streaming_import/add_primary_key_indexes and confirmed active with /api/streaming_import/primary_key_indexes_ready
const primaryKey = [
["organisationId"],
["collectionId"],
["regionId"],
["marketId"],
["storeId"],
["materialExternalId"],
["articleExternalId"],
];

for (let i = 0; i < batches.length; i += 10000) {
const batch = batches.slice(i, i + 10000);
const data = {
tables: {
collectionAllocations: {
primaryKey,
jsonSchema: {
type: "object",
properties: {
organisationId: {
type: "id",
tableName: "organisations",
},
collectionId: { type: "id", tableName: "collections" },
regionId: { type: "id", tableName: "regions" },
marketId: { type: "id", tableName: "markets" },
storeId: { type: "id", tableName: "stores" },
materialExternalId: { type: "string" },
articleExternalId: { type: "string" },
units: {
type: "union",
value: [
{ type: "number" },
{ type: "null" },
{ type: "literal", value: "x" },
],
},
},
},
},
},
messages: batch.map((row) => {
return {
tableName: "collectionAllocations",
data: row,
};
}),
};

const response = await fetch(
`${env.CONVEX_URL}/api/streaming_import/import_airbyte_records`,
{
method: "POST",
headers: {
Accept: "application/json",
"Content-Type": "application/json",
"Convex-Client": "streaming-import-0.1.0",
Authorization: `Convex ${env.DEPLOY_KEY}`,
},
body: JSON.stringify(data),
},
);
}
Batched by 10k rows and all rows are uploaded to the table. So the jsonSchema, primaryKey and messages seems to work fine. Am I missing anything?
sbkl.
sbkl.6d ago
and my implementation with the regular queries and mutations within an action also does an upsert check and also delete relevant records.
erquhart
erquhart5d ago
Commented this on your other post as well but I'll add it here: I don't know if streaming import is supposed to be faster or more efficient, the point of it is to support large dataset imports, which is difficult to do reliably with regular functions as you've seen. Apart from the time it takes, is everything working with the import?

Did you find this page helpful?