Skip to content

[Data] 2 phase commit for checkpointing to avoid duplicates#60983

Closed
xinyuangui2 wants to merge 2 commits intoray-project:masterfrom
xinyuangui2:checkpointing-fault-tolerance
Closed

[Data] 2 phase commit for checkpointing to avoid duplicates#60983
xinyuangui2 wants to merge 2 commits intoray-project:masterfrom
xinyuangui2:checkpointing-fault-tolerance

Conversation

@xinyuangui2
Copy link
Copy Markdown
Contributor

Summary

Reproduce script: https://gist.github.com/xinyuangui2/0be60bb7fd629afdf462a480519be86c

Current Issue

This is a typical Ray Data pipeline: read_parquet -> map_batches -> write. In the checkpointing introduced in #59409, every block is first written and then checkpointed. If a failure occurs after data is written but before the checkpoint is saved, there will be duplicates in the final result.

Solution: 2-Phase Commit

Only for file-based datasinks (_FileDatasink and its subclasses) for now.

Write Stage

  1. Fetch the result filenames using FilenameProvider. Create pending checkpoint files ({id}.pending.parquet) and store the result filenames in the parquet metadata.
  2. Write data to the output path.
  3. Commit the pending checkpoint files by renaming {id}.pending.parquet to {id}.parquet.

Restore Stage

  1. Find any pending parquet files (*.pending.parquet).
  2. Fetch the result filenames from the parquet metadata.
  3. Delete any files that match the filenames (pattern-based matching).
  4. Delete the pending parquet files.
  5. Continue with the existing checkpoint loading stage.

Correctness Guarantees

The 2-phase commit ensures correctness because:

  1. Failure at any step of the write stage doesn't affect the final result - If failure occurs before commit, the pending checkpoint is cleaned up during restore.
  2. Restore stage is idempotent - Running restore multiple times produces the same result.

Other Changes

Deprecate block_index Parameter in FilenameProvider

The block_index parameter in FilenameProvider.get_filename_for_block() is always 0 in _FileDatasink because datasinks merge all blocks into one before writing. Additionally, this parameter makes fetching filenames inside pending checkpoints tricky (we need to predict the filename before writing).

A new method get_filename_for_task() is introduced without the block_index parameter. Custom FilenameProvider implementations should not depend on block_index to ensure checkpointing correctness.

Release tests (to add)

Signed-off-by: xgui <xgui@anyscale.com>
Signed-off-by: xgui <xgui@anyscale.com>
@xinyuangui2 xinyuangui2 requested a review from a team as a code owner February 11, 2026 21:03
@xinyuangui2 xinyuangui2 added the go add ONLY when ready to merge, run all tests label Feb 11, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a robust 2-phase commit mechanism for checkpointing file-based datasinks, effectively preventing data duplication upon failures. The implementation is well-structured, separating the logic for file-based and non-file-based datasinks, and includes pre-write (prepare), write, and post-write (commit) phases. The recovery logic correctly handles the cleanup of pending checkpoints and associated orphaned data files, even for complex scenarios like partitioned outputs. Additionally, the deprecation of block_index in FilenameProvider in favor of the more deterministic get_filename_for_task is a sensible improvement. The accompanying tests are comprehensive, covering a wide range of failure scenarios and edge cases, which instills confidence in the correctness of this critical feature. I have one suggestion for a minor performance optimization.

Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

data_context, op
)

if isinstance(datasink, _FileDatasink):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2PC path crashes RowBasedFileDatasink with custom FilenameProvider

Medium Severity

The isinstance(datasink, _FileDatasink) check at line 114 routes RowBasedFileDatasink (used by write_images) into the 2-phase commit path. This path calls _generate_base_filename, which invokes get_filename_for_task(). The base class default delegates to get_filename_for_block(None, ...), which raises NotImplementedError for custom FilenameProvider implementations that only override get_filename_for_row() — the only method the RowBasedFileDatasink contract actually requires. This causes a runtime crash when checkpointing is enabled with write_images and a custom provider (like the ImageFilenameProvider example in the docs).

Additional Locations (1)

Fix in Cursor Fix in Web

data_context, op
)

if isinstance(datasink, _FileDatasink):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2PC path crashes RowBasedFileDatasink with custom FilenameProvider

Medium Severity

The isinstance(datasink, _FileDatasink) check at line 114 routes RowBasedFileDatasink (used by write_images) into the 2-phase commit path. This path calls _generate_base_filename, which invokes get_filename_for_task(). The base class default delegates to get_filename_for_block(None, ...), which raises NotImplementedError for custom FilenameProvider implementations that only override get_filename_for_row() — the only method the RowBasedFileDatasink contract actually requires. This causes a runtime crash when checkpointing is enabled with write_images and a custom provider (like the ImageFilenameProvider example in the docs).

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant