bruse
bruse2d ago

Mutation without a write still triggers query (python client)

Hi! Just curious about the dynamics of mutations. I have a mutation that is supposed to claim the next pending job, but exits early if there are no such jobs. When it returns early it just returns null, and when there is a job to claim it will patch that entry to update a state variable, plus a few more things. My listening queries in other clients seem to get triggered even if the mutation exited early though. Is this expected? I would have thought that nothing would be part of that mutations write set, thus not triggering any reads/queries. Here's the mutation in question:
export const claimNextPending = mutation({
args: { apiToken: v.string(), workerId: v.string() },
handler: async (ctx, { apiToken, workerId }) => {
if (apiToken !== process.env.NARRATOR_API_TOKEN)
throw new Error('Unauthorized')
const next = await ctx.db
.query('articles')
.withIndex('status', q => q.eq('status', 'pending'))
.first()
if (!next) {
console.debug('No pending articles to claim')
return null
}

// Mark as processing with a lease
await ctx.db.patch(next._id, {
status: 'processing',
claimedBy: workerId,
claimedAt: Date.now(),
lastError: undefined,
})

return { _id: next._id, url: next.url }
},
})
export const claimNextPending = mutation({
args: { apiToken: v.string(), workerId: v.string() },
handler: async (ctx, { apiToken, workerId }) => {
if (apiToken !== process.env.NARRATOR_API_TOKEN)
throw new Error('Unauthorized')
const next = await ctx.db
.query('articles')
.withIndex('status', q => q.eq('status', 'pending'))
.first()
if (!next) {
console.debug('No pending articles to claim')
return null
}

// Mark as processing with a lease
await ctx.db.patch(next._id, {
status: 'processing',
claimedBy: workerId,
claimedAt: Date.now(),
lastError: undefined,
})

return { _id: next._id, url: next.url }
},
})
Sorry, I'm sure this has been asked before, but I honestly couldn't find previous questions about it.
19 Replies
Convex Bot
Convex Bot2d ago
Thanks for posting in <#1088161997662724167>. Reminder: If you have a Convex Pro account, use the Convex Dashboard to file support tickets. - Provide context: What are you trying to achieve, what is the end-user interaction, what are you seeing? (full error message, command output, etc.) - Use search.convex.dev to search Docs, Stack, and Discord all at once. - Additionally, you can post your questions in the Convex Community's <#1228095053885476985> channel to receive a response from AI. - Avoid tagging staff unless specifically instructed. Thank you!
Clever Tagline
What evidence have you seen that proves that this mutation is the cause of those other queries updating? Are there any other mutations that could be affecting them besides this one? Not knowing about your process but just making a guess or two, if a worker is calling claimNextPending because it's available for a job, that could happen because it just finished processing a prior job. If it just finished a job, it likely wrote something to that job's document at the end of the process. If that's an accurate assumption about the process, could the mutation recording that prior job's final update be the cause of those other queries refreshing?
Sara
Sara2d ago
well, your if statements might be invalid, the procces.env.api can be undefined sometimes, and it doesn't "throw error", and the "next" can just be not null sometimes, so the check is also insufficient, so, to sum up, your mutation passed the last write of patch and hence has updated your query, to fix this, fix the if statements 🙂, and to confirm this, write a console.log before the patch, and if it logs then you found your culprate 🙂
Clever Tagline
I'm sorry, but that doesn't make sense to me...
the procces.env.api can be undefined sometimes, and it doesn't "throw error"
If undefined is returned when making the process.env comparison, it would throw an error because it would fail when comparing against the apiToken string.
and the "next" can actually just be null sometimes, so the check is also insufficient
If next is null, then the patch operation would fail with an error because there's no _id property (await ctx.db.patch(next._id, ...). There was no mention of errors in this process, so to me the next check is sufficient.
Sara
Sara2d ago
my mistake, but still the problem is with one or both the if statments, if both are not true, the line skips to the patch statment, and would make it not return early
bruse
bruseOP2d ago
Right now my system is very simple. I have a react frontend which adds items to this list through another mutation when a user (only me for now) adds an article in the GUI. I have a python service which queries articles in the 'PENDING' state:
export const listPending = query({
args: { apiToken: v.string() },
handler: async (ctx, { apiToken }) => {
if (apiToken !== process.env.NARRATOR_API_TOKEN)
throw new Error('Unauthorized')
const items = await ctx.db
.query('articles')
.withIndex('status', q => q.eq('status', 'pending'))
.collect()
return items.length
},
})
export const listPending = query({
args: { apiToken: v.string() },
handler: async (ctx, { apiToken }) => {
if (apiToken !== process.env.NARRATOR_API_TOKEN)
throw new Error('Unauthorized')
const items = await ctx.db
.query('articles')
.withIndex('status', q => q.eq('status', 'pending'))
.collect()
return items.length
},
})
It then runs the mutation above to take a pending article and start processing it. I previously did not check the return value of the listPending query, so I immediately tried to consume from the empty list. The query then returns early, and my python code goes back to consuming from the subscribed query generator:
try:
stream: Iterable[Any] = client.subscribe("articles:listPending", {"apiToken": os.getenv("NARRATOR_API_TOKEN")})
except Exception as e:
raise RuntimeError(
"Subscription to 'articles:listPending' failed."
) from e

