Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion crates/iceberg/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{Error, ErrorKind, Result, ensure_data_valid};

const ELEMENT_ID: &str = "element-id";
const FIELD_ID_PROP: &str = "field-id";
const ICEBERG_FIELD_NAME_PROP: &str = "iceberg-field-name";
const KEY_ID: &str = "key-id";
const VALUE_ID: &str = "value-id";
const MAP_LOGICAL_TYPE: &str = "map";
Expand Down Expand Up @@ -442,8 +443,14 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {

let optional = is_avro_optional(&avro_field.schema);

let field_name = avro_field
.custom_attributes
.get(ICEBERG_FIELD_NAME_PROP)
.and_then(|v| v.as_str())
.unwrap_or(&avro_field.name);

let mut field =
NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
NestedField::new(field_id, field_name, field_type.unwrap(), !optional);

if let Some(doc) = &avro_field.doc {
field = field.with_doc(doc);
Expand Down Expand Up @@ -1212,4 +1219,31 @@ mod tests {
converter.primitive(&avro_schema).unwrap().unwrap()
);
}

#[test]
fn test_avro_to_iceberg_uses_iceberg_field_name_property() {
let avro_json = r#"{
"type": "record",
"name": "test_schema",
"fields": [
{
"name": "_123column",
"type": "string",
"field-id": 1,
"iceberg-field-name": "123column"
},
{
"name": "normal_field",
"type": "int",
"field-id": 2
}
]
}"#;
let avro_schema = AvroSchema::parse_str(avro_json).unwrap();
let iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap();

let fields = iceberg_schema.as_struct().fields();
assert_eq!(fields[0].name, "123column");
assert_eq!(fields[1].name, "normal_field");
}
}
65 changes: 65 additions & 0 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
use crate::table::Table;
use crate::transaction::snapshot::{
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
check_no_duplicate_paths_in_batch,
};
use crate::transaction::{ActionCommit, TransactionAction};

Expand Down Expand Up @@ -84,6 +85,10 @@ impl FastAppendAction {
#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
if self.check_duplicate {
check_no_duplicate_paths_in_batch(&self.added_data_files)?;
}

let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
Expand Down Expand Up @@ -259,6 +264,66 @@ mod tests {
assert!(Arc::new(action).commit(&table).await.is_err());
}

#[tokio::test]
async fn test_fast_append_rejects_intra_batch_duplicate_paths() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let make_file = |size: u64, records: u64| {
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/dup.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(size)
.record_count(records)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap()
};

let action = tx.fast_append().add_data_files(vec![
make_file(100, 10),
make_file(200, 20),
make_file(300, 30),
]);
let err = match Arc::new(action).commit(&table).await {
Ok(_) => panic!("expected duplicate paths to be rejected"),
Err(e) => e,
};
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
let msg = err.message();
assert!(
msg.contains("duplicate file path") && msg.contains("test/dup.parquet"),
"unexpected error message: {msg}"
);
}

#[tokio::test]
async fn test_fast_append_intra_batch_duplicate_check_can_be_disabled() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let make_file = || {
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/dup.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(10)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(1))]))
.build()
.unwrap()
};

let action = tx
.fast_append()
.with_check_duplicate(false)
.add_data_files(vec![make_file(), make_file()]);
assert!(Arc::new(action).commit(&table).await.is_ok());
}

#[tokio::test]
async fn test_fast_append() {
let table = make_v2_minimal_table();
Expand Down
16 changes: 16 additions & 0 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,19 @@ impl<'a> SnapshotProducer<'a> {
Ok(ActionCommit::new(updates, requirements))
}
}

pub(super) fn check_no_duplicate_paths_in_batch(files: &[DataFile]) -> Result<()> {
let mut seen: HashSet<&str> = HashSet::with_capacity(files.len());
for f in files {
if !seen.insert(f.file_path.as_str()) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add duplicate file path within the same commit: {}",
f.file_path
),
));
}
}
Ok(())
}
Loading