Skip to content
Merged
15 changes: 14 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import zlib
from typing import TYPE_CHECKING, Any, cast

from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
Expand Down Expand Up @@ -45,5 +46,17 @@ def parse(self, data: dict[str, Any] | Any | None, model: type[Model]) -> list[M
for record in parsed_envelope.Records:
# We allow either AWS expected contract (bytes) or a custom Model, see #943
data = cast(bytes, record.kinesis.data)
models.append(self._parse(data=data.decode("utf-8"), model=model))
try:
decoded_data = data.decode("utf-8")
except UnicodeDecodeError as ude:
try:
logger.debug(
f"{type(ude).__name__}: {str(ude)} encountered. "
"Data will be decompressed with zlib.decompress().",
)
decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
decoded_data = decompressed_data.decode("utf-8")
except Exception as e:
raise ValueError("Unable to decode and/or decompress data.") from e
models.append(self._parse(data=decoded_data, model=model))
return models
25 changes: 25 additions & 0 deletions tests/unit/parser/_pydantic/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,28 @@ class DummyModel(BaseModel): ...
for record in stream_data.Records:
record.kinesis.data = DummyModel()
record.decompress_zlib_record_data_as_json()


def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope():
# GIVEN Kinesis Data Stream event with compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

# THEN logs should be extracted as CloudWatchLogsDecode objects
assert isinstance(logs[0], CloudWatchLogsDecode)


def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
# GIVEN Kinesis Data Stream event with corrupted compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
# and the data is corrupted
raw_event["Records"][0]["kinesis"]["data"] = "eyJ4eXoiOiAiYWJjIn0KH4sIAK25JWgAA6tWqqisUrJSUEpMSlaq5QIAbdJPfw8AAAA="
# THEN a ValueError should be thrown
with pytest.raises(ValueError):
envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)
Loading