Skip to content

Commit 75f89b8

Browse files
feat(integrations): add support for cluster clients from redis sdk (#2394)
This change adds support for cluster clients from the redis sdk (as opposed to the rediscluster library). This has also been tested in my own app which uses clusters (but not asyncio clusters). Fixes GH-2523 * feat(integrations): add support for cluster clients from redis sdk * fix: review round 1 * fix: explicit `is not None` checks * fix: explicit `is not None` checks, take 2 * fix: add try/except to _set_db_data * fix: handle additional spans and breadcrumbs caused by rediscluster initialization * fix: typing for redis integration * fix: simplify assertions * add `capture_internal_exceptions` Co-authored-by: Matthieu Devlin <matt@zumper.com> * rerun CI --------- Co-authored-by: Daniel Szoke <szokeasaurusrex@users.noreply.github.com>
1 parent 22bdc4d commit 75f89b8

File tree

6 files changed

+435
-41
lines changed

6 files changed

+435
-41
lines changed

sentry_sdk/integrations/redis/__init__.py

Lines changed: 126 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,13 @@
1313
)
1414

1515
if TYPE_CHECKING:
16+
from collections.abc import Callable
1617
from typing import Any, Dict, Sequence
18+
from redis import Redis, RedisCluster
19+
from redis.asyncio.cluster import (
20+
RedisCluster as AsyncRedisCluster,
21+
ClusterPipeline as AsyncClusterPipeline,
22+
)
1723
from sentry_sdk.tracing import Span
1824

1925
_SINGLE_KEY_COMMANDS = frozenset(
@@ -83,8 +89,7 @@ def _set_pipeline_data(
8389
):
8490
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
8591
span.set_tag("redis.is_cluster", is_cluster)
86-
transaction = is_transaction if not is_cluster else False
87-
span.set_tag("redis.transaction", transaction)
92+
span.set_tag("redis.transaction", is_transaction)
8893

8994
commands = []
9095
for i, arg in enumerate(command_stack):
@@ -118,7 +123,7 @@ def _set_client_data(span, is_cluster, name, *args):
118123
span.set_tag("redis.key", args[0])
119124

120125

121-
def _set_db_data(span, connection_params):
126+
def _set_db_data_on_span(span, connection_params):
122127
# type: (Span, Dict[str, Any]) -> None
123128
span.set_data(SPANDATA.DB_SYSTEM, "redis")
124129

@@ -135,8 +140,43 @@ def _set_db_data(span, connection_params):
135140
span.set_data(SPANDATA.SERVER_PORT, port)
136141

137142

138-
def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
139-
# type: (Any, bool, Any) -> None
143+
def _set_db_data(span, redis_instance):
144+
# type: (Span, Redis[Any]) -> None
145+
try:
146+
_set_db_data_on_span(span, redis_instance.connection_pool.connection_kwargs)
147+
except AttributeError:
148+
pass # connections_kwargs may be missing in some cases
149+
150+
151+
def _set_cluster_db_data(span, redis_cluster_instance):
152+
# type: (Span, RedisCluster[Any]) -> None
153+
default_node = redis_cluster_instance.get_default_node()
154+
if default_node is not None:
155+
_set_db_data_on_span(
156+
span, {"host": default_node.host, "port": default_node.port}
157+
)
158+
159+
160+
def _set_async_cluster_db_data(span, async_redis_cluster_instance):
161+
# type: (Span, AsyncRedisCluster[Any]) -> None
162+
default_node = async_redis_cluster_instance.get_default_node()
163+
if default_node is not None and default_node.connection_kwargs is not None:
164+
_set_db_data_on_span(span, default_node.connection_kwargs)
165+
166+
167+
def _set_async_cluster_pipeline_db_data(span, async_redis_cluster_pipeline_instance):
168+
# type: (Span, AsyncClusterPipeline[Any]) -> None
169+
with capture_internal_exceptions():
170+
_set_async_cluster_db_data(
171+
span,
172+
# the AsyncClusterPipeline has always had a `_client` attr but it is private so potentially problematic and mypy
173+
# does not recognize it - see https://github.com/redis/redis-py/blame/v5.0.0/redis/asyncio/cluster.py#L1386
174+
async_redis_cluster_pipeline_instance._client, # type: ignore[attr-defined]
175+
)
176+
177+
178+
def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn):
179+
# type: (Any, bool, Any, Callable[[Span, Any], None]) -> None
140180
old_execute = pipeline_cls.execute
141181

142182
def sentry_patched_execute(self, *args, **kwargs):
@@ -150,12 +190,12 @@ def sentry_patched_execute(self, *args, **kwargs):
150190
op=OP.DB_REDIS, description="redis.pipeline.execute"
151191
) as span:
152192
with capture_internal_exceptions():
153-
_set_db_data(span, self.connection_pool.connection_kwargs)
193+
set_db_data_fn(span, self)
154194
_set_pipeline_data(
155195
span,
156196
is_cluster,
157197
get_command_args_fn,
158-
self.transaction,
198+
False if is_cluster else self.transaction,
159199
self.command_stack,
160200
)
161201

@@ -164,8 +204,8 @@ def sentry_patched_execute(self, *args, **kwargs):
164204
pipeline_cls.execute = sentry_patched_execute
165205

166206

167-
def patch_redis_client(cls, is_cluster):
168-
# type: (Any, bool) -> None
207+
def patch_redis_client(cls, is_cluster, set_db_data_fn):
208+
# type: (Any, bool, Callable[[Span, Any], None]) -> None
169209
"""
170210
This function can be used to instrument custom redis client classes or
171211
subclasses.
@@ -189,11 +229,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
189229
description = description[: integration.max_data_size - len("...")] + "..."
190230

191231
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
192-
try:
193-
_set_db_data(span, self.connection_pool.connection_kwargs)
194-
except AttributeError:
195-
pass # connections_kwargs may be missing in some cases
196-
232+
set_db_data_fn(span, self)
197233
_set_client_data(span, is_cluster, name, *args)
198234

199235
return old_execute_command(self, name, *args, **kwargs)
@@ -203,14 +239,16 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
203239

204240
def _patch_redis(StrictRedis, client): # noqa: N803
205241
# type: (Any, Any) -> None
206-
patch_redis_client(StrictRedis, is_cluster=False)
207-
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
242+
patch_redis_client(StrictRedis, is_cluster=False, set_db_data_fn=_set_db_data)
243+
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args, _set_db_data)
208244
try:
209245
strict_pipeline = client.StrictPipeline
210246
except AttributeError:
211247
pass
212248
else:
213-
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
249+
patch_redis_pipeline(
250+
strict_pipeline, False, _get_redis_command_args, _set_db_data
251+
)
214252

215253
try:
216254
import redis.asyncio
@@ -222,8 +260,56 @@ def _patch_redis(StrictRedis, client): # noqa: N803
222260
patch_redis_async_pipeline,
223261
)
224262

225-
patch_redis_async_client(redis.asyncio.client.StrictRedis)
226-
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
263+
patch_redis_async_client(
264+
redis.asyncio.client.StrictRedis,
265+
is_cluster=False,
266+
set_db_data_fn=_set_db_data,
267+
)
268+
patch_redis_async_pipeline(
269+
redis.asyncio.client.Pipeline,
270+
False,
271+
_get_redis_command_args,
272+
set_db_data_fn=_set_db_data,
273+
)
274+
275+
276+
def _patch_redis_cluster():
277+
# type: () -> None
278+
"""Patches the cluster module on redis SDK (as opposed to rediscluster library)"""
279+
try:
280+
from redis import RedisCluster, cluster
281+
except ImportError:
282+
pass
283+
else:
284+
patch_redis_client(RedisCluster, True, _set_cluster_db_data)
285+
patch_redis_pipeline(
286+
cluster.ClusterPipeline,
287+
True,
288+
_parse_rediscluster_command,
289+
_set_cluster_db_data,
290+
)
291+
292+
try:
293+
from redis.asyncio import cluster as async_cluster
294+
except ImportError:
295+
pass
296+
else:
297+
from sentry_sdk.integrations.redis.asyncio import (
298+
patch_redis_async_client,
299+
patch_redis_async_pipeline,
300+
)
301+
302+
patch_redis_async_client(
303+
async_cluster.RedisCluster,
304+
is_cluster=True,
305+
set_db_data_fn=_set_async_cluster_db_data,
306+
)
307+
patch_redis_async_pipeline(
308+
async_cluster.ClusterPipeline,
309+
True,
310+
_parse_rediscluster_command,
311+
set_db_data_fn=_set_async_cluster_pipeline_db_data,
312+
)
227313

228314

229315
def _patch_rb():
@@ -233,9 +319,15 @@ def _patch_rb():
233319
except ImportError:
234320
pass
235321
else:
236-
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
237-
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
238-
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
322+
patch_redis_client(
323+
rb.clients.FanoutClient, is_cluster=False, set_db_data_fn=_set_db_data
324+
)
325+
patch_redis_client(
326+
rb.clients.MappingClient, is_cluster=False, set_db_data_fn=_set_db_data
327+
)
328+
patch_redis_client(
329+
rb.clients.RoutingClient, is_cluster=False, set_db_data_fn=_set_db_data
330+
)
239331

240332

241333
def _patch_rediscluster():
@@ -245,7 +337,9 @@ def _patch_rediscluster():
245337
except ImportError:
246338
return
247339

248-
patch_redis_client(rediscluster.RedisCluster, is_cluster=True)
340+
patch_redis_client(
341+
rediscluster.RedisCluster, is_cluster=True, set_db_data_fn=_set_db_data
342+
)
249343

250344
# up to v1.3.6, __version__ attribute is a tuple
251345
# from v2.0.0, __version__ is a string and VERSION a tuple
@@ -255,11 +349,17 @@ def _patch_rediscluster():
255349
# https://github.com/Grokzen/redis-py-cluster/blob/master/docs/release-notes.rst
256350
if (0, 2, 0) < version < (2, 0, 0):
257351
pipeline_cls = rediscluster.pipeline.StrictClusterPipeline
258-
patch_redis_client(rediscluster.StrictRedisCluster, is_cluster=True)
352+
patch_redis_client(
353+
rediscluster.StrictRedisCluster,
354+
is_cluster=True,
355+
set_db_data_fn=_set_db_data,
356+
)
259357
else:
260358
pipeline_cls = rediscluster.pipeline.ClusterPipeline
261359

262-
patch_redis_pipeline(pipeline_cls, True, _parse_rediscluster_command)
360+
patch_redis_pipeline(
361+
pipeline_cls, True, _parse_rediscluster_command, set_db_data_fn=_set_db_data
362+
)
263363

264364

265365
class RedisIntegration(Integration):
@@ -278,6 +378,7 @@ def setup_once():
278378
raise DidNotEnable("Redis client not installed")
279379

280380
_patch_redis(StrictRedis, client)
381+
_patch_redis_cluster()
281382
_patch_rb()
282383

283384
try:

sentry_sdk/integrations/redis/asyncio.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,25 @@
44
from sentry_sdk.consts import OP
55
from sentry_sdk.integrations.redis import (
66
RedisIntegration,
7-
_get_redis_command_args,
87
_get_span_description,
98
_set_client_data,
10-
_set_db_data,
119
_set_pipeline_data,
1210
)
1311
from sentry_sdk._types import TYPE_CHECKING
12+
from sentry_sdk.tracing import Span
1413
from sentry_sdk.utils import capture_internal_exceptions
1514

1615
if TYPE_CHECKING:
17-
from typing import Any
16+
from collections.abc import Callable
17+
from typing import Any, Union
18+
from redis.asyncio.client import Pipeline, StrictRedis
19+
from redis.asyncio.cluster import ClusterPipeline, RedisCluster
1820

1921

20-
def patch_redis_async_pipeline(pipeline_cls):
21-
# type: (Any) -> None
22+
def patch_redis_async_pipeline(
23+
pipeline_cls, is_cluster, get_command_args_fn, set_db_data_fn
24+
):
25+
# type: (Union[type[Pipeline[Any]], type[ClusterPipeline[Any]]], bool, Any, Callable[[Span, Any], None]) -> None
2226
old_execute = pipeline_cls.execute
2327

2428
async def _sentry_execute(self, *args, **kwargs):
@@ -32,22 +36,22 @@ async def _sentry_execute(self, *args, **kwargs):
3236
op=OP.DB_REDIS, description="redis.pipeline.execute"
3337
) as span:
3438
with capture_internal_exceptions():
35-
_set_db_data(span, self.connection_pool.connection_kwargs)
39+
set_db_data_fn(span, self)
3640
_set_pipeline_data(
3741
span,
38-
False,
39-
_get_redis_command_args,
40-
self.is_transaction,
41-
self.command_stack,
42+
is_cluster,
43+
get_command_args_fn,
44+
False if is_cluster else self.is_transaction,
45+
self._command_stack if is_cluster else self.command_stack,
4246
)
4347

4448
return await old_execute(self, *args, **kwargs)
4549

46-
pipeline_cls.execute = _sentry_execute
50+
pipeline_cls.execute = _sentry_execute # type: ignore[method-assign]
4751

4852

49-
def patch_redis_async_client(cls):
50-
# type: (Any) -> None
53+
def patch_redis_async_client(cls, is_cluster, set_db_data_fn):
54+
# type: (Union[type[StrictRedis[Any]], type[RedisCluster[Any]]], bool, Callable[[Span, Any], None]) -> None
5155
old_execute_command = cls.execute_command
5256

5357
async def _sentry_execute_command(self, name, *args, **kwargs):
@@ -60,9 +64,9 @@ async def _sentry_execute_command(self, name, *args, **kwargs):
6064
description = _get_span_description(name, *args)
6165

6266
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
63-
_set_db_data(span, self.connection_pool.connection_kwargs)
64-
_set_client_data(span, False, name, *args)
67+
set_db_data_fn(span, self)
68+
_set_client_data(span, is_cluster, name, *args)
6569

6670
return await old_execute_command(self, name, *args, **kwargs)
6771

68-
cls.execute_command = _sentry_execute_command
72+
cls.execute_command = _sentry_execute_command # type: ignore[method-assign]
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import pytest
2+
3+
pytest.importorskip("redis.cluster")

0 commit comments

Comments
 (0)