for snapshot in stream:
# <-- My new check to see if there are pending articles now goes here
try:
print("Trying to process article...")
process_one(client, worker_id)
except KeyboardInterrupt:
print("Interrupted")
break
except Exception:
traceback.print_exc()
try:
stream: Iterable[Any] = client.subscribe("articles:listPending", {"apiToken": os.getenv("NARRATOR_API_TOKEN")})
except Exception as e:
raise RuntimeError(
"Subscription to 'articles:listPending' failed."
) from e

for snapshot in stream:
# <-- My new check to see if there are pending articles now goes here
try:
print("Trying to process article...")
process_one(client, worker_id)
except KeyboardInterrupt:
print("Interrupted")
break
except Exception:
traceback.print_exc()
So, my evidence: if I add a check for pending articles which skips the process_one call, the process waits for new articles as expected. If I let it run the process_one method, which runs the mutation and returns early without writing, it keeps looping. Spitting out "Trying to process article..." followed by "[DEBUG] 'No pending articles to claim'" continuously. Or of course, I could be mistaken. It does sound like you guys don't think this is expected 🙂 I'll just dig into my code a bit more. Maybe I'm triggering another mutation further down in the processing. Ok well, I've confirmed that there are no other mutations happening, so I guess I'm still confused.
Sara
Sara2d ago
can you take a screenshot of the logs in your dashboard?
bruse
bruseOP2d ago
Just to be clear: I can solve this by just checking if the query returns 0, and then not run my claimNextPending mutation. I just want to understand how convex behaves.
No description
Sara
Sara2d ago
that's ok, we're just as curious as you are if you go to your client, and open up dev tools, click network and filter for ws (websockets), then click messages, when you run the mutation, do you get the query updated?
bruse
bruseOP2d ago
My client is a python server unfortunately. I can dig into what type of debugging functionality there is.
Sara
Sara2d ago
that's ok, I think your function is running according to what you want
bruse
bruseOP2d ago
Maybe I'll put together a minimal reproduction in a new public github repo.
Sara
Sara2d ago
the expected outcome to happen is if your mutation is updating something that your query reads, then the mutation would trigger a write, and the query would be called, the logs are in descending order, so the first log is the latest, it seems that the query hasn't been called so no updates happened
bruse
bruseOP2d ago
So maybe it's a quirk of how the python client works?
try:
stream: Iterable[Any] = client.subscribe("articles:listPending", {"apiToken": os.getenv("NARRATOR_API_TOKEN")})
except Exception as e:
raise RuntimeError(
"Subscription to 'articles:listPending' failed."
) from e

for snapshot in stream:
if snapshot == 0:
continue # <-- This prevents the looping
try:
print("Trying to process article...")
process_one(client, worker_id)
time.sleep(1)
except KeyboardInterrupt:
print("Interrupted")
break
except Exception:
traceback.print_exc()
try:
stream: Iterable[Any] = client.subscribe("articles:listPending", {"apiToken": os.getenv("NARRATOR_API_TOKEN")})
except Exception as e:
raise RuntimeError(
"Subscription to 'articles:listPending' failed."
) from e

