Skip to content

feat(taskbroker): Fetch Pending Activations in Batches#582

Merged
george-sentry merged 3 commits intomainfrom
george/push-taskbroker/batch-fetch-and-push
Apr 2, 2026
Merged

feat(taskbroker): Fetch Pending Activations in Batches#582
george-sentry merged 3 commits intomainfrom
george/push-taskbroker/batch-fetch-and-push

Conversation

@george-sentry
Copy link
Copy Markdown
Member

Linear

Completes STREAM-829

Description

Currently, taskworkers pull tasks from taskbrokers via RPC. This approach works, but has some drawbacks. Therefore, we want taskbrokers to push tasks to taskworkers instead. Read this page on Notion for more information.

This PR changes get_pending_activation to return multiple of activations (Vec<InflightActivation>) instead of a single one (Option<InflightActivation>) and to accept a limit argument of type Option<i32>. In all unmodified areas of the application, that argument is Some(1). In the fetch loop, it is Some(config.fetch_batch_size).

Once a batch of tasks is fetched, they are submitted to the push pool one by one. Some comments...

  • We still haven't decided what to do when sending a task fails. Right now, when that happens, the upkeep task will eventually change its status back to pending and increment its processing attempts. Once processing attempts run out, the task will be dropped, which may be a problem. This will be addressed in a future PR
  • The new limit argument is an optional, but we do not use None anywhere in the code, so I'm thinking about making it required and passing in 1 whenever a single activation is required

Details

  • Add a fetch_batch_size field to the configuration with a default value of 1
  • Change get_pending_activations back to get_pending_activations_from_namespaces (what it was originally)
  • Change get_pending_activation to get_pending_activations, add a limit argument with type Option<i32>, and make it return a vector of tasks instead of one
  • Change all previously existing calls to get_pending_activations to pass Some(1) as the limit
  • Add three new tests to see what happens when...
    • The limit is None
    • The limit is Some(X) where X is >= Y where Y is the number of pending tasks
    • The limit is Some(X) where X is < Y where Y is the number of pending tasks

@george-sentry george-sentry requested a review from a team as a code owner March 31, 2026 00:13
@linear-code
Copy link
Copy Markdown

linear-code bot commented Mar 31, 2026

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

}

backoff = true;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch loop continues pushing after failure, compounding timeouts

Medium Severity

When pusher.push_task() fails with a PushError::Timeout, the for loop over the batch continues attempting to push all remaining activations instead of breaking out. Since a timeout means the push channel was full for push_queue_timeout_ms (default 5000ms), each subsequent push will also likely timeout. With fetch_batch_size of N, the loop can block for up to N * push_queue_timeout_ms. All N activations have already been atomically claimed from the store, so breaking early doesn't lose work — upkeep recovers them regardless. A break after setting backoff = true would preserve the old single-activation latency behavior while still benefiting from batching under normal conditions.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is true I think it's OK for now until we finalize how we want to handle failures.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compounding timeouts provide a bit of natural backpressure. There is a risk of the compounded time causing processing deadlines to be missed for tasks with shorter deadlines.

Comment on lines 40 to +53
let inflight = self
.store
.get_pending_activation(application.as_deref(), namespaces, None)
.get_pending_activations(application.as_deref(), namespaces, Some(1), None)
.await;

match inflight {
Ok(Some(inflight)) => {
Ok(activations) if activations.is_empty() => {
Err(Status::not_found("No pending activation"))
}

Ok(activations) => {
let inflight = &activations[0];
let now = Utc::now();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little shaky: you call a method whose return signature says it could return any number of activations and rely on the limit argument to ensure only one is returned. This is alright, though you should probably add an error or an assertion that the number of returned tasks is actually one. This is a bit defensive to ensure the subset of the return type we care for is actually respected.

It would be ok to add such a defensive measure as the system can only be in a broken state if you get more than one activations: they would be updated in the DB and be ignored -> data loss.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made some changes in both places where this happens. Now if more than one is fetched, it returns an error.

match inflight {
  ...
  Ok(activations) if activations.len() > 1 => {
    error!(
      count = activations.len(),
      application = ?application,
      namespace = ?namespace,
      "get_pending_activations returned more than one row despite limit of 1",
    );

    Err(Status::internal("Unable to retrieve pending activation"))
  }
  ...
}


/// Claim pending activations (moves them to processing), optionally filtered by application and namespaces.
async fn get_pending_activations(
async fn get_pending_activations_from_namespaces(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reason for renaming this ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was called get_pending_activations_from_namespaces originally, while get_pending_activation was used to get a single activation.

In a previous PR I changed the name of this method to get_pending_activations because it's shorter. But now that get_pending_activation returns multiple tasks, I renamed it to get_pending_activations while renaming this method back to get_pending_activations_from_namespaces.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've never used the namespace filtering for workers. If it simplifies the implementation for push workers, we could remove support in the broker internals and make using namespace an error in the grpc service methods.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely like to explore refactoring the store API in general at some point. I think it's gotten a bit confusing, especially with all my push taskbroker changes.

}

backoff = true;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is true I think it's OK for now until we finalize how we want to handle failures.

&self,
application: Option<&str>,
namespaces: Option<&[String]>,
limit: Option<i32>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we deprecate the SQLite code, we can change all of these Option<> to be required instead.

Comment on lines +148 to 155
)
}

backoff = true;
}
}
}

Ok(None) => {
debug!("No pending activations");

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A push timeout on one activation in a batch causes cascading timeouts for all subsequent activations in that batch, stalling the fetch thread without an early exit.
Severity: HIGH

Suggested Fix

Add a break; statement within the error handling block of the activation push loop in src/fetch/mod.rs. This will ensure the loop exits immediately after the first push failure, preventing subsequent timeouts and allowing the fetcher to back off and retry later.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/fetch/mod.rs#L148-L155

Potential issue: When fetching a batch of activations, the code iterates through them
and pushes them to a worker channel. If the channel is full, the `push_task` operation
will time out. However, the loop does not break on failure. It continues to attempt to
push the remaining activations in the batch, with each attempt also timing out. This
causes the fetch thread to stall for the cumulative duration of all timeouts, during
which all activations in the batch are locked in a "Processing" state and cannot be
handled by other workers.

Did we get this right? 👍 / 👎 to inform future reviews.

@george-sentry george-sentry requested review from evanh and fpacifici April 2, 2026 13:23
Copy link
Copy Markdown
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM, minus CI failures.

@george-sentry george-sentry merged commit a985e23 into main Apr 2, 2026
32 of 33 checks passed
@george-sentry george-sentry deleted the george/push-taskbroker/batch-fetch-and-push branch April 2, 2026 18:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants