Skip to content

[Data] Add Iceberg-backend in Checkpointing#61316

Closed
WinkerDu wants to merge 7 commits intoray-project:masterfrom
WinkerDu:github-master-iceberg-checkpoint-3
Closed

[Data] Add Iceberg-backend in Checkpointing#61316
WinkerDu wants to merge 7 commits intoray-project:masterfrom
WinkerDu:github-master-iceberg-checkpoint-3

Conversation

@WinkerDu
Copy link
Copy Markdown

Description

Implement Iceberg-backend Ray data checkpoint for this PR #59870

Related issues

Relates to #59870

Additional information

  1. Introduce new CheckpointBackend ICEBERG
  2. Introduce new param as catalog_kwargs when initializing CheckpointConfig

More usage examples can refers for python/ray/data/tests/test_iceberg_checkpoint.py

Change-Id: Iee31177fe923a779b958ad18aeea738adbf273f7
@WinkerDu WinkerDu requested a review from a team as a code owner February 25, 2026 18:40
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an Iceberg backend for Ray Data checkpointing, which is a significant feature enhancement. The changes include adding a new ICEBERG checkpoint backend type, implementing the IcebergCheckpointWriter, and updating CheckpointLoader and BatchBasedCheckpointFilter to handle Iceberg-specific logic. The configuration has also been updated to accept catalog_kwargs. The addition of an end-to-end test in test_iceberg_checkpoint.py is excellent for ensuring correctness. Overall, the implementation is well-structured. I've identified one critical issue with exception handling and one opportunity to reduce code duplication for better maintainability.

Change-Id: Ib43c5ef4160fba7b567e8b970edad09568f9aa48
@WinkerDu
Copy link
Copy Markdown
Author

@owenowenisme Please take a review, thx :)

@ray-gardener ray-gardener bot added the community-contribution Contributed by the community label Feb 25, 2026
@TheR1sing3un
Copy link
Copy Markdown

@owenowenisme Please take a review, thx :)

I'm not sure if there are some deviations in my understanding. I think this issue: #59869 aims to apply the checkpoint mechanism that uses iceberg as the pipeline sink. In other words, it means that the checkpoint capability can be utilized in the read and write operations of more data sources. Please confirm. cc @owenowenisme

@WinkerDu
Copy link
Copy Markdown
Author

@TheR1sing3un The existing Ray Data checkpoint mechanism already supports using file-based checkpoints for the Iceberg read/write interfaces. You can refer to the unit tests in this PR for details. Therefore, my understanding is that this issue aims to implement an Iceberg-based checkpoint mechanism.
@owenowenisme Please confirm.

@owenowenisme
Copy link
Copy Markdown
Member

@WinkerDu Correct. The current checkpoint mechanism only supports file-based data sinks. For others, such as Iceberg, we need a different approach because Iceberg requires an explicit commit process.

@owenowenisme owenowenisme added the data Ray Data-related issues label Feb 27, 2026
@WinkerDu
Copy link
Copy Markdown
Author

@WinkerDu Correct. The current checkpoint mechanism only supports file-based data sinks. For others, such as Iceberg, we need a different approach because Iceberg requires an explicit commit process.

@owenowenisme Is the implementation of this patch consistent with the expected behavior?

  1. Introduce new CheckpointBackend ICEBERG
  2. Introduce new param as catalog_kwargs when initializing CheckpointConfig

@xinyuangui2
Copy link
Copy Markdown
Contributor

Could you briefly talk about the design in the description? Thanks

@WinkerDu
Copy link
Copy Markdown
Author

Could you briefly talk about the design in the description? Thanks

@xinyuangui2 Sure.

  1. Overview

The primary objective of this commit is to introduce Iceberg Backend support for Ray Data's Checkpoint mechanism.

Previously, Ray Data Checkpoints primarily supported file-based storage (e.g., Parquet files). Introducing Iceberg as a backend leverages its table format features (such as ACID transactions and metadata management), making checkpoint state management more robust and standardized

  1. Core Flowchart
graph TD
    subgraph Configuration [Configuration Phase]
        A[User Configures CheckpointConfig] -->|backend=ICEBERG| B(CheckpointConfig Object)
        B -->|Contains| C{catalog_kwargs}
        C -->|Passes To| D[Writer & Filter]
    end
Loading
graph TD
    subgraph Write_Phase [Write Phase - IcebergCheckpointWriter]
        D[Task Processes Block] --> E{Checkpoint Enabled?}
        E -- Yes --> F[IcebergCheckpointWriter.write_block_checkpoint]
        F -->|First Time| G[Load or Create Iceberg Table]
        G -->|Schema: id_col int64| H[(Iceberg Table)]
        F -->|Extract ID Column| I[Build ID Block]
        I --> J[IcebergDatasink.write]
        J -->|Immediate Commit| H
    end
Loading
graph TD
    subgraph Read_Resume_Phase [Read/Resume Phase - CheckpointFilter]
        K[Job Start/Resume] --> L[BatchBasedCheckpointFilter]
        L --> M[CheckpointLoader.load_checkpoint]
        M -->|backend=ICEBERG| N[pyiceberg.catalog.load_table]
        N --> O[Scan & Select ID Column]
        O --> P[Convert to Ray Dataset]
        Q[Input Data Source] --> R[Filter Logic]
        P --> R
        R -->|Filter Processed IDs| S[Unprocessed Data]
    end
Loading
  1. Detailed Steps
    Step 1: Configuration Support
  • Action : Enable ICEBERG backend and catalog_kwargs in CheckpointConfig .
  • File : interfaces.py
  • Logic : Added CheckpointBackend.ICEBERG enum and catalog_kwargs field. Modified backend inference to skip filesystem detection for Iceberg, delegating storage handling to the Iceberg SDK.

Step 2: Checkpoint Writing (Iceberg Table)

  • Action : Implement writing processed IDs to an Iceberg table.
  • File : checkpoint_writer.py
  • Logic : Implemented IcebergCheckpointWriter. It initializes the Iceberg table (auto-creating with an ID column if missing) and uses IcebergDatasink to write and commit ID blocks directly from workers.

Step 3: Checkpoint Reading & Filtering

  • Action : Load IDs from Iceberg to filter input data.
  • File : checkpoint_filter.py
  • Logic : Updated CheckpointLoader to scan the Iceberg table (ID column only) via pyiceberg, converting it to a Ray Dataset for filtering. Added drop_table logic for checkpoint deletion.

Step 4: Testing & Stability

  • Action : Verify end-to-end flow and ensure thread safety.
  • Files : test_iceberg_checkpoint.py, iceberg_datasource.py
  • Logic : Added a full test cycle (Write -> Checkpoint -> New Data -> Filter). Fixed potential side-effects by safely copying kwargs in IcebergDatasource.

Change-Id: Ib148e684b6e193a307b0a3ddd02d5627ac17209a
@WinkerDu WinkerDu changed the title [Data] Add Iceberg-backend in Checkpointing (#59870) [Data] Add Iceberg-backend in Checkpointing Mar 3, 2026
Change-Id: I058a20dad17a1d1653befc7d52ecf29709a53e58
Change-Id: Iab693a43fbe0ff3e07f42a4039c6e76b232a1e32
Change-Id: Ie91b8235fd9d61b1521e204b66241d395b9a47ce
Change-Id: Ib0fed415eb864e140c6aaee82d584d945f0c7402
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

) -> Tuple[CheckpointBackend, Optional["pyarrow.fs.FileSystem"]]:
try:
if override_backend == CheckpointBackend.ICEBERG:
return CheckpointBackend.ICEBERG, None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New ICEBERG backend requires use of deprecated parameter

Medium Severity

The newly added CheckpointBackend.ICEBERG can only be activated by passing override_backend=CheckpointBackend.ICEBERG, but override_backend emits a deprecation warning stating it "will be removed in August 2025." There is no alternative path (e.g., auto-inference from catalog_kwargs) to select the ICEBERG backend without triggering this warning. This means every user of the new Iceberg checkpoint feature sees a confusing deprecation notice for a newly introduced capability, and if override_backend is actually removed, the ICEBERG backend becomes unusable.

Additional Locations (1)

Fix in Cursor Fix in Web

@WinkerDu
Copy link
Copy Markdown
Author

close this pr for wrong understanding for #59870 . Explicit Iceberg commit process with checkpoint support can be seen in #61753

@WinkerDu WinkerDu closed this Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants