Skip to content

feat(dlq): add dlq support (no-op)#277

Open
victoria-yining-huang wants to merge 25 commits intomainfrom
vic/add_dlq
Open

feat(dlq): add dlq support (no-op)#277
victoria-yining-huang wants to merge 25 commits intomainfrom
vic/add_dlq

Conversation

@victoria-yining-huang
Copy link
Copy Markdown
Contributor

@victoria-yining-huang victoria-yining-huang commented Mar 20, 2026

ticket

PR1 (this):

  • noop
  • add arroyo dlq support that can reach rust-arroyo
  • json validation

PR2: default config
PR3: wire everything together, end to end testing
PR4: enable by default, deploy some default topics

@victoria-yining-huang victoria-yining-huang requested a review from a team as a code owner March 20, 2026 19:50
@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 20, 2026

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (dlq) Add dlq support (no-op) by victoria-yining-huang in #277

Internal Changes 🔧

Deps

  • Bump rustls-webpki from 0.103.3 to 0.103.10 in /sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms by dependabot in #278
  • Bump slab from 0.4.10 to 0.4.12 in /sentry_streams/sentry_streams/examples/rust_simple_map_filter/rust_transforms by dependabot in #281
  • Bump rustls-webpki from 0.103.3 to 0.103.10 in /sentry_streams/tests/rust_test_functions by dependabot in #283

🤖 This preview updates automatically when you update the PR.

Comment on lines +17 to +21

enabled: bool
topic: str
producer_config: "KafkaProducerConfig"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The Python adapter in rust_arroyo.py doesn't read the dlq configuration or pass it to the Rust ArroyoConsumer, rendering the DLQ feature non-functional.
Severity: CRITICAL

Suggested Fix

Update rust_arroyo.py to read the dlq configuration from the source config. If present, construct the corresponding Rust DlqConfig object and pass it as the dlq_config argument when initializing the ArroyoConsumer. Add integration tests to verify the DLQ functionality.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/config_types.py#L17-L21

Potential issue: The pull request adds Dead Letter Queue (DLQ) support, with a
`DlqConfig` defined in Python and the Rust consumer expecting a `dlq_config` parameter.
However, the Python adapter in `rust_arroyo.py` that instantiates the `ArroyoConsumer`
never reads the `dlq` key from the configuration dictionary and does not pass it during
initialization. As a result, any user-provided DLQ configuration will be silently
ignored, and the DLQ functionality will not work. Invalid messages will be dropped
instead of being routed to the configured dead-letter topic.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Python type stub missing new DLQ class and parameter
    • Updated rust_streams.pyi to add PyDlqConfig and the optional dlq_config argument on ArroyoConsumer.__init__ to match the Rust-exposed interface.

Create PR

Or push these changes by commenting:

@cursor push cb8d770063
Preview (cb8d770063)
diff --git a/sentry_streams/sentry_streams/rust_streams.pyi b/sentry_streams/sentry_streams/rust_streams.pyi
--- a/sentry_streams/sentry_streams/rust_streams.pyi
+++ b/sentry_streams/sentry_streams/rust_streams.pyi
@@ -41,6 +41,12 @@
         override_params: Mapping[str, str],
     ) -> None: ...
 
+class PyDlqConfig:
+    topic: str
+    producer_config: PyKafkaProducerConfig
+
+    def __init__(self, topic: str, producer_config: PyKafkaProducerConfig) -> None: ...
+
 class PyMetricConfig:
     def __init__(
         self,
@@ -105,6 +111,7 @@
         schema: str | None,
         metric_config: PyMetricConfig | None = None,
         write_healthcheck: bool = False,
+        dlq_config: PyDlqConfig | None = None,
     ) -> None: ...
     def add_step(self, step: RuntimeOperator) -> None: ...
     def run(self) -> None: ...

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@victoria-yining-huang victoria-yining-huang changed the title feat(dlq): add dlq support rust side feat(dlq): add dlq support (no-op) Mar 23, 2026
Adds DLQ configuration support throughout the streaming platform:
- Add PyDlqConfig type stub and build_dlq_config() helper
- Wire DLQ config from StreamSource to Rust ArroyoConsumer
- Support DLQ override from YAML deployment configuration
- Add comprehensive test coverage for DLQ functionality

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comments on the unit tests and on the defaults.

Also please adjust the example pipelines to set up the DLQ. Since this cannot be tested with unit test consider adapting https://github.com/getsentry/streams/blob/main/sentry_streams/integration_tests/test_example_pipelines.py

