Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u
buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]);
}

/// Append `value_size` bytes of given `value` into `dest`.
fn append_packed_u32(dest: &mut Vec<u8>, value: u32, value_size: usize) {
let n = dest.len() + value_size;
dest.extend(value.to_le_bytes());
dest.truncate(n);
}

/// Wrapper around a `Vec<u8>` that provides methods for appending
/// primitive values, variant types, and metadata.
///
Expand Down Expand Up @@ -112,10 +119,6 @@ impl ValueBuffer {
self.0.push(primitive_header(primitive_type));
}

fn inner(&self) -> &[u8] {
&self.0
}

fn into_inner(self) -> Vec<u8> {
self.into()
}
Expand Down Expand Up @@ -366,36 +369,6 @@ impl ValueBuffer {
Ok(())
}

/// Writes out the header byte for a variant object or list
fn append_header(&mut self, header_byte: u8, is_large: bool, num_items: usize) {
let buf = self.inner_mut();
buf.push(header_byte);

if is_large {
let num_items = num_items as u32;
buf.extend_from_slice(&num_items.to_le_bytes());
} else {
let num_items = num_items as u8;
buf.push(num_items);
};
}

/// Writes out the offsets for an array of offsets, including the final offset (data size).
fn append_offset_array(
&mut self,
offsets: impl IntoIterator<Item = usize>,
data_size: Option<usize>,
nbytes: u8,
) {
let buf = self.inner_mut();
for offset in offsets {
write_offset(buf, offset, nbytes);
}
if let Some(data_size) = data_size {
write_offset(buf, data_size, nbytes);
}
}

/// Writes out the header byte for a variant object or list, from the starting position
/// of the buffer, will return the position after this write
fn append_header_start_from_buf_pos(
Expand Down Expand Up @@ -609,14 +582,15 @@ enum ParentState<'a> {
List {
buffer: &'a mut ValueBuffer,
metadata_builder: &'a mut MetadataBuilder,
parent_value_offset_base: usize,
offsets: &'a mut Vec<usize>,
},
Object {
buffer: &'a mut ValueBuffer,
metadata_builder: &'a mut MetadataBuilder,
fields: &'a mut IndexMap<u32, usize>,
field_name: &'a str,
parent_offset_base: usize,
parent_value_offset_base: usize,
},
}

