Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pydeequ/configs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
import logging
from functools import lru_cache
import os
import re

import pyspark

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESIGN: The Deequ version was downgraded from 2.0.8 to 2.0.7 in the SPARK_TO_DEEQU_COORD_MAPPING without explanation. This is a regression from the existing codebase which uses 2.0.8 (as shown in the knowledge base). The pyproject.toml version is 1.5.0 and the README mentions 2.0.7 being available with 1.3.0, while 2.0.8 is the latest.

The diff shows the mapping values changed from 2.0.8-spark-3.5 etc. to 2.0.7-spark-3.5 etc. The knowledge base configs.py shows "3.5": "com.amazon.deequ:deequ:2.0.8-spark-3.5".

SPARK_TO_DEEQU_COORD_MAPPING = {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DESIGN: Importing pyspark at module top-level in configs.py creates a hard dependency on pyspark being installed just to read configuration. Previously pyspark was an optional dependency (see pyproject.toml: pyspark = { version = ">=2.4.7,<4.0.0", optional = true }). This import will cause an ImportError if pyspark is not installed, breaking the package for users who haven't installed the optional extra.

pyproject.toml declares pyspark = { version = ">=2.4.7,<4.0.0", optional = true } under [tool.poetry.extras]. Line 8 of configs.py now has import pyspark at module level, which will fail if pyspark is not installed.

"3.5": "com.amazon.deequ:deequ:2.0.7-spark-3.5",
Expand All @@ -22,7 +23,12 @@ def _extract_major_minor_versions(full_version: str):
@lru_cache(maxsize=None)
def _get_spark_version() -> str:
try:
spark_version = os.environ["SPARK_VERSION"]
spark_version = os.getenv("SPARK_VERSION")
if not spark_version:
spark_version = str(pyspark.__version__)
Comment thread
aagumin marked this conversation as resolved.
logging.info(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUG: The except KeyError block on line 30 is now dead code. os.getenv() never raises KeyError (it returns None on missing keys). If pyspark.__version__ somehow fails or the version string is invalid, the KeyError exception handler will never be triggered. The except KeyError block should be removed or replaced with appropriate error handling for the new logic.

Line 25 changed from os.environ["SPARK_VERSION"] (which raises KeyError) to os.getenv("SPARK_VERSION") (which returns None). The except KeyError: on line 30 is now unreachable dead code.

f"SPARK_VERSION environment variable is not set, using Spark version from PySpark {spark_version} for Deequ Maven jars"
)
except KeyError:
raise RuntimeError(f"SPARK_VERSION environment variable is required. Supported values are: {SPARK_TO_DEEQU_COORD_MAPPING.keys()}")

Expand Down
31 changes: 30 additions & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import os
from unittest import mock

import pyspark
import pytest
from pydeequ.configs import _extract_major_minor_versions

from pydeequ.configs import _extract_major_minor_versions, _get_spark_version


@pytest.fixture
def mock_env(monkeypatch):
with mock.patch.dict(os.environ, clear=True):
monkeypatch.delenv("SPARK_VERSION", raising=False)
yield


@pytest.mark.parametrize(
Expand All @@ -13,3 +25,20 @@
)
def test_extract_major_minor_versions(full_version, major_minor_version):
assert _extract_major_minor_versions(full_version) == major_minor_version


@pytest.mark.parametrize(
"spark_version, expected", [("3.2.1", "3.2"), ("3.1", "3.1"), ("3.10.3", "3.10"), ("3.10", "3.10")]
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUG: _get_spark_version is decorated with @lru_cache. In test__get_spark_version_without_cache, cache_clear() is called AFTER the assertion (line 34), meaning the first test case caches its result and subsequent parametrized cases will return the stale cached value instead of re-executing the function with the new mock. The cache_clear() should be called BEFORE each invocation (e.g., in a fixture or at the start of the test).

Lines 32-35: assert _get_spark_version() == expected runs first (line 34), then _get_spark_version.cache_clear() runs after (line 35). For the second parametrized case, the cache still holds the result from the first case when _get_spark_version() is called.

def test__get_spark_version_without_cache(spark_version, expected, mock_env):
with mock.patch.object(pyspark, "__version__", spark_version):
assert _get_spark_version() == expected
_get_spark_version.cache_clear()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUG: The test__get_spark_version_with_cache test has incorrect expected values. The parametrize decorator says ("3.1", "3.2"), ("3.10.3", "3.2"), ("3.10", "3.2") — these expected values don't match the mocked pyspark.__version__. The test relies on the LRU cache retaining the result from the first parametrized case ("3.2.1""3.2"), but @pytest.mark.parametrize creates separate test instances, and the cache state leaks unpredictably between them. The test will pass or fail depending on execution order, making it fragile and non-deterministic.

Lines 38-44: parametrize has ("3.2.1", "3.2"), ("3.1", "3.2"), ("3.10.3", "3.2"), ("3.10", "3.2"). The test never calls cache_clear(), so it depends on the first parametrized case running first and caching "3.2". If tests run in isolation or different order, the assertions for inputs "3.1", "3.10.3", "3.10" expecting "3.2" will fail.


@pytest.mark.parametrize(
"spark_version, expected", [("3.2.1", "3.2"), ("3.1", "3.2"), ("3.10.3", "3.2"), ("3.10", "3.2")]
)
def test__get_spark_version_with_cache(spark_version, expected, mock_env):
with mock.patch.object(pyspark, "__version__", spark_version):
assert _get_spark_version() == expected