-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[arrow-avro] Add AsyncWriter #9241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[arrow-avro] Add AsyncWriter #9241
Conversation
- Introduce AsyncFileWriter trait for async sink abstraction - Implement AsyncWriter<W, F> generic over async sink and format - Provide AsyncAvroWriter and AsyncAvroStreamWriter type aliases - Support OCF and SOE formats with identical API to sync Writer - Add AsyncWriterBuilder for configuration - Include comprehensive tests for OCF, SOE, and batch writing - Gate behind 'async' feature with tokio/futures/bytes dependencies
- Test OCF and stream writing modes - Test multiple batch writing with write_batches - Test builder configuration and capacity settings - Test schema mismatch error handling - Test deflate compression with conditional feature gate - Test into_inner to verify writer consumption
- Add comprehensive feature list in module docs - Add note about future object_store integration - Update code formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a comprehensive async writer API for the arrow-avro crate, providing an idiomatic async counterpart to the existing synchronous writer. The implementation mirrors the sync writer's API while following established Arrow async patterns (consistent with Parquet's async writer).
Changes:
- Added async writer feature with tokio, futures, and bytes dependencies
- Implemented
AsyncFileWritertrait andAsyncWritergeneric struct with type aliases for OCF and SOE formats - Added 7 comprehensive tests covering OCF/SOE round-trips, multiple batches, compression, and error handling
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| arrow-avro/Cargo.toml | Added async feature with optional dependencies for tokio, futures, and bytes |
| arrow-avro/src/writer/mod.rs | Added feature-gated public exports for async writer types |
| arrow-avro/src/writer/async_writer.rs | New module implementing full async writer API with trait, builder, writer struct, and comprehensive tests |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Add detailed schema resolution behavior documentation - Explain metadata key usage and fallback to conversion - Document default settings and customization options - Match documentation level with sync WriterBuilder
Summary
This PR implements a fully functional async Avro writer for
arrow-avro, providing a symmetric and idiomatic async API that mirrors the existing synchronousWriterwhile following Arrow's established async patterns (consistent with Parquet's async writer).Fixes: #9212
Design Overview
New Types
AsyncFileWritertrait: Minimal abstraction for async I/O sinkswrite(Bytes) -> BoxFuture<Result<()>>complete() -> BoxFuture<Result<()>>tokio::io::AsyncWrite + Unpin + SendAsyncWriter<W, F>struct: Generic async writerW: AnyAsyncFileWriter(tokio types, custom implementations, etc.)F: AnyAvroFormat(OCF or SOE)WriterType aliases:
AsyncAvroWriter<W>- OCF (Object Container File) formatAsyncAvroStreamWriter<W>- SOE (Single Object Encoding) formatAsyncWriterBuilder: Configuration builderwith_compression()- All codecs (Deflate, Snappy, ZStandard, etc.)with_fingerprint_strategy()- SOE fingerprintingwith_capacity()- Buffer sizingbuild()methodKey Implementation Details
RecordEncoder- no re-implementation of Avro encodingVec<u8>, converts toBytes, flushes asynchronouslyasyncfeature withtokio,futures,bytesdependenciesAPI Parity
The async writer provides identical methods to the sync writer:
Test Coverage
7 comprehensive tests covering:
into_inner()All tests verify data integrity through round-trip with sync
ReaderBuilder, not byte-for-byte equality (OCF sync markers are random).Feature Gating
All async code is guarded with
#[cfg(feature = "async")].Commits
[arrow-avro] add async writer module with feature gating- Core implementation[arrow-avro] add comprehensive async writer tests- Test coverage[arrow-avro] format code with rustfmt- Formatting[arrow-avro] improve async writer documentation- DocsTesting
All tests pass: ✅
Clippy: No warnings ✅
Rustfmt: Clean ✅
Files Modified
arrow-avro/Cargo.toml- Added async feature and dependenciesarrow-avro/src/writer/mod.rs- Module exportsarrow-avro/src/writer/async_writer.rs- New module (486 lines, 7 tests)Future Work
object_storefeature for cloud storage integration (S3, GCS, Azure)ParquetObjectWriterExample Usage
References
AsyncArrowWriter(parquet/src/arrow/async_writer/)