/// Builds the DLQ policy if dlq_config is provided.
/// Returns None if DLQ is not configured.
fn build_dlq_policy(&self) -> Option<DlqPolicy<KafkaPayload>> {
match &self.dlq_config {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a standalone funciton also allows you not to have this case for when the config is not provided.

/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why PyDlqConfig rather than DlqConfig? Is there a rust version that we need to distinguish from ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saw that it's a naming convention we have in the repo, like PyKafkaConsumerConfig. I assume it means "this Rust struct is meant for coming from Python"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.

Comment on lines +264 to +265
topic=dlq_data["topic"],
bootstrap_servers=dlq_data["bootstrap_servers"],
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the user wants to override only the topic name and leave the bootstrap_servers as they are?
Also should we make the bootstrap_servers default to the same one we use for the StreamingSource ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added individual field overriding ability

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default value i was planning on handling in the next PR, see my PR description

topic=step.dlq_config.topic,
producer_config=PyKafkaProducerConfig(
bootstrap_servers=step.dlq_config.bootstrap_servers,
override_params=None,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be hardcoded to None, we almsot always apply some override parameters.
See for example all producers in s4s2 have authentication arguments
https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10

"""

topic: str
bootstrap_servers: Sequence[str]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above. If you conenct to kafka, either as producer or consumer, you need to allow the user to override the connection parameters. https://github.com/getsentry/ops/blob/master/k8s/services/super-big-consumers/_errors_config.yaml#L5-L10

/// When provided, invalid messages will be sent to the DLQ topic.
#[pyclass]
#[derive(Debug, Clone)]
pub struct PyDlqConfig {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not supposed to be a coding convention. We had PyKafkaConsumerConfig to distinguish it from KafkaConsumerConfig which already existed in the Rust code base. Under normal operations you do not need to add Py in front of an object you expose to python.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: DLQ producer config drops override_params needed for auth
    • I added override_params to DLQ pipeline config propagation and updated DLQ producer construction and tests so auth-related override parameters are preserved end-to-end.

Create PR

Or push these changes by commenting:

@cursor push 212acf9bdc
Preview (212acf9bdc)
diff --git a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
--- a/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
+++ b/sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
@@ -186,7 +186,7 @@
         topic=step.dlq_config.topic,
         producer_config=PyKafkaProducerConfig(
             bootstrap_servers=step.dlq_config.bootstrap_servers,
-            override_params=None,
+            override_params=step.dlq_config.override_params,
         ),
     )
 

diff --git a/sentry_streams/sentry_streams/pipeline/pipeline.py b/sentry_streams/sentry_streams/pipeline/pipeline.py
--- a/sentry_streams/sentry_streams/pipeline/pipeline.py
+++ b/sentry_streams/sentry_streams/pipeline/pipeline.py
@@ -2,7 +2,7 @@
 
 from abc import ABC, abstractmethod
 from collections import defaultdict
-from dataclasses import dataclass
+from dataclasses import dataclass, field
 from datetime import timedelta
 from enum import Enum
 from functools import partial
@@ -235,6 +235,7 @@
 
     topic: str
     bootstrap_servers: Sequence[str]
+    override_params: Mapping[str, str] = field(default_factory=dict)
 
 
 @dataclass
@@ -267,6 +268,9 @@
             servers = producer_config.get("bootstrap_servers") or (
                 self.dlq_config.bootstrap_servers if self.dlq_config else None
             )
+            override_params = producer_config.get("override_params")
+            if override_params is None:
+                override_params = self.dlq_config.override_params if self.dlq_config else {}
 
             if topic is None or servers is None:
                 raise ValueError("DLQ config requires both 'topic' and 'bootstrap_servers'")
@@ -274,6 +278,7 @@
             self.dlq_config = DlqConfig(
                 topic=topic,
                 bootstrap_servers=servers,
+                override_params=cast(Mapping[str, str], override_params),
             )
 
 

diff --git a/sentry_streams/tests/test_dlq.py b/sentry_streams/tests/test_dlq.py
--- a/sentry_streams/tests/test_dlq.py
+++ b/sentry_streams/tests/test_dlq.py
@@ -68,21 +68,28 @@
 
 
 @pytest.mark.parametrize(
-    "dlq_config, expected_topic, expected_bootstrap_servers",
+    "dlq_config, expected_topic, expected_bootstrap_servers, expected_override_params",
     [
-        pytest.param(None, None, None, id="no_dlq_config"),
+        pytest.param(None, None, None, None, id="no_dlq_config"),
         pytest.param(
-            DlqConfig(topic="test-dlq", bootstrap_servers=["localhost:9092"]),
+            DlqConfig(
+                topic="test-dlq",
+                bootstrap_servers=["localhost:9092"],
+                override_params={"sasl.username": "test"},
+            ),
             "test-dlq",
             ["localhost:9092"],
+            {"sasl.username": "test"},
             id="single_bootstrap_server",
         ),
         pytest.param(
             DlqConfig(
-                topic="my-dlq", bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"]
+                topic="my-dlq",
+                bootstrap_servers=["broker1:9092", "broker2:9092", "broker3:9092"],
             ),
             "my-dlq",
             ["broker1:9092", "broker2:9092", "broker3:9092"],
+            {},
             id="multiple_bootstrap_servers",
         ),
     ],
@@ -91,6 +98,7 @@
     dlq_config: DlqConfig | None,
     expected_topic: str | None,
     expected_bootstrap_servers: list[str] | None,
+    expected_override_params: dict[str, str] | None,
 ) -> None:
     """Test build_dlq_config returns correct PyDlqConfig for various inputs."""
     source = StreamSource(
@@ -109,47 +117,75 @@
         assert result.topic == expected_topic
         assert result.producer_config is not None
         assert result.producer_config.bootstrap_servers == expected_bootstrap_servers
-        assert result.producer_config.override_params is None
+        assert result.producer_config.override_params == expected_override_params
 
 
 @pytest.mark.parametrize(
-    "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers",
+    "initial_dlq_config, override_dlq, expected_topic, expected_bootstrap_servers, expected_override_params",
     [
         pytest.param(
             None,
-            {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["broker1:9092"]}},
+            {
+                "topic": "new-dlq",
+                "producer_config": {
+                    "bootstrap_servers": ["broker1:9092"],
+                    "override_params": {"security.protocol": "SASL_SSL"},
+                },
+            },
             "new-dlq",
             ["broker1:9092"],
+            {"security.protocol": "SASL_SSL"},
             id="create_new_config",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
             {"topic": "new-dlq"},
             "new-dlq",
             ["old-broker:9092"],
+            {"old.param": "old-value"},
             id="override_topic_only",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
-            {"producer_config": {"bootstrap_servers": ["new-broker:9092", "new-broker2:9092"]}},
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
+            {
+                "producer_config": {
+                    "bootstrap_servers": ["new-broker:9092", "new-broker2:9092"],
+                    "override_params": {"new.param": "new-value"},
+                }
+            },
             "old-dlq",
             ["new-broker:9092", "new-broker2:9092"],
+            {"new.param": "new-value"},
             id="override_bootstrap_servers_only",
         ),
         pytest.param(
-            DlqConfig(topic="old-dlq", bootstrap_servers=["old-broker:9092"]),
+            DlqConfig(
+                topic="old-dlq",
+                bootstrap_servers=["old-broker:9092"],
+                override_params={"old.param": "old-value"},
+            ),
             {"topic": "new-dlq", "producer_config": {"bootstrap_servers": ["new-broker:9092"]}},
             "new-dlq",
             ["new-broker:9092"],
+            {"old.param": "old-value"},
             id="override_both_fields",
         ),
     ],
 )
 def test_stream_source_override_config_dlq(
     initial_dlq_config: DlqConfig | None,
-    override_dlq: dict[str, str | list[str]],
+    override_dlq: dict[str, object],
     expected_topic: str,
     expected_bootstrap_servers: list[str],
+    expected_override_params: dict[str, str],
 ) -> None:
     """Test that StreamSource.override_config correctly overrides DLQ settings."""
     source = StreamSource(
@@ -163,3 +199,4 @@
     assert source.dlq_config is not None
     assert source.dlq_config.topic == expected_topic
     assert source.dlq_config.bootstrap_servers == expected_bootstrap_servers
+    assert source.dlq_config.override_params == expected_override_params

This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

Comment on lines +257 to +258
override_params = producer_config.get("override_params") or (
self.dlq_config.producer_config.override_params if self.dlq_config else None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The use of the or operator for merging DLQ configurations causes an empty dictionary for override_params to be ignored, preventing users from clearing these settings.
Severity: MEDIUM

Suggested Fix

Explicitly check if the value from producer_config.get("override_params") is None instead of relying on its truthiness. This will correctly handle an empty dictionary {} as a valid configuration value, allowing users to clear override parameters as intended.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/pipeline/pipeline.py#L257-L258

Potential issue: In the `override_config` method, the `or` operator is used for fallback
merging of the `override_params` configuration. In Python, an empty dictionary `{}` is
falsy. When a user explicitly provides an empty dictionary to clear existing override
parameters, such as authentication credentials, the `or` operator incorrectly treats it
as falsy and falls back to the existing configuration value. This prevents users from
clearing these parameters, which can cause the DLQ producer to silently continue using
stale, invalid credentials, leading to potential authentication failures.

)
override_params = producer_config.get("override_params") or (
self.dlq_config.producer_config.override_params if self.dlq_config else None
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falsy override_params silently ignored by or operator

Low Severity

The or operator for merging override_params treats an explicitly provided empty dict {} as falsy, causing it to silently fall back to the existing self.dlq_config.producer_config.override_params value. Unlike topic and servers (where empty/falsy values are always invalid), an empty override_params is semantically valid — it means "no extra Kafka settings." Using is not None instead of or would correctly distinguish between "not provided" (None) and "explicitly empty" ({}).

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit cc13df3. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if someone explicitly uses empty dict as override params, then the system is still behaving as intended. I'm ignoring this comment

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 6b4468a. Configure here.

"""

topic: str
producer_config: "KafkaProducerConfig"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DLQ TypedDict references wrong producer config type

Low Severity

The DlqConfig TypedDict's producer_config field is typed as KafkaProducerConfig, which defines additional_settings: Mapping[str, Any]. However, the JSON schema (DlqProducerConfig) and the override_config code in pipeline.py both expect override_params, not additional_settings. This type mismatch means type checkers and developers looking at this TypedDict will see the wrong field name for the DLQ producer configuration, potentially causing incorrect deployment YAML configs in future PRs that wire everything together.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 6b4468a. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants