[Data] Add Iceberg-backend in Checkpointing#61316
[Data] Add Iceberg-backend in Checkpointing#61316WinkerDu wants to merge 7 commits intoray-project:masterfrom
Conversation
Change-Id: Iee31177fe923a779b958ad18aeea738adbf273f7
There was a problem hiding this comment.
Code Review
This pull request introduces an Iceberg backend for Ray Data checkpointing, which is a significant feature enhancement. The changes include adding a new ICEBERG checkpoint backend type, implementing the IcebergCheckpointWriter, and updating CheckpointLoader and BatchBasedCheckpointFilter to handle Iceberg-specific logic. The configuration has also been updated to accept catalog_kwargs. The addition of an end-to-end test in test_iceberg_checkpoint.py is excellent for ensuring correctness. Overall, the implementation is well-structured. I've identified one critical issue with exception handling and one opportunity to reduce code duplication for better maintainability.
Change-Id: Ib43c5ef4160fba7b567e8b970edad09568f9aa48
|
@owenowenisme Please take a review, thx :) |
I'm not sure if there are some deviations in my understanding. I think this issue: #59869 aims to apply the checkpoint mechanism that uses iceberg as the pipeline sink. In other words, it means that the checkpoint capability can be utilized in the read and write operations of more data sources. Please confirm. cc @owenowenisme |
|
@TheR1sing3un The existing Ray Data checkpoint mechanism already supports using file-based checkpoints for the Iceberg read/write interfaces. You can refer to the unit tests in this PR for details. Therefore, my understanding is that this issue aims to implement an Iceberg-based checkpoint mechanism. |
|
@WinkerDu Correct. The current checkpoint mechanism only supports file-based data sinks. For others, such as Iceberg, we need a different approach because Iceberg requires an explicit commit process. |
@owenowenisme Is the implementation of this patch consistent with the expected behavior?
|
|
Could you briefly talk about the design in the description? Thanks |
@xinyuangui2 Sure.
The primary objective of this commit is to introduce Iceberg Backend support for Ray Data's Checkpoint mechanism. Previously, Ray Data Checkpoints primarily supported file-based storage (e.g., Parquet files). Introducing Iceberg as a backend leverages its table format features (such as ACID transactions and metadata management), making checkpoint state management more robust and standardized
graph TD
subgraph Configuration [Configuration Phase]
A[User Configures CheckpointConfig] -->|backend=ICEBERG| B(CheckpointConfig Object)
B -->|Contains| C{catalog_kwargs}
C -->|Passes To| D[Writer & Filter]
end
graph TD
subgraph Write_Phase [Write Phase - IcebergCheckpointWriter]
D[Task Processes Block] --> E{Checkpoint Enabled?}
E -- Yes --> F[IcebergCheckpointWriter.write_block_checkpoint]
F -->|First Time| G[Load or Create Iceberg Table]
G -->|Schema: id_col int64| H[(Iceberg Table)]
F -->|Extract ID Column| I[Build ID Block]
I --> J[IcebergDatasink.write]
J -->|Immediate Commit| H
end
graph TD
subgraph Read_Resume_Phase [Read/Resume Phase - CheckpointFilter]
K[Job Start/Resume] --> L[BatchBasedCheckpointFilter]
L --> M[CheckpointLoader.load_checkpoint]
M -->|backend=ICEBERG| N[pyiceberg.catalog.load_table]
N --> O[Scan & Select ID Column]
O --> P[Convert to Ray Dataset]
Q[Input Data Source] --> R[Filter Logic]
P --> R
R -->|Filter Processed IDs| S[Unprocessed Data]
end
Step 2: Checkpoint Writing (Iceberg Table)
Step 3: Checkpoint Reading & Filtering
Step 4: Testing & Stability
|
Change-Id: I058a20dad17a1d1653befc7d52ecf29709a53e58
Change-Id: Iab693a43fbe0ff3e07f42a4039c6e76b232a1e32
Change-Id: Ie91b8235fd9d61b1521e204b66241d395b9a47ce
Change-Id: Ib0fed415eb864e140c6aaee82d584d945f0c7402
| ) -> Tuple[CheckpointBackend, Optional["pyarrow.fs.FileSystem"]]: | ||
| try: | ||
| if override_backend == CheckpointBackend.ICEBERG: | ||
| return CheckpointBackend.ICEBERG, None |
There was a problem hiding this comment.
New ICEBERG backend requires use of deprecated parameter
Medium Severity
The newly added CheckpointBackend.ICEBERG can only be activated by passing override_backend=CheckpointBackend.ICEBERG, but override_backend emits a deprecation warning stating it "will be removed in August 2025." There is no alternative path (e.g., auto-inference from catalog_kwargs) to select the ICEBERG backend without triggering this warning. This means every user of the new Iceberg checkpoint feature sees a confusing deprecation notice for a newly introduced capability, and if override_backend is actually removed, the ICEBERG backend becomes unusable.


Description
Implement Iceberg-backend Ray data checkpoint for this PR #59870
Related issues
Relates to #59870
Additional information
ICEBERGcatalog_kwargswhen initializingCheckpointConfigMore usage examples can refers for
python/ray/data/tests/test_iceberg_checkpoint.py