Skip to content

Conversation

dreamorosi
Copy link
Contributor

Summary

Changes

Please provide a summary of what's being changed

This PR refactors the way keys, values, and headers are deserialized and parsed so that they are processed lazily when the customer accesses them, and not eagerly before they're passed to the AWS Lambda handler.

This approach allows customers to write code like the one below, and introduce custom error handling for different types of errors:

const handler = kafkaConsumer(
  async (event) => {
    for (const record of event.records) {
      try {
        const { value, key } = record; // decoding & parsing now happens here when accessing
        // ... your business logic
      } catch (error) {
        // handle error types, maybe send to a DLQ
      }
    }
  },
  {
    value: {
      type: SchemaType.JSON,
      parserSchema: valueZodSchema,
    },
    key: {
      type: SchemaType.JSON,
      parserSchema: keyZodSchema,
    },
  }
);

On a more technical note, the lazy processing relies on custom getter functions for objects. This allows us to run custom logic just in time when the customer accesses a property.

The downside of this approach is that these get functions can only be synchronous. This is not an issue when it comes to deserializing since all these operations are synchronous, however since we were dynamically importing some of the deserializers to avoid introducing extra deps for customers I had to do a significant refactor in the logic that chooses and loads the deserializers to load the extra modules asynchronously in closures while staying sync.

As part of the refactor I also improved the error handling to make it consistent and added some unit test cases.

Please add the issue number below, if no issue is present the PR might get blocked and not be reviewed

Issue number: closes #4067


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@dreamorosi dreamorosi self-assigned this Jun 19, 2025
@pull-request-size pull-request-size bot added the size/L PRs between 100-499 LOC label Jun 19, 2025
Copy link

@leandrodamascena
Copy link
Contributor

Taking a look.

Copy link
Contributor

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improve @dreamorosi! APPROVED!

@dreamorosi dreamorosi merged commit ef9bb52 into main Jun 19, 2025
50 checks passed
@dreamorosi dreamorosi deleted the feat/kafka_lazy_deserialize branch June 19, 2025 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/L PRs between 100-499 LOC
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: decode key/val lazily to allow error handling
2 participants