onlyayep
onlyayep3mo ago

Hi here there an example for persistance

Hi here there an example for persistance text streaming which has the has the thread and messages schema. Iam finding an issue where i need to pass my threadId to use with useStream() from "@convex-dev/persistent-text-streaming/react". My goal is to list all message base on my threadId, Is there a way for me to pass a body?
/**
* React hook for persistent text streaming.
*
* @param getPersistentBody - A query function reference that returns the body
* of a stream using the component's `getStreamBody` method.
* @param streamUrl - The URL of the http action that will kick off the stream
* generation and stream the result back to the client using the component's
* `stream` method.
* @param driven - Whether this particular session is driving the stream. Set this
* to true if this is the client session that first created the stream using the
* component's `createStream` method. If you're simply reloading an existing
* stream, set this to false.
* @param streamId - The ID of the stream. If this is not provided, the return
* value will be an empty string for the stream body and the status will be
* `pending`.
* @returns The body and status of the stream.
*/
export function useStream(
getPersistentBody: FunctionReference<
"query",
"public",
{ streamId: string },
StreamBody
>,
streamUrl: URL,
driven: boolean,
streamId: StreamId | undefined,
opts?: {
// If provided, this will be passed as the Authorization header.
authToken?: string | null;
// If provided, these will be passed as additional headers.
headers?: Record<string, string>;
}
) {
/**
* React hook for persistent text streaming.
*
* @param getPersistentBody - A query function reference that returns the body
* of a stream using the component's `getStreamBody` method.
* @param streamUrl - The URL of the http action that will kick off the stream
* generation and stream the result back to the client using the component's
* `stream` method.
* @param driven - Whether this particular session is driving the stream. Set this
* to true if this is the client session that first created the stream using the
* component's `createStream` method. If you're simply reloading an existing
* stream, set this to false.
* @param streamId - The ID of the stream. If this is not provided, the return
* value will be an empty string for the stream body and the status will be
* `pending`.
* @returns The body and status of the stream.
*/
export function useStream(
getPersistentBody: FunctionReference<
"query",
"public",
{ streamId: string },
StreamBody
>,
streamUrl: URL,
driven: boolean,
streamId: StreamId | undefined,
opts?: {
// If provided, this will be passed as the Authorization header.
authToken?: string | null;
// If provided, these will be passed as additional headers.
headers?: Record<string, string>;
}
) {
3 Replies
Barrel Of Lube
Barrel Of Lube3mo ago
if you jus want to list all messages by thread id, make use of indexes in you schema for example .index("by_stream_updated", ["streamId", "updatedAt"] and then you can query withIndex like
export const getAll = query({
args: {
paginationOpts: v.optional(paginationOptsValidator),
filters: v.optional(
v.object({
pinned: v.optional(v.boolean()),
}),
),
},
handler: async (ctx, args) => {
const { userId } = await requireAuth(ctx);

const userChats = await ctx.db
.query("chats")
.withIndex("by_user_updated", (q) => q.eq("userId", userId))
.filter((q) =>
args.filters?.pinned === undefined
? true
: q.eq(q.field("pinned"), args.filters.pinned),
)
.order("desc")
.paginate(args.paginationOpts ?? { numItems: 10, cursor: null });

return userChats;
},
});
export const getAll = query({
args: {
paginationOpts: v.optional(paginationOptsValidator),
filters: v.optional(
v.object({
pinned: v.optional(v.boolean()),
}),
),
},
handler: async (ctx, args) => {
const { userId } = await requireAuth(ctx);

const userChats = await ctx.db
.query("chats")
.withIndex("by_user_updated", (q) => q.eq("userId", userId))
.filter((q) =>
args.filters?.pinned === undefined
? true
: q.eq(q.field("pinned"), args.filters.pinned),
)
.order("desc")
.paginate(args.paginationOpts ?? { numItems: 10, cursor: null });

return userChats;
},
});
(replace by_user_updated wth by_stream_updated) the first field of index is necessary and the 2nd is optional. .order arranges based on _creation time, to fetch the latest/updated we add additional field "updatedAt" which will order the results base don updatedAt, 2 birds wth 1 stone.
onlyayep
onlyayepOP3mo ago
iam curious, isnt streamId should be unique per message? So if that's the case, how are we going to list all messages base on threadId in these case than? i have a thread & message table in these case I've figure it out, my approach is to use searchParams instead for my use case to find all message by thread id
// client side code
const authToken = useAuthToken();
const threadId = message.threadId;
const streamId = message.responseStreamId as StreamId;
const url = new URL(
`${ENV.NEXT_PUBLIC_CONVEX_URL.replace("cloud", "site")}/api/chat?threadId=${threadId}`
);
const { text, status } = useStream(
api.functions.chat.queries.getChatStreamBody,
url,
isDriven,
streamId,
{ authToken }
);
// client side code
const authToken = useAuthToken();
const threadId = message.threadId;
const streamId = message.responseStreamId as StreamId;
const url = new URL(
`${ENV.NEXT_PUBLIC_CONVEX_URL.replace("cloud", "site")}/api/chat?threadId=${threadId}`
);
const { text, status } = useStream(
api.functions.chat.queries.getChatStreamBody,
url,
isDriven,
streamId,
{ authToken }
);
Barrel Of Lube
Barrel Of Lube3mo ago
mb, i have a custom streaming implementation

Did you find this page helpful?