-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[Variant] Avoid extra buffer allocation in ListBuilder #7987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,13 @@ fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u | |
| buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]); | ||
| } | ||
|
|
||
| /// Append `value_size` bytes of given `value` into `dest`. | ||
| fn append_packed_u32(dest: &mut Vec<u8>, value: u32, value_size: usize) { | ||
| let n = dest.len() + value_size; | ||
| dest.extend(value.to_le_bytes()); | ||
| dest.truncate(n); | ||
| } | ||
|
|
||
| /// Wrapper around a `Vec<u8>` that provides methods for appending | ||
| /// primitive values, variant types, and metadata. | ||
| /// | ||
|
|
@@ -112,10 +119,6 @@ impl ValueBuffer { | |
| self.0.push(primitive_header(primitive_type)); | ||
| } | ||
|
|
||
| fn inner(&self) -> &[u8] { | ||
| &self.0 | ||
| } | ||
|
|
||
| fn into_inner(self) -> Vec<u8> { | ||
| self.into() | ||
| } | ||
|
|
@@ -366,36 +369,6 @@ impl ValueBuffer { | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Writes out the header byte for a variant object or list | ||
| fn append_header(&mut self, header_byte: u8, is_large: bool, num_items: usize) { | ||
| let buf = self.inner_mut(); | ||
| buf.push(header_byte); | ||
|
|
||
| if is_large { | ||
| let num_items = num_items as u32; | ||
| buf.extend_from_slice(&num_items.to_le_bytes()); | ||
| } else { | ||
| let num_items = num_items as u8; | ||
| buf.push(num_items); | ||
| }; | ||
| } | ||
|
|
||
| /// Writes out the offsets for an array of offsets, including the final offset (data size). | ||
| fn append_offset_array( | ||
| &mut self, | ||
| offsets: impl IntoIterator<Item = usize>, | ||
| data_size: Option<usize>, | ||
| nbytes: u8, | ||
| ) { | ||
| let buf = self.inner_mut(); | ||
| for offset in offsets { | ||
| write_offset(buf, offset, nbytes); | ||
| } | ||
| if let Some(data_size) = data_size { | ||
| write_offset(buf, data_size, nbytes); | ||
| } | ||
| } | ||
|
|
||
| /// Writes out the header byte for a variant object or list, from the starting position | ||
| /// of the buffer, will return the position after this write | ||
| fn append_header_start_from_buf_pos( | ||
|
|
@@ -609,14 +582,15 @@ enum ParentState<'a> { | |
| List { | ||
| buffer: &'a mut ValueBuffer, | ||
| metadata_builder: &'a mut MetadataBuilder, | ||
| parent_value_offset_base: usize, | ||
| offsets: &'a mut Vec<usize>, | ||
| }, | ||
| Object { | ||
| buffer: &'a mut ValueBuffer, | ||
| metadata_builder: &'a mut MetadataBuilder, | ||
| fields: &'a mut IndexMap<u32, usize>, | ||
| field_name: &'a str, | ||
| parent_offset_base: usize, | ||
| parent_value_offset_base: usize, | ||
| }, | ||
| } | ||
|
|
||
|
|
@@ -650,16 +624,20 @@ impl ParentState<'_> { | |
| fn finish(&mut self, starting_offset: usize) { | ||
| match self { | ||
| ParentState::Variant { .. } => (), | ||
| ParentState::List { offsets, .. } => offsets.push(starting_offset), | ||
| ParentState::List { | ||
| offsets, | ||
| parent_value_offset_base, | ||
| .. | ||
| } => offsets.push(starting_offset - *parent_value_offset_base), | ||
| ParentState::Object { | ||
| metadata_builder, | ||
| fields, | ||
| field_name, | ||
| parent_offset_base: object_start_offset, | ||
| parent_value_offset_base, | ||
| .. | ||
| } => { | ||
| let field_id = metadata_builder.upsert_field_name(field_name); | ||
| let shifted_start_offset = starting_offset - *object_start_offset; | ||
| let shifted_start_offset = starting_offset - *parent_value_offset_base; | ||
| fields.insert(field_id, shifted_start_offset); | ||
| } | ||
| } | ||
|
|
@@ -1121,16 +1099,27 @@ impl VariantBuilder { | |
| pub struct ListBuilder<'a> { | ||
| parent_state: ParentState<'a>, | ||
| offsets: Vec<usize>, | ||
| buffer: ValueBuffer, | ||
| /// The starting offset in the parent's buffer where this list starts | ||
| parent_value_offset_base: usize, | ||
| /// The starting offset in the parent's metadata buffer where this list starts | ||
| /// used to truncate the written fields in `drop` if the current list has not been finished | ||
| parent_metadata_offset_base: usize, | ||
| /// Whether the list has been finished, the written content of the current list | ||
| /// will be truncated in `drop` if `has_been_finished` is false | ||
| has_been_finished: bool, | ||
| validate_unique_fields: bool, | ||
| } | ||
|
|
||
| impl<'a> ListBuilder<'a> { | ||
| fn new(parent_state: ParentState<'a>, validate_unique_fields: bool) -> Self { | ||
| let parent_value_offset_base = parent_state.buffer_current_offset(); | ||
| let parent_metadata_offset_base = parent_state.metadata_current_offset(); | ||
| Self { | ||
| parent_state, | ||
| offsets: vec![], | ||
| buffer: ValueBuffer::default(), | ||
| parent_value_offset_base, | ||
| has_been_finished: false, | ||
| parent_metadata_offset_base, | ||
| validate_unique_fields, | ||
| } | ||
| } | ||
|
|
@@ -1146,9 +1135,12 @@ impl<'a> ListBuilder<'a> { | |
|
|
||
| // Returns validate_unique_fields because we can no longer reference self once this method returns. | ||
| fn parent_state(&mut self) -> (ParentState, bool) { | ||
| let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); | ||
|
|
||
| let state = ParentState::List { | ||
| buffer: &mut self.buffer, | ||
| metadata_builder: self.parent_state.metadata_builder(), | ||
| buffer, | ||
| metadata_builder, | ||
| parent_value_offset_base: self.parent_value_offset_base, | ||
| offsets: &mut self.offsets, | ||
| }; | ||
| (state, self.validate_unique_fields) | ||
|
|
@@ -1185,9 +1177,12 @@ impl<'a> ListBuilder<'a> { | |
| &mut self, | ||
| value: T, | ||
| ) -> Result<(), ArrowError> { | ||
| self.offsets.push(self.buffer.offset()); | ||
| self.buffer | ||
| .try_append_variant(value.into(), self.parent_state.metadata_builder())?; | ||
| let (buffer, metadata_builder) = self.parent_state.buffer_and_metadata_builder(); | ||
|
|
||
| let offset = buffer.offset() - self.parent_value_offset_base; | ||
| self.offsets.push(offset); | ||
|
|
||
| buffer.try_append_variant(value.into(), metadata_builder)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
@@ -1216,24 +1211,46 @@ impl<'a> ListBuilder<'a> { | |
|
|
||
| /// Finalizes this list and appends it to its parent, which otherwise remains unmodified. | ||
| pub fn finish(mut self) { | ||
| let data_size = self.buffer.offset(); | ||
| let buffer = self.parent_state.buffer(); | ||
|
|
||
| let data_size = buffer | ||
| .offset() | ||
| .checked_sub(self.parent_value_offset_base) | ||
| .expect("Data size overflowed usize"); | ||
|
|
||
| let num_elements = self.offsets.len(); | ||
| let is_large = num_elements > u8::MAX as usize; | ||
| let offset_size = int_size(data_size); | ||
|
|
||
| // Get parent's buffer | ||
| let parent_buffer = self.parent_state.buffer(); | ||
| let starting_offset = parent_buffer.offset(); | ||
| let starting_offset = self.parent_value_offset_base; | ||
|
|
||
| let num_elements_size = if is_large { 4 } else { 1 }; // is_large: 4 bytes, else 1 byte. | ||
| let num_elements = self.offsets.len(); | ||
| let header_size = 1 + // header (i.e., `array_header`) | ||
| num_elements_size + // num_element_size | ||
| (num_elements + 1) * offset_size as usize; // offsets and data size | ||
|
|
||
| // Calculated header size becomes a hint; being wrong only risks extra allocations. | ||
| // Make sure to reserve enough capacity to handle the extra bytes we'll truncate. | ||
|
Comment on lines
+1233
to
+1234
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, can we rephrase the comment, I don't quite get what it means. Do you try to say the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The size if calculated separately, and then the actual bytes are appended. That opens up a bug surface -- any time the two disagree, |
||
| let mut bytes_to_splice = Vec::with_capacity(header_size + 3); | ||
| // Write header | ||
| let header = array_header(is_large, offset_size); | ||
| parent_buffer.append_header(header, is_large, num_elements); | ||
| bytes_to_splice.push(header); | ||
|
|
||
| append_packed_u32(&mut bytes_to_splice, num_elements as u32, num_elements_size); | ||
|
|
||
| for offset in &self.offsets { | ||
| append_packed_u32(&mut bytes_to_splice, *offset as u32, offset_size as usize); | ||
| } | ||
|
|
||
| append_packed_u32(&mut bytes_to_splice, data_size as u32, offset_size as usize); | ||
|
|
||
| buffer | ||
| .inner_mut() | ||
| .splice(starting_offset..starting_offset, bytes_to_splice); | ||
|
|
||
| // Write out the offset array followed by the value bytes | ||
| let offsets = std::mem::take(&mut self.offsets); | ||
| parent_buffer.append_offset_array(offsets, Some(data_size), offset_size); | ||
| parent_buffer.append_slice(self.buffer.inner()); | ||
| self.parent_state.finish(starting_offset); | ||
| self.has_been_finished = true; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1242,7 +1259,18 @@ impl<'a> ListBuilder<'a> { | |
| /// This is to ensure that the list is always finalized before its parent builder | ||
| /// is finalized. | ||
| impl Drop for ListBuilder<'_> { | ||
| fn drop(&mut self) {} | ||
| fn drop(&mut self) { | ||
|
||
| if !self.has_been_finished { | ||
| self.parent_state | ||
| .buffer() | ||
| .inner_mut() | ||
| .truncate(self.parent_value_offset_base); | ||
| self.parent_state | ||
| .metadata_builder() | ||
| .field_names | ||
| .truncate(self.parent_metadata_offset_base); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// A builder for creating [`Variant::Object`] values. | ||
|
|
@@ -1360,7 +1388,7 @@ impl<'a> ObjectBuilder<'a> { | |
| metadata_builder, | ||
| fields: &mut self.fields, | ||
| field_name: key, | ||
| parent_offset_base: self.parent_value_offset_base, | ||
| parent_value_offset_base: self.parent_value_offset_base, | ||
| }; | ||
| (state, validate_unique_fields) | ||
| } | ||
|
|
@@ -2861,8 +2889,7 @@ mod tests { | |
| // Only the second attempt should appear in the final variant | ||
| let (metadata, value) = builder.finish(); | ||
| let metadata = VariantMetadata::try_new(&metadata).unwrap(); | ||
| assert_eq!(metadata.len(), 1); | ||
| assert_eq!(&metadata[0], "name"); // not rolled back | ||
| assert!(metadata.is_empty()); // rolled back | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
|
|
||
| let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); | ||
| assert_eq!(variant, Variant::Int8(2)); | ||
|
|
@@ -2885,14 +2912,12 @@ mod tests { | |
| object_builder.finish().unwrap(); | ||
| let (metadata, value) = builder.finish(); | ||
| let metadata = VariantMetadata::try_new(&metadata).unwrap(); | ||
| assert_eq!(metadata.len(), 2); | ||
| assert_eq!(&metadata[0], "first"); | ||
| assert_eq!(&metadata[1], "second"); | ||
| assert_eq!(metadata.len(), 1); | ||
| assert_eq!(&metadata[0], "second"); | ||
|
|
||
| let variant = Variant::try_new_with_metadata(metadata, &value).unwrap(); | ||
| let obj = variant.as_object().unwrap(); | ||
| assert_eq!(obj.len(), 2); | ||
| assert_eq!(obj.get("first"), Some(Variant::Int8(1))); | ||
| assert_eq!(obj.len(), 1); | ||
| assert_eq!(obj.get("second"), Some(Variant::Int8(2))); | ||
| } | ||
|
|
||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a good idea