Expand Down Expand Up @@ -650,16 +624,20 @@ impl ParentState<'_> {
fn finish(&mut self, starting_offset: usize) {
match self {
ParentState::Variant { .. } => (),
ParentState::List { offsets, .. } => offsets.push(starting_offset),
ParentState::List {
offsets,
parent_value_offset_base,
..
} => offsets.push(starting_offset - *parent_value_offset_base),
ParentState::Object {
metadata_builder,
fields,
field_name,
parent_offset_base: object_start_offset,
parent_value_offset_base,
..
} => {
let field_id = metadata_builder.upsert_field_name(field_name);
let shifted_start_offset = starting_offset - *object_start_offset;
let shifted_start_offset = starting_offset - *parent_value_offset_base;
fields.insert(field_id, shifted_start_offset);
}
}
Expand Down Expand Up @@ -1121,16 +1099,27 @@ impl VariantBuilder {
pub struct ListBuilder<'a> {
parent_state: ParentState<'a>,
offsets: Vec<usize>,
buffer: ValueBuffer,
/// The starting offset in the parent's buffer where this list starts
parent_value_offset_base: usize,
/// The starting offset in the parent's metadata buffer where this list starts
/// used to truncate the written fields in `drop` if the current list has not been finished
parent_metadata_offset_base: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good idea

/// Whether the list has been finished, the written content of the current list
/// will be truncated in `drop` if `has_been_finished` is false
has_been_finished: bool,
validate_unique_fields: bool,
}

impl<'a> ListBuilder<'a> {
fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self {
let parent_value_offset_base = parent_state.buffer_current_offset();
let parent_metadata_offset_base = parent_state.metadata_current_offset();
Self {
parent_state,
offsets: vec![],
buffer: ValueBuffer::default(),
parent_value_offset_base,
has_been_finished: false,
parent_metadata_offset_base,
validate_unique_fields,
}
}
Expand All @@ -1146,9 +1135,12 @@ impl<'a> ListBuilder<'a> {

// Returns validate_unique_fields because we can no longer reference self once this method returns.
fn parent_state(&mut self) -> (ParentState, bool) {
let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder();

let state = ParentState::List {
buffer: &mut self.buffer,
metadata_builder: self.parent_state.metadata_builder(),
buffer,
metadata_builder,
parent_value_offset_base: self.parent_value_offset_base,
offsets: &mut self.offsets,
};
(state, self.validate_unique_fields)
Expand Down Expand Up @@ -1185,9 +1177,12 @@ impl<'a> ListBuilder<'a> {
&mut self,
value: T,
) -> Result<(), ArrowError> {
self.offsets.push(self.buffer.offset());
self.buffer
.try_append_variant(value.into(), self.parent_state.metadata_builder())?;
let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder();

let offset = buffer.offset() - self.parent_value_offset_base;
self.offsets.push(offset);

buffer.try_append_variant(value.into(), metadata_builder)?;

Ok(())
}
Expand Down Expand Up @@ -1216,24 +1211,46 @@ impl<'a> ListBuilder<'a> {

/// Finalizes this list and appends it to its parent, which otherwise remains unmodified.
pub fn finish(mut self) {
let data_size = self.buffer.offset();
let buffer = self.parent_state.buffer();

let data_size = buffer
.offset()
.checked_sub(self.parent_value_offset_base)
.expect("Data size overflowed usize");

let num_elements = self.offsets.len();
let is_large = num_elements > u8::MAX as usize;
let offset_size = int_size(data_size);

// Get parent's buffer
let parent_buffer = self.parent_state.buffer();
let starting_offset = parent_buffer.offset();
let starting_offset = self.parent_value_offset_base;

let num_elements_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte.
let num_elements = self.offsets.len();
let header_size = 1 + // header (i.e., `array_header`)
num_elements_size + // num_element_size
(num_elements + 1) * offset_size as usize; // offsets and data size

// Calculated header size becomes a hint; being wrong only risks extra allocations.
// Make sure to reserve enough capacity to handle the extra bytes we'll truncate.
Comment on lines +1233 to +1234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can we rephrase the comment, I don't quite get what it means. Do you try to say the header_size is just a hint and we will allocate extra space?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When header_size will be incorrect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When header_size will be incorrect?

The size if calculated separately, and then the actual bytes are appended. That opens up a bug surface -- any time the two disagree, header_size will be wrong. If the code directly relied on the size being correct, e.g. because we allocate that many bytes and then index them, we could have produce a bad variant value (either because there's an extra run of inserted bytes, or because of a buffer overflow while indexing. But because the calculated size is only a capacity hint for the vec, the cost of being wrong is very low.

let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
// Write header
let header = array_header(is_large, offset_size);
parent_buffer.append_header(header, is_large, num_elements);
bytes_to_splice.push(header);

append_packed_u32(&mut bytes_to_splice, num_elements as u32, num_elements_size);

for offset in &self.offsets {
append_packed_u32(&mut bytes_to_splice, *offset as u32, offset_size as usize);
}

append_packed_u32(&mut bytes_to_splice, data_size as u32, offset_size as usize);

buffer
.inner_mut()
.splice(starting_offset..starting_offset, bytes_to_splice);

// Write out the offset array followed by the value bytes
let offsets = std::mem::take(&mut self.offsets);
parent_buffer.append_offset_array(offsets, Some(data_size), offset_size);
parent_buffer.append_slice(self.buffer.inner());
self.parent_state.finish(starting_offset);
self.has_been_finished = true;
}
}

Expand All @@ -1242,7 +1259,18 @@ impl<'a> ListBuilder<'a> {
/// This is to ensure that the list is always finalized before its parent builder
/// is finalized.
impl Drop for ListBuilder<'_> {
fn drop(&mut self) {}
fn drop(&mut self) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified that the drop for ListBuilder was covered with cargo llvm-cov --html test -p parquet-variant

image

if !self.has_been_finished {
self.parent_state
.buffer()
.inner_mut()
.truncate(self.parent_value_offset_base);
self.parent_state
.metadata_builder()
.field_names
.truncate(self.parent_metadata_offset_base);
}
}
}

/// A builder for creating [`Variant::Object`] values.
Expand Down Expand Up @@ -1360,7 +1388,7 @@ impl<'a> ObjectBuilder<'a> {
metadata_builder,
fields: &mut self.fields,
field_name: key,
parent_offset_base: self.parent_value_offset_base,
parent_value_offset_base: self.parent_value_offset_base,
};
(state, validate_unique_fields)
}
Expand Down Expand Up @@ -2861,8 +2889,7 @@ mod tests {
// Only the second attempt should appear in the final variant
let (metadata, value) = builder.finish();
let metadata = VariantMetadata::try_new(&metadata).unwrap();
assert_eq!(metadata.len(), 1);
assert_eq!(&metadata[0], "name"); // not rolled back
assert!(metadata.is_empty()); // rolled back
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!


let variant = Variant::try_new_with_metadata(metadata, &value).unwrap();
assert_eq!(variant, Variant::Int8(2));
Expand All @@ -2885,14 +2912,12 @@ mod tests {
object_builder.finish().unwrap();
let (metadata, value) = builder.finish();
let metadata = VariantMetadata::try_new(&metadata).unwrap();
assert_eq!(metadata.len(), 2);
assert_eq!(&metadata[0], "first");
assert_eq!(&metadata[1], "second");
assert_eq!(metadata.len(), 1);
assert_eq!(&metadata[0], "second");

let variant = Variant::try_new_with_metadata(metadata, &value).unwrap();
let obj = variant.as_object().unwrap();
assert_eq!(obj.len(), 2);
assert_eq!(obj.get("first"), Some(Variant::Int8(1)));
assert_eq!(obj.len(), 1);
assert_eq!(obj.get("second"), Some(Variant::Int8(2)));
}

Expand Down
Loading