Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions pydeequ/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,30 @@
from pyspark.sql import DataFrame, SparkSession

from pydeequ.analyzers import AnalysisRunBuilder
from pydeequ.checks import Check
from pydeequ.checks import Check, CheckLevel
from pydeequ.pandas_utils import ensure_pyspark_df

# TODO integrate Analyzer context


class AnomalyCheckConfig:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MISSING_TEST: The AnomalyCheckConfig class has no dedicated unit test for its _get_java_object method. If the Deequ JVM class com.amazon.deequ.AnomalyCheckConfig doesn't exist or has a different constructor signature (e.g., different number of default arguments), this will fail at runtime with no clear error message.

Lines 14-28 in verification.py define AnomalyCheckConfig._get_java_object which calls getattr(self._check_java_class, 'apply$default$3')(), apply$default$4, and apply$default$5. There is no isolated test that verifies these JVM calls succeed independently of the full anomaly detection flow.

pass

def __init__(self, level: CheckLevel, description):
self.level = level
self.description = description

def _get_java_object(self, jvm):
self._jvm = jvm
self._java_level = self.level._get_java_object(self._jvm)
self._check_java_class = self._jvm.com.amazon.deequ.AnomalyCheckConfig
self._anomalyCheckConfig_jvm = self._check_java_class(
self._java_level,
self.description,
getattr(self._check_java_class, 'apply$default$3')(),
getattr(self._check_java_class, 'apply$default$4')(),
getattr(self._check_java_class, 'apply$default$5')(),
)
return self._anomalyCheckConfig_jvm


class VerificationResult:
Expand Down Expand Up @@ -187,10 +203,11 @@ def addAnomalyCheck(self, anomaly, analyzer: _AnalyzerObject, anomalyCheckConfig
:param anomalyCheckConfig: Some configuration settings for the Check
:return: Adds an anomaly strategy to the run
"""
anomalyCheckConfig_jvm = None
if anomalyCheckConfig:
raise NotImplementedError("anomalyCheckConfigs have not been implemented yet, using default value")
anomalyCheckConfig_jvm = anomalyCheckConfig._get_java_object(self._jvm)

AnomalyCheckConfig = self._jvm.scala.Option.apply(anomalyCheckConfig)
AnomalyCheckConfig = self._jvm.scala.Option.apply(anomalyCheckConfig_jvm)

anomaly._set_jvm(self._jvm)
anomaly_jvm = anomaly._anomaly_jvm
Expand Down
19 changes: 17 additions & 2 deletions tests/test_anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ def OnlineNormalStrategy(
print(df.collect())
return df.select("check_status").collect()

def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, upperBound):
def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, upperBound,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESIGN: Only the SimpleThresholdStrategy helper method was updated to accept the anomalyCheckConfig parameter. The first call to addAnomalyCheck (line 189-190 for the previous run) still passes no config, meaning the test only exercises anomalyCheckConfig on the second/current run but not the first. This is inconsistent and doesn't fully test the feature.

Line 189-190: .addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func).run() — no anomalyCheckConfig passed for the previous run. Line 197: .addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func, anomalyCheckConfig) — config only passed for the current run.

anomalyCheckConfig: AnomalyCheckConfig = None):
metricsRepository = InMemoryMetricsRepository(self.spark)
previousKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 1000 * 60)

Expand All @@ -196,7 +197,7 @@ def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, u
.onData(df_curr)
.useRepository(metricsRepository)
.saveOrAppendResult(currKey)
.addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func)
.addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func, anomalyCheckConfig)
.run()
)

Expand Down Expand Up @@ -486,6 +487,20 @@ def get_anomalyDetector(self, anomaly):
def test_anomalyDetector(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MISSING_TEST: The new tests only cover SimpleThresholdStrategy with AnomalyCheckConfig. There is no test for passing anomalyCheckConfig to addAnomalyCheck with other anomaly strategies (e.g., AbsoluteChangeStrategy, RelativeRateOfChangeStrategy, etc.), and no test for the AnomalyCheckConfig class itself (e.g., verifying _get_java_object returns a valid JVM object, or testing with different parameters).

The diff adds test_SimpleThresholdStrategy_Error and test_SimpleThresholdStrategy_Warning but only the SimpleThresholdStrategy helper method was updated to accept anomalyCheckConfig (line 184). The other helper methods (AbsoluteChangeStrategy, RelativeRateOfChangeStrategy, OnlineNormalStrategy, BatchNormalStrategy, HoltWinters) still hardcode anomalyCheckConfig=None.

self.get_anomalyDetector(SimpleThresholdStrategy(1.0, 3.0))

def test_SimpleThresholdStrategy_Error(self):
config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Error)
# Lower bound is 1 upper bound is 6 (Range: 1-6 rows)
self.assertEqual(
self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0, config), [Row(check_status="Error")]
)

def test_SimpleThresholdStrategy_Warning(self):
config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Warning)
# Lower bound is 1 upper bound is 6 (Range: 1-6 rows)
self.assertEqual(
self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0, config), [Row(check_status="Warning")]
)

#
# def test_RelativeRateOfChangeStrategy(self):
# metricsRepository = InMemoryMetricsRepository(self.spark)
Expand Down