diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fdbc680977..ee9f60e0e2 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -34,6 +34,7 @@ use crate::{Error, ErrorKind, Result, ensure_data_valid}; const ELEMENT_ID: &str = "element-id"; const FIELD_ID_PROP: &str = "field-id"; +const ICEBERG_FIELD_NAME_PROP: &str = "iceberg-field-name"; const KEY_ID: &str = "key-id"; const VALUE_ID: &str = "value-id"; const MAP_LOGICAL_TYPE: &str = "map"; @@ -442,8 +443,14 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { let optional = is_avro_optional(&avro_field.schema); + let field_name = avro_field + .custom_attributes + .get(ICEBERG_FIELD_NAME_PROP) + .and_then(|v| v.as_str()) + .unwrap_or(&avro_field.name); + let mut field = - NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional); + NestedField::new(field_id, field_name, field_type.unwrap(), !optional); if let Some(doc) = &avro_field.doc { field = field.with_doc(doc); @@ -1212,4 +1219,31 @@ mod tests { converter.primitive(&avro_schema).unwrap().unwrap() ); } + + #[test] + fn test_avro_to_iceberg_uses_iceberg_field_name_property() { + let avro_json = r#"{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "_123column", + "type": "string", + "field-id": 1, + "iceberg-field-name": "123column" + }, + { + "name": "normal_field", + "type": "int", + "field-id": 2 + } + ] + }"#; + let avro_schema = AvroSchema::parse_str(avro_json).unwrap(); + let iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + + let fields = iceberg_schema.as_struct().fields(); + assert_eq!(fields[0].name, "123column"); + assert_eq!(fields[1].name, "normal_field"); + } } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..b51fb117d2 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -26,6 +26,7 @@ use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; use crate::table::Table; use crate::transaction::snapshot::{ DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, + check_no_duplicate_paths_in_batch, }; use crate::transaction::{ActionCommit, TransactionAction}; @@ -84,6 +85,10 @@ impl FastAppendAction { #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { + if self.check_duplicate { + check_no_duplicate_paths_in_batch(&self.added_data_files)?; + } + let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), @@ -259,6 +264,66 @@ mod tests { assert!(Arc::new(action).commit(&table).await.is_err()); } + #[tokio::test] + async fn test_fast_append_rejects_intra_batch_duplicate_paths() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = |size: u64, records: u64| { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(size) + .record_count(records) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + let action = tx.fast_append().add_data_files(vec![ + make_file(100, 10), + make_file(200, 20), + make_file(300, 30), + ]); + let err = match Arc::new(action).commit(&table).await { + Ok(_) => panic!("expected duplicate paths to be rejected"), + Err(e) => e, + }; + assert_eq!(err.kind(), crate::ErrorKind::DataInvalid); + let msg = err.message(); + assert!( + msg.contains("duplicate file path") && msg.contains("test/dup.parquet"), + "unexpected error message: {msg}" + ); + } + + #[tokio::test] + async fn test_fast_append_intra_batch_duplicate_check_can_be_disabled() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let make_file = || { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/dup.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(1))])) + .build() + .unwrap() + }; + + let action = tx + .fast_append() + .with_check_duplicate(false) + .add_data_files(vec![make_file(), make_file()]); + assert!(Arc::new(action).commit(&table).await.is_ok()); + } + #[tokio::test] async fn test_fast_append() { let table = make_v2_minimal_table(); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..763c49f32c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -508,3 +508,19 @@ impl<'a> SnapshotProducer<'a> { Ok(ActionCommit::new(updates, requirements)) } } + +pub(super) fn check_no_duplicate_paths_in_batch(files: &[DataFile]) -> Result<()> { + let mut seen: HashSet<&str> = HashSet::with_capacity(files.len()); + for f in files { + if !seen.insert(f.file_path.as_str()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate file path within the same commit: {}", + f.file_path + ), + )); + } + } + Ok(()) +}