Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions tests/auto_inject/test_auto_inject_workload_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import uuid
from scp import SCPClient

from utils import scenarios, context, features, irrelevant, logger
from utils.onboarding.injection_log_parser import command_injection_skipped


class _AutoInjectWorkloadSelectionBaseTest:
"""Base class to test workload selection policies on auto instrumentation."""

def _execute_remote_command(self, ssh_client, command):
"""Execute remote command and get remote log file from the vm. You can use this method using env variables or using injection config file"""

unique_log_name = f"host_injection_{uuid.uuid4()}.log"

command_with_config = f"DD_APM_INSTRUMENTATION_DEBUG=TRUE DD_APM_INSTRUMENTATION_OUTPUT_PATHS=/var/log/datadog_weblog/{unique_log_name} {command}"
logger.info(f"Executing command: [{command_with_config}] associated with log file: [{unique_log_name}]")
log_local_path = context.scenario.host_log_folder + f"/{unique_log_name}"

_, stdout, stderr = ssh_client.exec_command(command_with_config)
logger.info("Command output:")
logger.info(stdout.readlines())
logger.info("Command err output:")
logger.info(stderr.readlines())

scp = SCPClient(ssh_client.get_transport())
scp.get(remote_path=f"/var/log/datadog_weblog/{unique_log_name}", local_path=log_local_path)

return log_local_path


@features.host_block_list
@scenarios.installer_auto_injection
@irrelevant(condition=context.weblog_variant == "test-app-dotnet-iis")
class TestAutoInjectWorkloadSelectionInstallManualHost(_AutoInjectWorkloadSelectionBaseTest):
"""Test that auto instrumentation respects workload selection policies (excluded specific commands and args)."""

# Commands excluded by workload selection policy (should not be instrumented)
# no_language_found_commands = [
# "touch myfile.txt",
# "hello=hola cat myfile.txt",
# "ls -la",
# "mkdir newdir",
# ]

# Commands with args excluded by workload selection policy per language (should not be instrumented)
commands_excluded_by_workload_policy = {
"java": ["java -version", "MY_ENV_VAR=hello java -version"],
"dotnet": [
"dotnet restore",
"dotnet build -c Release",
"dotnet publish",
"MY_ENV_VAR=hello dotnet build -c Release",
],
}

# Commands with args included by workload selection policy per language (should be instrumented)
commands_not_excluded_by_workload_policy = {
"java": [
"java -jar myjar.jar",
"java -jar myjar.jar",
"version=-version java -jar myjar.jar",
"java -Dversion=-version -jar myapp.jar",
],
"dotnet": [
"dotnet run -- -p build",
"dotnet build.dll -- -p build",
"dotnet run myapp.dll -- -p build",
"dotnet publish",
"MY_ENV_VAR=build dotnet myapp.dll",
],
}

# @irrelevant(
# condition="container" in context.weblog_variant
# or "alpine" in context.weblog_variant
# or "buildpack" in context.weblog_variant
# )
# def test_no_language_found_commands(self):
# """Check that commands with no language found are skipped from auto injection."""
# virtual_machine = context.virtual_machine
# logger.info(f"[{virtual_machine.get_ip()}] Executing commands with no language found")
# ssh_client = virtual_machine.get_ssh_connection()
# for command in self.no_language_found_commands:
# local_log_file = self._execute_remote_command(ssh_client, command)
# assert command_injection_skipped(command, local_log_file), (
# f"The command '{command}' was allowed by auto injection but should have been denied"
# )

@irrelevant(
condition="container" in context.weblog_variant
or "alpine" in context.weblog_variant
or "buildpack" in context.weblog_variant
)
def test_commands_denied_by_workload_selection(self):
"""Check that commands are skipped from auto injection based on workload selection policies."""
virtual_machine = context.virtual_machine
logger.info(f"[{virtual_machine.get_ip()}] Executing commands that are denied by workload selection policies")
language = context.library.name
if language not in self.commands_excluded_by_workload_policy:
return
ssh_client = virtual_machine.get_ssh_connection()
for command in self.commands_excluded_by_workload_policy[language]:
local_log_file = self._execute_remote_command(ssh_client, command)
assert command_injection_skipped(command, local_log_file), (
f"The command '{command}' was allowed by auto injection but should have been denied"
)

