ari-cake
ari-cake•2mo ago

Work Stealing API

Small library I've now used in two projects to implement work-stealing. Some compute is better handled on own infrastructure. Not open-source yet, because putting it in a package is work - but might do it if people find it interesting.
No description
2 Replies
ari-cake
ari-cakeOP•2mo ago
Goal: Make it simple (<20 lines of code, mostly copy/paste) to add a new work-queue to a convex project. Sample server:
// Server (in convex)
import { v } from "convex/values";
import { createWorkerQueue, workerMutation } from "./worker/createWorkerQueue";

// To define a queue, you use the createWorkerQueue function
// All you need to do is provide an async function that returns all items that have work to do.
export const { hasWork, stealWork, markDone, clearQueue } = createWorkerQueue(
"encode",
async (ctx) =>
ctx.db
.query("queue")
.withIndex("by_encode_prio", (q) => q.eq("status", "done").gt("encode_completed_prio", 0))
.order("desc")
.collect(),
);

export const setCompletedPriority = workerMutation({
args: {
workerKey: v.string(),
item: v.id("queue"),
newPrio: v.number(),
},
handler(ctx, args) {
ctx.db.patch(args.item, {
encode_completed_prio: args.newPrio,
});
},
});
// Server (in convex)
import { v } from "convex/values";
import { createWorkerQueue, workerMutation } from "./worker/createWorkerQueue";

// To define a queue, you use the createWorkerQueue function
// All you need to do is provide an async function that returns all items that have work to do.
export const { hasWork, stealWork, markDone, clearQueue } = createWorkerQueue(
"encode",
async (ctx) =>
ctx.db
.query("queue")
.withIndex("by_encode_prio", (q) => q.eq("status", "done").gt("encode_completed_prio", 0))
.order("desc")
.collect(),
);

export const setCompletedPriority = workerMutation({
args: {
workerKey: v.string(),
item: v.id("queue"),
newPrio: v.number(),
},
handler(ctx, args) {
ctx.db.patch(args.item, {
encode_completed_prio: args.newPrio,
});
},
});
Sample Client:
// Client (node.js, bun, ...)
import { ConvexClient } from "convex/browser";
import { runQueue } from "./queue.ts";
import { api } from "../convex/_generated/api";

const convex = new ConvexClient(process.env.VITE_CONVEX_URL as string, {});
const workerKey = process.env.WORKER_KEY as string;

// As soon as there's work, they get assigned.
// In this case, up to 5 jobs are executed in parallel by this worker
await runQueue(convex, workerKey, 5, api.encodeQueue, async (item) => {
// As soon as there's work, they get assigned.
console.log(now(), "working on: ", item._id, "prio: ", item.etaSecs);

await new Promise((resolve) => setTimeout(resolve, 5000)); // Fake doing actual work.

// And here's the *one* invariant you have to keep in mind:
// At the end of the `runQueue` function, the database must be in a state
// where the function in createWorkerQueue doens't return the item anymore.
await convex.mutation(api.encodeQueue.setCompletedPriority, {
workerKey,
item: item._id,
newPrio: 0,
});

console.log("item", item._id, "complete!");
});

function now() {
return new Date().toLocaleTimeString();
}
// Client (node.js, bun, ...)
import { ConvexClient } from "convex/browser";
import { runQueue } from "./queue.ts";
import { api } from "../convex/_generated/api";

const convex = new ConvexClient(process.env.VITE_CONVEX_URL as string, {});
const workerKey = process.env.WORKER_KEY as string;

// As soon as there's work, they get assigned.
// In this case, up to 5 jobs are executed in parallel by this worker
await runQueue(convex, workerKey, 5, api.encodeQueue, async (item) => {
// As soon as there's work, they get assigned.
console.log(now(), "working on: ", item._id, "prio: ", item.etaSecs);

await new Promise((resolve) => setTimeout(resolve, 5000)); // Fake doing actual work.

// And here's the *one* invariant you have to keep in mind:
// At the end of the `runQueue` function, the database must be in a state
// where the function in createWorkerQueue doens't return the item anymore.
await convex.mutation(api.encodeQueue.setCompletedPriority, {
workerKey,
item: item._id,
newPrio: 0,
});

console.log("item", item._id, "complete!");
});

