Skip to content

[Data] Add checkpoint support for Iceberg read/write#61753

Open
WinkerDu wants to merge 10 commits intoray-project:masterfrom
WinkerDu:master-iceberg-checkpoint-3
Open

[Data] Add checkpoint support for Iceberg read/write#61753
WinkerDu wants to merge 10 commits intoray-project:masterfrom
WinkerDu:master-iceberg-checkpoint-3

Conversation

@WinkerDu
Copy link
Copy Markdown

Description

This PR adds checkpoint integration for Ray Data Iceberg read/write to improve fault tolerance during distributed execution.

On the write path, we persist row-level checkpoint IDs (via CheckpointConfig.id_column) and also checkpoint IcebergWriteResult metadata per write task. If the job fails after data files are written but before the driver commit completes, a retry can load all previously checkpointed write results and perform a single unified Iceberg commit, avoiding duplicate commits and ensuring consistency.

On the read path, we integrate the existing checkpoint filter so read tasks can skip already-processed rows by id_column. For Iceberg reads, we also support filtering planned scan files via checkpoint_path_partition_filter to reduce unnecessary work during restore.

Related issues

Fixes #59870

Additional information

  • Main changes:
    • Write planning adds a checkpoint-writing transform that:
      • validates id_column exists in blocks
      • writes {uuid}.parquet with checkpointed IDs
      • writes {uuid}.meta.pkl containing the write task’s IcebergWriteResult for recovery
    • IcebergDatasink.on_write_complete() loads checkpointed IcebergWriteResult entries (when enabled), merges them with current results, and commits once.
    • IcebergDatasource.get_read_tasks() optionally filters planned files when checkpoint_path_partition_filter is provided.
  • Tests:
    • Adds/updates python/ray/data/tests/test_checkpoint_for_iceberg.py to cover:
      • checkpoint ID + write-result metadata persistence and loading
      • failure-before-commit recovery with unified commit
      • UPSERT recovery including upsert_keys handling

@WinkerDu WinkerDu requested a review from a team as a code owner March 15, 2026 16:36
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces checkpointing support for Iceberg read and write operations in Ray Data, which is a great enhancement for fault tolerance. The implementation for the write path correctly checkpoints IcebergWriteResult metadata, allowing for recovery and a unified commit on retry. The read path is also updated to filter data based on checkpoints. The changes are well-structured and include comprehensive tests for recovery scenarios. I have a few minor suggestions to improve code clarity and remove dead code.

@WinkerDu WinkerDu force-pushed the master-iceberg-checkpoint-3 branch from 10735a0 to e933ed1 Compare March 15, 2026 19:03
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Mar 15, 2026
@WinkerDu WinkerDu force-pushed the master-iceberg-checkpoint-3 branch from e933ed1 to 04f2433 Compare March 15, 2026 20:05
Change-Id: Iad5b113a2f386dda7f8ae8d8970aafad4728b768
@WinkerDu WinkerDu force-pushed the master-iceberg-checkpoint-3 branch from 04f2433 to a1bf03f Compare March 19, 2026 09:21
Change-Id: Ib7739d249c21c6ab119f26907634a25022c898ab
@WinkerDu
Copy link
Copy Markdown
Author

@owenowenisme @xinyuangui2 please take a review, thx :)

Change-Id: I18877c3ddf904194f738d78f444c5dba5926af2c
Change-Id: I7c73719adf25b3c0438f228916b8ce33c18bf554
Change-Id: Iede63b1930b5252c9847d031dde0445e533e1a16
Change-Id: I157fb7bcf951418fb5d8099df35b1e106508fa35
…checkpoint-4

Change-Id: I1a07f000f08a8a1a420f672b175a8da21664f686

# Conflicts:
#	python/ray/data/_internal/planner/checkpoint/plan_write_op.py
#	python/ray/data/checkpoint/checkpoint_writer.py
Change-Id: I95b5fa1a8e7c965cf199ffa32341cf02c4b2f5a8
@owenowenisme owenowenisme self-assigned this Mar 24, 2026
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @WinkerDu, thanks for your contribution!

