Skip to content

Conversation

leandrodamascena
Copy link
Contributor

Issue number: #2981

Summary

Changes

This pull request introduces a new feature to the SqsFifoProcessor class, allowing for continuous message processing by adding a flag. With this enhancement, users can choose to maintain message processing without halting after an initial failure.

User experience

payload.json

{
   "Records":[
      {
         "messageId":"24045de4-a92e-4277-840e-994236b646f6",
         "receiptHandle":"AQEBL8wKtlV8KYnf1Jgz2tdTrAfBSWIhZ/0snUM1FrCK2BsWOq02i0f+L7ufkQHHebbWJ/ZRuc8AvGeB90n/q86llnPhZJu5hSSndYhhviDQXZf/TsIb/waNVWdW70AAp8Mg5MBWAkcdWNm4nz4xqKKi+ntjqh1wwM8RwFTrJezwxxBaWRcTZUS2XsaYX+gGhQClnWnppfJ8OUiU+l9mOd92ciDRDIlAKKcFgfAoqrkDig8OQzsB+ajwAnM2gdwWyK0JjTYS1vgR6i8quepgesY+hg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384129",
            "MessageGroupId":"2",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"ae0eee0c918b8b88835554d31d9473ad04465a3866e48dba7b1e0fab51d2c4a4",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"29a62446b7883b1840cd85bd79091e44",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"455fc143-6eb4-4663-bbda-0c19dd04f756",
         "receiptHandle":"AQEB7EmRsjoiaFSDeZxDkq9PKfcpamqUrshhfK5vGsX/jZE09+b21OHfG0zsHkML4VDsrgskEzp2AaNfyaZXk7dfa31hEHMRa2EscRIrj6CSRF78iHSRwVsxrRSpx8w5NW51Z6PGOtVK+Vi5Cqto28CACcGjN69tWiNFHYwczVVb0oSRtNSk6CGTijKKNb5BzVXfzGmukEEk+E2+Qqmh8vEoKiqVLqF7Q9Wbj5znMxkUMFbs/1r8KEpj74D2s5Ch9Mi3wOapY1fY0km9mElZp4OprA==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384130",
            "MessageGroupId":"2",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"28ce33b1552d6ee566c48eb91b4fc29bded1fa7fea62485ca7152c3ec68e79f0",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"05c7dd78a740a557a3d8712fa28650a6",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"f7188ab6-ae5e-47a2-83bf-7da7ca110994",
         "receiptHandle":"AQEBCeLYV6Rxc40DA95XwfQGTZtxlGYLhpQTEERZCObtx7kr0HvDBFOjmnvKdbf5ksTSDj9ElNHHAEP2PhLDqhFhpdx8qgbgoPgXtTlcLpadgd2YtGb+bCPSJb5AoMtsz5vBmcSw1EjigYwE8+SHqCeP+rhQXl432MJtHxTsu3HNSG/1sc3ICw/uwM7K9LQ9tw0BS13WNLs3TwHgbc2FqWXBqooSAKLE0y0yPIDGvY/39AL6npBt3+65PIYMRgeAXz6NAxE0g579d+iX7nnPudDnXw==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384131",
            "MessageGroupId":"4",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"7b46f0df14493ebf41b7ef5c6d1da625859a2205324351e4022db9203b000295",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"7d6fbfc49a1f8d404729fbea3807fb80",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"c627fea1-e548-43b4-b23b-fd33d93f96e8",
         "receiptHandle":"AQEBrrpYtiVCMqkLpc6L9oZrh/MRNlZxaE9tLTygm7fTVyQA0UNFfeNZslPemxbnx1XsQVhe/pXfpuzPzuKWSsbhP44+39D5IZI8l8Xnx4nweznqolih+JaDCt/7m3BvCXrPxqWDAHlGgmIZcoFMS9lU58zY5tUU2dALlz+j3Ju4x90grsr3fCiIziipX/RYRC3gzoUoLVxTfIrQLQ3Jgvsxm4UA6LIuR7gj/gcwpjMYt507FAuhBJdSB8kKlBoYMtul0vLegfMPl55BfPXdNEMdLg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384133",
            "MessageGroupId":"4",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"1eea03c3f7e782c7bdc2f2a917f40389314733ff39f5ab16219580c0109ade98",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"6aae5e0be0dc30894275756efda242e5",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"c627fea1-e548-43b4-b23b-fd33d93f9GR6",
         "receiptHandle":"AQEBrrpYtiVCMqkLpc6L9oZrh/MRNlZxaE9tLTygm7fTVyQA0UNFfeNZslPemxbnx1XsQVhe/pXfpuzPzuKWSsbhP44+39D5IZI8l8Xnx4nweznqolih+JaDCt/7m3BvCXrPxqWDAHlGgmIZcoFMS9lU58zY5tUU2dALlz+j3Ju4x90grsr3fCiIziipX/RYRC3gzoUoLVxTfIrQLQ3Jgvsxm4UA6LIuR7gj/gcwpjMYt507FAuhBJdSB8kKlBoYMtul0vLegfMPl55BfPXdNEMdLg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384133",
            "MessageGroupId":"6",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"1eea03c3f7e782c7bdc2f2a917f40389314733ff39f5ab16219580c0109ade98",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"6aae5e0be0dc30894275756efda242e5",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"d8b02313-c025-4c48-bd36-73555186baa2",
         "receiptHandle":"AQEB9dBiqbg3IFxWJDw8BI6jWdknnHSQM/COWjyiWQwhlWqd1WTxm0o3zFokJDkR4LtKrCLe03PH4LDNF5x66fCdj8YHXHS8nYOqC2/bfDyc6ewDS3lLNvbWDI5KhHZtcn0dgTfDoX16ron8KBcYmUztEpRFb/Ytj85eKFg6yIJhVgx0sYpCR4k39ueaWGqo6YzYXJuyDJ9q07T1wMnaAnFPoOnBfa0dOf0q/IGj+OcNHldTVTwRxvfQEQRkRq4lbORfmtjlH6utuEM/rm+5hR2Xfg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384135",
            "MessageGroupId":"1",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"e92f65051dc808cb4a9bf23a1c9f143ae4cf3029acd7502f5fb020c3ab0fc502",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"737c6afff14f8ec8db181e8c7cb18589",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"bf3a1af3-b849-4f9f-abc4-d254f741ea1e",
         "receiptHandle":"AQEBfBDdLA6E4PBWMo7ReyPW1O/BrY86m55eicY8fwT7U+0dkSLdY6GGYp3EeOxBoUucHBkXTgceujq/TO/O5NilJAN7KN1xyHwBEU26rOpQLE2n4UuscbLEVmSh5YzT877CfHRVtQ29QaHseNIrmzpCG3KTFWexZbBj00VOXTDf6+YzcJ+QlvtRSoF37yS4B99a69zMTevaJ/7+UeUE9l24ltNOzIZrN4p6ylPRKb8Vbk7ns2ZUWZC3p/yPK+fDhbCmoOOH5LEWIKUVLF/QIJVuNg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384136",
            "MessageGroupId":"1",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"33d2b5ce7da1725dc7b73195aa3264a087363808af93fddabbd83a447e66ce85",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"40c58ee85cc790638034668b73f8b444",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      },
      {
         "messageId":"bf3a1af3-b849-4f9f-abc4-ssssa",
         "receiptHandle":"AQEBfBDdLA6E4PBWMo7ReyPW1O/BrY86m55eicY8fwT7U+0dkSLdY6GGYp3EeOxBoUucHBkXTgceujq/TO/O5NilJAN7KN1xyHwBEU26rOpQLE2n4UuscbLEVmSh5YzT877CfHRVtQ29QaHseNIrmzpCG3KTFWexZbBj00VOXTDf6+YzcJ+QlvtRSoF37yS4B99a69zMTevaJ/7+UeUE9l24ltNOzIZrN4p6ylPRKb8Vbk7ns2ZUWZC3p/yPK+fDhbCmoOOH5LEWIKUVLF/QIJVuNg==",
         "body": "{\"item\": {\"laptop\": \"amd\"}}",
         "attributes":{
            "ApproximateReceiveCount":"1",
            "SentTimestamp":"1703675223472",
            "SequenceNumber":"18882884930918384136",
            "MessageGroupId":"3",
            "SenderId":"AIDAS5S4WFUBFAUCROFBE",
            "MessageDeduplicationId":"33d2b5ce7da1725dc7b73195aa3264a087363808af93fddabbd83a447e66ce85",
            "ApproximateFirstReceiveTimestamp":"1703675223484"
         },
         "messageAttributes":{
            
         },
         "md5OfBody":"40c58ee85cc790638034668b73f8b444",
         "eventSource":"aws:sqs",
         "eventSourceARN":"arn:aws:sqs:us-west-1:123456789012:powertools.fifo",
         "awsRegion":"us-west-1"
      }
      
   ]
}