function now() {
return new Date().toLocaleTimeString();
}
Why work stealing? Video encoding needs stuff like ffmpeg, and I also had issues with yt-dlp, etc. With this you can also call out to other programming languages etc. In short: it removes the restrictions of convex' envrionments, and allows to use cheap compute where needed. The performance hit is very low - jobs start with a latency <50ms in my experience (depends on ping), and the results are pushed to server instantly. It's just as reactive, so it's transparent to the user. It's quite a nifty way that imo is almost easier to use than scheduled actions, and gets rid of some of their restrictions. it also supports multiple workers on different machine processing the queue Actually, here's the code (MIT / Public Domain): Worker Side:
import type { Id } from "../convex/_generated/dataModel";
import type { ConvexClient } from "convex/browser";
import type { FunctionReference } from "convex/server";

interface QueueApi<Task> {
hasWork: FunctionReference<"query", "public", { workerKey: string }, boolean>;
stealWork: FunctionReference<
"mutation",
"public",
{ workerKey: string },
{
taskId: Id<"workerWip">;
queueItem: Task;
} | null
>;
markDone: FunctionReference<
"mutation",
"public",
{ workerKey: string; workId: Id<"workerWip">; error?: string | undefined },
boolean
>;
clearQueue: FunctionReference<"mutation", "public", { workerKey: string }, number>;
}

export async function runQueue<Task extends { _id: string }>(
convex: ConvexClient,
workerKey: string,
concurrency: number,
queue: QueueApi<Task>,
process: (t: Task) => Promise<void>,
): Promise<never> {
console.log("clearing queue, just in case");
const numDeleted = await convex.mutation(queue.clearQueue, { workerKey });
console.log("Removed", numDeleted, "docs with wip status");

const limiter = new ConcurrencyLimiter(concurrency);

while (true) {
await limiter.limit();
const job = await stealWork(convex, workerKey, queue);

const promise = process(job.queueItem)
.then(async () => {
await convex.mutation(queue.markDone, {
workerKey,
workId: job.taskId,
});
})
.catch(async (e) => {
console.error(e);
await convex.mutation(queue.markDone, {
workerKey,
workId: job.taskId,
error: String(e),
});
});
limiter.addPromise(promise);
}
}

async function stealWork<Task>(
convex: ConvexClient,
workerKey: string,
queue: QueueApi<Task>,
): Promise<{
taskId: Id<"workerWip">;
queueItem: Task;
}> {
while (true) {
const item = await convex.mutation(queue.stealWork, {
workerKey,
});
if (item != null) {
return item;
}

const { promise, resolve } = Promise.withResolvers<void>();
console.log("Waiting for job on queue");
const unsubscribe = convex.onUpdate(queue.hasWork, { workerKey }, (update) => {
if (update) resolve();
});

await promise;
console.log("got potential job :)");
unsubscribe();
}
}

class ConcurrencyLimiter {
#inFlight = new Set<Promise<void>>();
#maxParallel: number;
#promise = Promise.withResolvers<void>();

constructor(maxParallel: number) {
this.#maxParallel = maxParallel;
}

addPromise(p: Promise<void>) {
this.#inFlight.add(p);

void p.finally(() => {
this.#inFlight.delete(p);

if (this.#inFlight.size < this.#maxParallel) {
this.#promise.resolve();
this.#promise = Promise.withResolvers<void>();
}
});
}

limit() {
if (this.#inFlight.size < this.#maxParallel) {
return Promise.resolve();
}

return this.#promise.promise;
}
}
import type { Id } from "../convex/_generated/dataModel";
import type { ConvexClient } from "convex/browser";
import type { FunctionReference } from "convex/server";

interface QueueApi<Task> {
hasWork: FunctionReference<"query", "public", { workerKey: string }, boolean>;
stealWork: FunctionReference<
"mutation",
"public",
{ workerKey: string },
{
taskId: Id<"workerWip">;
queueItem: Task;
} | null
>;
markDone: FunctionReference<
"mutation",
"public",
{ workerKey: string; workId: Id<"workerWip">; error?: string | undefined },
boolean
>;
clearQueue: FunctionReference<"mutation", "public", { workerKey: string }, number>;
}

export async function runQueue<Task extends { _id: string }>(
convex: ConvexClient,
workerKey: string,
concurrency: number,
queue: QueueApi<Task>,
process: (t: Task) => Promise<void>,
): Promise<never> {
console.log("clearing queue, just in case");
const numDeleted = await convex.mutation(queue.clearQueue, { workerKey });
console.log("Removed", numDeleted, "docs with wip status");

const limiter = new ConcurrencyLimiter(concurrency);

while (true) {
await limiter.limit();
const job = await stealWork(convex, workerKey, queue);

const promise = process(job.queueItem)
.then(async () => {
await convex.mutation(queue.markDone, {
workerKey,
workId: job.taskId,
});
})
.catch(async (e) => {
console.error(e);
await convex.mutation(queue.markDone, {
workerKey,
workId: job.taskId,
error: String(e),
});
});
limiter.addPromise(promise);
}
}

