Skip to content

feature: Support Spark expression: arrays_zip#3643

Open
hsiang-c wants to merge 12 commits intoapache:mainfrom
hsiang-c:arrays_zip
Open

feature: Support Spark expression: arrays_zip#3643
hsiang-c wants to merge 12 commits intoapache:mainfrom
hsiang-c:arrays_zip

Conversation

@hsiang-c
Copy link
Copy Markdown
Contributor

@hsiang-c hsiang-c commented Mar 7, 2026

Which issue does this PR close?

Closes #3151 and #3575

Rationale for this change

  • This PR supports Spark-compatible arrays_zip SQL function

What changes are included in this PR?

scala> spark.sql("SELECT arrays_zip(array(1, 2, 3), array(1), NULL)").show(100, false)
+------------------------------------------+
|arrays_zip(array(1, 2, 3), array(1), NULL)|
+------------------------------------------+
|NULL                                      |
+------------------------------------------+

scala> spark.sql("SELECT arrays_zip(NULL, array(1, 2, 3))").show(100, false)
+--------------------------------+
|arrays_zip(NULL, array(1, 2, 3))|
+--------------------------------+
|NULL                            |
+--------------------------------+

How are these changes tested?

By SQL File Tests, we covered cases such as single array argument, null arguments and custom field name in the resulting struct. Here is an example of custom file name:

scala> spark.sql("SELECT arrays_zip(b, a) FROM (SELECT array(1, 2, 3) as a, array(1, 2) as b)").show(100, false)
+---------------------------+
|arrays_zip(b, a)           |
+---------------------------+
|[{1, 1}, {2, 2}, {NULL, 3}]|
+---------------------------+


scala> spark.sql("SELECT arrays_zip(b, a) FROM (SELECT array(1, 2, 3) as a, array(1, 2) as b)").printSchema
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
root
 |-- arrays_zip(b, a): array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- b: integer (nullable = true)
 |    |    |-- a: integer (nullable = true)

@comphead
Copy link
Copy Markdown
Contributor

comphead commented Mar 7, 2026

#3575

@hsiang-c
Copy link
Copy Markdown
Contributor Author

hsiang-c commented Mar 7, 2026

Thanks @comphead

@hsiang-c hsiang-c force-pushed the arrays_zip branch 2 times, most recently from 97d31a4 to be5dfce Compare March 20, 2026 18:33
@hsiang-c hsiang-c force-pushed the arrays_zip branch 2 times, most recently from a261445 to 3a21e25 Compare April 13, 2026 17:44
@hsiang-c hsiang-c marked this pull request as ready for review April 13, 2026 18:21
Copy link
Copy Markdown
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hsiang-c I'm not sure this implementation actually uses the DataFusion

@hsiang-c
Copy link
Copy Markdown
Contributor Author

@comphead Thanks for your review. This implementation doesn't use DataFusion for now b/c I need to pass names argument from Spark to arrays_zip_inner to parameterized the field key in the final struct.

// mimic Spark's ArraysZip behavior: returns NULL if any argument is NULL
val combinedNullCheck = expr.children.map(child => IsNotNull(child)).reduce(And)
val isNotNullExpr = exprToProtoInternal(combinedNullCheck, inputs, binding)
val nullLiteralProto = exprToProto(Literal(null, BooleanType), Seq.empty)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null literal here uses BooleanType, but elsewhere in this file (e.g., CometArrayAppend at line 88) we use the return type of the expression. DF expects all arms of casewhen to have compatible types and this may cause an error.

object CometArraysZip extends CometExpressionSerde[ArraysZip] {
override def getSupportLevel(expr: ArraysZip): SupportLevel = {
expr.dataType match {
case _: ArrayType => Compatible()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably check the element type here. There have been issues noted in the past. See this for instance - #1308

let fields = self.fields(input_schema)?;
Ok(List(Arc::new(Field::new_list_field(
DataType::Struct(Fields::from(fields)),
true,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a slight mismatch here. Spark has this defined as non-nullable.

)))
}
ExprStruct::ArraysZip(expr) => {
assert!(!expr.values.is_empty());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to return Err instead of asserting (which will cause a panic).

return Err(GeneralError("arrays_zip requires at least one argument".to_string()))

If you want to be extra safe, then you can also check

expr.values.len() == expr.names.len()

SELECT arrays_zip(a) FROM (SELECT array(1, 2, 3) as a, null as b)

query
SELECT arrays_zip(b) FROM (SELECT array(1, 2, 3) as a, null as b) No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline needed at end of file

message ArraysZip {
repeated Expr values = 1;
repeated string names = 2;
} No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline needed at end of file

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Support Spark expression: arrays_zip

3 participants