Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/core/src/bootstrap/Constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@ class PatchAssessmentSummaryStartedBy(EnumBackport):
STATUS_ERROR_MSG_SIZE_LIMIT_IN_CHARACTERS = 128
STATUS_ERROR_LIMIT = 5

# Boot Certificates to update
class Certificates(EnumBackport):
KEK = "KEK"
DB = "DB"

LATEST_CERTIFICATE_TIMESTAMP = "2023"

class PatchOperationTopLevelErrorCode(EnumBackport):
SUCCESS = 0
ERROR = 1
Expand All @@ -312,6 +319,7 @@ class PatchOperationErrorCodes(EnumBackport):
NEWER_OPERATION_SUPERSEDED = "NEWER_OPERATION_SUPERSEDED"
UA_ESM_REQUIRED = "UA_ESM_REQUIRED"
TRUNCATION = "PACKAGE_LIST_TRUNCATED"
CERTIFICATE_UPDATE = "CERTIFICATE_UPDATE"

ERROR_ADDED_TO_STATUS = "Error_added_to_status"

Expand Down
3 changes: 3 additions & 0 deletions src/core/src/core_logic/PatchInstaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def start_installation(self, simulate=False):
self.composite_logger.log_debug("Attempting to reboot the machine prior to patch installation as there is a reboot pending...")
reboot_manager.start_reboot_if_required_and_time_available(maintenance_window.get_remaining_time_in_minutes(None, False))

# Update certs if available
self.package_manager.update_certs()

if self.execution_config.max_patch_publish_date != str():
self.package_manager.set_max_patch_publish_date(self.execution_config.max_patch_publish_date)

Expand Down
110 changes: 110 additions & 0 deletions src/core/src/package_managers/AptitudePackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import re
import shutil
import sys
from abc import abstractmethod

from core.src.package_managers.PackageManager import PackageManager
from core.src.bootstrap.Constants import Constants
Expand Down Expand Up @@ -91,6 +92,28 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ

self.package_install_expected_avg_time_in_seconds = 90 # As per telemetry data, the average time to install package is around 81 seconds for apt.

# Update certificates in factory defaults
# self.check_mokutil_exists_cmd = "command -v mokutil"
self.install_mokutil_cmd = "sudo apt-get install -y -qq mokutil"
self.proposed_repo_file_path = "/etc/apt/sources.list.d/proposed.list"
self.proposed_pin_file_path = "/etc/apt/preferences.d/proposed"
self.add_proposed_repo_cmd = (
"bash -c 'echo \"deb http://archive.ubuntu.com/ubuntu/ "
"$( . /etc/os-release && echo $VERSION_CODENAME )-proposed restricted main multiverse universe\" "
"| sudo tee {0}'").format(self.proposed_repo_file_path)
self.create_proposed_pin_cmd = (
"bash -c 'cat << EOF | sudo tee {0}\n"
"Package: *\n"
"Pin: release a=$( . /etc/os-release && echo $VERSION_CODENAME )-proposed\n"
"Pin-Priority: 100\n"
"EOF'").format(self.proposed_pin_file_path)
self.remove_proposed_repo_cmd = "sudo rm -f {0}".format(self.proposed_repo_file_path)
self.remove_proposed_pin_cmd = "sudo rm -f {0}".format(self.proposed_pin_file_path)
self.apt_update_cmd = "sudo apt-get -q update"
self.install_fwupd_from_proposed_cmd = "bash -c 'sudo apt-get install -y -t $( . /etc/os-release && echo $VERSION_CODENAME )-proposed fwupd'"
self.fwupd_refresh_cmd = "sudo fwupdmgr refresh" # NOTE: This could be made generic in package manager, depending on what solution type is adopted for other distros
self.fwupd_update_cmd = "sudo fwupdmgr update -y"

# region Sources Management
def __get_custom_sources_to_spec(self, max_patch_published_date=str(), base_classification=str()):
# type: (str, str) -> (str, str)
Expand Down Expand Up @@ -944,3 +967,90 @@ def separate_out_esm_packages(self, packages, package_versions):
def get_package_install_expected_avg_time_in_seconds(self):
return self.package_install_expected_avg_time_in_seconds

