Skip to content

Conversation

@GaneshPatil7517
Copy link

Summary

This PR implements a fully functional async Avro writer for arrow-avro, providing a symmetric and idiomatic async API that mirrors the existing synchronous Writer while following Arrow's established async patterns (consistent with Parquet's async writer).

Fixes: #9212

Design Overview

New Types

  • AsyncFileWriter trait: Minimal abstraction for async I/O sinks

    • write(Bytes) -> BoxFuture<Result<()>>
    • complete() -> BoxFuture<Result<()>>
    • Blanket impl for tokio::io::AsyncWrite + Unpin + Send
    • Matches Parquet async_writer pattern
  • AsyncWriter<W, F> struct: Generic async writer

    • W: Any AsyncFileWriter (tokio types, custom implementations, etc.)
    • F: Any AvroFormat (OCF or SOE)
    • Full API parity with sync Writer
  • Type aliases:

    • AsyncAvroWriter<W> - OCF (Object Container File) format
    • AsyncAvroStreamWriter<W> - SOE (Single Object Encoding) format
  • AsyncWriterBuilder: Configuration builder

    • with_compression() - All codecs (Deflate, Snappy, ZStandard, etc.)
    • with_fingerprint_strategy() - SOE fingerprinting
    • with_capacity() - Buffer sizing
    • Async build() method

Key Implementation Details

  1. Sync Encoding Reuse: Leverages existing RecordEncoder - no re-implementation of Avro encoding
  2. Buffer Staging: Encodes to Vec<u8>, converts to Bytes, flushes asynchronously
  3. Header on Construction: OCF headers written and flushed immediately in builder
  4. Compression Preserved: Identical logic to sync writer for compression application
  5. Feature Gated: Requires async feature with tokio, futures, bytes dependencies

API Parity

The async writer provides identical methods to the sync writer:

// Create writers
let mut writer = AsyncAvroWriter::new(sink, schema).await?;
let mut writer = AsyncAvroStreamWriter::new(sink, schema).await?;

// Write batches
writer.write(&batch).await?;
writer.write_batches(&[&batch1, &batch2]).await?;

// Finish and retrieve sink
writer.finish().await?;
let sink = writer.into_inner();

Test Coverage

7 comprehensive tests covering:

  • ✅ OCF round-trip with sync reader
  • ✅ SOE stream writing
  • ✅ Multiple batch accumulation
  • ✅ Builder configuration
  • ✅ Writer consumption with into_inner()
  • ✅ Schema mismatch error handling
  • ✅ Deflate compression

All tests verify data integrity through round-trip with sync ReaderBuilder, not byte-for-byte equality (OCF sync markers are random).

Feature Gating

[features]
async = ["tokio", "futures", "bytes"]

[dependencies]
tokio = { version = "1", features = ["io-util"], optional = true }
futures = { version = "0.3.31", optional = true }
bytes = { version = "1.10.1", optional = true }

All async code is guarded with #[cfg(feature = "async")].

Commits

  1. [arrow-avro] add async writer module with feature gating - Core implementation
  2. [arrow-avro] add comprehensive async writer tests - Test coverage
  3. [arrow-avro] format code with rustfmt - Formatting
  4. [arrow-avro] improve async writer documentation - Docs

Testing

# Run async writer tests
cargo test -p arrow-avro --lib --features async async_writer

# Build with all features
cargo build -p arrow-avro --all-features

# Check documentation
cargo doc -p arrow-avro --features async --no-deps

All tests pass: ✅
Clippy: No warnings ✅
Rustfmt: Clean ✅

Files Modified

  • arrow-avro/Cargo.toml - Added async feature and dependencies
  • arrow-avro/src/writer/mod.rs - Module exports
  • arrow-avro/src/writer/async_writer.rs - New module (486 lines, 7 tests)

Future Work

  • Optional object_store feature for cloud storage integration (S3, GCS, Azure)
    • Would follow same pattern as Parquet's ParquetObjectWriter
    • Can be added in follow-up PR

Example Usage

use arrow_avro::writer::AsyncAvroWriter;
use tokio::fs::File;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::create("output.avro").await?;
    let mut writer = AsyncAvroWriter::new(file, schema).await?;
    
    writer.write(&batch1).await?;
    writer.write(&batch2).await?;
    
    writer.finish().await?;
    Ok(())
}

References

- Introduce AsyncFileWriter trait for async sink abstraction
- Implement AsyncWriter<W, F> generic over async sink and format
- Provide AsyncAvroWriter and AsyncAvroStreamWriter type aliases
- Support OCF and SOE formats with identical API to sync Writer
- Add AsyncWriterBuilder for configuration
- Include comprehensive tests for OCF, SOE, and batch writing
- Gate behind 'async' feature with tokio/futures/bytes dependencies
- Test OCF and stream writing modes
- Test multiple batch writing with write_batches
- Test builder configuration and capacity settings
- Test schema mismatch error handling
- Test deflate compression with conditional feature gate
- Test into_inner to verify writer consumption
- Add comprehensive feature list in module docs
- Add note about future object_store integration
- Update code formatting
Copilot AI review requested due to automatic review settings January 21, 2026 15:16
@github-actions github-actions bot added arrow Changes to the arrow crate arrow-avro arrow-avro crate labels Jan 21, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a comprehensive async writer API for the arrow-avro crate, providing an idiomatic async counterpart to the existing synchronous writer. The implementation mirrors the sync writer's API while following established Arrow async patterns (consistent with Parquet's async writer).

Changes:

  • Added async writer feature with tokio, futures, and bytes dependencies
  • Implemented AsyncFileWriter trait and AsyncWriter generic struct with type aliases for OCF and SOE formats
  • Added 7 comprehensive tests covering OCF/SOE round-trips, multiple batches, compression, and error handling

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
arrow-avro/Cargo.toml Added async feature with optional dependencies for tokio, futures, and bytes
arrow-avro/src/writer/mod.rs Added feature-gated public exports for async writer types
arrow-avro/src/writer/async_writer.rs New module implementing full async writer API with trait, builder, writer struct, and comprehensive tests

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

GaneshPatil7517 and others added 4 commits January 21, 2026 21:14
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Add detailed schema resolution behavior documentation
- Explain metadata key usage and fallback to conversion
- Document default settings and customization options
- Match documentation level with sync WriterBuilder
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate arrow-avro arrow-avro crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[arrow-avro] Add AsyncWriter

1 participant