Before

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    SqsFifoPartialProcessor,
    process_partial_response,
)
from aws_lambda_powertools.utilities.parser import BaseModel
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.parser.types import Json
from aws_lambda_powertools.utilities.typing import LambdaContext

class Order(BaseModel):
    item: dict


class OrderSqsRecord(SqsRecordModel):
    body: Json[Order]

processor = SqsFifoPartialProcessor(model=OrderSqsRecord)  
tracer = Tracer()
logger = Logger()


def record_handler(record: OrderSqsRecord):
    if record.attributes.MessageGroupId == "1" or record.attributes.MessageGroupId == "4":
        raise Exception("bla")


@logger.inject_lambda_context
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
sam local invoke --event events/batch.json --skip-pull-image
Invoking app.lambda_handler (python3.10)                                                                                                                                                                                                                                                                                 
Requested to skip pulling images ...                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                         
Mounting /home/leandro/DEVEL-PYTHON/tmp/batchprocessing-fifo/.aws-sam/build/HelloWorldFunction as /var/task:ro,delegated, inside runtime container                                                                                                                                                                       
START RequestId: 3eaed65e-d6db-40e8-93b5-5666d4abe57e Version: $LATEST
START RequestId: 3eaed65e-d6db-40e8-93b5-5666d4abe57e Version: $LATEST
END RequestId: 3eaed65e-d6db-40e8-93b5-5666d4abe57e
REPORT RequestId: 3eaed65e-d6db-40e8-93b5-5666d4abe57e	Init Duration: 0.17 ms	Duration: 629.09 ms	Billed Duration: 630 ms	Memory Size: 128 MB	Max Memory Used: 128 MB	
{"batchItemFailures": [{"itemIdentifier": "f7188ab6-ae5e-47a2-83bf-7da7ca110994"}, {"itemIdentifier": "c627fea1-e548-43b4-b23b-fd33d93f96e8"}, {"itemIdentifier": "c627fea1-e548-43b4-b23b-fd33d93f9GR6"}, {"itemIdentifier": "d8b02313-c025-4c48-bd36-73555186baa2"}, {"itemIdentifier": "bf3a1af3-b849-4f9f-abc4-d254f741ea1e"}, {"itemIdentifier": "bf3a1af3-b849-4f9f-abc4-ssssa"}]}

