Abhishek
Abhishek•10mo ago

Anthropic SDK with Convex | Claude AI

I was trying to run the anthropic SDK in action and trying to get a streaming response. When I am trying to run the action from the dashboard then the response is not at all coming up. Here is my action code :
export const generatePostGeneration = action({
args: {
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey:""
});
let body = "";
try {
await client.messages
.stream({
messages: [{ role: "user", content: "Hello" }],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
})
.on("text", (text: any) => {
body += text;
console.log("Stream:", text);
});
} catch (error) {
console.log("Error:", error);
}
console.log("Body:", body);
return body;
},
});
export const generatePostGeneration = action({
args: {
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey:""
});
let body = "";
try {
await client.messages
.stream({
messages: [{ role: "user", content: "Hello" }],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
})
.on("text", (text: any) => {
body += text;
console.log("Stream:", text);
});
} catch (error) {
console.log("Error:", error);
}
console.log("Body:", body);
return body;
},
});
This is only happening for streaming for normal messages I am getting the response. anthropic docs: https://docs.anthropic.com/claude/reference/messages-streaming
Claude
Streaming Messages
When creating a Message, you can set "stream": true to incrementally stream the response using server-sent events (SSE).Streaming with SDKsOur Python and Typescript SDKs offer multiple ways of streaming. The Python SDK allows both sync and async streams. See the documentation in each SDK for details...
20 Replies
ballingt
ballingt•10mo ago
What happens when you use streaming? Are there errors?
erquhart
erquhart•10mo ago
Curious if any of those logs are printing at all
Abhishek
AbhishekOP•10mo ago
Nope no logs at just undefined
sshader
sshader•10mo ago
When I try the code you shared, I see some log lines and the empty string returned, but also a warning that I'm not awaiting a promise. If I instead modify the code like this, the streaming seems to work as intended
const stream = client.messages
.stream({
messages: [{ role: "user", content: "Hello" }],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
});
stream.on("text", (text: any) => {
body += text;
console.log("Stream:", text);
});
await stream.done()
const stream = client.messages
.stream({
messages: [{ role: "user", content: "Hello" }],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
});
stream.on("text", (text: any) => {
body += text;
console.log("Stream:", text);
});
await stream.done()
(I'm not sure why the Anthropic docs don't recommend this but .stream(...).on(...) doesn't actually return a promise, so I don't think the await was doing anything)
Abhishek
AbhishekOP•10mo ago
I still can't see any warning in the dashboard logs, but the solution works thanks a ton @sshader
Abhishek
AbhishekOP•10mo ago
@sshader how can I insert the entry to db on every stream messg Like @ian did in this repo https://github.com/ianmacartney/streaming-chat-gpt/blob/main/convex/openai.ts#L37 I tried this approach .on("text", (text: any) => handleText(text, ctx, args, body)); async function handleText(text: any, ctx: any, args: any, body: any) { body += text; console.log("Body: ", body); await ctx.runMutation(internal.postGenration.updatePostGeneration, { postId: args.postId, generatedPostMsg: body, });
GitHub
streaming-chat-gpt/convex/openai.ts at main · ianmacartney/streamin...
An example of streaming ChatGPT via the OpenAI v4.0 node SDK. - ianmacartney/streaming-chat-gpt
Abhishek
AbhishekOP•10mo ago
but got errors in the dashboard like
No description
sshader
sshader•10mo ago
We're getting more into how this anthropic sdk works vs. convex, but the issue is there's nothing awaiting handleText so your action likely ends before running all the mutations. It looks like client.messages.stream returns an async iterable, so I think you might be able to do something like this:
for await (const part of stream) {
if (part.type === "content_block_delta") {
await handleText(part.delta.text, ctx, ...)
}
}
for await (const part of stream) {
if (part.type === "content_block_delta") {
await handleText(part.delta.text, ctx, ...)
}
}
(but at this point I'm just cmd + clicking on the anthropic functions and skimming their docs so your guess is about as good as mine)
Abhishek
AbhishekOP•10mo ago
@sshader so I am constantly hitting this error, can you suggest any work around this documents read from or written to the "campaignPosts" table changed while this mutation was being run and on every subsequent retry. Another call to this mutation changed the document with ID "k17bmk9w76zfdprndbg4e99y3h6nher4". See https://docs.convex.dev/errors#1 It is being caused because we are calling mutation every time with the streaming response from the anthropic.
erquhart
erquhart•10mo ago
Can you share the latest version of the action you're using
Abhishek
AbhishekOP•10mo ago
sure
export const anthropicCall = internalAction({
args: {
postId: v.id("campaignPosts"),
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey:
"",
});
let body = " ";

try {
const stream = client.messages
.stream({
messages: [ ],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
temperature: 0.7,
system:
"",
})
.on("text", async (text) => {
body += text;

await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
});

await stream.done();
} catch (error) {
console.error("Error in anthropic call", error);
throw new Error("Error in anthropic call");
}
return body;
},
});
export const anthropicCall = internalAction({
args: {
postId: v.id("campaignPosts"),
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey:
"",
});
let body = " ";

try {
const stream = client.messages
.stream({
messages: [ ],
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
temperature: 0.7,
system:
"",
})
.on("text", async (text) => {
body += text;

await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
});

await stream.done();
} catch (error) {
console.error("Error in anthropic call", error);
throw new Error("Error in anthropic call");
}
return body;
},
});
erquhart
erquhart•10mo ago
You have a lot of concurrent mutations happening there - you'll want to loop the way @sshader mentioned, so each write is waiting for the previous write. (Assuming the stream is in fact an async iterable)
Abhishek
AbhishekOP•10mo ago
loop through in the sense currently, text is returned as a string
ian
ian•10mo ago
Right now each .on isn't blocking on the previous one having finished
Abhishek
AbhishekOP•10mo ago
Correct that why mutation is running so quickly
erquhart
erquhart•10mo ago
const stream = client.messages.stream({...})
for await (const part of stream) {
body += text; // have to figure out how to get text out of `part`
await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
}
const stream = client.messages.stream({...})
for await (const part of stream) {
body += text; // have to figure out how to get text out of `part`
await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
}
something like that, but I don't know what kind of object part is.
Abhishek
AbhishekOP•10mo ago
@ian can you help me with a solution, sorry for this dummy question Oh kay let me try this quickyly Didnt work So I solved this So Anyone in future wants to work with Anthropic SDK with Convex, here is the solution:
export const anthropicCall = internalAction({
args: {
postId: v.id("campaignPosts"),
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey: ""
});
let body = "";

try {
const stream = await client.messages.create({
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
temperature: 0.75,
stream: true,
messages: []
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
body += event.delta.text;
await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
}
}
} catch (error) {
console.error("Error in anthropic call", error);
throw new Error("Error in anthropic call");
}
return body;
},
});
export const anthropicCall = internalAction({
args: {
postId: v.id("campaignPosts"),
userInput: v.string(),
},
async handler(ctx, args) {
const client = new Anthropic({
apiKey: ""
});
let body = "";

try {
const stream = await client.messages.create({
model: "claude-3-sonnet-20240229",
max_tokens: 1024,
temperature: 0.75,
stream: true,
messages: []
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
body += event.delta.text;
await ctx.runMutation(internal.postGenration.updatePostGeneration, {
postId: args.postId,
generatedPostMsg: body,
});
}
}
} catch (error) {
console.error("Error in anthropic call", error);
throw new Error("Error in anthropic call");
}
return body;
},
});
erquhart
erquhart•10mo ago
Thanks for updating! I think client.messages.create() is missing closing brackets
Abhishek
AbhishekOP•10mo ago
Fixed thanks , nice obersvation 👀
jamwt
jamwt•10mo ago
thanks for sending this along. really useful!

Did you find this page helpful?