sonandmjy
sonandmjy4d ago

[AI agent] tool call status update

Hello! loving experimenting with the AI agent component, literally built a chatbot in a afternoon, but I am a bit confused when I am supposed to 'complete' a message since after tool calls, it goes to 'pending' then to 'failed' once I do a followup message so not sure where in the flow i am supposed to call the API? For more context, for text messages without tool calls it seems ok currently this is my setup. (truncated for brevity). Any help would be appreciated!
export const kaolinAgent = new Agent(components.agent, {
chat: xai("grok-3"),
name: "Kaolin agent",
...
tools: {
searchContacts: createTool({
description: "Search for a contact by name, email, or phone number",
args: z.object({
searchQuery: z.string(),
}),
handler: async (ctx, args) => {
const program: Effect.Effect<
TSearchContactsHandlerReturn,
never,
never
> = Effect.gen(function* () {
const contacts = yield* Effect.promise(() =>
ctx.runQuery(internal.ai.tools.searchContacts, {
userId: ctx.userId!,
searchQuery: args.searchQuery,
})
);

yield* Effect.promise(() =>
ctx.agent.completeMessage(ctx, {
threadId: ctx.threadId!,
messageId: ctx.messageId!,
result: { kind: "success" },
})
);

return yield* Effect.succeed(contacts);
});

return program.pipe(
Effect.catchAllCause((cause) => {
return Effect.gen(function* () {
yield* Effect.promise(() =>
ctx.agent.completeMessage(ctx, {
threadId: ctx.threadId!,
messageId: ctx.messageId!,
result: { kind: "error", error: cause._tag },
})
);
});
}),
Effect.runPromise
);
},
}),
},
});
export const kaolinAgent = new Agent(components.agent, {
chat: xai("grok-3"),
name: "Kaolin agent",
...
tools: {
searchContacts: createTool({
description: "Search for a contact by name, email, or phone number",
args: z.object({
searchQuery: z.string(),
}),
handler: async (ctx, args) => {
const program: Effect.Effect<
TSearchContactsHandlerReturn,
never,
never
> = Effect.gen(function* () {
const contacts = yield* Effect.promise(() =>
ctx.runQuery(internal.ai.tools.searchContacts, {
userId: ctx.userId!,
searchQuery: args.searchQuery,
})
);

yield* Effect.promise(() =>
ctx.agent.completeMessage(ctx, {
threadId: ctx.threadId!,
messageId: ctx.messageId!,
result: { kind: "success" },
})
);

return yield* Effect.succeed(contacts);
});

return program.pipe(
Effect.catchAllCause((cause) => {
return Effect.gen(function* () {
yield* Effect.promise(() =>
ctx.agent.completeMessage(ctx, {
threadId: ctx.threadId!,
messageId: ctx.messageId!,
result: { kind: "error", error: cause._tag },
})
);
});
}),
Effect.runPromise
);
},
}),
},
});
2 Replies
Convex Bot
Convex Bot4d ago
Thanks for posting in <#1088161997662724167>. Reminder: If you have a Convex Pro account, use the Convex Dashboard to file support tickets. - Provide context: What are you trying to achieve, what is the end-user interaction, what are you seeing? (full error message, command output, etc.) - Use search.convex.dev to search Docs, Stack, and Discord all at once. - Additionally, you can post your questions in the Convex Community's <#1228095053885476985> channel to receive a response from AI. - Avoid tagging staff unless specifically instructed. Thank you!
sonandmjy
sonandmjyOP4d ago
FYI following up, I think it has sth to do with text streaming? Like
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.generateText({
prompt: args.prompt,
})
);
const text = result.text;

return yield* Effect.succeed(text);
});
}
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.generateText({
prompt: args.prompt,
})
);
const text = result.text;

return yield* Effect.succeed(text);
});
}
this works, but this does not change status to success
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.streamText(
{ prompt: args.prompt },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } }
)
);

const text = yield* Effect.promise(async () => {
let fullText = "";
for await (const chunk of result.textStream) {
fullText += chunk;
}
return fullText;
});

return yield* Effect.succeed(text);
});
}
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.streamText(
{ prompt: args.prompt },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } }
)
);

const text = yield* Effect.promise(async () => {
let fullText = "";
for await (const chunk of result.textStream) {
fullText += chunk;
}
return fullText;
});

return yield* Effect.succeed(text);
});
}
sorry just using this as a notepad now but if i explicitly complete the message after the stream it works. Not sure if that is the intended level of abstraction or if the framework is missing something to update the status once stream has ended?
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.streamText(
{ prompt: args.prompt },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } }
)
);

const text = yield* Effect.promise(async () => {
let fullText = "";
for await (const chunk of result.textStream) {
fullText += chunk;
}
return fullText;
});

yield* Effect.promise(() =>
kaolinAgent.completeMessage(ctx, {
threadId: args.threadId,
messageId: result.messageId,
result: { kind: "success" },
})
);

return yield* Effect.succeed(text);
});
}
export function continueThreadHandler(
ctx: AuthedActionCtx,
args: TContinueThreadHandlerArgs
): Effect.Effect<TContinueThreadHandlerReturn, never, never> {
return Effect.gen(function* () {
const { thread } = yield* Effect.promise(() =>
kaolinAgent.continueThread(ctx, {
threadId: args.threadId,
userId: ctx.user._id,
})
);

const result = yield* Effect.promise(() =>
thread.streamText(
{ prompt: args.prompt },
{ saveStreamDeltas: { chunking: "word", throttleMs: 100 } }
)
);

const text = yield* Effect.promise(async () => {
let fullText = "";
for await (const chunk of result.textStream) {
fullText += chunk;
}
return fullText;
});

yield* Effect.promise(() =>
kaolinAgent.completeMessage(ctx, {
threadId: args.threadId,
messageId: result.messageId,
result: { kind: "success" },
})
);

return yield* Effect.succeed(text);
});
}

Did you find this page helpful?