[Data] 2 phase commit for checkpointing to avoid duplicates#60983
[Data] 2 phase commit for checkpointing to avoid duplicates#60983xinyuangui2 wants to merge 2 commits intoray-project:masterfrom
Conversation
Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a robust 2-phase commit mechanism for checkpointing file-based datasinks, effectively preventing data duplication upon failures. The implementation is well-structured, separating the logic for file-based and non-file-based datasinks, and includes pre-write (prepare), write, and post-write (commit) phases. The recovery logic correctly handles the cleanup of pending checkpoints and associated orphaned data files, even for complex scenarios like partitioned outputs. Additionally, the deprecation of block_index in FilenameProvider in favor of the more deterministic get_filename_for_task is a sensible improvement. The accompanying tests are comprehensive, covering a wide range of failure scenarios and edge cases, which instills confidence in the correctness of this critical feature. I have one suggestion for a minor performance optimization.
| data_context, op | ||
| ) | ||
|
|
||
| if isinstance(datasink, _FileDatasink): |
There was a problem hiding this comment.
2PC path crashes RowBasedFileDatasink with custom FilenameProvider
Medium Severity
The isinstance(datasink, _FileDatasink) check at line 114 routes RowBasedFileDatasink (used by write_images) into the 2-phase commit path. This path calls _generate_base_filename, which invokes get_filename_for_task(). The base class default delegates to get_filename_for_block(None, ...), which raises NotImplementedError for custom FilenameProvider implementations that only override get_filename_for_row() — the only method the RowBasedFileDatasink contract actually requires. This causes a runtime crash when checkpointing is enabled with write_images and a custom provider (like the ImageFilenameProvider example in the docs).
Additional Locations (1)
| data_context, op | ||
| ) | ||
|
|
||
| if isinstance(datasink, _FileDatasink): |
There was a problem hiding this comment.
2PC path crashes RowBasedFileDatasink with custom FilenameProvider
Medium Severity
The isinstance(datasink, _FileDatasink) check at line 114 routes RowBasedFileDatasink (used by write_images) into the 2-phase commit path. This path calls _generate_base_filename, which invokes get_filename_for_task(). The base class default delegates to get_filename_for_block(None, ...), which raises NotImplementedError for custom FilenameProvider implementations that only override get_filename_for_row() — the only method the RowBasedFileDatasink contract actually requires. This causes a runtime crash when checkpointing is enabled with write_images and a custom provider (like the ImageFilenameProvider example in the docs).


Summary
Reproduce script: https://gist.github.com/xinyuangui2/0be60bb7fd629afdf462a480519be86c
Current Issue
This is a typical Ray Data pipeline:
read_parquet -> map_batches -> write. In the checkpointing introduced in #59409, every block is first written and then checkpointed. If a failure occurs after data is written but before the checkpoint is saved, there will be duplicates in the final result.Solution: 2-Phase Commit
Only for file-based datasinks (
_FileDatasinkand its subclasses) for now.Write Stage
FilenameProvider. Create pending checkpoint files ({id}.pending.parquet) and store the result filenames in the parquet metadata.{id}.pending.parquetto{id}.parquet.Restore Stage
*.pending.parquet).Correctness Guarantees
The 2-phase commit ensures correctness because:
Other Changes
Deprecate
block_indexParameter inFilenameProviderThe
block_indexparameter inFilenameProvider.get_filename_for_block()is always 0 in_FileDatasinkbecause datasinks merge all blocks into one before writing. Additionally, this parameter makes fetching filenames inside pending checkpoints tricky (we need to predict the filename before writing).A new method
get_filename_for_task()is introduced without theblock_indexparameter. CustomFilenameProviderimplementations should not depend onblock_indexto ensure checkpointing correctness.Release tests (to add)