# region Update certificates in factory defaults
def try_install_mokutil(self):
# type: () -> bool
""" Attempts to install mokutil """
cmd = self.install_mokutil_cmd
self.composite_logger.log_verbose('[APM] Invoking install mokutil command [Command={0}]'.format(cmd))
out, code = self.invoke_package_manager_advanced(cmd, raise_on_exception=False)
self.composite_logger.log_debug('[APM] Invoked install mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out)))
return code == 0

def try_update_certs(self):
""" Attempts to update certificate status """
self.composite_logger.log("[APM][Certs] Starting cert update flow.")
if self.are_latest_certs_present():
self.composite_logger.log("[APM][Certs] Latest certs already present. Skipping.")
return True

success = False
try:
# shell/file steps
self.__run_cert_shell_command(self.add_proposed_repo_cmd, "AddProposedRepo", raise_on_error=True)
self.__run_cert_shell_command(self.create_proposed_pin_cmd, "CreateAptPin", raise_on_error=True)

# apt/package-manager steps
self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterRepo", raise_on_error=True)
self.__run_cert_apt_command(self.install_fwupd_from_proposed_cmd, "InstallFwupdFromProposed", raise_on_error=True)

# shell fwupd steps
self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True)
self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True)

if self.are_latest_certs_present():
self.composite_logger.log("[APM][Certs] Cert update completed and verified.")
success = True
else:
msg = "[APM][Certs] Cert update commands completed but latest certs not detected."
self.composite_logger.log_error(msg)
self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE)

except Exception as error:
self.composite_logger.log_error(
"[APM][Certs] Exception during cert update flow. [Error={0}]".format(repr(error)))

finally:
# best-effort cleanup
self.__run_cert_shell_command(self.remove_proposed_pin_cmd, "CleanupAptPin", raise_on_error=False)
self.__run_cert_shell_command(self.remove_proposed_repo_cmd, "CleanupProposedRepo", raise_on_error=False)
self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdateAfterCleanup", raise_on_error=False)

if success:
self.status_handler.set_reboot_pending(self.is_reboot_pending())

return success

def __run_cert_apt_command(self, command, step_name, raise_on_error=False):
"""Run apt/dpkg commands through package-manager wrapper."""
out, code = self.invoke_package_manager_advanced(command, raise_on_exception=False)
if code != self.apt_exitcode_ok:
msg = "[APM][UpdateCerts] Apt step failed. [Step={0}][Command={1}][Code={2}][Output={3}]".format(
step_name, str(command), str(code), str(out))
self.composite_logger.log_error(msg)
self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE)
if raise_on_error:
raise Exception(msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS))
return False, out

self.composite_logger.log_debug(
"[APM][UpdateCerts] Apt step succeeded. [Step={0}][Output={1}]".format(step_name, str(out)))
return True, out

def __run_cert_shell_command(self, command, step_name, raise_on_error=False):
"""Run non-apt utility commands directly."""
code, out = self.env_layer.run_command_output(command, False, False)
if code != 0:
msg = "[APM][UpdateCerts] Shell step failed. [Step={0}][Command={1}][Code={2}][Output={3}]".format(
step_name, str(command), str(code), str(out))
self.composite_logger.log_error(msg)
self.status_handler.add_error_to_status(msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE)
if raise_on_error:
raise Exception(msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS))
return False, out

self.composite_logger.log_debug(
"[APM][UpdateCerts] Shell step succeeded. [Step={0}][Output={1}]".format(step_name, str(out)))
return True, out
# endregion

88 changes: 88 additions & 0 deletions src/core/src/package_managers/PackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ
# Primarily for debian-based but generalizing for back-compat on customer-driven scenarios
self.REBOOT_PENDING_FILE_PATH = '/var/run/reboot-required'

# Update certificates in factory defaults
self.check_mokutil_exists_cmd = "command -v mokutil"
self.get_kek_cert_status_cmd = "mokutil --kek | grep 'CN='" # todo: review if this should be changed to mokutil --kek 2>&1 | grep -oP 'CN=\K[^,]+' | sort -u | tr '\n' '; ' || echo "none"
self.get_db_cert_status_cmd = "mokutil --db | grep 'CN='"
Comment on lines +59 to +60
self.mokutil_success_exit_code = 0