@irrelevant(
condition="container" in context.weblog_variant
or "alpine" in context.weblog_variant
or "buildpack" in context.weblog_variant
)
def test_commands_allowed_by_workload_selection(self):
"""Check that commands are allowed to be instrumented based on workload selection policies."""
virtual_machine = context.virtual_machine
logger.info(f"[{virtual_machine.get_ip()}] Executing commands that are allowed by workload selection policies")
language = context.library.name
if language not in self.commands_not_excluded_by_workload_policy:
return
ssh_client = virtual_machine.get_ssh_connection()
for command in self.commands_not_excluded_by_workload_policy[language]:
local_log_file = self._execute_remote_command(ssh_client, command)
assert command_injection_skipped(command, local_log_file) is False, (
f"The command '{command}' was denied by auto injection but should have been allowed"
)
116 changes: 0 additions & 116 deletions tests/auto_inject/test_blocklist_auto_inject.py

This file was deleted.

6 changes: 3 additions & 3 deletions tests/test_the_test/scenarios.json
Original file line number Diff line number Diff line change
Expand Up @@ -3150,13 +3150,13 @@
"tests/auto_inject/test_auto_inject_install.py::TestContainerAutoInjectInstallScriptAppsec::test_appsec": [
"CONTAINER_AUTO_INJECTION_INSTALL_SCRIPT_APPSEC"
],
"tests/auto_inject/test_blocklist_auto_inject.py::TestAutoInjectBlockListInstallManualHost::test_builtin_block_commands": [
"tests/auto_inject/test_auto_inject_workload_selection.py::TestAutoInjectWorkloadSelectionInstallManualHost::test_commands_excluded_by_workload_policy": [
"INSTALLER_AUTO_INJECTION"
],
"tests/auto_inject/test_blocklist_auto_inject.py::TestAutoInjectBlockListInstallManualHost::test_builtin_block_args": [
"tests/auto_inject/test_auto_inject_workload_selection.py::TestAutoInjectWorkloadSelectionInstallManualHost::test_args_excluded_by_workload_policy": [
"INSTALLER_AUTO_INJECTION"
],
"tests/auto_inject/test_blocklist_auto_inject.py::TestAutoInjectBlockListInstallManualHost::test_builtin_instrument_args": [
"tests/auto_inject/test_auto_inject_workload_selection.py::TestAutoInjectWorkloadSelectionInstallManualHost::test_args_included_by_workload_policy": [
"INSTALLER_AUTO_INJECTION"
],
"tests/debugger/test_debugger_code_origins.py::Test_Debugger_Code_Origins::test_code_origin_entry_present": [
Expand Down
92 changes: 48 additions & 44 deletions utils/onboarding/injection_log_parser.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
import re
from collections.abc import Callable
import json
from pathlib import Path

from utils._logger import logger

WLS_DENIED_INJECTION = "Workload selection denied injection"
WLS_ALLOWED_INJECTION = "Workload selection allowed injection: continuing"
NO_KNOWN_RUNTIME = "No known runtime was detected - not injecting!"


def exclude_telemetry_logs_filter(line: str):
return '"command":"telemetry"' not in line and '"caller":"telemetry/' not in line


def command_injection_skipped(command_line: str, log_local_path: str):
"""From parsed log, search on the list of logged commands
if one command has been skipped from the instrumentation
"""Determine if the given command was skipped from auto injection
(e.g. by workload selection policies or no language matched).
"""
command, command_args = _parse_command(command_line)
logger.debug(f"- Checking command: {command_args}")
for command_desc in _get_commands_from_log_file(log_local_path, exclude_telemetry_logs_filter):
# First line contains the name of the intercepted command
first_line_json = json.loads(command_desc[0])
if command in first_line_json["inFilename"]:
# last line contains the skip message. The command was skipped by build-in deny list or by user deny list
last_line_json = json.loads(command_desc[-1])
# pylint: disable=R1705
if last_line_json["msg"] == "not injecting; on deny list":
logger.debug(f" Command {command_args} was skipped by build-in deny list")
return True
elif last_line_json["msg"] == "not injecting; on user deny list":
logger.debug(f" Command {command_args} was skipped by user defined deny process list")
return True
elif last_line_json["msg"] in ["error injecting", "error when parsing", "skipping"] and (
last_line_json["error"].startswith(
(
"skipping due to ignore rules for language",
"error when parsing: skipping due to ignore rules for language",
)
)
):
logger.info(f" Command {command_args} was skipped by ignore arguments")
return True
logger.info(f" Missing injection deny: {last_line_json}")
return False
command, _ = _parse_command(command_line)
logger.debug(f"- Checking command: {command_line}")
for process_logs in _get_process_logs_from_log_file(log_local_path, exclude_telemetry_logs_filter):
process_exe = _get_exe_from_log_line(process_logs[0])
if process_exe is None or command != process_exe:
continue
if _process_chunk_means_skipped(process_logs):
logger.debug(f" Command '{command}' was skipped (denied by WLS or no known runtime)")
return True
logger.info(f" Command '{command}' was allowed and injected")
return False

logger.info(f" Command {command} was NOT FOUND")
raise ValueError(f"Command {command} was NOT FOUND")


def _process_chunk_means_skipped(chunk: list[str]) -> bool:
"""True if injection was skipped: denied by workload selection or no known runtime detected."""
text = "\n".join(chunk)
return WLS_DENIED_INJECTION in text or NO_KNOWN_RUNTIME in text


def _get_exe_from_log_line(line: str) -> str | None:
"""Extract executable name from the log line "process_exe: 'X'"."""
match = re.search(r"process_exe:\s*['\"]([^'\"]+)['\"]", line)
if match:
return Path(match.group(1)).name
return None


def _parse_command(command: str):
command_args = command.split()
command = None
Expand All @@ -64,33 +66,35 @@ def _parse_command(command: str):
return None, None


def _get_commands_from_log_file(log_local_path: str, line_filter: Callable):
"""From instrumentation log file, extract all commands parsed by dd-injection (the log level should be DEBUG)"""
def _get_process_logs_from_log_file(log_local_path: str, line_filter: Callable):
r"""From instrumentation log file, extract all log lines per process.

store_as_command = False
command_lines = []
A process chunk starts at the line containing \"process_exe:\" and runs until
\"injector finished\" (or the next \"process_exe:\"). This includes WLS decision
lines and post-WLS lines like \"No known runtime was detected - not injecting!\".
"""
process_logs = []
with open(log_local_path, encoding="utf-8") as f:
for line in f:
if not line_filter(line):
continue
if "starting process" in line:
store_as_command = True
if "process_exe:" in line:
if process_logs:
yield process_logs.copy()
process_logs = [line]
continue
if "exiting process" in line:
store_as_command = False
yield command_lines.copy()
command_lines = []
if process_logs and (WLS_DENIED_INJECTION in line or WLS_ALLOWED_INJECTION in line):
process_logs.append(line)
yield process_logs.copy()
process_logs = []
continue

if store_as_command:
command_lines.append(line)


def main():
log_file = "logs_onboarding_host_block_list/host_injection_21711f84-86b3-4125-9a5f-cd129195d99a.log"
command = "java -Dversion=-version -jar myapp.jar"
skipped = command_injection_skipped(command, log_file)
logger.info(f"The command was skiped? {skipped}")
logger.info(f"The command was skipped? {skipped}")


if __name__ == "__main__":
Expand Down
Loading