AFTER

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import (
    BatchProcessor,
    EventType,
    SqsFifoPartialProcessor,
    process_partial_response,
)
from aws_lambda_powertools.utilities.parser import BaseModel
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
from aws_lambda_powertools.utilities.parser.types import Json
from aws_lambda_powertools.utilities.typing import LambdaContext

class Order(BaseModel):
    item: dict


class OrderSqsRecord(SqsRecordModel):
    body: Json[Order]

processor = SqsFifoPartialProcessor(return_on_first_error=False, model=OrderSqsRecord)  
tracer = Tracer()
logger = Logger()


def record_handler(record: OrderSqsRecord):
    if record.attributes.MessageGroupId == "1" or record.attributes.MessageGroupId == "4":
        raise Exception("bla")


@logger.inject_lambda_context
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
sam local invoke --event events/batch.json --skip-pull-image
Invoking app.lambda_handler (python3.10)                                                                                                                                                                                                                                                                                 
Requested to skip pulling images ...                                                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                                                                                         
Mounting /home/leandro/DEVEL-PYTHON/tmp/batchprocessing-fifo/.aws-sam/build/HelloWorldFunction as /var/task:ro,delegated, inside runtime container                                                                                                                                                                       
START RequestId: 0a839f5a-d6e2-44ca-9be5-8b9733da93a6 Version: $LATEST
END RequestId: 0a839f5a-d6e2-44ca-9be5-8b9733da93a6
REPORT RequestId: 0a839f5a-d6e2-44ca-9be5-8b9733da93a6	Init Duration: 0.32 ms	Duration: 607.58 ms	Billed Duration: 608 ms	Memory Size: 128 MB	Max Memory Used: 128 MB	
{"batchItemFailures": [{"itemIdentifier": "f7188ab6-ae5e-47a2-83bf-7da7ca110994"}, {"itemIdentifier": "c627fea1-e548-43b4-b23b-fd33d93f96e8"}, {"itemIdentifier": "d8b02313-c025-4c48-bd36-73555186baa2"}, {"itemIdentifier": "bf3a1af3-b849-4f9f-abc4-d254f741ea1e"}]}