for snapshot in stream:
if snapshot == 0:
continue # <-- This prevents the looping
try:
print("Trying to process article...")
process_one(client, worker_id)
time.sleep(1)
except KeyboardInterrupt:
print("Interrupted")
break
except Exception:
traceback.print_exc()
I'm not sure if I understand why adding this check prevents the looping, if the mutation doesn't actually have any effect. I mean, without the check we run the mutation that does not do a write. And then the stream generator yields a new query result. If I have the check the mutation is not run, and the stream generator does not yield a new result.
Sara
Sara2d ago
I don't know how to answer that since i've never used the python client before, I'd leave it to someone else to answer 🤷🏻 but to me it seems to be acting correctly, maybe i'm missing something It would help to change the title to have the "python client" or something related
bruse
bruseOP2d ago
Thanks for the help, anyway! I'll minimize my code, and see if it matches what I expect. I'll probably also see if I can replicate it in TS.
Clever Tagline
I've not used the Python client either, but it's possible that it could be some quirk there, as I've never seen that behavior with the TS side of things.
bruse
bruseOP2d ago
I think I might just have misunderstood the API after all. Every time I do any interaction with convex within the subscription handler it will trigger a new yield from the generator/iterable. So even if I just query another table the client will not pause on the next iteration of the stream. If I don't interact with the client then it does pause. And my fail-early mutation from above does not trigger the listening queries in react/ts, as expected.
def main():
client = ConvexClient(os.getenv("CONVEX_URL"))
try:
stream: Iterable[Any] = client.subscribe("tasks:numNonComplete", {})
except Exception as e:
raise RuntimeError("Subscription to 'tasks:numNonComplete' failed") from e

for numNonComplete in stream:
print("numNonComplete", int(numNonComplete))
try:
# The query below will trigger a new yield in the stream above,
# if it is commented out the loop will pause
q = client.query("todos:get", {})
except Exception as e:
print("Failed", e)

time.sleep(1)
def main():
client = ConvexClient(os.getenv("CONVEX_URL"))
try:
stream: Iterable[Any] = client.subscribe("tasks:numNonComplete", {})
except Exception as e:
raise RuntimeError("Subscription to 'tasks:numNonComplete' failed") from e

for numNonComplete in stream:
print("numNonComplete", int(numNonComplete))
try:
# The query below will trigger a new yield in the stream above,
# if it is commented out the loop will pause
q = client.query("todos:get", {})
except Exception as e:
print("Failed", e)

time.sleep(1)
Indeed the rust client has the same behavior
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::from_filename(".env.local").ok();
dotenvy::dotenv().ok();

let deployment_url = env::var("CONVEX_URL").unwrap();

let mut client = ConvexClient::new(&deployment_url).await.unwrap();
let mut sub = client.subscribe("tasks:numNonComplete", maplit::btreemap!{}).await?;
while let Some(result) = sub.next().await {
println!("numNonComplete {result:?}");
// The query below will trigger a new yield in the stream above
// if it is commented out the loop will pause
let todos = client.query("todos:get", BTreeMap::new()).await.unwrap();
}

Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::from_filename(".env.local").ok();
dotenvy::dotenv().ok();

let deployment_url = env::var("CONVEX_URL").unwrap();

let mut client = ConvexClient::new(&deployment_url).await.unwrap();
let mut sub = client.subscribe("tasks:numNonComplete", maplit::btreemap!{}).await?;
while let Some(result) = sub.next().await {
println!("numNonComplete {result:?}");
// The query below will trigger a new yield in the stream above
// if it is commented out the loop will pause
let todos = client.query("todos:get", BTreeMap::new()).await.unwrap();
}

Ok(())
}
bruse
bruseOP2d ago
Here's the minimized code if anyone is interested: https://github.com/bruse/convex-client-yield
GitHub
GitHub - bruse/convex-client-yield
Contribute to bruse/convex-client-yield development by creating an account on GitHub.

Did you find this page helpful?