async function stealWork<Task>(
convex: ConvexClient,
workerKey: string,
queue: QueueApi<Task>,
): Promise<{
taskId: Id<"workerWip">;
queueItem: Task;
}> {
while (true) {
const item = await convex.mutation(queue.stealWork, {
workerKey,
});
if (item != null) {
return item;
}

const { promise, resolve } = Promise.withResolvers<void>();
console.log("Waiting for job on queue");
const unsubscribe = convex.onUpdate(queue.hasWork, { workerKey }, (update) => {
if (update) resolve();
});

await promise;
console.log("got potential job :)");
unsubscribe();
}
}

class ConcurrencyLimiter {
#inFlight = new Set<Promise<void>>();
#maxParallel: number;
#promise = Promise.withResolvers<void>();

constructor(maxParallel: number) {
this.#maxParallel = maxParallel;
}

addPromise(p: Promise<void>) {
this.#inFlight.add(p);

void p.finally(() => {
this.#inFlight.delete(p);

if (this.#inFlight.size < this.#maxParallel) {
this.#promise.resolve();
this.#promise = Promise.withResolvers<void>();
}
});
}

limit() {
if (this.#inFlight.size < this.#maxParallel) {
return Promise.resolve();
}

return this.#promise.promise;
}
}
Server side:
import { customMutation, customQuery } from "convex-helpers/server/customFunctions";
import { ConvexError, v } from "convex/values";
import { mutation, query, type QueryCtx } from "../_generated/server";

export const workerQuery = customQuery(query, {
args: {
workerKey: v.string(),
},
async input(ctx, args) {
const worker = await ctx.db
.query("workerConfig")
.withIndex("by_token", (a) => a.eq("token", args.workerKey))
.unique();

if (!worker) {
throw new ConvexError("Wrong credentials");
}

return {
ctx: {
...ctx,
worker: worker?._id,
},
args: {},
};
},
});

export const workerMutation = customMutation(mutation, {
args: {
workerKey: v.string(),
},
async input(ctx, args) {
const worker = await ctx.db
.query("workerConfig")
.withIndex("by_token", (a) => a.eq("token", args.workerKey))
.unique();

if (!worker) {
throw new ConvexError("Wrong credentials");
}

return {
ctx: {
...ctx,
worker: worker?._id,
},
args: {},
};
},
});

export function createWorkerQueue<T extends { _id: string }>(
queueName: string,
getTodoItems: (ctx: QueryCtx) => Promise<T[]>,
) {
async function getFreeItem(ctx: QueryCtx): Promise<T | null> {
const items = await getTodoItems(ctx);

for (const item of items) {
const todoItem = await ctx.db
.query("workerWip")
.withIndex("queue", (a) => a.eq("queue", queueName).eq("itemId", item._id))
.unique();
if (todoItem == null) {
return item;
}
}

return null;
}

return {
hasWork: workerQuery({
args: {
workerKey: v.string(),
},
async handler(ctx) {
return (await getFreeItem(ctx)) != null;
},
}),

stealWork: workerMutation({
args: {
workerKey: v.string(),
},
async handler(ctx) {
const workItem = await getFreeItem(ctx);
if (workItem == null) {
return null;
}

const workId = await ctx.db.insert("workerWip", {
itemId: workItem._id,
queue: queueName,
status: "started",
worker: ctx.worker,
heartbeat: Date.now(),
});

return {
taskId: workId,
queueItem: workItem,
};
},
}),

markDone: workerMutation({
args: {
workerKey: v.string(),
workId: v.id("workerWip"),
error: v.optional(v.string()),
},
async handler(ctx, args) {
const item = await ctx.db.get(args.workId);
if (!item) throw new ConvexError("Work does not exist");

if (args.error) {
await ctx.db.patch(args.workId, {
error: args.error,
});
return true;
}

await ctx.db.delete(args.workId);

const todo = await getTodoItems(ctx);
if (todo.find((a) => a._id === item.itemId)) {
throw new ConvexError("We marked an item as done, but it's still on the todo list.");
}

return true;
},
}),

clearQueue: workerMutation({
args: { workerKey: v.string() },
async handler(ctx) {
const toDelete = await ctx.db
.query("workerWip")
.withIndex("worker", (a) => a.eq("worker", ctx.worker).eq("queue", queueName))
.collect();

await Promise.all(toDelete.map((doc) => ctx.db.delete(doc._id)));

return toDelete.length;
},
}),
} as const;
}
import { customMutation, customQuery } from "convex-helpers/server/customFunctions";
import { ConvexError, v } from "convex/values";
import { mutation, query, type QueryCtx } from "../_generated/server";