I think we should take a step back before digging into the current design.

A few questions based on the PR description:

  1. Why checkpoint IcebergWriteResult metadata per write task? The checkpoint of .meta.pkl happens in the same task, on the same worker, at nearly the same time as the data file write. If the worker can fail, it can fail at any point — including during or before the metadata checkpoint write. So the metadata checkpoint doesn't provide a stronger durability guarantee than the data file write itself. This approach only covers the narrow case where the driver crashes after all workers fully complete but before on_write_complete() commits to the Iceberg catalog.

  2. Can we just clean up uncommitted files instead?(Like rollback) If a job fails before the Iceberg catalog commit, the written data files are orphans — they exist on storage but aren't referenced by any snapshot, so they're invisible to queries. On retry, we can just use the existing checkpoint IDs (.parquet files) to skip already-committed rows, and either proactively delete orphaned files or let Iceberg's built-in remove_orphan_files handle cleanup.

This would be significantly simpler while still achieving the same correctness guarantees.

Curious to hear your thoughts — there may be context I'm missing that motivated this design.

@WinkerDu
Copy link
Copy Markdown
Author

Hey @WinkerDu, thanks for your contribution!

I think we should take a step back before digging into the current design.

A few questions based on the PR description:

  1. Why checkpoint IcebergWriteResult metadata per write task? The checkpoint of .meta.pkl happens in the same task, on the same worker, at nearly the same time as the data file write. If the worker can fail, it can fail at any point — including during or before the metadata checkpoint write. So the metadata checkpoint doesn't provide a stronger durability guarantee than the data file write itself. This approach only covers the narrow case where the driver crashes after all workers fully complete but before on_write_complete() commits to the Iceberg catalog.
  2. Can we just clean up uncommitted files instead?(Like rollback) If a job fails before the Iceberg catalog commit, the written data files are orphans — they exist on storage but aren't referenced by any snapshot, so they're invisible to queries. On retry, we can just use the existing checkpoint IDs (.parquet files) to skip already-committed rows, and either proactively delete orphaned files or let Iceberg's built-in remove_orphan_files handle cleanup.

This would be significantly simpler while still achieving the same correctness guarantees.

Curious to hear your thoughts — there may be context I'm missing that motivated this design.

@owenowenisme Thank you for the reply.

  • Iceberg writes are a two-step protocol: workers write data files to storage, then the driver performs a catalog commit that references those files in a new snapshot. Until the catalog commit happens, files are orphans and are invisible to queries.

  • Checkpoint IDs only capture which input rows were processed, not which Iceberg data files were produced (i.e., the DataFile objects / file metadata required for the commit).

  • In the crash-before-commit scenario, it is possible to have:

    • data files already written to storage, and
    • checkpoint IDs already persisted,
    • but no completed Iceberg catalog commit.
  • On retry, if we “just use existing checkpoint IDs to skip rows”, we will not reprocess those rows, which means we also will not regenerate the corresponding DataFile metadata for them.

  • Without the DataFile metadata, the driver has nothing to commit for the skipped rows, so those previously written orphan files remain unreferenced by any snapshot and never become visible. The job can succeed while the table is missing data, i.e., silent data loss.

  • Therefore, under the existing Iceberg write + commit semantics, “skip already-processed checkpoint IDs” by itself is not sufficient: skipping work must be paired with a way to recover the exact set of data files that should be committed.

@WinkerDu
Copy link
Copy Markdown
Author

@owenowenisme Did you just leave a comment and edit or delete it afterward? I couldn’t find it in the PR. :-)

Copy link
Copy Markdown
Member

