-
Notifications
You must be signed in to change notification settings - Fork 208
[Coral-Schema] default value lowercasing #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,16 +1,21 @@ | ||
| /** | ||
| * Copyright 2022-2023 LinkedIn Corporation. All rights reserved. | ||
| * Copyright 2022-2026 LinkedIn Corporation. All rights reserved. | ||
| * Licensed under the BSD-2 Clause license. | ||
| * See LICENSE in the project root for license information. | ||
| */ | ||
| package com.linkedin.coral.schema.avro; | ||
|
|
||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import com.google.common.collect.Lists; | ||
| import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; | ||
|
|
||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.generic.GenericData; | ||
|
|
||
|
|
||
| /** | ||
|
|
@@ -62,12 +67,163 @@ public Schema primitive(Schema primitive) { | |
| return primitive; | ||
| } | ||
|
|
||
| /** | ||
| * Lowercases a field, including lowercasing any field names within its default value. | ||
| * @param field The original field | ||
| * @param schema The lowercased schema for this field | ||
| * @return A new field with lowercased name and lowercased default value | ||
| */ | ||
| private Schema.Field lowercaseField(Schema.Field field, Schema schema) { | ||
| Object originalDefaultValue = SchemaUtilities.defaultValue(field); | ||
| Object lowercasedDefaultValue = lowercaseDefaultValue(originalDefaultValue, schema); | ||
|
|
||
| Schema.Field lowercasedField = AvroCompatibilityHelper.createSchemaField(field.name().toLowerCase(), schema, | ||
| field.doc(), SchemaUtilities.defaultValue(field), field.order()); | ||
| field.doc(), lowercasedDefaultValue, field.order()); | ||
|
|
||
| SchemaUtilities.replicateFieldProps(field, lowercasedField); | ||
|
|
||
| return lowercasedField; | ||
| } | ||
|
|
||
| /** | ||
| * Recursively lowercases field names within default values based on the schema structure. | ||
| * This handles complex types like records, maps, and arrays where field names appear in default values. | ||
| * | ||
| * @param fieldDefaultValue The original default value for a field (can be null, primitive, Map, List, etc.) | ||
| * @param fieldSchema The schema that describes the structure of this field's default value | ||
| * @return The default value with all field names lowercased | ||
| */ | ||
| @SuppressWarnings("unchecked") | ||
| private Object lowercaseDefaultValue(Object fieldDefaultValue, Schema fieldSchema) { | ||
| if (fieldDefaultValue == null) { | ||
| return null; | ||
| } | ||
|
|
||
| // Handle union types to get the actual schema for processing the default value | ||
| // For nullable unions, extract the non-null type since we know defaultValue is non-null | ||
| Schema actualSchema = SchemaUtilities.extractIfOption(fieldSchema); | ||
| // If still a union after extracting nullable option (i.e., multi-type non-nullable union), | ||
| // the default value corresponds to the first type per Avro specification | ||
| if (actualSchema.getType() == Schema.Type.UNION) { | ||
| actualSchema = actualSchema.getTypes().get(0); | ||
| } | ||
|
|
||
| switch (actualSchema.getType()) { | ||
| case RECORD: | ||
| // For records, the default value can be either a Map or GenericData.Record | ||
| if (fieldDefaultValue instanceof GenericData.Record) { | ||
| GenericData.Record record = (GenericData.Record) fieldDefaultValue; | ||
| return lowercaseRecordDefaultValue(actualSchema, lowercasedFieldName -> { | ||
| // Find the matching field in the original record's schema (case-insensitive) | ||
| Schema.Field originalField = record.getSchema().getField(lowercasedFieldName); | ||
| if (originalField == null) { | ||
| for (Schema.Field f : record.getSchema().getFields()) { | ||
| if (f.name().equalsIgnoreCase(lowercasedFieldName)) { | ||
| originalField = f; | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| return originalField != null ? record.get(originalField.pos()) : null; | ||
| }); | ||
| } else if (fieldDefaultValue instanceof Map) { | ||
| Map<?, ?> recordMap = (Map<?, ?>) fieldDefaultValue; | ||
| return lowercaseRecordDefaultValue(actualSchema, lowercasedFieldName -> { | ||
| // Find the matching key in the original map (case-insensitive) | ||
| String matchingKey = findMatchingKeyForLowercased(recordMap, lowercasedFieldName); | ||
| return matchingKey != null ? recordMap.get(matchingKey) : null; | ||
| }); | ||
| } | ||
| // If neither Map nor GenericData.Record, return as-is | ||
| return fieldDefaultValue; | ||
|
|
||
| case MAP: | ||
| // For maps, lowercase the keys and recursively process values | ||
| if (fieldDefaultValue instanceof Map) { | ||
| Map<?, ?> mapValue = (Map<?, ?>) fieldDefaultValue; // Use wildcards to handle Utf8 keys | ||
| Map<String, Object> lowercasedMap = new LinkedHashMap<>(); | ||
| Schema valueSchema = actualSchema.getValueType(); | ||
|
|
||
| for (Map.Entry<?, ?> entry : mapValue.entrySet()) { | ||
| String originalKey = entry.getKey().toString(); // Handle both String and Utf8 | ||
| String lowercasedKey = originalKey.toLowerCase(); | ||
| Object lowercasedValue = lowercaseDefaultValue(entry.getValue(), valueSchema); | ||
| lowercasedMap.put(lowercasedKey, lowercasedValue); | ||
| } | ||
| return lowercasedMap; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the default value was [null, map], are we returning map by any chance? where's the logic to construct the union schema back? let's check this for other types too and ensure the spec of this method is returning the same schema as the input, just lower-cased.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
According to avro specs, in this case the default value SHOULD be Not sure what you mean by "where's the logic to construct the union schema back", we aren't constructing any schemas, this is just using the schema to figure out how to parse the default values properly so the relevant parts of the default values can be lowercased |
||
| } | ||
| return fieldDefaultValue; | ||
|
|
||
| case ARRAY: | ||
| // For arrays, recursively process each element | ||
| if (fieldDefaultValue instanceof List) { | ||
| List<Object> arrayValue = (List<Object>) fieldDefaultValue; | ||
| Schema elementSchema = actualSchema.getElementType(); | ||
|
|
||
| return arrayValue.stream().map(element -> lowercaseDefaultValue(element, elementSchema)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| return fieldDefaultValue; | ||
|
|
||
| case NULL: | ||
| case BOOLEAN: | ||
| case INT: | ||
| case LONG: | ||
| case FLOAT: | ||
| case DOUBLE: | ||
| case BYTES: | ||
| case STRING: | ||
| case ENUM: | ||
| case FIXED: | ||
| default: | ||
| // Primitive types and others: return as-is | ||
| return fieldDefaultValue; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Helper method that extracts the common logic for lowercasing record default values. | ||
| * This handles both GenericData.Record and Map-based default values. | ||
| * | ||
| * @param actualSchema The lowercased schema for the record | ||
| * @param valueExtractor Function that retrieves the original field value given a lowercased field name | ||
| * @return A Map with lowercased field names and recursively lowercased values | ||
| */ | ||
| private Map<String, Object> lowercaseRecordDefaultValue(Schema actualSchema, | ||
| Function<String, Object> valueExtractor) { | ||
| Map<String, Object> lowercasedRecordMap = new LinkedHashMap<>(); | ||
|
|
||
| // Iterate through the lowercased schema fields | ||
| for (Schema.Field field : actualSchema.getFields()) { | ||
| String lowercasedFieldName = field.name(); | ||
| Object fieldValue = valueExtractor.apply(lowercasedFieldName); | ||
|
|
||
| if (fieldValue != null) { | ||
| Object lowercasedFieldValue = lowercaseDefaultValue(fieldValue, field.schema()); | ||
| lowercasedRecordMap.put(lowercasedFieldName, lowercasedFieldValue); | ||
| } | ||
| } | ||
|
|
||
| return lowercasedRecordMap; | ||
| } | ||
|
|
||
| /** | ||
| * Finds a key in the original default value map that matches the lowercased field name. | ||
| * This is needed because the original default value may have field names in mixed case. | ||
| * | ||
| * @param map The map containing the original default value | ||
| * @param lowercasedFieldName The lowercased field name from the transformed schema | ||
| * @return The matching key from the original map, or null if not found | ||
| */ | ||
| private String findMatchingKeyForLowercased(Map<?, ?> map, String lowercasedFieldName) { | ||
| // Try case-insensitive match to find the original key | ||
| for (Object keyObj : map.keySet()) { | ||
| String key = keyObj.toString(); // Handle both String and Utf8 | ||
| if (key.equalsIgnoreCase(lowercasedFieldName)) { | ||
| return key; | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| { | ||
| "type" : "record", | ||
| "name" : "testrecord", | ||
| "namespace" : "com.test", | ||
| "fields" : [ { | ||
| "name" : "simple_field", | ||
| "type" : "int", | ||
| "default" : 42 | ||
| }, { | ||
| "name" : "struct_field", | ||
| "type" : { | ||
| "type" : "record", | ||
| "name" : "nestedrecord", | ||
| "fields" : [ { | ||
| "name" : "firstname", | ||
| "type" : "string" | ||
| }, { | ||
| "name" : "lastname", | ||
| "type" : "string" | ||
| }, { | ||
| "name" : "age", | ||
| "type" : "int" | ||
| } ] | ||
| }, | ||
| "default" : { | ||
| "firstname" : "John", | ||
| "lastname" : "Doe", | ||
| "age" : 30 | ||
| } | ||
| }, { | ||
| "name" : "map_field", | ||
| "type" : { | ||
| "type" : "map", | ||
| "values" : "string" | ||
| }, | ||
| "default" : { | ||
| "key_one" : "value1", | ||
| "key_two" : "value2" | ||
| } | ||
| }, { | ||
| "name" : "array_field", | ||
| "type" : { | ||
| "type" : "array", | ||
| "items" : { | ||
| "type" : "record", | ||
| "name" : "arrayitem", | ||
| "fields" : [ { | ||
| "name" : "item_name", | ||
| "type" : "string" | ||
| } ] | ||
| } | ||
| }, | ||
| "default" : [ { | ||
| "item_name" : "item1" | ||
| }, { | ||
| "item_name" : "item2" | ||
| } ] | ||
| } ] | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| { | ||
| "type" : "record", | ||
| "name" : "TestRecord", | ||
| "namespace" : "com.test", | ||
| "fields" : [ { | ||
| "name" : "Simple_Field", | ||
| "type" : "int", | ||
| "default" : 42 | ||
| }, { | ||
| "name" : "Struct_Field", | ||
| "type" : { | ||
| "type" : "record", | ||
| "name" : "NestedRecord", | ||
| "fields" : [ { | ||
| "name" : "firstName", | ||
| "type" : "string" | ||
| }, { | ||
| "name" : "lastName", | ||
| "type" : "string" | ||
| }, { | ||
| "name" : "Age", | ||
| "type" : "int" | ||
| } ] | ||
| }, | ||
| "default" : { | ||
| "firstName" : "John", | ||
| "lastName" : "Doe", | ||
| "Age" : 30 | ||
| } | ||
| }, { | ||
| "name" : "Map_Field", | ||
| "type" : { | ||
| "type" : "map", | ||
| "values" : "string" | ||
| }, | ||
| "default" : { | ||
| "Key_One" : "value1", | ||
| "Key_Two" : "value2" | ||
| } | ||
| }, { | ||
| "name" : "Array_Field", | ||
| "type" : { | ||
| "type" : "array", | ||
| "items" : { | ||
| "type" : "record", | ||
| "name" : "ArrayItem", | ||
| "fields" : [ { | ||
| "name" : "Item_Name", | ||
| "type" : "string" | ||
| } ] | ||
| } | ||
| }, | ||
| "default" : [ { | ||
| "Item_Name" : "item1" | ||
| }, { | ||
| "Item_Name" : "item2" | ||
| } ] | ||
| } ] | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the handling for complex union also? I understand we're stripping away the nulls for the union, but we could still have the a nullable- complex union?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for complex unions, we rely on the fact that the schema supplied is valid according to specs, see #560 (comment)