__metaclass__ = ABCMeta # For Python 3.0+, it changes to class Abstract(metaclass=ABCMeta)

@abstractmethod
Expand Down Expand Up @@ -491,3 +497,85 @@ def get_package_install_expected_avg_time_in_seconds(self):
"""Retrieves average time to install package in seconds."""
pass

# region Update certificates in factory defaults
def update_certs(self):
# 1. Fetch and Log current certs on the VM NOTE: Common across distros
# ensure mokutil exists
# if not, install mokutil
# If success, fetch cert detail
# If not, throw exception and stop
# log output
# 2. If 2023 certs already exists, do nothing
# 3. If not, perform all the steps to update certs
# 4. Validate and log new certs were installed

self.composite_logger.log_verbose("[PM] Updating current certificates if needed...")
is_mokutil_installed = self.is_mokutil_installed()
try_install_mokutil_status = False
if not is_mokutil_installed:
try_install_mokutil_status = self.try_install_mokutil()

if is_mokutil_installed or try_install_mokutil_status:
self.try_update_certs()
else:
error_msg = "[PM][UpdateCerts] Mokutil is not installed or could not be installed. Cannot fetch current certs."
self.composite_logger.log_error(error_msg)
self.status_handler.add_error_to_status(error_msg,
Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE)
raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS))

def is_mokutil_installed(self):
# type: () -> bool
"""check if mokutil is installed"""
cmd = self.check_mokutil_exists_cmd
self.composite_logger.log_verbose('[PM] Invoking check mokutil exists command [Command={0}]'.format(cmd))
code, out = self.env_layer.run_command_output(cmd, False, False)
self.composite_logger.log_debug('[PM] Invoked check mokutil exists. [Command={0}][Code={1}][Output={2}]'.format(cmd, str(code), str(out)))
return code == 0

def are_latest_certs_present(self):
kek_code, kek_out = self.fetch_current_certs(Constants.Certificates.KEK, self.get_kek_cert_status_cmd, raise_on_exception=False)
db_code, db_out = self.fetch_current_certs(Constants.Certificates.DB, self.get_db_cert_status_cmd, raise_on_exception=False)
return self.is_latest_cert_installed(kek_code, kek_out) and self.is_latest_cert_installed(db_code, db_out)

def fetch_current_certs(self, cert_type, get_cert_status_cmd, raise_on_exception=False):
# type: (str, str, bool) -> (int, str)
""" Fetches the status of the certificates on the machine """
cmd = get_cert_status_cmd
self.composite_logger.log_verbose('[PM] Invoking mokutil to fetch certificate details [Certificate={0}][Command={1}]'.format(cert_type, cmd))
code, out = self.env_layer.run_command_output(cmd, False, False)

if code != self.mokutil_success_exit_code:
self.composite_logger.log_warning('[ERROR] Customer environment error. Error invoking mokutil to fetch certificate status. '
'[Certificate={0}][Command={1}][Code={2}][Output={3}]'.format(cert_type, cmd, str(code), str(out)))
error_msg = ("Customer environment error: Investigate and resolve unexpected return code ({0}) for certificate ({1}) on command: {2}"
.format(str(code), cert_type, cmd))
self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.CERTIFICATE_UPDATE)
if raise_on_exception:
raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS))

else:
self.composite_logger.log_debug('[PM] Invoked mokutil to fetch certificate details. [Certificate={0}][Command={1}][Code={2}][Output={3}]'
.format(cert_type, cmd, str(code), str(out)))

return code, out

def is_latest_cert_installed(self, status_code, status_output):
# type: (int, str) -> bool
""" Checks if the latest certs are already installed on the machine based on mokutil output """
if status_code != self.mokutil_success_exit_code:
return False

return Constants.LATEST_CERTIFICATE_TIMESTAMP in status_output

@abstractmethod
def try_install_mokutil(self):
""" Attempts to install mokutil """
pass

@abstractmethod
def try_update_certs(self):
""" Attempts to update certificate status """
pass
# endregion

10 changes: 10 additions & 0 deletions src/core/src/package_managers/TdnfPackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,13 @@ def separate_out_esm_packages(self, packages, package_versions):
def get_package_install_expected_avg_time_in_seconds(self):
return self.package_install_expected_avg_time_in_seconds

