feat(dlq): add dlq support (no-op)#277
Conversation
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Internal Changes 🔧Deps
🤖 This preview updates automatically when you update the PR. |
|
|
||
| enabled: bool | ||
| topic: str | ||
| producer_config: "KafkaProducerConfig" | ||
|
|
There was a problem hiding this comment.
Bug: The Python adapter in rust_arroyo.py doesn't read the dlq configuration or pass it to the Rust ArroyoConsumer, rendering the DLQ feature non-functional.
Severity: CRITICAL
Suggested Fix
Update rust_arroyo.py to read the dlq configuration from the source config. If present, construct the corresponding Rust DlqConfig object and pass it as the dlq_config argument when initializing the ArroyoConsumer. Add integration tests to verify the DLQ functionality.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/config_types.py#L17-L21
Potential issue: The pull request adds Dead Letter Queue (DLQ) support, with a
`DlqConfig` defined in Python and the Rust consumer expecting a `dlq_config` parameter.
However, the Python adapter in `rust_arroyo.py` that instantiates the `ArroyoConsumer`
never reads the `dlq` key from the configuration dictionary and does not pass it during
initialization. As a result, any user-provided DLQ configuration will be silently
ignored, and the DLQ functionality will not work. Invalid messages will be dropped
instead of being routed to the configured dead-letter topic.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Python type stub missing new DLQ class and parameter
- Updated
rust_streams.pyito addPyDlqConfigand the optionaldlq_configargument onArroyoConsumer.__init__to match the Rust-exposed interface.
- Updated
Or push these changes by commenting:
@cursor push cb8d770063
Preview (cb8d770063)
diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi
--- a/sentry_streams/sentry_streams/rust_streams.pyi
+++ b/sentry_streams/sentry_streams/rust_streams.pyi
@@ -41,6 +41,12 @@
override_params: Mapping[str, str],
) -> None: ...
+class PyDlqConfig:
+ topic: str
+ producer_config: PyKafkaProducerConfig
+
+ def __init__(self, topic: str, producer_config: PyKafkaProducerConfig) -> None: ...
+
class PyMetricConfig:
def __init__(
self,
@@ -105,6 +111,7 @@
schema: str | None,
metric_config: PyMetricConfig | None = None,
write_healthcheck: bool = False,
+ dlq_config: PyDlqConfig | None = None,
) -> None: ...
def add_step(self, step: RuntimeOperator) -> None: ...
def run(self) -> None: ...This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
Adds DLQ configuration support throughout the streaming platform: - Add PyDlqConfig type stub and build_dlq_config() helper - Wire DLQ config from StreamSource to Rust ArroyoConsumer - Support DLQ override from YAML deployment configuration - Add comprehensive test coverage for DLQ functionality Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
fpacifici
left a comment
There was a problem hiding this comment.
Please see my comments on the unit tests and on the defaults.
Also please adjust the example pipelines to set up the DLQ. Since this cannot be tested with unit test consider adapting https://github.com/getsentry/streams/blob/main/sentry_streams/integration_tests/test_example_pipelines.py
sentry_streams/src/consumer.rs
Outdated
| /// Builds the DLQ policy if dlq_config is provided. | ||
| /// Returns None if DLQ is not configured. | ||
| fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> { | ||
| match &self.dlq_config { |
There was a problem hiding this comment.
Making this a standalone funciton also allows you not to have this case for when the config is not provided.
sentry_streams/src/consumer.rs
Outdated
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { |
There was a problem hiding this comment.
Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?
There was a problem hiding this comment.
I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"
There was a problem hiding this comment.
That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.
| topic=dlq_data["topic"], | ||
| bootstrap_servers=dlq_data["bootstrap_servers"], |
There was a problem hiding this comment.
What if the user wants to override only the topic name and leave the bootstrap_servers as they are?
Also should we make the bootstrap_servers default to the same one we use for the StreamingSource ?
There was a problem hiding this comment.
added individual field overriding ability
There was a problem hiding this comment.
default value i was planning on handling in the next PR, see my PR description
| topic=step.dlq_config.topic, | ||
| producer_config=PyKafkaProducerConfig( | ||
| bootstrap_servers=step.dlq_config.bootstrap_servers, | ||
| override_params=None, |
There was a problem hiding this comment.
This cannot be hardcoded to None, we almsot always apply some override parameters.
See for example all producers in s4s2 have authentication arguments
https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10
| """ | ||
|
|
||
| topic: str | ||
| bootstrap_servers: Sequence[str] |
There was a problem hiding this comment.
See my comment above. If you conenct to kafka, either as producer or consumer, you need to allow the user to override the connection parameters. https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10
sentry_streams/src/consumer.rs
Outdated
| /// When provided, invalid messages will be sent to the DLQ topic. | ||
| #[pyclass] | ||
| #[derive(Debug, Clone)] | ||
| pub struct PyDlqConfig { |
There was a problem hiding this comment.
That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: DLQ producer config drops override_params needed for auth
- I added
override_paramsto DLQ pipeline config propagation and updated DLQ producer construction and tests so auth-related override parameters are preserved end-to-end.
- I added
Or push these changes by commenting:
@cursor push 212acf9bdc
Preview (212acf9bdc)
diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
--- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
+++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
@@ -186,7 +186,7 @@
topic=step.dlq_config.topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=step.dlq_config.bootstrap_servers,
- override_params=None,
+ override_params=step.dlq_config.override_params,
),
)
diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py
--- a/sentry_streams/sentry_streams/pipeline/pipeline.py
+++ b/sentry_streams/sentry_streams/pipeline/pipeline.py
@@ -2,7 +2,7 @@
from abc import ABC, abstractmethod
from collections import defaultdict
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from functools import partial
@@ -235,6 +235,7 @@
topic: str
bootstrap_servers: Sequence[str]
+ override_params: Mapping[str, str] = field(default_factory=dict)
@dataclass
@@ -267,6 +268,9 @@
servers = producer_config.get("bootstrap_servers") or (
self.dlq_config.bootstrap_servers if self.dlq_config else None
)
+ override_params = producer_config.get("override_params")
+ if override_params is None:
+ override_params = self.dlq_config.override_params if self.dlq_config else {}
if topic is None or servers is None:
raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")
@@ -274,6 +278,7 @@
self.dlq_config = DlqConfig(
topic=topic,
bootstrap_servers=servers,
+ override_params=cast(Mapping[str, str], override_params),
)
diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py
--- a/sentry_streams/tests/test_dlq.py
+++ b/sentry_streams/tests/test_dlq.py
@@ -68,21 +68,28 @@
@pytest.mark.parametrize(
- "dlq_config, expected_topic, expected_bootstrap_servers",
+ "dlq_config, expected_topic, expected_bootstrap_servers, expected_override_params",
[
- pytest.param(None, None, None, id="no_dlq_config"),
+ pytest.param(None, None, None, None, id="no_dlq_config"),
pytest.param(
- DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]),
+ DlqConfig(
+ topic="test-dlq",
+ bootstrap_servers=["localhost:9092"],
+ override_params={"sasl.username": "test"},
+ ),
"test-dlq",
["localhost:9092"],
+ {"sasl.username": "test"},
id="single_bootstrap_server",
),
pytest.param(
DlqConfig(
- topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"]
+ topic="my-dlq",
+ bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"],
),
"my-dlq",
["broker1:9092", "broker2:9092", "broker3:9092"],
+ {},
id="multiple_bootstrap_servers",
),
],
@@ -91,6 +98,7 @@
dlq_config: DlqConfig | None,
expected_topic: str | None,
expected_bootstrap_servers: list[str] | None,
+ expected_override_params: dict[str, str] | None,
) -> None:
"""Test build_dlq_config returns correct PyDlqConfig for various inputs."""
source = StreamSource(
@@ -109,47 +117,75 @@
assert result.topic == expected_topic
assert result.producer_config is not None
assert result.producer_config.bootstrap_servers == expected_bootstrap_servers
- assert result.producer_config.override_params is None
+ assert result.producer_config.override_params == expected_override_params
@pytest.mark.parametrize(
- "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers",
+ "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params",
[
pytest.param(
None,
- {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}},
+ {
+ "topic": "new-dlq",
+ "producer_config": {
+ "bootstrap_servers": ["broker1:9092"],
+ "override_params": {"security.protocol": "SASL_SSL"},
+ },
+ },
"new-dlq",
["broker1:9092"],
+ {"security.protocol": "SASL_SSL"},
id="create_new_config",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
{"topic": "new-dlq"},
"new-dlq",
["old-broker:9092"],
+ {"old.param": "old-value"},
id="override_topic_only",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
- {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}},
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
+ {
+ "producer_config": {
+ "bootstrap_servers": ["new-broker:9092", "new-broker2:9092"],
+ "override_params": {"new.param": "new-value"},
+ }
+ },
"old-dlq",
["new-broker:9092", "new-broker2:9092"],
+ {"new.param": "new-value"},
id="override_bootstrap_servers_only",
),
pytest.param(
- DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+ DlqConfig(
+ topic="old-dlq",
+ bootstrap_servers=["old-broker:9092"],
+ override_params={"old.param": "old-value"},
+ ),
{"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}},
"new-dlq",
["new-broker:9092"],
+ {"old.param": "old-value"},
id="override_both_fields",
),
],
)
def test_stream_source_override_config_dlq(
initial_dlq_config: DlqConfig | None,
- override_dlq: dict[str, str | list[str]],
+ override_dlq: dict[str, object],
expected_topic: str,
expected_bootstrap_servers: list[str],
+ expected_override_params: dict[str, str],
) -> None:
"""Test that StreamSource.override_config correctly overrides DLQ settings."""
source = StreamSource(
@@ -163,3 +199,4 @@
assert source.dlq_config is not None
assert source.dlq_config.topic == expected_topic
assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers
+ assert source.dlq_config.override_params == expected_override_paramsThis Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.
| override_params = producer_config.get("override_params") or ( | ||
| self.dlq_config.producer_config.override_params if self.dlq_config else None |
There was a problem hiding this comment.
Bug: The use of the or operator for merging DLQ configurations causes an empty dictionary for override_params to be ignored, preventing users from clearing these settings.
Severity: MEDIUM
Suggested Fix
Explicitly check if the value from producer_config.get("override_params") is None instead of relying on its truthiness. This will correctly handle an empty dictionary {} as a valid configuration value, allowing users to clear override parameters as intended.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/pipeline/pipeline.py#L257-L258
Potential issue: In the `override_config` method, the `or` operator is used for fallback
merging of the `override_params` configuration. In Python, an empty dictionary `{}` is
falsy. When a user explicitly provides an empty dictionary to clear existing override
parameters, such as authentication credentials, the `or` operator incorrectly treats it
as falsy and falls back to the existing configuration value. This prevents users from
clearing these parameters, which can cause the DLQ producer to silently continue using
stale, invalid credentials, leading to potential authentication failures.
| ) | ||
| override_params = producer_config.get("override_params") or ( | ||
| self.dlq_config.producer_config.override_params if self.dlq_config else None | ||
| ) |
There was a problem hiding this comment.
Falsy override_params silently ignored by or operator
Low Severity
The or operator for merging override_params treats an explicitly provided empty dict {} as falsy, causing it to silently fall back to the existing self.dlq_config.producer_config.override_params value. Unlike topic and servers (where empty/falsy values are always invalid), an empty override_params is semantically valid — it means "no extra Kafka settings." Using is not None instead of or would correctly distinguish between "not provided" (None) and "explicitly empty" ({}).
Reviewed by Cursor Bugbot for commit cc13df3. Configure here.
There was a problem hiding this comment.
if someone explicitly uses empty dict as override params, then the system is still behaving as intended. I'm ignoring this comment
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 6b4468a. Configure here.
| """ | ||
|
|
||
| topic: str | ||
| producer_config: "KafkaProducerConfig" |
There was a problem hiding this comment.
DLQ TypedDict references wrong producer config type
Low Severity
The DlqConfig TypedDict's producer_config field is typed as KafkaProducerConfig, which defines additional_settings: Mapping[str, Any]. However, the JSON schema (DlqProducerConfig) and the override_config code in pipeline.py both expect override_params, not additional_settings. This type mismatch means type checkers and developers looking at this TypedDict will see the wrong field name for the DLQ producer configuration, potentially causing incorrect deployment YAML configs in future PRs that wire everything together.
Reviewed by Cursor Bugbot for commit 6b4468a. Configure here.



ticket
PR1 (this):
PR2: default config
PR3: wire everything together, end to end testing
PR4: enable by default, deploy some default topics