Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/analyzers.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Here are the current supported functionalities of Analyzers.
| Compliance | Compliance(instance, predicate) | Done|
| Correlation | Correlation(column1, column2) | Done|
| CountDistinct | CountDistinct(columns) | Done|
| CustomSql | CustomSql(expression, disambiguator) | Done|
| Datatype | Datatype(column) | Done|
| Distinctness | Distinctness(columns) | Done|
| Entropy | Entropy(column) | Done|
Expand Down
24 changes: 24 additions & 0 deletions pydeequ/analyzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,30 @@ def _analyzer_jvm(self):
return self._deequAnalyzers.CountDistinct(to_scala_seq(self._jvm, self.columns))


class CustomSql(_AnalyzerObject):
"""
A custom SQL-based analyzer executing provided SQL expression.
The expression must return a single value.

:param str expression: A SQL expression to execute.
:param str disambiguator: A label used to distinguish this metric
when running multiple custom SQL analyzers. Defaults to "*".
"""

def __init__(self, expression: str, disambiguator: str = "*"):
self.expression = expression
self.disambiguator = disambiguator

@property
def _analyzer_jvm(self):
"""
Returns the result of SQL expression execution.

:return self
"""
return self._deequAnalyzers.CustomSql(self.expression, self.disambiguator)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDGE_CASE: The CustomSql analyzer passes self.expression and self.disambiguator directly to the Scala CustomSql constructor. However, looking at the Deequ Scala source for CustomSql, the constructor signature is CustomSql(expression: String, disambiguator: Option[String]) in some versions. If the Scala API expects an Option[String] for disambiguator, passing a plain Python string would cause a Py4J type mismatch error at runtime.

Line 385: return self._deequAnalyzers.CustomSql(self.expression, self.disambiguator) passes self.disambiguator as a raw string. Other analyzers in this file wrap optional parameters with self._jvm.scala.Option.apply(...) (e.g., line 181 for ApproxCountDistinct: self._jvm.scala.Option.apply(self.where)). If the Scala CustomSql expects Option[String], this will fail at runtime.


class DataType(_AnalyzerObject):
"""
Data Type Analyzer. Returns the datatypes of column
Expand Down
29 changes: 29 additions & 0 deletions tests/test_analyzers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Compliance,
Correlation,
CountDistinct,
CustomSql,
DataType,
Distinctness,
Entropy,
Expand Down Expand Up @@ -111,6 +112,14 @@ def CountDistinct(self, columns):
df_from_json = self.spark.read.json(self.sc.parallelize([result_json]))
self.assertEqual(df_from_json.select("value").collect(), result_df.select("value").collect())
return result_df.select("value").collect()

def CustomSql(self, expression, disambiguator="*"):
result = self.AnalysisRunner.onData(self.df).addAnalyzer(CustomSql(expression, disambiguator)).run()
result_df = AnalyzerContext.successMetricsAsDataFrame(self.spark, result)
result_json = AnalyzerContext.successMetricsAsJson(self.spark, result)
df_from_json = self.spark.read.json(self.sc.parallelize([result_json]))
self.assertEqual(df_from_json.select("value").collect(), result_df.select("value").collect())
return result_df.select("value", "instance").collect()

def Datatype(self, column, where=None):
result = self.AnalysisRunner.onData(self.df).addAnalyzer(DataType(column, where)).run()
Expand Down Expand Up @@ -298,6 +307,26 @@ def test_CountDistinct(self):
def test_fail_CountDistinct(self):
self.assertEqual(self.CountDistinct("b"), [Row(value=1.0)])

def test_CustomSql(self):
self.df.createOrReplaceTempView("input_table")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDGE_CASE: The test test_CustomSql relies on self.df.createOrReplaceTempView("input_table") being called within the test method, but the helper method CustomSql (line 115) calls self.AnalysisRunner.onData(self.df) which registers the DataFrame for analysis but does NOT register the temp view. If test_CustomSql runs before any other test that might register the view, it works. However, test_fail_CustomSql and test_fail_CustomSql_incorrect_query do NOT call createOrReplaceTempView, so they depend on the temp view persisting from test_CustomSql. If test execution order changes (e.g., running test_fail_CustomSql_incorrect_query in isolation), the temp view won't exist and the test will fail for the wrong reason (missing table, not incorrect query).

Line 309: self.df.createOrReplaceTempView("input_table") is only in test_CustomSql. Lines 320-321 (test_fail_CustomSql) and 324-325 (test_fail_CustomSql_incorrect_query) use "input_table" without registering the view. The setUpClass method (lines 39-47 in full source) does not register a temp view.

self.assertEqual(self.CustomSql("SELECT SUM(b) FROM input_table"), [Row(value=6.0, instance="*")])
self.assertEqual(
self.CustomSql("SELECT AVG(LENGTH(a)) FROM input_table", disambiguator="foo"),
[Row(value=3.0, instance="foo")]
)
self.assertEqual(
self.CustomSql("SELECT MAX(c) FROM input_table", disambiguator="bar"),
[Row(value=6.0, instance="bar")]
)

@pytest.mark.xfail(reason="@unittest.expectedFailure")
def test_fail_CustomSql(self):
self.assertEqual(self.CustomSql("SELECT SUM(b) FROM input_table"), [Row(value=1.0)])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MISSING_TEST: The test test_fail_CustomSql_incorrect_query uses SELECT SUM(b) without a FROM clause. This is expected to fail, but the xfail marker means the test passes if an exception is raised. However, there's no test that verifies the correct failure mode (e.g., that a specific exception type is raised or that the metric result is NaN/empty). A proper negative test should use pytest.raises or check the specific error behavior rather than relying on any exception being acceptable.

Line 324-325: @pytest.mark.xfail(reason="@unittest.expectedFailure") followed by self.CustomSql("SELECT SUM(b)") - this will pass as long as ANY exception is raised, not specifically validating the expected behavior for an invalid SQL query.

@pytest.mark.xfail(reason="@unittest.expectedFailure")
def test_fail_CustomSql_incorrect_query(self):
self.CustomSql("SELECT SUM(b)")

def test_DataType(self):
self.assertEqual(
self.Datatype("b"),
Expand Down