@owenowenisme owenowenisme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to modify the datasource/datasink to support checkpointing here. When we added checkpoint support for parquet (PRs #59409 and #61821), the parquet datasource and datasink were left completely untouched — the only change was a minor filename API refactor to make filenames deterministic. All checkpoint logic lives in planner/checkpoint/ and checkpoint/, implemented as pre/post-write transforms that wrap around the existing write operator. The datasinks remain entirely unaware of checkpointing.

This PR breaks that pattern by adding checkpoint-specific code into three places:

  1. IcebergDatasink.write() — builds per-block write results and stores them in ctx.kwargs
  2. IcebergDatasink.on_write_complete() — loads .meta.pkl checkpoint files and merges with write returns
  3. IcebergDatasource.get_read_tasks() — filters plan files using a checkpoint partition filter

All of this can be done without modifying the datasource/datasink:

  • Read-side filtering: plan_read_op.py already injects a post-read transform that filters out checkpointed row IDs — no need to touch get_read_tasks().
  • Write-side checkpointing: plan_write_op.py already injects pre/post-write transforms. The IcebergWriteResult (with DataFile paths) is available in write_returns after the write completes — the post-write transform can extract what it needs from there.
  • Recovery in on_write_complete: This can be handled by a checkpoint-aware wrapper at the planner level that merges recovered results into the WriteResult before calling the original on_write_complete. The datasink just sees a complete WriteResult and does its normal schema reconciliation and catalog commit.

Checkpoint logic should stay where it belongs — datasources and datasinks should be unaware of its existence.

return checkpoint_ds.sort(self.id_column)


class IcebergCheckpointLoader:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CAn we create a new file (something like iceberg_checkpoint.py) and move iceberg related logic there and maybe try to extend the base checkpoint mechanism? The checkpoint filter here should be general.

We can also move iceberg_datasource.py & iceberg_datasink.py under the folder _internal/datasource/iceberg

return

file_name = f"{uuid.uuid4()}.parquet"
file_id = str(uuid.uuid4())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem here, we dont want to make the checkpoint_writer iceberg specific, try to extend on top of it

# Serialize first so pickling errors don't occur after the parquet file
# has already been persisted.
#
# Write metadata before parquet to avoid a "parquet-only" partial state:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the error happen after .meta.pkl is created and before or during the checkpoint parquet is written? I think it will give us duplicate pkl file, why not just fuse this behavior with the current checkpoint 2pc?

WinkerDu added 2 commits April 6, 2026 20:36
Change-Id: I5406b48fb1f4e2bcd346e60e6daa5c18e254db84
…checkpoint-3

Change-Id: Ifbfc99e793db51dbae98ea1e81ec1183793cde31
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 10adb61. Configure here.


Uses class name check to avoid importing IcebergDatasink at module level.
"""
return type(datasink).__name__ in ("IcebergDatasink", "_FailOnceIcebergDatasink")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Production code hardcodes test-only class name

High Severity

_is_iceberg_datasink checks type(datasink).__name__ in ("IcebergDatasink", "_FailOnceIcebergDatasink"). The _FailOnceIcebergDatasink name is a test-only class from the test file. More critically, type().__name__ only matches the exact class — any user-defined or future subclass of IcebergDatasink won't be recognized, silently skipping the _wrap_iceberg_on_write_complete call and breaking checkpoint recovery for those subclasses. Checking the MRO (e.g. any(c.__name__ == "IcebergDatasink" for c in type(datasink).__mro__)) would correctly handle all subclasses without embedding test class names in production code.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 10adb61. Configure here.

)
return original_on_write_complete(write_result)

datasink.on_write_complete = wrapped_on_write_complete
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repeated wrapping accumulates on datasink reuse across retries

Medium Severity

_wrap_iceberg_on_write_complete monkey-patches datasink.on_write_complete each time plan_write_op_with_checkpoint_writer is called. When a user reuses the same datasink instance across retries (as the test does), each execution adds another wrapper layer. On the Nth retry, merge_recovered_iceberg_write_results is called N times per commit, each redundantly loading all .meta.pkl files from the checkpoint directory. There's no guard against re-wrapping an already-wrapped method.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 10adb61. Configure here.

)
all_ids.update(table[id_col].to_pylist())

return all_ids
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused get_checkpoint_ids method only used in tests

Low Severity

The get_checkpoint_ids method on IcebergCheckpointLoader is defined in production code but only called from the test file test_checkpoint_for_iceberg.py. No production code path invokes it. The PR reviewer also noted this concern ("this function is only used in UT").

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 10adb61. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Support Iceberg in Checkpointing

2 participants