feat(taskbroker): Fetch Pending Activations in Batches#582
feat(taskbroker): Fetch Pending Activations in Batches#582george-sentry merged 3 commits intomainfrom
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| } | ||
|
|
||
| backoff = true; | ||
| } |
There was a problem hiding this comment.
Batch loop continues pushing after failure, compounding timeouts
Medium Severity
When pusher.push_task() fails with a PushError::Timeout, the for loop over the batch continues attempting to push all remaining activations instead of breaking out. Since a timeout means the push channel was full for push_queue_timeout_ms (default 5000ms), each subsequent push will also likely timeout. With fetch_batch_size of N, the loop can block for up to N * push_queue_timeout_ms. All N activations have already been atomically claimed from the store, so breaking early doesn't lose work — upkeep recovers them regardless. A break after setting backoff = true would preserve the old single-activation latency behavior while still benefiting from batching under normal conditions.
There was a problem hiding this comment.
While this is true I think it's OK for now until we finalize how we want to handle failures.
There was a problem hiding this comment.
The compounding timeouts provide a bit of natural backpressure. There is a risk of the compounded time causing processing deadlines to be missed for tasks with shorter deadlines.
| let inflight = self | ||
| .store | ||
| .get_pending_activation(application.as_deref(), namespaces, None) | ||
| .get_pending_activations(application.as_deref(), namespaces, Some(1), None) | ||
| .await; | ||
|
|
||
| match inflight { | ||
| Ok(Some(inflight)) => { | ||
| Ok(activations) if activations.is_empty() => { | ||
| Err(Status::not_found("No pending activation")) | ||
| } | ||
|
|
||
| Ok(activations) => { | ||
| let inflight = &activations[0]; | ||
| let now = Utc::now(); | ||
|
|
There was a problem hiding this comment.
This is a little shaky: you call a method whose return signature says it could return any number of activations and rely on the limit argument to ensure only one is returned. This is alright, though you should probably add an error or an assertion that the number of returned tasks is actually one. This is a bit defensive to ensure the subset of the return type we care for is actually respected.
It would be ok to add such a defensive measure as the system can only be in a broken state if you get more than one activations: they would be updated in the DB and be ignored -> data loss.
There was a problem hiding this comment.
Made some changes in both places where this happens. Now if more than one is fetched, it returns an error.
match inflight {
...
Ok(activations) if activations.len() > 1 => {
error!(
count = activations.len(),
application = ?application,
namespace = ?namespace,
"get_pending_activations returned more than one row despite limit of 1",
);
Err(Status::internal("Unable to retrieve pending activation"))
}
...
}|
|
||
| /// Claim pending activations (moves them to processing), optionally filtered by application and namespaces. | ||
| async fn get_pending_activations( | ||
| async fn get_pending_activations_from_namespaces( |
There was a problem hiding this comment.
What was the reason for renaming this ?
There was a problem hiding this comment.
This function was called get_pending_activations_from_namespaces originally, while get_pending_activation was used to get a single activation.
In a previous PR I changed the name of this method to get_pending_activations because it's shorter. But now that get_pending_activation returns multiple tasks, I renamed it to get_pending_activations while renaming this method back to get_pending_activations_from_namespaces.
There was a problem hiding this comment.
We've never used the namespace filtering for workers. If it simplifies the implementation for push workers, we could remove support in the broker internals and make using namespace an error in the grpc service methods.
There was a problem hiding this comment.
I would definitely like to explore refactoring the store API in general at some point. I think it's gotten a bit confusing, especially with all my push taskbroker changes.
| } | ||
|
|
||
| backoff = true; | ||
| } |
There was a problem hiding this comment.
While this is true I think it's OK for now until we finalize how we want to handle failures.
| &self, | ||
| application: Option<&str>, | ||
| namespaces: Option<&[String]>, | ||
| limit: Option<i32>, |
There was a problem hiding this comment.
Once we deprecate the SQLite code, we can change all of these Option<> to be required instead.
| ) | ||
| } | ||
|
|
||
| backoff = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(None) => { | ||
| debug!("No pending activations"); | ||
|
|
There was a problem hiding this comment.
Bug: A push timeout on one activation in a batch causes cascading timeouts for all subsequent activations in that batch, stalling the fetch thread without an early exit.
Severity: HIGH
Suggested Fix
Add a break; statement within the error handling block of the activation push loop in src/fetch/mod.rs. This will ensure the loop exits immediately after the first push failure, preventing subsequent timeouts and allowing the fetcher to back off and retry later.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/fetch/mod.rs#L148-L155
Potential issue: When fetching a batch of activations, the code iterates through them
and pushes them to a worker channel. If the channel is full, the `push_task` operation
will time out. However, the loop does not break on failure. It continues to attempt to
push the remaining activations in the batch, with each attempt also timing out. This
causes the fetch thread to stall for the cumulative duration of all timeouts, during
which all activations in the batch are locked in a "Processing" state and cannot be
handled by other workers.
Did we get this right? 👍 / 👎 to inform future reviews.


Linear
Completes STREAM-829
Description
Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.
This PR changes
get_pending_activationto return multiple of activations (Vec<InflightActivation>) instead of a single one (Option<InflightActivation>) and to accept alimitargument of typeOption<i32>. In all unmodified areas of the application, that argument isSome(1). In the fetch loop, it isSome(config.fetch_batch_size).Once a batch of tasks is fetched, they are submitted to the push pool one by one. Some comments...
limitargument is an optional, but we do not useNoneanywhere in the code, so I'm thinking about making it required and passing in1whenever a single activation is requiredDetails
fetch_batch_sizefield to the configuration with a default value of 1get_pending_activationsback toget_pending_activations_from_namespaces(what it was originally)get_pending_activationtoget_pending_activations, add alimitargument with typeOption<i32>, and make it return a vector of tasks instead of oneget_pending_activationsto passSome(1)as the limitNoneSome(X)where X is >= Y where Y is the number of pending tasksSome(X)where X is < Y where Y is the number of pending tasks