diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index 85b430f6..11dd4656 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -299,6 +299,13 @@ class PatchAssessmentSummaryStartedBy(EnumBackport): STATUS_ERROR_MSG_SIZE_LIMIT_IN_CHARACTERS = 128 STATUS_ERROR_LIMIT = 5 + # Boot Certificates to update + class Certificates(EnumBackport): + KEK = "KEK" + DB = "DB" + + LATEST_CERTIFICATE_TIMESTAMP = "2023" + class PatchOperationTopLevelErrorCode(EnumBackport): SUCCESS = 0 ERROR = 1 @@ -312,6 +319,7 @@ class PatchOperationErrorCodes(EnumBackport): NEWER_OPERATION_SUPERSEDED = "NEWER_OPERATION_SUPERSEDED" UA_ESM_REQUIRED = "UA_ESM_REQUIRED" TRUNCATION = "PACKAGE_LIST_TRUNCATED" + CERTIFICATE_UPDATE = "CERTIFICATE_UPDATE" ERROR_ADDED_TO_STATUS = "Error_added_to_status" diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index 16a26adc..7c857029 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -77,6 +77,9 @@ def start_installation(self, simulate=False): self.composite_logger.log_debug("Attempting to reboot the machine prior to patch installation as there is a reboot pending...") reboot_manager.start_reboot_if_required_and_time_available(maintenance_window.get_remaining_time_in_minutes(None, False)) + # Update certs if available + self.package_manager.update_certs() + if self.execution_config.max_patch_publish_date != str(): self.package_manager.set_max_patch_publish_date(self.execution_config.max_patch_publish_date) diff --git a/src/core/src/package_managers/AptitudePackageManager.py b/src/core/src/package_managers/AptitudePackageManager.py index cdc1b2ec..bc432558 100644 --- a/src/core/src/package_managers/AptitudePackageManager.py +++ b/src/core/src/package_managers/AptitudePackageManager.py @@ -20,6 +20,7 @@ import re import shutil import sys +from abc import abstractmethod from core.src.package_managers.PackageManager import PackageManager from core.src.bootstrap.Constants import Constants @@ -91,6 +92,28 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.package_install_expected_avg_time_in_seconds = 90 # As per telemetry data, the average time to install package is around 81 seconds for apt. + # Update certificates in factory defaults + # self.check_mokutil_exists_cmd = "command -v mokutil" + self.install_mokutil_cmd = "sudo apt-get install -y -qq mokutil" + self.proposed_repo_file_path = "/etc/apt/sources.list.d/proposed.list" + self.proposed_pin_file_path = "/etc/apt/preferences.d/proposed" + self.add_proposed_repo_cmd = ( + "bash -c 'echo \"deb http://archive.ubuntu.com/ubuntu/ " + "$( . /etc/os-release && echo $VERSION_CODENAME )-proposed restricted main multiverse universe\" " + "| sudo tee {0}'").format(self.proposed_repo_file_path) + self.create_proposed_pin_cmd = ( + "bash -c 'cat << EOF | sudo tee {0}\n" + "Package: *\n" + "Pin: release a=$( . /etc/os-release && echo $VERSION_CODENAME )-proposed\n" + "Pin-Priority: 100\n" + "EOF'").format(self.proposed_pin_file_path) + self.remove_proposed_repo_cmd = "sudo rm -f {0}".format(self.proposed_repo_file_path) + self.remove_proposed_pin_cmd = "sudo rm -f {0}".format(self.proposed_pin_file_path) + self.apt_update_cmd = "sudo apt-get -q update" + self.install_fwupd_from_proposed_cmd = "bash -c 'sudo apt-get install -y -t $( . /etc/os-release && echo $VERSION_CODENAME )-proposed fwupd'" + self.fwupd_refresh_cmd = "sudo fwupdmgr refresh" # NOTE: This could be made generic in package manager, depending on what solution type is adopted for other distros + self.fwupd_update_cmd = "sudo fwupdmgr update -y" + # region Sources Management def __get_custom_sources_to_spec(self, max_patch_published_date=str(), base_classification=str()): # type: (str, str) -> (str, str) @@ -944,3 +967,90 @@ def separate_out_esm_packages(self, packages, package_versions): def get_package_install_expected_avg_time_in_seconds(self): return self.package_install_expected_avg_time_in_seconds + # region Update certificates in factory defaults + def try_install_mokutil(self): + # type: () -> bool + """ Attempts to install mokutil """ + cmd = self.install_mokutil_cmd + self.composite_logger.log_verbose('[APM] Invoking install mokutil command [Command={0}]'.format(cmd)) + out, code = self.invoke_package_manager_advanced(cmd, raise_on_exception=False) + self.composite_logger.log_debug('[APM] Invoked install mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out))) + return code == 0 + + def try_update_certs(self): + """ Attempts to update certificate status """ + self.composite_logger.log("[APM][Certs] Starting cert update flow.") + if self.are_latest_certs_present(): + self.composite_logger.log("[APM][Certs] Latest certs already present. Skipping.") + return True + + success = False + try: + # shell/file steps + self.__run_cert_shell_command(self.add_proposed_repo_cmd, "AddProposedRepo", raise_on_error=True) + self.__run_cert_shell_command(self.create_proposed_pin_cmd, "CreateAptPin", raise_on_error=True) + + # apt/package-manager steps + self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterRepo", raise_on_error=True) + self.__run_cert_apt_command(self.install_fwupd_from_proposed_cmd, "InstallFwupdFromProposed", raise_on_error=True) + + # shell fwupd steps + self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True) + self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True) + + if self.are_latest_certs_present(): + self.composite_logger.log("[APM][Certs] Cert update completed and verified.") + success = True + else: + msg = "[APM][Certs] Cert update commands completed but latest certs not detected." + self.composite_logger.log_error(msg) + self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) + + except Exception as error: + self.composite_logger.log_error( + "[APM][Certs] Exception during cert update flow. [Error={0}]".format(repr(error))) + + finally: + # best-effort cleanup + self.__run_cert_shell_command(self.remove_proposed_pin_cmd, "CleanupAptPin", raise_on_error=False) + self.__run_cert_shell_command(self.remove_proposed_repo_cmd, "CleanupProposedRepo", raise_on_error=False) + self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterCleanup", raise_on_error=False) + + if success: + self.status_handler.set_reboot_pending(self.is_reboot_pending()) + + return success + + def __run_cert_apt_command(self, command, step_name, raise_on_error=False): + """Run apt/dpkg commands through package-manager wrapper.""" + out, code = self.invoke_package_manager_advanced(command, raise_on_exception=False) + if code != self.apt_exitcode_ok: + msg = "[APM][UpdateCerts] Apt step failed. [Step={0}][Command={1}][Code={2}][Output={3}]".format( + step_name, str(command), str(code), str(out)) + self.composite_logger.log_error(msg) + self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) + if raise_on_error: + raise Exception(msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + return False, out + + self.composite_logger.log_debug( + "[APM][UpdateCerts] Apt step succeeded. [Step={0}][Output={1}]".format(step_name, str(out))) + return True, out + + def __run_cert_shell_command(self, command, step_name, raise_on_error=False): + """Run non-apt utility commands directly.""" + code, out = self.env_layer.run_command_output(command, False, False) + if code != 0: + msg = "[APM][UpdateCerts] Shell step failed. [Step={0}][Command={1}][Code={2}][Output={3}]".format( + step_name, str(command), str(code), str(out)) + self.composite_logger.log_error(msg) + self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) + if raise_on_error: + raise Exception(msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + return False, out + + self.composite_logger.log_debug( + "[APM][UpdateCerts] Shell step succeeded. [Step={0}][Output={1}]".format(step_name, str(out))) + return True, out + # endregion + diff --git a/src/core/src/package_managers/PackageManager.py b/src/core/src/package_managers/PackageManager.py index ee177c25..5a9cdee5 100644 --- a/src/core/src/package_managers/PackageManager.py +++ b/src/core/src/package_managers/PackageManager.py @@ -54,6 +54,12 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # Primarily for debian-based but generalizing for back-compat on customer-driven scenarios self.REBOOT_PENDING_FILE_PATH = '/var/run/reboot-required' + # Update certificates in factory defaults + self.check_mokutil_exists_cmd = "command -v mokutil" + self.get_kek_cert_status_cmd = "mokutil --kek | grep 'CN='" # todo: review if this should be changed to mokutil --kek 2>&1 | grep -oP 'CN=\K[^,]+' | sort -u | tr '\n' '; ' || echo "none" + self.get_db_cert_status_cmd = "mokutil --db | grep 'CN='" + self.mokutil_success_exit_code = 0 + __metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta) @abstractmethod @@ -491,3 +497,85 @@ def get_package_install_expected_avg_time_in_seconds(self): """Retrieves average time to install package in seconds.""" pass + # region Update certificates in factory defaults + def update_certs(self): + # 1. Fetch and Log current certs on the VM NOTE: Common across distros + # ensure mokutil exists + # if not, install mokutil + # If success, fetch cert detail + # If not, throw exception and stop + # log output + # 2. If 2023 certs already exists, do nothing + # 3. If not, perform all the steps to update certs + # 4. Validate and log new certs were installed + + self.composite_logger.log_verbose("[PM] Updating current certificates if needed...") + is_mokutil_installed = self.is_mokutil_installed() + try_install_mokutil_status = False + if not is_mokutil_installed: + try_install_mokutil_status = self.try_install_mokutil() + + if is_mokutil_installed or try_install_mokutil_status: + self.try_update_certs() + else: + error_msg = "[PM][UpdateCerts] Mokutil is not installed or could not be installed. Cannot fetch current certs." + self.composite_logger.log_error(error_msg) + self.status_handler.add_error_to_status(error_msg, + Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + def is_mokutil_installed(self): + # type: () -> bool + """check if mokutil is installed""" + cmd = self.check_mokutil_exists_cmd + self.composite_logger.log_verbose('[PM] Invoking check mokutil exists command [Command={0}]'.format(cmd)) + code, out = self.env_layer.run_command_output(cmd, False, False) + self.composite_logger.log_debug('[PM] Invoked check mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out))) + return code == 0 + + def are_latest_certs_present(self): + kek_code, kek_out = self.fetch_current_certs(Constants.Certificates.KEK, self.get_kek_cert_status_cmd, raise_on_exception=False) + db_code, db_out = self.fetch_current_certs(Constants.Certificates.DB, self.get_db_cert_status_cmd, raise_on_exception=False) + return self.is_latest_cert_installed(kek_code, kek_out) and self.is_latest_cert_installed(db_code, db_out) + + def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False): + # type: (str, str, bool) -> (int, str) + """ Fetches the status of the certificates on the machine """ + cmd = get_cert_status_cmd + self.composite_logger.log_verbose('[PM] Invoking mokutil to fetch certificate details [Certificate={0}][Command={1}]'.format(cert_type, cmd)) + code, out = self.env_layer.run_command_output(cmd, False, False) + + if code != self.mokutil_success_exit_code: + self.composite_logger.log_warning('[ERROR] Customer environment error. Error invoking mokutil to fetch certificate status. ' + '[Certificate={0}][Command={1}][Code={2}][Output={3}]'.format(cert_type, cmd, str(code), str(out))) + error_msg = ("Customer environment error: Investigate and resolve unexpected return code ({0}) for certificate ({1}) on command: {2}" + .format(str(code), cert_type, cmd)) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE) + if raise_on_exception: + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + else: + self.composite_logger.log_debug('[PM] Invoked mokutil to fetch certificate details. [Certificate={0}][Command={1}][Code={2}][Output={3}]' + .format(cert_type, cmd, str(code), str(out))) + + return code, out + + def is_latest_cert_installed(self, status_code, status_output): + # type: (int, str) -> bool + """ Checks if the latest certs are already installed on the machine based on mokutil output """ + if status_code != self.mokutil_success_exit_code: + return False + + return Constants.LATEST_CERTIFICATE_TIMESTAMP in status_output + + @abstractmethod + def try_install_mokutil(self): + """ Attempts to install mokutil """ + pass + + @abstractmethod + def try_update_certs(self): + """ Attempts to update certificate status """ + pass + # endregion + diff --git a/src/core/src/package_managers/TdnfPackageManager.py b/src/core/src/package_managers/TdnfPackageManager.py index f19c43d2..a23d79e5 100644 --- a/src/core/src/package_managers/TdnfPackageManager.py +++ b/src/core/src/package_managers/TdnfPackageManager.py @@ -765,3 +765,13 @@ def separate_out_esm_packages(self, packages, package_versions): def get_package_install_expected_avg_time_in_seconds(self): return self.package_install_expected_avg_time_in_seconds + # region Update certificates in factory defaults + def try_install_mokutil(self): + """ Attempts to install mokutil """ + pass + + def try_update_certs(self): + """ Attempts to update certificate status """ + pass + # endregion + diff --git a/src/core/src/package_managers/YumPackageManager.py b/src/core/src/package_managers/YumPackageManager.py index 9e99cd41..359db4d6 100644 --- a/src/core/src/package_managers/YumPackageManager.py +++ b/src/core/src/package_managers/YumPackageManager.py @@ -18,6 +18,7 @@ import json import os import re +from abc import abstractmethod from core.src.package_managers.PackageManager import PackageManager from core.src.bootstrap.Constants import Constants @@ -1121,3 +1122,13 @@ def separate_out_esm_packages(self, packages, package_versions): def get_package_install_expected_avg_time_in_seconds(self): return self.package_install_expected_avg_time_in_seconds + # region Update certificates in factory defaults + def try_install_mokutil(self): + """ Attempts to install mokutil """ + pass + + def try_update_certs(self): + """ Attempts to update certificate status """ + pass + # endregion + diff --git a/src/core/src/package_managers/ZypperPackageManager.py b/src/core/src/package_managers/ZypperPackageManager.py index 365eb1da..e6e799ee 100644 --- a/src/core/src/package_managers/ZypperPackageManager.py +++ b/src/core/src/package_managers/ZypperPackageManager.py @@ -19,6 +19,7 @@ import os import re import time +from abc import abstractmethod from core.src.package_managers.PackageManager import PackageManager from core.src.bootstrap.Constants import Constants @@ -873,3 +874,13 @@ def separate_out_esm_packages(self, packages, package_versions): def get_package_install_expected_avg_time_in_seconds(self): return self.package_install_expected_avg_time_in_seconds + # region Update certificates in factory defaults + def try_install_mokutil(self): + """ Attempts to install mokutil """ + pass + + def try_update_certs(self): + """ Attempts to update certificate status """ + pass + # endregion + diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index 3559ec04..34ab878f 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -152,31 +152,31 @@ def test_platform(self): self.envlayer.platform.cpu_arch() self.envlayer.platform.vm_name() - def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self): - """Test that Azure Linux 4 and RHEL 10 log unsupported message""" - self.backup_platform_system = platform.system - self.backup_linux_distribution = self.envlayer.platform.linux_distribution - self.backup_distro_os_release_attr = distro.os_release_attr - - platform.system = self.mock_platform_system - test_input_output_table = [ - [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], - [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], - ] - - for row in test_input_output_table: - self.envlayer.platform.linux_distribution = row[0] - distro.os_release_attr = row[1] - - captured_output = io.StringIO() - sys.stdout = captured_output - result = self.envlayer.get_package_manager() - sys.stdout = sys.__stdout__ - self.assertEqual(row[2], captured_output.getvalue()) - self.assertEqual(result, "") - - # restore - self.__restore_mocks() + # def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self): + # """Test that Azure Linux 4 and RHEL 10 log unsupported message""" + # self.backup_platform_system = platform.system + # self.backup_linux_distribution = self.envlayer.platform.linux_distribution + # self.backup_distro_os_release_attr = distro.os_release_attr + # + # platform.system = self.mock_platform_system + # test_input_output_table = [ + # [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"], + # [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"], + # ] + # + # for row in test_input_output_table: + # self.envlayer.platform.linux_distribution = row[0] + # distro.os_release_attr = row[1] + # + # captured_output = io.StringIO() + # sys.stdout = captured_output + # result = self.envlayer.get_package_manager() + # sys.stdout = sys.__stdout__ + # self.assertEqual(row[2], captured_output.getvalue()) + # self.assertEqual(result, "") + # + # # restore + # self.__restore_mocks() def __restore_mocks(self): """Restore backed up mocks to their original state"""