Checklist

If your change doesn't seem to apply, please leave them unchecked.

Is this a breaking change?

RFC issue number:

Checklist:

  • Migration process documented
  • Implement warnings (if it can live side by side)

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

@boring-cyborg boring-cyborg bot added documentation Improvements or additions to documentation tests labels Mar 14, 2024
@pull-request-size pull-request-size bot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Mar 14, 2024
@github-actions github-actions bot added feature New feature or functionality and removed documentation Improvements or additions to documentation labels Mar 14, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@codecov-commenter
Copy link

codecov-commenter commented Mar 14, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 96.31%. Comparing base (e14e768) to head (90db3e0).
Report is 203 commits behind head on develop.

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@             Coverage Diff             @@
##           develop    #3954      +/-   ##
===========================================
- Coverage    96.38%   96.31%   -0.07%     
===========================================
  Files          214      215       +1     
  Lines        10030    10277     +247     
  Branches      1846     1914      +68     
===========================================
+ Hits          9667     9898     +231     
- Misses         259      271      +12     
- Partials       104      108       +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@leandrodamascena leandrodamascena marked this pull request as ready for review March 14, 2024 22:43
@leandrodamascena leandrodamascena requested review from a team and rubenfonseca March 14, 2024 22:43
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 14, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 14, 2024
@leandrodamascena
Copy link
Contributor Author

I believe we might refactor the process method a little. I'll ping you offline.

Of course, @rubenfonseca! Let's explore ways to refactor it for better readability and reduced cognitive load.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 19, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 19, 2024
@rubenfonseca
Copy link
Contributor

@leandrodamascena I've pushed a suggestion, and kinda rewrote the whole FIFO processor. Instead of overriding the process method, I override the smaller _process_record. For me the code becomes more easy to follow. Tests are green, but let me know if you like this approach.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 19, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 19, 2024
Copy link
Contributor Author

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rubenfonseca! This refactoring is impressive, and the code is much clearer now. However, I believe I've found a bug and would appreciate your insights on it.

Also, I think I need to review the tests because we didn't catch this bug in those tests.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 20, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 20, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 20, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 20, 2024
@rubenfonseca
Copy link
Contributor

@leandrodamascena ready for the next review

@leandrodamascena
Copy link
Contributor Author

@leandrodamascena ready for the next review

Thanks @rubenfonseca! Everything is working as expected, and the codebase is so clean! I've only left two minor comments regarding imports, and then we're ready to merge.

@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 20, 2024
@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 20, 2024
@boring-cyborg boring-cyborg bot added the documentation Improvements or additions to documentation label Mar 20, 2024
Copy link

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
No data about Duplication

See analysis details on SonarCloud

@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Mar 20, 2024
@leandrodamascena leandrodamascena merged commit a0141ed into aws-powertools:develop Mar 20, 2024
@leandrodamascena leandrodamascena deleted the improve-sqs-fifo-batch branch March 20, 2024 12:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or functionality size/L Denotes a PR that changes 100-499 lines, ignoring generated files. tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: Optimize Circuit-Breaking for SQS FIFO Partial Processor on records with different group IDs
5 participants