[Data] Add checkpoint support for Iceberg read/write#61753
[Data] Add checkpoint support for Iceberg read/write#61753WinkerDu wants to merge 10 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces checkpointing support for Iceberg read and write operations in Ray Data, which is a great enhancement for fault tolerance. The implementation for the write path correctly checkpoints IcebergWriteResult metadata, allowing for recovery and a unified commit on retry. The read path is also updated to filter data based on checkpoints. The changes are well-structured and include comprehensive tests for recovery scenarios. I have a few minor suggestions to improve code clarity and remove dead code.
10735a0 to
e933ed1
Compare
e933ed1 to
04f2433
Compare
Change-Id: Iad5b113a2f386dda7f8ae8d8970aafad4728b768
04f2433 to
a1bf03f
Compare
|
@owenowenisme @xinyuangui2 please take a review, thx :) |
Change-Id: I18877c3ddf904194f738d78f444c5dba5926af2c
Change-Id: I7c73719adf25b3c0438f228916b8ce33c18bf554
Change-Id: Iede63b1930b5252c9847d031dde0445e533e1a16
Change-Id: I157fb7bcf951418fb5d8099df35b1e106508fa35
…checkpoint-4 Change-Id: I1a07f000f08a8a1a420f672b175a8da21664f686 # Conflicts: # python/ray/data/_internal/planner/checkpoint/plan_write_op.py # python/ray/data/checkpoint/checkpoint_writer.py
Change-Id: I95b5fa1a8e7c965cf199ffa32341cf02c4b2f5a8
owenowenisme
left a comment
There was a problem hiding this comment.
Hey @WinkerDu, thanks for your contribution!
I think we should take a step back before digging into the current design.
A few questions based on the PR description:
-
Why checkpoint
IcebergWriteResultmetadata per write task? The checkpoint of.meta.pklhappens in the same task, on the same worker, at nearly the same time as the data file write. If the worker can fail, it can fail at any point — including during or before the metadata checkpoint write. So the metadata checkpoint doesn't provide a stronger durability guarantee than the data file write itself. This approach only covers the narrow case where the driver crashes after all workers fully complete but beforeon_write_complete()commits to the Iceberg catalog. -
Can we just clean up uncommitted files instead?(Like rollback) If a job fails before the Iceberg catalog commit, the written data files are orphans — they exist on storage but aren't referenced by any snapshot, so they're invisible to queries. On retry, we can just use the existing checkpoint IDs (
.parquetfiles) to skip already-committed rows, and either proactively delete orphaned files or let Iceberg's built-inremove_orphan_fileshandle cleanup.
This would be significantly simpler while still achieving the same correctness guarantees.
Curious to hear your thoughts — there may be context I'm missing that motivated this design.
@owenowenisme Thank you for the reply.
|
|
@owenowenisme Did you just leave a comment and edit or delete it afterward? I couldn’t find it in the PR. :-) |
owenowenisme
left a comment
There was a problem hiding this comment.
I don't think we need to modify the datasource/datasink to support checkpointing here. When we added checkpoint support for parquet (PRs #59409 and #61821), the parquet datasource and datasink were left completely untouched — the only change was a minor filename API refactor to make filenames deterministic. All checkpoint logic lives in planner/checkpoint/ and checkpoint/, implemented as pre/post-write transforms that wrap around the existing write operator. The datasinks remain entirely unaware of checkpointing.
This PR breaks that pattern by adding checkpoint-specific code into three places:
IcebergDatasink.write()— builds per-block write results and stores them inctx.kwargsIcebergDatasink.on_write_complete()— loads.meta.pklcheckpoint files and merges with write returnsIcebergDatasource.get_read_tasks()— filters plan files using a checkpoint partition filter
All of this can be done without modifying the datasource/datasink:
- Read-side filtering:
plan_read_op.pyalready injects a post-read transform that filters out checkpointed row IDs — no need to touchget_read_tasks(). - Write-side checkpointing:
plan_write_op.pyalready injects pre/post-write transforms. TheIcebergWriteResult(withDataFilepaths) is available inwrite_returnsafter the write completes — the post-write transform can extract what it needs from there. - Recovery in
on_write_complete: This can be handled by a checkpoint-aware wrapper at the planner level that merges recovered results into theWriteResultbefore calling the originalon_write_complete. The datasink just sees a completeWriteResultand does its normal schema reconciliation and catalog commit.
Checkpoint logic should stay where it belongs — datasources and datasinks should be unaware of its existence.
| return checkpoint_ds.sort(self.id_column) | ||
|
|
||
|
|
||
| class IcebergCheckpointLoader: |
There was a problem hiding this comment.
CAn we create a new file (something like iceberg_checkpoint.py) and move iceberg related logic there and maybe try to extend the base checkpoint mechanism? The checkpoint filter here should be general.
We can also move iceberg_datasource.py & iceberg_datasink.py under the folder _internal/datasource/iceberg
| return | ||
|
|
||
| file_name = f"{uuid.uuid4()}.parquet" | ||
| file_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
Same problem here, we dont want to make the checkpoint_writer iceberg specific, try to extend on top of it
| # Serialize first so pickling errors don't occur after the parquet file | ||
| # has already been persisted. | ||
| # | ||
| # Write metadata before parquet to avoid a "parquet-only" partial state: |
There was a problem hiding this comment.
What if the error happen after .meta.pkl is created and before or during the checkpoint parquet is written? I think it will give us duplicate pkl file, why not just fuse this behavior with the current checkpoint 2pc?
Change-Id: I5406b48fb1f4e2bcd346e60e6daa5c18e254db84
…checkpoint-3 Change-Id: Ifbfc99e793db51dbae98ea1e81ec1183793cde31
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Reviewed by Cursor Bugbot for commit 10adb61. Configure here.
|
|
||
| Uses class name check to avoid importing IcebergDatasink at module level. | ||
| """ | ||
| return type(datasink).__name__ in ("IcebergDatasink", "_FailOnceIcebergDatasink") |
There was a problem hiding this comment.
Production code hardcodes test-only class name
High Severity
_is_iceberg_datasink checks type(datasink).__name__ in ("IcebergDatasink", "_FailOnceIcebergDatasink"). The _FailOnceIcebergDatasink name is a test-only class from the test file. More critically, type().__name__ only matches the exact class — any user-defined or future subclass of IcebergDatasink won't be recognized, silently skipping the _wrap_iceberg_on_write_complete call and breaking checkpoint recovery for those subclasses. Checking the MRO (e.g. any(c.__name__ == "IcebergDatasink" for c in type(datasink).__mro__)) would correctly handle all subclasses without embedding test class names in production code.
Reviewed by Cursor Bugbot for commit 10adb61. Configure here.
| ) | ||
| return original_on_write_complete(write_result) | ||
|
|
||
| datasink.on_write_complete = wrapped_on_write_complete |
There was a problem hiding this comment.
Repeated wrapping accumulates on datasink reuse across retries
Medium Severity
_wrap_iceberg_on_write_complete monkey-patches datasink.on_write_complete each time plan_write_op_with_checkpoint_writer is called. When a user reuses the same datasink instance across retries (as the test does), each execution adds another wrapper layer. On the Nth retry, merge_recovered_iceberg_write_results is called N times per commit, each redundantly loading all .meta.pkl files from the checkpoint directory. There's no guard against re-wrapping an already-wrapped method.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 10adb61. Configure here.
| ) | ||
| all_ids.update(table[id_col].to_pylist()) | ||
|
|
||
| return all_ids |
There was a problem hiding this comment.
Unused get_checkpoint_ids method only used in tests
Low Severity
The get_checkpoint_ids method on IcebergCheckpointLoader is defined in production code but only called from the test file test_checkpoint_for_iceberg.py. No production code path invokes it. The PR reviewer also noted this concern ("this function is only used in UT").
Reviewed by Cursor Bugbot for commit 10adb61. Configure here.


Description
This PR adds checkpoint integration for Ray Data Iceberg read/write to improve fault tolerance during distributed execution.
On the write path, we persist row-level checkpoint IDs (via
CheckpointConfig.id_column) and also checkpointIcebergWriteResultmetadata per write task. If the job fails after data files are written but before the driver commit completes, a retry can load all previously checkpointed write results and perform a single unified Iceberg commit, avoiding duplicate commits and ensuring consistency.On the read path, we integrate the existing checkpoint filter so read tasks can skip already-processed rows by
id_column. For Iceberg reads, we also support filtering planned scan files viacheckpoint_path_partition_filterto reduce unnecessary work during restore.Related issues
Fixes #59870
Additional information
id_columnexists in blocks{uuid}.parquetwith checkpointed IDs{uuid}.meta.pklcontaining the write task’sIcebergWriteResultfor recoveryIcebergDatasink.on_write_complete()loads checkpointedIcebergWriteResultentries (when enabled), merges them with current results, and commits once.IcebergDatasource.get_read_tasks()optionally filters planned files whencheckpoint_path_partition_filteris provided.python/ray/data/tests/test_checkpoint_for_iceberg.pyto cover:upsert_keyshandling