From c6150384c0985d25ffe96f573a5c9df38fd0c5aa Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Mon, 11 May 2026 16:42:54 -0400 Subject: [PATCH 1/6] Skeleton PR for dnf5 linux4 implementation --- .../src/bootstrap/ConfigurationFactory.py | 6 +- src/core/src/bootstrap/Constants.py | 1 + src/core/src/bootstrap/EnvLayer.py | 26 +- .../package_managers/AzL4DnfPackageManager.py | 232 ++++++++++++++++++ 4 files changed, 258 insertions(+), 7 deletions(-) create mode 100644 src/core/src/package_managers/AzL4DnfPackageManager.py diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index 30bf7db0..db0972b5 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -38,6 +38,7 @@ from core.src.package_managers.AptitudePackageManager import AptitudePackageManager from core.src.package_managers.AzL3TdnfPackageManager import AzL3TdnfPackageManager +from core.src.package_managers.AzL4DnfPackageManager import AzL4DnfPackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -70,16 +71,19 @@ def __init__(self, log_file_path, events_folder, telemetry_supported): self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), + 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), + 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), + 'dnf_test_config': self.new_test_configuration(Constants.DNF, AzL4DnfPackageManager), 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) @@ -116,7 +120,7 @@ def get_configuration(self, env, package_manager_name): print ("Error: Environment configuration not supported - " + str(env)) return None - if str(package_manager_name) not in [Constants.APT, Constants.TDNF, Constants.YUM, Constants.ZYPPER]: + if str(package_manager_name) not in [Constants.APT, Constants.DNF, Constants.TDNF, Constants.YUM, Constants.ZYPPER]: print ("Error: Package manager configuration not supported - " + str(package_manager_name)) return None diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 51da02c5..46fb0fe0 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -201,6 +201,7 @@ class StatusTruncationConfig(EnumBackport): # Package Managers APT = 'apt' + DNF = 'dnf' TDNF = 'tdnf' YUM = 'yum' ZYPPER = 'zypper' diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index f79f7b2c..f5dc35ff 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -49,7 +49,7 @@ def is_distro_azure_linux(distro_name): def is_distro_azure_linux_3_or_beyond(self): # type: () -> bool - """ Checks if the current distro is Azure Linux 3 """ + """ Checks if the current distro is Azure Linux 3 or greater""" if self.is_distro_azure_linux(self.platform.linux_distribution()): version = distro.os_release_attr('version') major = version.split('.')[0] if version else None @@ -63,12 +63,26 @@ def get_package_manager(self): return Constants.APT if self.is_distro_azure_linux(str(self.platform.linux_distribution())): - code, out = self.run_command_output('which tdnf', False, False) - if code == 0: - return Constants.TDNF + # Determine package manager based on Azure Linux version + version = distro.os_release_attr('version') + major = version.split('.')[0] if version else None + + # Azure Linux 4 uses DNF + if major is not None and int(major) >= 4: + code, out = self.run_command_output('which dnf', False, False) + if code == 0: + return 'dnf' + else: + print("Error: Expected package manager dnf not found on this Azure Linux 4 VM.") + return str() + # Azure Linux 3 uses TDNF else: - print("Error: Expected package manager tdnf not found on this Azure Linux VM.") - return str() + code, out = self.run_command_output('which tdnf', False, False) + if code == 0: + return Constants.TDNF + else: + print("Error: Expected package manager tdnf not found on this Azure Linux VM.") + return str() # choose default package manager package_manager_map = (('apt-get', Constants.APT), diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/AzL4DnfPackageManager.py new file mode 100644 index 00000000..c16d1048 --- /dev/null +++ b/src/core/src/package_managers/AzL4DnfPackageManager.py @@ -0,0 +1,232 @@ +# Copyright 2026 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + +"""AzL4DnfPackageManager for Azure Linux L4""" +from abc import ABCMeta +from core.src.bootstrap.Constants import Constants +from core.src.package_managers.PackageManager import PackageManager + + +class AzL4DnfPackageManager(PackageManager): + """Implementation of Azure Linux L4 DNF package management operations""" + + def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): + super(AzL4DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # TODO: Add AzL4 DNF specific initialization + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) + + # ConfigurePatch Method + def refresh_repo(self): + """Refreshes the DNF repository cache and lists available updates by cleaning expired cache entries + Commands: + - sudo dnf clean expire-cache (cleans expired cache entries) + - sudo dnf -q check-update (checks for available updates) + """ + pass + + # AssessPatch method + def invoke_package_manager_advanced(self, command, raise_on_exception=True): + """Invokes the DNF package manager with standardized command execution, logging, and error handling + Parameters: + - command (string): The DNF command to execute + - raise_on_exception (boolean): Whether to raise exception on non-zero exit code + Returns: + - Tuple of (output, return_code) from the command execution + """ + pass + + # AssessPatch method + def get_all_updates(self, cached=False): + """Gets all missing updates available for the system and returns the cached updates list and versions list + Cache Check Logic: + - If cached=True and cache has data, return cached updates and versions immediately (high performance reuse) + - If cache miss or cached=False, execute the DNF command to get fresh updates and populate cache + Command: + - sudo dnf -q check-update (checks for all available updates) + 1. If cached=True and cache has data, return cached results + 2. Execute command, parse output, cache results + 3. Return all_updates_cached and all_update_versions_cached + """ + return [], [] + + # AssessPatch method + def get_security_updates(self): + """Gets all missing security updates available for the system and returns packages and versions list + Command: + - sudo dnf -q check-update --security (checks for available security updates only) + Returns: + - List of security package names + - List of corresponding security package versions + """ + pass + + # AssessPatch method + def get_other_updates(self): + """Gets missing (non-security) updates. Record log and return + """ + return [], [] + + def set_max_patch_publish_date(self, max_patch_publish_date=str()): + pass + + # Install Patch method + def get_composite_package_identifier(self, package_name, package_version): + """Creates a version+architecture-specific package identifier for install commands + Parameters: + - package_name (string): Name of the package (may include architecture) + - package_version (string): Version of the package + Returns: + - String: Composite package identifier (e.g., "package-1.0.0.x86_64") + """ + pass + + def install_updates_fail_safe(self, excluded_packages): + pass + + # AssessPatch method + def get_all_available_versions_of_package(self, package_name): + """Returns a list of all available versions of a package + Parameters: + - package_name (string): Name of the package to get versions for + Commands used: + - sudo dnf list --available (lists all available versions of the package) + Returns: + - List of all available package versions + """ + return [] + + # AssessPatch method + def is_package_version_installed(self, package_name, package_version): + """Checks if a specific package version is installed + Parameters: + - package_name (string): Name of the package + - package_version (string): Version of the package to check + Commands used: + - sudo dnf list installed (checks if specific package version is installed) + Returns: + - Boolean: True if the specific package version is installed, False otherwise + """ + pass + + + def get_dependent_list(self, packages): + """Returns dependent list for the list of packages + Parameters: + - packages (list): List of package names to get dependencies for + Commands used: + - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) + Returns: List of dependency package names required for the input packages + """ + pass + + def get_product_name(self, package_name): + pass + + def get_package_size(self, output): + """Retrieves package size from installation output string + Parameters: + - output (string): The output string from DNF installation command + Returns: + - String: Package size (e.g., "15 M") or UNKNOWN_PACKAGE_SIZE if not found + """ + pass + + # Install Patch method + def install_security_updates_azgps_coordinated(self): + """Installs security updates in Azure Linux 4 following strict safe deployment practices + Commands used: + - sudo dnf -y upgrade --security --skip-broken (installs security updates only) + Returns: + - Tuple of (return code, output) from the command execution + """ + pass + + def try_meet_azgps_coordinated_requirements(self): + """ + Do we need this for dnf5? + """ + return False + + # ConfigurePatch Method + def get_current_auto_os_patch_state(self): + """ Gets the current auto OS update patch state on the machine """ + pass + + # ConfigurePatch Method + def disable_auto_os_update(self): + """ + Disables auto OS updates on the machine only if they are enabled + Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. + Will we have this installed in other machines which leads to my question on whether we need this or not ? + """ + pass + + def backup_image_default_patch_configuration_if_not_exists(self): + """ + This method saves the original auto-update configuration so it can be restored later. + """ + pass + + def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): + pass + + def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): + pass + + # Post Install method/ Install Patch + def is_reboot_pending(self): + """Checks if there is a pending reboot on the machine + Returns: + - Boolean: True if reboot is pending, False otherwise + """ + pass + + # Post Install method / Install Patch + def do_processes_require_restart(self): + """Checks if processes require a restart due to updates + Commands used: + - sudo dnf -y install dnf-utils (installs dnf-utils if not already present) + - sudo LANG=en_US.UTF8 needs-restarting -r (checks if processes require restart) + Returns: + - Boolean: True if processes require restart, False otherwise + """ + pass + + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): + """ + Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically + Command Used to confirm above: sudo dnf -y install jq + """ + pass + + def set_security_esm_package_status(self, operation, packages): + """No-op for dnf, tdnf, yum and zypper """ + pass + + def separate_out_esm_packages(self, packages, package_versions): + """No-op for dnf, tdnf, yum and zypper """ + pass + + def get_package_install_expected_avg_time_in_seconds(self): + pass + + # ConfigurePatch method + def revert_auto_os_update_to_system_default(self): + """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ + pass + From f11560ee475791f2bf77d9324f7b401f2286ae9a Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Mon, 11 May 2026 17:05:48 -0400 Subject: [PATCH 2/6] Address Copilot comments --- src/core/src/bootstrap/EnvLayer.py | 2 +- .../package_managers/AzL4DnfPackageManager.py | 54 +++++++++---------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index f5dc35ff..8a8abf03 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -71,7 +71,7 @@ def get_package_manager(self): if major is not None and int(major) >= 4: code, out = self.run_command_output('which dnf', False, False) if code == 0: - return 'dnf' + return Constants.DNF else: print("Error: Expected package manager dnf not found on this Azure Linux 4 VM.") return str() diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/AzL4DnfPackageManager.py index c16d1048..388a58e5 100644 --- a/src/core/src/package_managers/AzL4DnfPackageManager.py +++ b/src/core/src/package_managers/AzL4DnfPackageManager.py @@ -37,7 +37,7 @@ def refresh_repo(self): - sudo dnf clean expire-cache (cleans expired cache entries) - sudo dnf -q check-update (checks for available updates) """ - pass + raise NotImplementedError("DNF: refresh_repo not implemented yet") # AssessPatch method def invoke_package_manager_advanced(self, command, raise_on_exception=True): @@ -48,7 +48,7 @@ def invoke_package_manager_advanced(self, command, raise_on_exception=True): Returns: - Tuple of (output, return_code) from the command execution """ - pass + raise NotImplementedError("DNF: invoke_package_manager_advanced not implemented yet") # AssessPatch method def get_all_updates(self, cached=False): @@ -62,7 +62,7 @@ def get_all_updates(self, cached=False): 2. Execute command, parse output, cache results 3. Return all_updates_cached and all_update_versions_cached """ - return [], [] + raise NotImplementedError("DNF: get_all_updates not implemented yet") # AssessPatch method def get_security_updates(self): @@ -73,7 +73,7 @@ def get_security_updates(self): - List of security package names - List of corresponding security package versions """ - pass + raise NotImplementedError("DNF: get_security_updates not implemented yet") # AssessPatch method def get_other_updates(self): @@ -82,7 +82,7 @@ def get_other_updates(self): return [], [] def set_max_patch_publish_date(self, max_patch_publish_date=str()): - pass + raise NotImplementedError("DNF: set_max_patch_publish_date not implemented yet") # Install Patch method def get_composite_package_identifier(self, package_name, package_version): @@ -93,10 +93,10 @@ def get_composite_package_identifier(self, package_name, package_version): Returns: - String: Composite package identifier (e.g., "package-1.0.0.x86_64") """ - pass + raise NotImplementedError("DNF: get_composite_package_identifier not implemented yet") def install_updates_fail_safe(self, excluded_packages): - pass + raise NotImplementedError("DNF: install_updates_fail_safe not implemented yet") # AssessPatch method def get_all_available_versions_of_package(self, package_name): @@ -108,7 +108,7 @@ def get_all_available_versions_of_package(self, package_name): Returns: - List of all available package versions """ - return [] + raise NotImplementedError("DNF: get_all_available_versions_of_package not implemented yet") # AssessPatch method def is_package_version_installed(self, package_name, package_version): @@ -121,7 +121,7 @@ def is_package_version_installed(self, package_name, package_version): Returns: - Boolean: True if the specific package version is installed, False otherwise """ - pass + raise NotImplementedError("DNF: is_package_version_installed not implemented yet") def get_dependent_list(self, packages): @@ -132,10 +132,10 @@ def get_dependent_list(self, packages): - sudo dnf install --assumeno --skip-broken (simulates installation to find dependencies without actually installing) Returns: List of dependency package names required for the input packages """ - pass + raise NotImplementedError("DNF: get_dependent_list not implemented yet") def get_product_name(self, package_name): - pass + raise NotImplementedError("DNF: get_product_name not implemented yet") def get_package_size(self, output): """Retrieves package size from installation output string @@ -144,7 +144,7 @@ def get_package_size(self, output): Returns: - String: Package size (e.g., "15 M") or UNKNOWN_PACKAGE_SIZE if not found """ - pass + raise NotImplementedError("DNF: get_package_size not implemented yet") # Install Patch method def install_security_updates_azgps_coordinated(self): @@ -154,18 +154,18 @@ def install_security_updates_azgps_coordinated(self): Returns: - Tuple of (return code, output) from the command execution """ - pass + raise NotImplementedError("DNF: install_security_updates_azgps_coordinated not implemented yet") def try_meet_azgps_coordinated_requirements(self): """ - Do we need this for dnf5? + Do we need this for dnf? """ - return False + raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ - pass + raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") # ConfigurePatch Method def disable_auto_os_update(self): @@ -174,19 +174,19 @@ def disable_auto_os_update(self): Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. Will we have this installed in other machines which leads to my question on whether we need this or not ? """ - pass + raise NotImplementedError("DNF: disable_auto_os_update not implemented yet") def backup_image_default_patch_configuration_if_not_exists(self): """ This method saves the original auto-update configuration so it can be restored later. """ - pass + raise NotImplementedError("DNF: backup_image_default_patch_configuration_if_not_exists not implemented yet") def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): - pass + raise NotImplementedError("DNF: is_image_default_patch_configuration_backup_valid not implemented yet") def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): - pass + raise NotImplementedError("DNF: update_os_patch_configuration_sub_setting not implemented yet") # Post Install method/ Install Patch def is_reboot_pending(self): @@ -194,7 +194,7 @@ def is_reboot_pending(self): Returns: - Boolean: True if reboot is pending, False otherwise """ - pass + raise NotImplementedError("DNF: is_reboot_pending not implemented yet") # Post Install method / Install Patch def do_processes_require_restart(self): @@ -205,28 +205,28 @@ def do_processes_require_restart(self): Returns: - Boolean: True if processes require restart, False otherwise """ - pass + raise NotImplementedError("DNF: do_processes_require_restart not implemented yet") def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): """ Unnecessary for DNF because the package manager already handles multi-architecture dependencies automatically Command Used to confirm above: sudo dnf -y install jq """ - pass + return def set_security_esm_package_status(self, operation, packages): """No-op for dnf, tdnf, yum and zypper """ - pass + return def separate_out_esm_packages(self, packages, package_versions): """No-op for dnf, tdnf, yum and zypper """ - pass + return def get_package_install_expected_avg_time_in_seconds(self): - pass + raise NotImplementedError("DNF: get_package_install_expected_avg_time_in_seconds not implemented yet") # ConfigurePatch method def revert_auto_os_update_to_system_default(self): """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ - pass + raise NotImplementedError("DNF: revert_auto_os_update_to_system_default not implemented yet") From 1b80cf7603a6f4ef61510f1c3d644ac215c37f9c Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Tue, 19 May 2026 13:31:10 -0400 Subject: [PATCH 3/6] Update class name --- src/core/src/bootstrap/ConfigurationFactory.py | 8 ++++---- .../{AzL4DnfPackageManager.py => DnfPackageManager.py} | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) rename src/core/src/package_managers/{AzL4DnfPackageManager.py => DnfPackageManager.py} (96%) diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index db0972b5..27897574 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -38,7 +38,7 @@ from core.src.package_managers.AptitudePackageManager import AptitudePackageManager from core.src.package_managers.AzL3TdnfPackageManager import AzL3TdnfPackageManager -from core.src.package_managers.AzL4DnfPackageManager import AzL4DnfPackageManager +from core.src.package_managers.DnfPackageManager import DnfPackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -71,19 +71,19 @@ def __init__(self, log_file_path, events_folder, telemetry_supported): self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), - 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_prod_config': self.new_prod_configuration(Constants.DNF, DnfPackageManager), 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), - 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_dev_config': self.new_dev_configuration(Constants.DNF, DnfPackageManager), 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), - 'dnf_test_config': self.new_test_configuration(Constants.DNF, AzL4DnfPackageManager), + 'dnf_test_config': self.new_test_configuration(Constants.DNF, DnfPackageManager), 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, AzL3TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) diff --git a/src/core/src/package_managers/AzL4DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py similarity index 96% rename from src/core/src/package_managers/AzL4DnfPackageManager.py rename to src/core/src/package_managers/DnfPackageManager.py index 388a58e5..aa9c70a2 100644 --- a/src/core/src/package_managers/AzL4DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -14,18 +14,18 @@ # # Requires Python 2.7+ -"""AzL4DnfPackageManager for Azure Linux L4""" +"""DnfPackageManager for Azure Linux L4 and RHEL 10""" from abc import ABCMeta from core.src.bootstrap.Constants import Constants from core.src.package_managers.PackageManager import PackageManager -class AzL4DnfPackageManager(PackageManager): - """Implementation of Azure Linux L4 DNF package management operations""" +class DnfPackageManager(PackageManager): + """Implementation of Azure Linux L4/RHEL 10 DNF package management operations""" def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): - super(AzL4DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) - # TODO: Add AzL4 DNF specific initialization + super(DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) From e42e6013300e3c027b8439d876b05882fba53ccb Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Wed, 20 May 2026 09:05:08 -0400 Subject: [PATCH 4/6] Iternation_1 --- .../src/package_managers/DnfPackageManager.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index aa9c70a2..a419f5be 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -28,6 +28,9 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + # commands for DNF Automatic updates service + self.__init_constants_for_dnf_automatic() + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) # ConfigurePatch Method @@ -230,3 +233,17 @@ def revert_auto_os_update_to_system_default(self): """ Reverts the auto OS update patch state on the machine to its system default value, if one exists in our backup file """ raise NotImplementedError("DNF: revert_auto_os_update_to_system_default not implemented yet") + # region auto OS updates + def __init_constants_for_dnf5_automatic(self): + self.dnf5_automatic_configuration_file_path = None + self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' + self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' + self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' + self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' + self.dnf5_automatic_config_pattern_match_text = None + # Detect them from ExecStart flags instead of a file: + self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' + self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' + self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" + self.dnf5_automatic_installation_state_identifier_text = "installation_state" + self.dnf5_auto_os_update_service = "dnf5-automatic" \ No newline at end of file From 9b9fd31d4aa81216eed14e8530840423fa5efcc8 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Wed, 20 May 2026 14:29:36 -0400 Subject: [PATCH 5/6] First Iteration PR for Enablement/Disablement changes --- .../src/package_managers/DnfPackageManager.py | 141 ++++++++++++++++-- src/core/tests/Test_DnfPackageManager.py | 59 ++++++++ 2 files changed, 186 insertions(+), 14 deletions(-) create mode 100644 src/core/tests/Test_DnfPackageManager.py diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index a419f5be..fcab868b 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -28,8 +28,16 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # TODO: Add AzL4/Red hat 10 DNF specific initialization self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + # auto OS updates + self.current_auto_os_update_service = None + self.enable_on_reboot_identifier_text = "" + self.enable_on_reboot_check_cmd = '' + self.enable_on_reboot_cmd = '' + self.installation_state_identifier_text = "" + self.install_check_cmd = "" + # commands for DNF Automatic updates service - self.__init_constants_for_dnf_automatic() + self.__init_constants_for_dnf5_automatic() __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) @@ -168,16 +176,79 @@ def try_meet_azgps_coordinated_requirements(self): # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ - raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") + self.composite_logger.log("[DNF] Fetching the current automatic OS patch state on the machine...") + + current_auto_os_patch_state_for_dnf5_automatic = self.__get_current_auto_os_patch_state_for_dnf5_automatic() + + self.composite_logger.log("[DNF] OS patch state per auto OS update service: [dnf5-automatic={0}]".format(str(current_auto_os_patch_state_for_dnf5_automatic))) + + if current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.ENABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.ENABLED + elif current_auto_os_patch_state_for_dnf5_automatic == Constants.AutomaticOSPatchStates.DISABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.DISABLED + else: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.UNKNOWN + + self.composite_logger.log_debug("[DNF] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) + return current_auto_os_patch_state + # raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") + + def __get_current_auto_os_patch_state_for_dnf5_automatic(self): + """ Gets current auto OS update patch state for dnf5-automatic """ + self.composite_logger.log_debug("[DNF] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") + self.__init_auto_update_for_dnf5_automatic() + + is_service_installed = self.is_auto_update_service_installed(self.install_check_cmd) + enable_on_reboot_value = False + + if is_service_installed: + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + if enable_on_reboot_value: + return Constants.AutomaticOSPatchStates.ENABLED + else: + return Constants.AutomaticOSPatchStates.DISABLED # ConfigurePatch Method def disable_auto_os_update(self): - """ - Disables auto OS updates on the machine only if they are enabled - Comments from yashna : The current VM with AzLinux4 installed doesnt have dnf automatic/auto OS updates installed. - Will we have this installed in other machines which leads to my question on whether we need this or not ? - """ - raise NotImplementedError("DNF: disable_auto_os_update not implemented yet") + """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ + try: + self.composite_logger.log_verbose("[DNF] Disabling auto OS updates in all identified services...") + self.__disable_auto_os_update_for_dnf5_automatic() + self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates") + + except Exception as error: + self.composite_logger.log_error("[DNF] Could not disable auto OS updates. [Error={0}]".format(repr(error))) + raise + + def __disable_auto_os_update_for_dnf5_automatic(self): + """ Disables auto OS updates, using dnf5-automatic service, and logs the default settings the machine comes with """ + self.composite_logger.log_verbose("[DNF] Disabling auto OS updates using dnf5-automatic") + self.__init_auto_update_for_dnf5_automatic() + + #self.backup_image_default_patch_configuration_if_not_exists() - Will uncomment for later iterations + + if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): + self.composite_logger.log_debug("[DNF] Cannot disable as dnf5-automatic is not installed on the machine") + return + + self.composite_logger.log_verbose("[DNF] Preemptively disabling auto OS updates using dnf5-automatic") + self.disable_auto_update_on_reboot(self.dnf5_automatic_disable_on_reboot_cmd) + + self.composite_logger.log_debug("[DNF] Successfully disabled auto OS updates using dnf5-automatic") + + def disable_auto_update_on_reboot(self, command): + """ Disables auto update on reboot by executing systemctl command """ + self.composite_logger.log_verbose("[DNF] Disabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF][ERROR] Error disabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF] Disabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) def backup_image_default_patch_configuration_if_not_exists(self): """ @@ -235,15 +306,57 @@ def revert_auto_os_update_to_system_default(self): # region auto OS updates def __init_constants_for_dnf5_automatic(self): - self.dnf5_automatic_configuration_file_path = None self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' - self.dnf5_automatic_config_pattern_match_text = None - # Detect them from ExecStart flags instead of a file: - self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' - self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" self.dnf5_automatic_installation_state_identifier_text = "installation_state" - self.dnf5_auto_os_update_service = "dnf5-automatic" \ No newline at end of file + self.dnf5_auto_os_update_service = "dnf5-automatic" + + def __init_auto_update_for_dnf5_automatic(self): + """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ + self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text + self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text + self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd + self.enable_on_reboot_cmd = self.dnf5_automatic_enable_on_reboot_cmd + self.install_check_cmd = self.dnf5_automatic_install_check_cmd + self.current_auto_os_update_service = self.dnf5_auto_os_update_service + + def is_auto_update_service_installed(self, install_check_cmd): + """ Checks if the auto update service is installed on the VM """ + code, out = self.env_layer.run_command_output(install_check_cmd, False, False) + self.composite_logger.log_debug("[DNF] Checked if auto update service is installed. [Command={0}][Code={1}][Output={2}]".format(install_check_cmd, str(code), out)) + if len(out.strip()) > 0 and code == 0: + self.composite_logger.log_debug("[DNF] > Auto OS update service is installed on the machine") + return True + else: + self.composite_logger.log_debug("[DNF] > Auto OS update service is NOT installed on the machine") + return False + + def is_service_set_to_enable_on_reboot(self, command): + """ Checking if auto update is set to enable on reboot on the machine. An enable_on_reboot service will be activated (if currently inactive) on machine reboot """ + code, out = self.env_layer.run_command_output(command, False, False) + self.composite_logger.log_debug("[DNF] Checked if auto update service is set to enable on reboot. [Code={0}][Out={1}]".format(str(code), out)) + if len(out.strip()) > 0 and code == 0 and 'enabled' in out: + self.composite_logger.log_debug("[DNF] > Auto OS update service will enable on reboot") + return True + self.composite_logger.log_debug("[DNF] > Auto OS update service will NOT enable on reboot") + return False + + def enable_auto_update_on_reboot(self): + """ Enables machine default auto update on reboot """ + # type () -> None + command = self.enable_on_reboot_cmd + self.composite_logger.log_verbose("[DNF] Enabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[DNF][ERROR] Error enabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[DNF] Enabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + + # endregion diff --git a/src/core/tests/Test_DnfPackageManager.py b/src/core/tests/Test_DnfPackageManager.py new file mode 100644 index 00000000..e3601213 --- /dev/null +++ b/src/core/tests/Test_DnfPackageManager.py @@ -0,0 +1,59 @@ +# Copyright 2026 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +import unittest + +from core.src.bootstrap.Constants import Constants +from core.tests.library.ArgumentComposer import ArgumentComposer +from core.tests.library.RuntimeCompositor import RuntimeCompositor + + +class TestDnfPackageManager(unittest.TestCase): + def setUp(self): + self.runtime = RuntimeCompositor(ArgumentComposer().get_composed_arguments(), True, Constants.DNF) + self.container = self.runtime.container + + def tearDown(self): + self.runtime.stop() + + def test_disable_auto_os_updates_with_uninstalled_services(self): + """Test disable_auto_os_update when dnf5-automatic is not installed""" + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + + # Should complete without error even when service is not installed + package_manager.disable_auto_os_update() + + def test_get_current_auto_os_patch_state_with_uninstalled_services(self): + """Test get_current_auto_os_patch_state when dnf5-automatic is not installed""" + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + + def test_get_current_auto_os_patch_state_with_installed_services_and_state_disabled(self): + """Test get_current_auto_os_patch_state when dnf5-automatic is installed but disabled""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + # Restore original implementation so package manager logic (rpm + systemctl checks) runs + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + +if __name__ == '__main__': + unittest.main() + From b950de5e8124e183757a5797255fdaebe37e13b6 Mon Sep 17 00:00:00 2001 From: Yashna Parikh Date: Thu, 21 May 2026 18:38:56 -0400 Subject: [PATCH 6/6] Iteration_2 : Auto assessment --- .../src/package_managers/DnfPackageManager.py | 167 ++++++++++++++---- src/core/tests/Test_DnfPackageManager.py | 61 +++++++ 2 files changed, 198 insertions(+), 30 deletions(-) diff --git a/src/core/src/package_managers/DnfPackageManager.py b/src/core/src/package_managers/DnfPackageManager.py index fcab868b..0f1f8db6 100644 --- a/src/core/src/package_managers/DnfPackageManager.py +++ b/src/core/src/package_managers/DnfPackageManager.py @@ -16,17 +16,22 @@ """DnfPackageManager for Azure Linux L4 and RHEL 10""" from abc import ABCMeta +import json from core.src.bootstrap.Constants import Constants from core.src.package_managers.PackageManager import PackageManager class DnfPackageManager(PackageManager): - """Implementation of Azure Linux L4/RHEL 10 DNF package management operations""" + """Implementation of Azure Linux L4/RHEL 10 DNF5 package management operations""" def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): super(DnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) - # TODO: Add AzL4/Red hat 10 DNF specific initialization - self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf') + + self.cmd_clean_cache = "sudo dnf5 -q clean expire-cache" + self.cmd_repo_refresh = "sudo dnf5 -q check-update " + + # TODO: Add AzL4/Red hat 10 DNF5 specific initialization + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, 'dnf5') # auto OS updates self.current_auto_os_update_service = None @@ -35,31 +40,42 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.enable_on_reboot_cmd = '' self.installation_state_identifier_text = "" self.install_check_cmd = "" + self.apply_updates_enabled = "Enabled" + self.apply_updates_disabled = "Disabled" + self.apply_updates_unknown = "Unknown" # commands for DNF Automatic updates service self.__init_constants_for_dnf5_automatic() __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) - # ConfigurePatch Method def refresh_repo(self): - """Refreshes the DNF repository cache and lists available updates by cleaning expired cache entries - Commands: - - sudo dnf clean expire-cache (cleans expired cache entries) - - sudo dnf -q check-update (checks for available updates) - """ - raise NotImplementedError("DNF: refresh_repo not implemented yet") + self.composite_logger.log("[DNF] Refreshing local repo...") + self.invoke_package_manager(self.cmd_clean_cache) + self.invoke_package_manager(self.cmd_repo_refresh) # AssessPatch method def invoke_package_manager_advanced(self, command, raise_on_exception=True): - """Invokes the DNF package manager with standardized command execution, logging, and error handling - Parameters: - - command (string): The DNF command to execute - - raise_on_exception (boolean): Whether to raise exception on non-zero exit code - Returns: - - Tuple of (output, return_code) from the command execution - """ - raise NotImplementedError("DNF: invoke_package_manager_advanced not implemented yet") + self.composite_logger.log_verbose("[DNF] Invoking package manager. [Command={0}]".format(str(command))) + # env_layer.run_command_output returns (code, output) + code, out = self.env_layer.run_command_output(command, False, False) + + # Treat exit code 0 as success. No updates available. + if code == 0: + self.composite_logger.log_debug('[DNF] Invoked package manager. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + + elif code == 100: + # Updates available + self.composite_logger.log_debug( + '[DNF] Updates available. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + else: + self.composite_logger.log_warning('[ERROR] Customer environment error. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + error_msg = "Customer environment error: Investigate and resolve unexpected return code ({0}) from package manager on command: {1}".format(str(code), command) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.PACKAGE_MANAGER_FAILURE) + if raise_on_exception: + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + return out, code # AssessPatch method def get_all_updates(self, cached=False): @@ -173,7 +189,6 @@ def try_meet_azgps_coordinated_requirements(self): """ raise NotImplementedError("DNF: try_meet_azgps_coordinated_requirements not implemented yet") - # ConfigurePatch Method def get_current_auto_os_patch_state(self): """ Gets the current auto OS update patch state on the machine """ self.composite_logger.log("[DNF] Fetching the current automatic OS patch state on the machine...") @@ -191,25 +206,22 @@ def get_current_auto_os_patch_state(self): self.composite_logger.log_debug("[DNF] Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) return current_auto_os_patch_state - # raise NotImplementedError("DNF: get_current_auto_os_patch_state not implemented yet") def __get_current_auto_os_patch_state_for_dnf5_automatic(self): """ Gets current auto OS update patch state for dnf5-automatic """ self.composite_logger.log_debug("[DNF] Fetching current automatic OS patch state in dnf5-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") self.__init_auto_update_for_dnf5_automatic() - is_service_installed = self.is_auto_update_service_installed(self.install_check_cmd) - enable_on_reboot_value = False + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() - if is_service_installed: - enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + if not is_service_installed: + return Constants.AutomaticOSPatchStates.DISABLED if enable_on_reboot_value: return Constants.AutomaticOSPatchStates.ENABLED - else: - return Constants.AutomaticOSPatchStates.DISABLED - # ConfigurePatch Method + return Constants.AutomaticOSPatchStates.DISABLED + def disable_auto_os_update(self): """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ try: @@ -226,7 +238,7 @@ def __disable_auto_os_update_for_dnf5_automatic(self): self.composite_logger.log_verbose("[DNF] Disabling auto OS updates using dnf5-automatic") self.__init_auto_update_for_dnf5_automatic() - #self.backup_image_default_patch_configuration_if_not_exists() - Will uncomment for later iterations + self.backup_image_default_patch_configuration_if_not_exists() if not self.is_auto_update_service_installed(self.dnf5_automatic_install_check_cmd): self.composite_logger.log_debug("[DNF] Cannot disable as dnf5-automatic is not installed on the machine") @@ -254,10 +266,58 @@ def backup_image_default_patch_configuration_if_not_exists(self): """ This method saves the original auto-update configuration so it can be restored later. """ - raise NotImplementedError("DNF: backup_image_default_patch_configuration_if_not_exists not implemented yet") + try: + self.composite_logger.log_debug("[DNF] Ensuring there is a backup of the default patch state for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + + # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file + image_default_patch_configuration_backup = {} + if self.image_default_patch_configuration_backup_exists(): + try: + image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) + except Exception as error: + self.composite_logger.log_error("[DNF] Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) + + # verify if existing backup is valid if not, write to backup + is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) + if is_backup_valid: + self.composite_logger.log_debug("[DNF] Since extension has a valid backup, no need to log the current settings again. [Default Auto OS update settings={0}] [File path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + else: + self.composite_logger.log_debug("[DNF] Since the backup is invalid, will add a new backup with the current auto OS update settings") + self.composite_logger.log_debug("[DNF] Fetching current auto OS update settings for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + backup_image_default_patch_configuration_json_to_add = { + self.current_auto_os_update_service: { + self.enable_on_reboot_identifier_text: enable_on_reboot_value, + self.installation_state_identifier_text: is_service_installed + } + } + + image_default_patch_configuration_backup.update(backup_image_default_patch_configuration_json_to_add) + + self.composite_logger.log_debug("[DNF] Logging default system configuration settings for auto OS updates. [Settings={0}] [Log file path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + self.env_layer.file_system.write_with_retry(self.image_default_patch_configuration_backup_path, '{0}'.format(json.dumps(image_default_patch_configuration_backup)), mode='w+') + except Exception as error: + error_message = "[DNF] Exception during fetching and logging default auto update settings on the machine. [Exception={0}]".format(repr(error)) + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) + raise def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): - raise NotImplementedError("DNF: is_image_default_patch_configuration_backup_valid not implemented yet") + # Validate backup JSON for dnf5 automatic service + try: + if self.dnf5_auto_os_update_service in image_default_patch_configuration_backup \ + and self.dnf5_automatic_enable_on_reboot_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service] \ + and self.dnf5_automatic_installation_state_identifier_text in image_default_patch_configuration_backup[self.dnf5_auto_os_update_service]: + self.composite_logger.log_debug("[DNF] Extension has a valid backup for default dnf5-automatic configuration settings") + return True + else: + self.composite_logger.log_debug("[DNF] Extension does not have a valid backup for default dnf5-automatic configuration settings") + return False + except Exception: + return False def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value, patch_configuration_sub_setting_pattern_to_match): raise NotImplementedError("DNF: update_os_patch_configuration_sub_setting not implemented yet") @@ -306,16 +366,24 @@ def revert_auto_os_update_to_system_default(self): # region auto OS updates def __init_constants_for_dnf5_automatic(self): + self.dnf5_automatic_configuration_service = 'systemctl cat dnf5-automatic.service' self.dnf5_automatic_install_check_cmd = 'rpm -qa | grep dnf5-plugin-automatic' self.dnf5_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf5-automatic.timer' self.dnf5_automatic_disable_on_reboot_cmd = 'systemctl disable --now dnf5-automatic.timer' self.dnf5_automatic_enable_on_reboot_cmd = 'systemctl enable --now dnf5-automatic.timer' + self.dnf5_automatic_config_pattern_match_text = None + # Detect them from ExecStart flags instead of a file: + self.dnf5_automatic_download_updates_identifier_text = '--downloadupdates' + self.dnf5_automatic_apply_updates_identifier_text = '--installupdates' self.dnf5_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" self.dnf5_automatic_installation_state_identifier_text = "installation_state" self.dnf5_auto_os_update_service = "dnf5-automatic" def __init_auto_update_for_dnf5_automatic(self): """ Initializes all generic auto OS update variables with the config values for dnf5 automatic service """ + self.os_patch_configuration_settings_read_cmd = self.dnf5_automatic_configuration_service + self.download_updates_identifier_text = self.dnf5_automatic_download_updates_identifier_text + self.apply_updates_identifier_text = self.dnf5_automatic_apply_updates_identifier_text self.enable_on_reboot_identifier_text = self.dnf5_automatic_enable_on_reboot_identifier_text self.installation_state_identifier_text = self.dnf5_automatic_installation_state_identifier_text self.enable_on_reboot_check_cmd = self.dnf5_automatic_enable_on_reboot_check_cmd @@ -323,6 +391,45 @@ def __init_auto_update_for_dnf5_automatic(self): self.install_check_cmd = self.dnf5_automatic_install_check_cmd self.current_auto_os_update_service = self.dnf5_auto_os_update_service + def __get_current_auto_os_updates_setting_on_machine(self): + """ Gets all the update settings related to auto OS updates via dnf """ + try: + download_updates_value = "" + apply_updates_value = "" + is_service_installed = False + enable_on_reboot_value = False + + # get install state + if not self.is_auto_update_service_installed(self.install_check_cmd): + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + is_service_installed = True + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + self.composite_logger.log_debug( + "[DNF] Checking if auto updates are currently enabled...") + + # Check systemd service unit file for ExecStart flags to determine current settings + # Get the dnf5-automatic.service configuration + code, unit_output = self.env_layer.run_command_output(self.os_patch_configuration_settings_read_cmd, False, False) + + if code == 0: + self.composite_logger.log_debug( + "[DNF] Retrieved dnf5-automatic service unit configuration...") + + # ExecStart line format example: ExecStart=/usr/bin/dnf5 automatic --timer + for line in unit_output.split('\n'): + if line.strip().startswith('ExecStart=') and 'dnf5 automatic' in line: + self.composite_logger.log_debug("[DNF] ExecStart line: {0}".format(line)) + break + + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + except Exception as error: + raise Exception( + "[DNF] Error occurred in fetching current auto OS update settings from the machine (dnf5). [Exception={0}]".format( + repr(error))) + def is_auto_update_service_installed(self, install_check_cmd): """ Checks if the auto update service is installed on the VM """ code, out = self.env_layer.run_command_output(install_check_cmd, False, False) diff --git a/src/core/tests/Test_DnfPackageManager.py b/src/core/tests/Test_DnfPackageManager.py index e3601213..280fe1ac 100644 --- a/src/core/tests/Test_DnfPackageManager.py +++ b/src/core/tests/Test_DnfPackageManager.py @@ -28,6 +28,13 @@ def setUp(self): def tearDown(self): self.runtime.stop() + + def test_refresh_repo(self): + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_manager.refresh_repo_safely() + def test_disable_auto_os_updates_with_uninstalled_services(self): """Test disable_auto_os_update when dnf5-automatic is not installed""" self.runtime.set_legacy_test_type('SadPath') @@ -44,6 +51,58 @@ def test_get_current_auto_os_patch_state_with_uninstalled_services(self): current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + def test_get_current_auto_os_patch_state_with_installed_services_and_state_enabled(self): + """Test get_current_auto_os_patch_state for DNF when service is installed and enabled""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + # Mock the systemctl cat output with enabled flags + systemctl_cat_output = '''[Unit] + Description=Run dnf5 automatic updates + After=network-online.target + + [Timer] + OnBootSec=1h + OnUnitActiveSec=24h + AccuracySec=1h + Persistent=true + + [Install] + WantedBy=timers.target + + [Service] + Type=oneshot + ExecStart=/usr/bin/dnf5 automatic --timer + StandardOutput=journal + StandardError=journal + ''' + + # Mock the run_command_output for systemctl cat + backup_run_command_output = self.runtime.env_layer.run_command_output + + def mock_systemctl_cat(cmd, no_output=False, chk_err=False): + if 'rpm -qa | grep dnf5-plugin-automatic' in cmd: + return 0, 'dnf5-plugin-automatic-xyz' + + # Mock timer enabled + elif 'systemctl is-enabled dnf5-automatic.timer' in cmd: + return 0, 'enabled' + + # Mock service file + elif 'systemctl cat dnf5-automatic' in cmd: + return 0, systemctl_cat_output + + return backup_run_command_output(cmd, no_output, chk_err) + + self.runtime.env_layer.run_command_output = mock_systemctl_cat + + try: + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.ENABLED) + finally: + self.runtime.env_layer.run_command_output = backup_run_command_output + def test_get_current_auto_os_patch_state_with_installed_services_and_state_disabled(self): """Test get_current_auto_os_patch_state when dnf5-automatic is installed but disabled""" self.runtime.set_legacy_test_type('HappyPath') @@ -52,6 +111,8 @@ def test_get_current_auto_os_patch_state_with_installed_services_and_state_disab package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) if __name__ == '__main__':