export const workerQuery = customQuery(query, {
args: {
workerKey: v.string(),
},
async input(ctx, args) {
const worker = await ctx.db
.query("workerConfig")
.withIndex("by_token", (a) => a.eq("token", args.workerKey))
.unique();

if (!worker) {
throw new ConvexError("Wrong credentials");
}

return {
ctx: {
...ctx,
worker: worker?._id,
},
args: {},
};
},
});

export const workerMutation = customMutation(mutation, {
args: {
workerKey: v.string(),
},
async input(ctx, args) {
const worker = await ctx.db
.query("workerConfig")
.withIndex("by_token", (a) => a.eq("token", args.workerKey))
.unique();

if (!worker) {
throw new ConvexError("Wrong credentials");
}

return {
ctx: {
...ctx,
worker: worker?._id,
},
args: {},
};
},
});

export function createWorkerQueue<T extends { _id: string }>(
queueName: string,
getTodoItems: (ctx: QueryCtx) => Promise<T[]>,
) {
async function getFreeItem(ctx: QueryCtx): Promise<T | null> {
const items = await getTodoItems(ctx);

for (const item of items) {
const todoItem = await ctx.db
.query("workerWip")
.withIndex("queue", (a) => a.eq("queue", queueName).eq("itemId", item._id))
.unique();
if (todoItem == null) {
return item;
}
}

return null;
}

return {
hasWork: workerQuery({
args: {
workerKey: v.string(),
},
async handler(ctx) {
return (await getFreeItem(ctx)) != null;
},
}),

stealWork: workerMutation({
args: {
workerKey: v.string(),
},
async handler(ctx) {
const workItem = await getFreeItem(ctx);
if (workItem == null) {
return null;
}

const workId = await ctx.db.insert("workerWip", {
itemId: workItem._id,
queue: queueName,
status: "started",
worker: ctx.worker,
heartbeat: Date.now(),
});

return {
taskId: workId,
queueItem: workItem,
};
},
}),

markDone: workerMutation({
args: {
workerKey: v.string(),
workId: v.id("workerWip"),
error: v.optional(v.string()),
},
async handler(ctx, args) {
const item = await ctx.db.get(args.workId);
if (!item) throw new ConvexError("Work does not exist");

if (args.error) {
await ctx.db.patch(args.workId, {
error: args.error,
});
return true;
}

await ctx.db.delete(args.workId);

const todo = await getTodoItems(ctx);
if (todo.find((a) => a._id === item.itemId)) {
throw new ConvexError("We marked an item as done, but it's still on the todo list.");
}

return true;
},
}),

clearQueue: workerMutation({
args: { workerKey: v.string() },
async handler(ctx) {
const toDelete = await ctx.db
.query("workerWip")
.withIndex("worker", (a) => a.eq("worker", ctx.worker).eq("queue", queueName))
.collect();

await Promise.all(toDelete.map((doc) => ctx.db.delete(doc._id)));

return toDelete.length;
},
}),
} as const;
}
Schema:
import { defineTable } from "convex/server";
import { v } from "convex/values";

export const workerSchema = {
workerConfig: defineTable({
token: v.string(),
type: v.string(),
}).index("by_token", ["token"]),
workerWip: defineTable({
queue: v.string(),
itemId: v.string(),
status: v.string(),
heartbeat: v.number(),
worker: v.id("workerConfig"),
error: v.optional(v.string()),
})
.index("queue", ["queue", "itemId"])
.index("worker", ["worker", "queue"]),
};
import { defineTable } from "convex/server";
import { v } from "convex/values";

export const workerSchema = {
workerConfig: defineTable({
token: v.string(),
type: v.string(),
}).index("by_token", ["token"]),
workerWip: defineTable({
queue: v.string(),
itemId: v.string(),
status: v.string(),
heartbeat: v.number(),
worker: v.id("workerConfig"),
error: v.optional(v.string()),
})
.index("queue", ["queue", "itemId"])
.index("worker", ["worker", "queue"]),
};
Could this be a component? Uh, sure! But convex doesn't document how to make those soooo yea
胡洪伟
胡洪伟•2mo ago
🔥

Did you find this page helpful?