# region Update certificates in factory defaults
def try_install_mokutil(self):
""" Attempts to install mokutil """
pass

def try_update_certs(self):
""" Attempts to update certificate status """
pass
# endregion

11 changes: 11 additions & 0 deletions src/core/src/package_managers/YumPackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import json
import os
import re
from abc import abstractmethod
from core.src.package_managers.PackageManager import PackageManager
from core.src.bootstrap.Constants import Constants

Expand Down Expand Up @@ -1121,3 +1122,13 @@ def separate_out_esm_packages(self, packages, package_versions):
def get_package_install_expected_avg_time_in_seconds(self):
return self.package_install_expected_avg_time_in_seconds

# region Update certificates in factory defaults
def try_install_mokutil(self):
""" Attempts to install mokutil """
pass

def try_update_certs(self):
""" Attempts to update certificate status """
pass
# endregion

11 changes: 11 additions & 0 deletions src/core/src/package_managers/ZypperPackageManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import re
import time
from abc import abstractmethod
from core.src.package_managers.PackageManager import PackageManager
from core.src.bootstrap.Constants import Constants

Expand Down Expand Up @@ -873,3 +874,13 @@ def separate_out_esm_packages(self, packages, package_versions):
def get_package_install_expected_avg_time_in_seconds(self):
return self.package_install_expected_avg_time_in_seconds

# region Update certificates in factory defaults
def try_install_mokutil(self):
""" Attempts to install mokutil """
pass

def try_update_certs(self):
""" Attempts to update certificate status """
pass
# endregion
Comment on lines +877 to +885

50 changes: 25 additions & 25 deletions src/core/tests/Test_EnvLayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,31 +152,31 @@ def test_platform(self):
self.envlayer.platform.cpu_arch()
self.envlayer.platform.vm_name()

def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self):
"""Test that Azure Linux 4 and RHEL 10 log unsupported message"""
self.backup_platform_system = platform.system
self.backup_linux_distribution = self.envlayer.platform.linux_distribution
self.backup_distro_os_release_attr = distro.os_release_attr

platform.system = self.mock_platform_system
test_input_output_table = [
[self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"],
[self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"],
]

for row in test_input_output_table:
self.envlayer.platform.linux_distribution = row[0]
distro.os_release_attr = row[1]

captured_output = io.StringIO()
sys.stdout = captured_output
result = self.envlayer.get_package_manager()
sys.stdout = sys.__stdout__
self.assertEqual(row[2], captured_output.getvalue())
self.assertEqual(result, "")

# restore
self.__restore_mocks()
# def test_get_package_manager_azure_linux_4_and_rhel10_not_supported(self):
# """Test that Azure Linux 4 and RHEL 10 log unsupported message"""
# self.backup_platform_system = platform.system
# self.backup_linux_distribution = self.envlayer.platform.linux_distribution
# self.backup_distro_os_release_attr = distro.os_release_attr
#
# platform.system = self.mock_platform_system
# test_input_output_table = [
# [self.mock_linux_distribution_to_return_azure_linux_4, self.mock_distro_os_release_attr_return_azure_linux_4, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Microsoft Azure Linux][Version=4.0][Code=]\n"],
# [self.mock_linux_distribution_to_return_rhel_10, self.mock_distro_os_release_attr_return_rhel_10, "Error: This distro is not yet supported in your region. Please review https://aka.ms/VMGuestPatchingCompatibility for more information. [Distro=Red Hat][Version=10.0][Code=abc]\n"],
# ]
#
# for row in test_input_output_table:
# self.envlayer.platform.linux_distribution = row[0]
# distro.os_release_attr = row[1]
#
# captured_output = io.StringIO()
# sys.stdout = captured_output
# result = self.envlayer.get_package_manager()
# sys.stdout = sys.__stdout__
# self.assertEqual(row[2], captured_output.getvalue())
# self.assertEqual(result, "")
#
# # restore
# self.__restore_mocks()
Comment on lines +155 to +179

Comment on lines +157 to 180
def __restore_mocks(self):
"""Restore backed up mocks to their original state"""
Expand Down
Loading