Indy
Indy2w ago

Workpool

If you're using the oncomplete to schedule the next batch, then Workpool might be a better answer. Depending on what you're inserting (can they be inserted in parallel, have to be linear) you may need to do this: https://www.convex.dev/components/workpool#optimize-occ-errors
Convex
Workpool
Workpools give critical tasks priority by organizing async operations into separate, customizable queues.
3 Replies
dejeszio
dejeszio2w ago
Hey @Indy , so I currently use this code - as you can see, I use the action-retrier, the caching of an action result, alongside the oncomplete. This seems to do the job just fine, but I am unsure whether this follows the best practices, I think it does - but if you have any input that would be nice, thanks. I don't think this needs a workpool to be successful?
const CHUNK_SIZE = 1000;

export const kickOffCoinsSyncActions = internalAction({
handler: async (ctx) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
const total = response.data.length;
console.log(`🔍 total coins to process: ${total}`);
const numChunks = Math.ceil(total / CHUNK_SIZE);

for (let chunkIndex = 0; chunkIndex < numChunks; chunkIndex++) {
await retrier.run(
ctx,
internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoAction,
{ chunkIndex },
{
onComplete: internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoCallback,
base: 2,
initialBackoffMs: 1000,
logLevel: "DEBUG",
maxFailures: 2,
}
);
}
}
});

// The action that fetches the full coin list from CoinGecko
export const fetchAllCoinsFromCoinGecko = internalAction({
handler: async (ctx, args) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
return response.data; // or response, depending on your helper
}
});

export const fetchCoinsFromCoinGeckoAction = internalAction({
args: { chunkIndex: v.number() },
handler: async (ctx, { chunkIndex }) => {
// Only fetch the chunk you need
const chunk: CoinGeckoCoinResponse[] = await coinsChunkCache.fetch(ctx, { chunkIndex });
return chunk;
}
});

export const fetchAllCoinsChunkFromCoinGecko = internalAction({
args: { chunkIndex: v.number() },
handler: async (ctx, { chunkIndex }) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
const allCoins: CoinGeckoCoinResponse[] = response.data;
// Split into chunks
const start = chunkIndex * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, allCoins.length);
return allCoins.slice(start, end);
}
});

const coinsChunkCache = new ActionCache(components.actionCache, {
action: internal.coingeckov2.fetchCoins.fetchAllCoinsChunkFromCoinGecko,
name: "coingecko-coins-chunk-v1",
ttl: 1000 * 60 * 10, // 10 minutes
});

export const fetchCoinsFromCoinGeckoCallback = internalMutation({
args: onCompleteValidator,
handler: async (ctx, args) => {
if (args.result.type === "success") {
const coins: CoinGeckoCoinResponse[] = args.result.returnValue;
if (!coins || coins.length === 0) {
throw new Error(`❌ No coins returned from CoinGecko in callback ${args.runId}`);
}
await CoinGeckoCoins.upsertBatch(ctx, coins.map(coin => ({
id: coin.id,
symbol: coin.symbol,
name: coin.name,
platforms: coin.platforms,
})));
}
else if (args.result.type === "failed") {
console.error(`❌ Failed to fetch coins from CoinGecko ${args.runId}`);
}
else if (args.result.type === "canceled") {
console.error(`❌ Canceled fetching coins from CoinGecko ${args.runId}`);
}
}
});
const CHUNK_SIZE = 1000;

export const kickOffCoinsSyncActions = internalAction({
handler: async (ctx) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
const total = response.data.length;
console.log(`🔍 total coins to process: ${total}`);
const numChunks = Math.ceil(total / CHUNK_SIZE);

for (let chunkIndex = 0; chunkIndex < numChunks; chunkIndex++) {
await retrier.run(
ctx,
internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoAction,
{ chunkIndex },
{
onComplete: internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoCallback,
base: 2,
initialBackoffMs: 1000,
logLevel: "DEBUG",
maxFailures: 2,
}
);
}
}
});

// The action that fetches the full coin list from CoinGecko
export const fetchAllCoinsFromCoinGecko = internalAction({
handler: async (ctx, args) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
return response.data; // or response, depending on your helper
}
});

export const fetchCoinsFromCoinGeckoAction = internalAction({
args: { chunkIndex: v.number() },
handler: async (ctx, { chunkIndex }) => {
// Only fetch the chunk you need
const chunk: CoinGeckoCoinResponse[] = await coinsChunkCache.fetch(ctx, { chunkIndex });
return chunk;
}
});

export const fetchAllCoinsChunkFromCoinGecko = internalAction({
args: { chunkIndex: v.number() },
handler: async (ctx, { chunkIndex }) => {
const response = await callCoinGeckoApiHelper(`coins/list?include_platform=true`, { method: "GET" }, 3);
const allCoins: CoinGeckoCoinResponse[] = response.data;
// Split into chunks
const start = chunkIndex * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, allCoins.length);
return allCoins.slice(start, end);
}
});

const coinsChunkCache = new ActionCache(components.actionCache, {
action: internal.coingeckov2.fetchCoins.fetchAllCoinsChunkFromCoinGecko,
name: "coingecko-coins-chunk-v1",
ttl: 1000 * 60 * 10, // 10 minutes
});

export const fetchCoinsFromCoinGeckoCallback = internalMutation({
args: onCompleteValidator,
handler: async (ctx, args) => {
if (args.result.type === "success") {
const coins: CoinGeckoCoinResponse[] = args.result.returnValue;
if (!coins || coins.length === 0) {
throw new Error(`❌ No coins returned from CoinGecko in callback ${args.runId}`);
}
await CoinGeckoCoins.upsertBatch(ctx, coins.map(coin => ({
id: coin.id,
symbol: coin.symbol,
name: coin.name,
platforms: coin.platforms,
})));
}
else if (args.result.type === "failed") {
console.error(`❌ Failed to fetch coins from CoinGecko ${args.runId}`);
}
else if (args.result.type === "canceled") {
console.error(`❌ Canceled fetching coins from CoinGecko ${args.runId}`);
}
}
});
Indy
IndyOP2w ago
If this works for you great! The main part that gives me pause is this loop you have in the kickOffFunction
for (let chunkIndex = 0; chunkIndex < numChunks; chunkIndex++) {
await retrier.run(
ctx,
internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoAction,
{ chunkIndex },
{
onComplete: internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoCallback,
base: 2,
initialBackoffMs: 1000,
logLevel: "DEBUG",
maxFailures: 2,
}
);
}
for (let chunkIndex = 0; chunkIndex < numChunks; chunkIndex++) {
await retrier.run(
ctx,
internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoAction,
{ chunkIndex },
{
onComplete: internal.coingeckov2.fetchCoins.fetchCoinsFromCoinGeckoCallback,
base: 2,
initialBackoffMs: 1000,
logLevel: "DEBUG",
maxFailures: 2,
}
);
}
Indy
IndyOP2w ago
If numChunks is very large, and your "upsert" function
await CoinGeckoCoins.upsertBatch(ctx, coins.map(coin => ({
await CoinGeckoCoins.upsertBatch(ctx, coins.map(coin => ({
If you are writing over the same rows, then you might be in trouble with repeatedly trying to write on the same records that keep getting invalidated by another mutation. But if they are independent records that you're likely ok. IIUC you'll only be doing 17 of these batches at the same time, so you'll likely be fine w.r.t. parallelization limits. But if you expect there to be a lot of writes that contend with each other, and if the number of batches grows quite a bit more, then you'll want to switch the retrier with a workpool to ensure you fit within the Convex parallel function limits: https://docs.convex.dev/production/state/limits#concurrent-function-executions
Limits | Convex Developer Hub
We’d love for you to have unlimited joy building on Convex but engineering

Did you find this page helpful?