From 197e3a3cb6fb488e6afe8be76aea1dac203b6a18 Mon Sep 17 00:00:00 2001 From: Nathan Jacobson <22107+natecj@users.noreply.github.com> Date: Wed, 20 May 2026 16:27:25 -0400 Subject: [PATCH] feat: add adguard_export plugin and tests Adds a new NetAlertX plugin that syncs known devices from the NetAlertX database to AdGuard Home as persistent clients, keeping names, MACs, IP addresses, and device-type tags in sync. Also fixes the adguard_import config.json description placeholder and a minor indentation inconsistency in that file. Co-Authored-By: Claude Sonnet 4.6 --- front/plugins/adguard_export/README.md | 140 ++++++ front/plugins/adguard_export/config.json | 534 +++++++++++++++++++++++ front/plugins/adguard_export/script.py | 499 +++++++++++++++++++++ front/plugins/adguard_import/config.json | 4 +- test/plugins/test_adguard_export.py | 401 +++++++++++++++++ 5 files changed, 1576 insertions(+), 2 deletions(-) create mode 100644 front/plugins/adguard_export/README.md create mode 100644 front/plugins/adguard_export/config.json create mode 100644 front/plugins/adguard_export/script.py create mode 100644 test/plugins/test_adguard_export.py diff --git a/front/plugins/adguard_export/README.md b/front/plugins/adguard_export/README.md new file mode 100644 index 000000000..e0cdc948c --- /dev/null +++ b/front/plugins/adguard_export/README.md @@ -0,0 +1,140 @@ +# adguard_export — NetAlertX Plugin + +> **Direction:** NetAlertX → AdGuard Home +> Syncs known devices from the NetAlertX database to AdGuard Home as **persistent clients**, keeping device names, MAC addresses, and IP identifiers in sync. + +--- + +## What it does + +On every run the plugin: + +1. Reads all (or only *known*) devices from the NetAlertX database. +2. Fetches the current list of persistent clients from AdGuard Home via its REST API. +3. **Adds** clients that are in NetAlertX but not yet in AdGuard Home. +4. **Updates** clients whose name, identifiers, or device-type tag have changed. +5. Optionally **deletes** clients that have been removed from NetAlertX (see `DELETE` setting). + +Device types set in NetAlertX (e.g. `Smartphone`, `Laptop`, `NAS`) are automatically mapped to the corresponding AdGuard Home `device_*` tags (e.g. `device_phone`, `device_laptop`, `device_nas`). + +--- + +## Requirements + +| Requirement | Notes | +|---|---| +| AdGuard Home | v0.107+ (REST API must be enabled) | +| Python packages | `requests`, `pytz` — already present in the NetAlertX container | +| AdGuard credentials | A user account with permission to manage clients | + +--- + +## Installation + +1. Copy the `adguard_export/` folder into `/app/front/plugins/` inside your NetAlertX container (or mount it as a volume). +2. Restart NetAlertX so the plugin is discovered. +3. Open **Settings → Plugins → AdGuard (Device Export)** and configure the settings below. + +--- + +## Settings + +| Setting key | Default | Description | +|---|---|---| +| `ADGUARDEXP_RUN` | `disabled` | When to run: `disabled`, `once`, or `schedule` | +| `ADGUARDEXP_RUN_SCHD` | `0 * * * *` | Cron schedule (default: hourly) | +| `ADGUARDEXP_URL` | `http://192.168.11.1:3000` | Base URL of AdGuard Home web UI | +| `ADGUARDEXP_USER` | `admin` | AdGuard Home username | +| `ADGUARDEXP_PASSWORD` | *(empty)* | AdGuard Home password | +| `ADGUARDEXP_VERIFYSSL` | `true` | Verify TLS cert; set `false` for self-signed certs | +| `ADGUARDEXP_INCLUDE_OFFLINE` | `true` | When `true`, devices not seen in the last scan are still exported | +| `ADGUARDEXP_INCLUDE_NEW` | `false` | When `false`, devices flagged as new/unknown are excluded until identified | +| `ADGUARDEXP_USEMAC` | `true` | Use MAC address as primary client identifier; falls back to IP | +| `ADGUARDEXP_DELETE` | `false` | ⚠ Delete AdGuard clients no longer present in NetAlertX | + +--- + +## AdGuard Home client identifiers + +AdGuard Home identifies a client by one or more **ids**, which can be: + +- A MAC address (e.g. `aa:bb:cc:dd:ee:ff`) +- An IP address (e.g. `192.168.1.42`) +- A CIDR range +- A ClientID string + +When `ADGUARDEXP_USEMAC=true`, the plugin prefers the device's MAC address and includes the last known IP as a secondary identifier. When `ADGUARDEXP_USEMAC=false`, only the IP address is used. + +--- + +## Device type tags + +The plugin maps NetAlertX device types to valid AdGuard Home `device_*` tags automatically: + +| NetAlertX type | AdGuard tag | +|---|---| +| Smartphone, Phone, Mobile | `device_phone` | +| Laptop, Notebook | `device_laptop` | +| Desktop, Server, Hypervisor | `device_pc` | +| Tablet | `device_tablet` | +| Smart TV, SmartTV, TV | `device_tv` | +| NAS | `device_nas` | +| Printer | `device_printer` | +| IP Camera, Camera | `device_camera` | +| Game Console | `device_gameconsole` | +| Speaker, Assistant, Virtual Assistance | `device_audio` | +| AP, Gateway, Router, House Appliance | `device_other` | + +Devices with an unrecognised or empty type are exported without a tag. + +--- + +## Safe deletion + +When `ADGUARDEXP_DELETE=true`, the plugin only removes clients it previously created — it will never delete clients you added manually in AdGuard Home. Ownership is tracked in a local state file at: + +``` +/app/db/state.ADGUARDEXP.json +``` + +--- + +## Logs + +Plugin logs are written to: + +``` +/tmp/log/plugins/script.ADGUARDEXP.log +``` + +Result rows (used by the NetAlertX UI) are written to: + +``` +/tmp/log/plugins/last_result.ADGUARDEXP.log +``` + +--- + +## Troubleshooting + +| Symptom | Likely cause | +|---|---| +| `Connection failed` in logs | Wrong `ADGUARDEXP_URL` or AdGuard Home is unreachable from the NetAlertX container | +| `HTTP error: 401` | Wrong username / password | +| `HTTP error: 400` | Client already exists with conflicting ids — check AdGuard Home for duplicate entries | +| Devices not appearing | `ADGUARDEXP_INCLUDE_NEW=false` and devices are flagged as new/unknown; identify them in NetAlertX first | +| SSL errors | Set `ADGUARDEXP_VERIFYSSL=false` for self-signed certificates | + +--- + +## Related plugins + +- **adguard_import** — the reverse direction: imports devices *from* AdGuard Home *into* NetAlertX. + +--- + +## Notes + +- Version: 1.0.0 +- Author: [natecj](https://github.com/natecj) +- Release Date: 2026-05-10 diff --git a/front/plugins/adguard_export/config.json b/front/plugins/adguard_export/config.json new file mode 100644 index 000000000..c6da90272 --- /dev/null +++ b/front/plugins/adguard_export/config.json @@ -0,0 +1,534 @@ +{ + "code_name": "adguard_export", + "unique_prefix": "ADGUARDEXP", + "plugin_type": "other", + "execution_order": "Layer_0", + "enabled": true, + "data_source": "script", + "localized": ["display_name", "description", "icon"], + "display_name": [ + { + "language_code": "en_us", + "string": "AdGuard (Device Export)" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Exports known devices from NetAlertX to AdGuard as persistent clients, keeping names and IP/MAC identifiers in sync." + } + ], + "icon": [ + { + "language_code": "en_us", + "string": "" + } + ], + "timeout": 120, + "params": [], + "settings": [ + { + "function": "RUN", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "disabled", + "options": ["disabled", "once", "schedule"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "When to run" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Enable or schedule the export. Use 'schedule' together with the RUN_SCHD setting." + } + ] + }, + { + "function": "RUN_SCHD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "0 * * * *", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Schedule" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Cron expression for how often to run. Default: every hour on the hour." + } + ] + }, + { + "function": "CMD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "python3 /app/front/plugins/adguard_export/script.py", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Command" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Command to execute. Do not change unless you know what you are doing." + } + ] + }, + { + "function": "URL", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "http://localhost:3000", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home URL" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Base URL of your AdGuard Home web interface, e.g. http://192.168.1.1:3000" + } + ] + }, + { + "function": "USER", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "admin", + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home Username" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Username for AdGuard Home basic authentication." + } + ] + }, + { + "function": "PASSWORD", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "input", + "elementOptions": [{ "type": "password" }], + "transformers": [] + } + ] + }, + "maxLength": 200, + "default_value": "", + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard Home Password" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Password for AdGuard Home basic authentication." + } + ] + }, + { + "function": "VERIFYSSL", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Verify SSL Certificate" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Set to 'false' to skip TLS certificate verification (useful for self-signed certs)." + } + ] + }, + { + "function": "INCLUDE_OFFLINE", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Include offline devices" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', devices not seen in the last scan are still exported to AdGuard Home." + } + ] + }, + { + "function": "INCLUDE_NEW", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "false", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Include new/unknown devices" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', devices flagged as new/unknown are also exported. When 'false', only identified devices are exported." + } + ] + }, + { + "function": "USEMAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "true", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Use MAC address as identifier" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When 'true', MAC addresses are used as the primary client identifier in AdGuard Home. Falls back to IP when no MAC is available." + } + ] + }, + { + "function": "DELETE", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "select", + "elementOptions": [], + "transformers": [] + } + ] + }, + "default_value": "false", + "options": ["true", "false"], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Delete clients missing from NetAlertX" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "Caution: When 'true', AdGuard Home clients that were previously exported by this plugin but are no longer present in NetAlertX will be deleted. Ownership is tracked in a local state file — manually added AdGuard clients are never deleted." + } + ] + } + ], + "database_column_definitions": [ + { + "column": "Object_PrimaryID", + "mapped_to_column": "cur_MAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "filter_info": { + "type": "MAC", + "name": "MAC", + "source": "table" + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "MAC / ID" + } + ] + }, + { + "column": "Object_SecondaryID", + "mapped_to_column": "cur_IP", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "IP Address" + } + ] + }, + { + "column": "Watched_Value1", + "mapped_to_column": "cur_Name", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Device Name" + } + ] + }, + { + "column": "Watched_Value2", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "MAC Address" + } + ] + }, + { + "column": "Watched_Value3", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Last IP" + } + ] + }, + { + "column": "Watched_Value4", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "AdGuard URL" + } + ] + }, + { + "column": "Dummy", + "mapped_to_column": "scanSourcePlugin", + "mapped_to_column_data": { + "value": "adguard_export" + }, + "css_classes": "col-sm-2", + "show": false, + "type": "label", + "default_value": "", + "options": [], + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "ADGUARDEXP" + } + ] + }, + { + "column": "Extra", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [ + { + "option": "cssClass", + "value": "text-muted small" + } + ], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Status" + } + ] + }, + { + "column": "ForeignKey", + "mapped_to_column": "dev_MAC", + "type": { + "dataType": "string", + "elements": [ + { + "elementType": "label", + "elementOptions": [], + "transformers": [] + } + ] + }, + "localized": ["name"], + "name": [ + { + "language_code": "en_us", + "string": "Device link" + } + ] + } + ] +} diff --git a/front/plugins/adguard_export/script.py b/front/plugins/adguard_export/script.py new file mode 100644 index 000000000..432e037c8 --- /dev/null +++ b/front/plugins/adguard_export/script.py @@ -0,0 +1,499 @@ +#!/usr/bin/env python +# adguard_export/script.py +# +# NetAlertX plugin: adguard_export +# Syncs known devices from the NetAlertX database to AdGuard Home as +# persistent clients, keeping names, MACs, and IP addresses in sync. +# +# AdGuard Home API reference: +# GET /control/clients – list all persistent clients +# POST /control/clients/add – create a new persistent client +# POST /control/clients/update – update an existing persistent client +# POST /control/clients/delete – remove a persistent client + +import os +import sys +import json +import requests +from pytz import timezone +import sqlite3 +from typing import Dict, List, Optional, Set, Tuple + +# Define the installation path and extend the system path for plugin imports +INSTALL_PATH = os.getenv('NETALERTX_APP', '/app') +sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) + +from const import dataPath, logPath, fullDbPath # noqa: E402, E261 +from plugin_helper import Plugin_Objects # noqa: E402, E261 +from logger import mylog, Logger # noqa: E402, E261 +from helper import get_setting_value # noqa: E402, E261 +import conf # noqa: E402, E261 + +# ---------------------------- +# Plugin metadata +# ---------------------------- +pluginName = "ADGUARDEXP" + +# Make sure the TIMEZONE for logging is correct +conf.tz = timezone(get_setting_value("TIMEZONE")) + +# Make sure log level is initialized correctly +Logger(get_setting_value("LOG_LEVEL")) + +# Define paths +LOG_PATH = logPath + "/plugins" +RESULT_FILE = os.path.join(LOG_PATH, f"last_result.{pluginName}.log") +STATE_FILE = os.path.join(dataPath, f"state.{pluginName}.json") + +plugin_objects = Plugin_Objects(RESULT_FILE) + + +def load_managed_names() -> Set[str]: + """Return the set of AdGuard client names we previously added.""" + try: + with open(STATE_FILE) as f: + return set(json.load(f).get("managed", [])) + except (FileNotFoundError, json.JSONDecodeError): + return set() + + +def save_managed_names(names: Set[str]) -> None: + with open(STATE_FILE, "w") as f: + json.dump({"managed": sorted(names)}, f, indent=2) + + +# --------------------------------------------------------------------------- +# Device type → AdGuard tag mapping +# --------------------------------------------------------------------------- +_TYPE_TAG_MAP: Dict[str, str] = { + "ap": "device_other", + "desktop": "device_pc", + "game console": "device_gameconsole", + "gameconsole": "device_gameconsole", + "gateway": "device_other", + "house appliance": "device_other", + "hypervisor": "device_pc", + "ip camera": "device_camera", + "camera": "device_camera", + "laptop": "device_laptop", + "notebook": "device_laptop", + "nas": "device_nas", + "printer": "device_printer", + "router": "device_other", + "server": "device_pc", + "smarttv": "device_tv", + "smart tv": "device_tv", + "tv": "device_tv", + "smartphone": "device_phone", + "phone": "device_phone", + "mobile": "device_phone", + "smartwatch": "device_phone", + "watch": "device_phone", + "tablet": "device_tablet", + "virtual assistance": "device_audio", + "assistant": "device_audio", + "speaker": "device_audio", +} + + +def device_type_to_tag(dev_type: str) -> str: + """Map a NetAlertX devType string to a valid AdGuard Home tag, or ''.""" + if not dev_type: + return "" + key = dev_type.strip().lower() + if key in _TYPE_TAG_MAP: + return _TYPE_TAG_MAP[key] + # Substring fallback for partial matches + for pattern, tag in _TYPE_TAG_MAP.items(): + if pattern in key: + return tag + return "" + + +# --------------------------------------------------------------------------- +# AdGuard Home client +# --------------------------------------------------------------------------- +class AdGuardClient: + """Thin wrapper around the AdGuard Home /control/clients* API.""" + + def __init__(self, base_url: str, username: str, password: str, verify_ssl: bool = True): + self.base_url = base_url.rstrip("/") + self.auth = (username, password) + self.verify_ssl = verify_ssl + self.session = requests.Session() + self.session.auth = self.auth + + def _url(self, path: str) -> str: + return f"{self.base_url}/control/{path.lstrip('/')}" + + def get_clients(self) -> List[dict]: + """Return the list of persistent (manually added) clients.""" + resp = self.session.get(self._url("clients"), verify=self.verify_ssl, timeout=15) + resp.raise_for_status() + return resp.json().get("clients") or [] + + def add_client(self, client: dict) -> None: + resp = self.session.post( + self._url("clients/add"), + json=client, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + def update_client(self, old_name: str, client: dict) -> None: + payload = {"name": old_name, "data": client} + resp = self.session.post( + self._url("clients/update"), + json=payload, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + def delete_client(self, name: str) -> None: + resp = self.session.post( + self._url("clients/delete"), + json={"name": name}, + verify=self.verify_ssl, + timeout=15, + ) + resp.raise_for_status() + + +# --------------------------------------------------------------------------- +# Database helpers +# --------------------------------------------------------------------------- +def get_netalertx_devices(db_path: str, include_offline: bool, include_new: bool) -> List[dict]: + """ + Query NetAlertX's Devices table and return a list of dicts with the + fields we care about: mac, name, last_ip, dev_type + """ + devices = [] + conn = None + try: + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + clauses = ["devIsArchived = 0"] + if not include_offline: + clauses.append("devPresentLastScan = 1") + if not include_new: + clauses.append("devIsNew = 0") + where = "WHERE " + " AND ".join(clauses) + + cur.execute( + f""" + SELECT devMac AS mac, + devName AS name, + devLastIP AS last_ip, + devType AS dev_type + FROM Devices + {where} + ORDER BY devMac + """ + ) + for row in cur.fetchall(): + mac = (row["mac"] or "").strip() + name = (row["name"] or "").strip() + last_ip = (row["last_ip"] or "").strip() + dev_type = (row["dev_type"] or "").strip() + + # Skip completely empty rows + if not mac and not last_ip: + continue + + # Fall back to MAC as name when no friendly name is set + if not name: + name = mac or last_ip + + devices.append({"mac": mac, "name": name, "last_ip": last_ip, "dev_type": dev_type}) + + except sqlite3.Error as exc: + mylog("verbose", [f"[{pluginName}] ERROR reading NetAlertX database: {exc}"]) + finally: + if conn: + conn.close() + + return devices + + +# --------------------------------------------------------------------------- +# Sync logic +# --------------------------------------------------------------------------- +def build_agrd_client(device: dict, use_mac: bool) -> dict: + """ + Build an AdGuard Home client object from a NetAlertX device row. + + AdGuard Home identifies a client by its 'ids' list, which may contain + MACs, IPs, CIDRs, or ClientIDs. We prefer MAC when available; fall + back to IP otherwise. + """ + ids = [] + if use_mac and device["mac"] and device["mac"] not in ("", "00:00:00:00:00:00"): + ids.append(device["mac"].lower()) + if device["last_ip"] and device["last_ip"] not in ("", "0.0.0.0"): + ids.append(device["last_ip"]) + + if not ids: + return {} # nothing useful to identify the device + + tag = device_type_to_tag(device.get("dev_type", "")) + return { + "name": device["name"], + "ids": ids, + "tags": [tag] if tag else [], + "use_global_settings": True, + "use_global_blocked_services": True, + "filtering_enabled": False, + "parental_enabled": False, + "safebrowsing_enabled": False, + "safesearch_enabled": False, + "blocked_services": [], + "upstreams": [], + } + + +def sync_to_adguard( + agrd: AdGuardClient, + devices: List[dict], + use_mac: bool, + delete_missing: bool, + existing_clients: Optional[List[dict]] = None, +) -> Tuple[int, int, int, int]: + """ + Core sync routine. Returns (added, updated, skipped, deleted). + Pass existing_clients to reuse a list already fetched (avoids a second + round-trip when the caller performed a connectivity check first). + """ + if existing_clients is None: + existing_clients = agrd.get_clients() + mylog("verbose", [f"[{pluginName}] AdGuard Home currently has {len(existing_clients)} persistent client(s)."]) + + # Build a lookup: identifier → client dict + existing_by_id: Dict[str, dict] = {} + for client in existing_clients: + for cid in client.get("ids", []): + existing_by_id[cid.lower()] = client + + # Also index by name for update / delete operations (warn if AdGuard has duplicate names) + existing_by_name: Dict[str, dict] = {} + for c in existing_clients: + if c["name"] in existing_by_name: + mylog("verbose", [f"[{pluginName}] WARNING duplicate client name in AdGuard Home: {c['name']!r}"]) + existing_by_name[c["name"]] = c + + # Load the set of client names we've previously added so that DELETE mode + # only removes clients we created, not manually-added ones. + managed_names = load_managed_names() + + added = updated = skipped = deleted = 0 + + # ----- add / update ----- + for device in devices: + client_data = build_agrd_client(device, use_mac) + if not client_data: + if not use_mac and not device["last_ip"]: + reason = "no IP address (USEMAC is disabled, IP required)" + else: + reason = "no usable MAC or IP" + mylog("verbose", [f"[{pluginName}] SKIP {device['name']!r} – {reason}"]) + skipped += 1 + continue + + # Check whether any of the ids already exist in AdGuard + existing = None + for cid in client_data["ids"]: + if cid.lower() in existing_by_id: + existing = existing_by_id[cid.lower()] + break + + if existing is None and device["name"] in managed_names: + # Fall back to name match only for clients we previously added — avoids + # accidentally matching a manually-created AdGuard client with the same name. + existing = existing_by_name.get(device["name"]) + if existing: + mylog("verbose", [f"[{pluginName}] WARN matched {device['name']!r} by name (no ID match) — verify no duplicate clients"]) + + if existing: + old_name = existing["name"] + # Preserve existing per-client AdGuard settings; we only manage name, ids, tags. + _our_keys = frozenset(("name", "ids", "tags")) + merged_data = {**client_data, **{k: v for k, v in existing.items() if k not in _our_keys}} + # Only call update when something actually changed to avoid noise + if ( + sorted(i.lower() for i in existing.get("ids", [])) != sorted(i.lower() for i in client_data["ids"]) + or existing.get("name") != client_data["name"] + or sorted(existing.get("tags", [])) != sorted(client_data["tags"]) + ): + try: + agrd.update_client(old_name, merged_data) + mylog("verbose", [f"[{pluginName}] UPDATE {old_name!r} → {device['name']!r} ids={client_data['ids']}"]) + managed_names.discard(old_name) + managed_names.add(device["name"]) + updated += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR updating {device['name']!r}: {exc}"]) + skipped += 1 + else: + mylog("verbose", [f"[{pluginName}] SKIP (no change) {device['name']!r}"]) + managed_names.add(device["name"]) + skipped += 1 + else: + try: + agrd.add_client(client_data) + mylog("verbose", [f"[{pluginName}] ADD {device['name']!r} ids={client_data['ids']}"]) + managed_names.add(device["name"]) + added += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR adding {device['name']!r}: {exc}"]) + skipped += 1 + + # ----- optional delete of AdGuard clients no longer in NetAlertX ----- + if delete_missing: + export_names = {d["name"] for d in devices} + for client in existing_clients: + cname = client.get("name", "") + # Only delete clients that we previously added (tracked in state file) + # so we don't accidentally remove manually-added clients. + if cname in managed_names and cname not in export_names: + try: + agrd.delete_client(cname) + mylog("verbose", [f"[{pluginName}] DELETE {cname!r} (no longer in NetAlertX)"]) + managed_names.discard(cname) + deleted += 1 + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR deleting {cname!r}: {exc}"]) + + save_managed_names(managed_names) + return added, updated, skipped, deleted + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- +def main(): + mylog("verbose", [f"[{pluginName}] In script"]) + + # ------------------------------------------------------------------ + # Read settings + # ------------------------------------------------------------------ + agrd_url = get_setting_value("ADGUARDEXP_URL") or "http://localhost:3000" + agrd_user = get_setting_value("ADGUARDEXP_USER") or "" + agrd_pass = get_setting_value("ADGUARDEXP_PASSWORD") or "" + verify_ssl_str = get_setting_value("ADGUARDEXP_VERIFYSSL") or "true" + include_offline_str = get_setting_value("ADGUARDEXP_INCLUDE_OFFLINE") or "true" + include_new_str = get_setting_value("ADGUARDEXP_INCLUDE_NEW") or "false" + use_mac_str = get_setting_value("ADGUARDEXP_USEMAC") or "true" + delete_str = get_setting_value("ADGUARDEXP_DELETE") or "false" + + verify_ssl = verify_ssl_str.strip().lower() not in ("false", "0", "no") + include_offline = include_offline_str.strip().lower() not in ("false", "0", "no") + include_new = include_new_str.strip().lower() not in ("false", "0", "no") + use_mac = use_mac_str.strip().lower() not in ("false", "0", "no") + delete_miss = delete_str.strip().lower() not in ("false", "0", "no") + + mylog("verbose", [f"[{pluginName}] " + ("=" * 60)]) + mylog("verbose", [f"[{pluginName}] AdGuard Home URL : {agrd_url}"]) + mylog("verbose", [f"[{pluginName}] Include offline devs: {include_offline}"]) + mylog("verbose", [f"[{pluginName}] Include new/unknown : {include_new}"]) + mylog("verbose", [f"[{pluginName}] Use MAC as id : {use_mac}"]) + mylog("verbose", [f"[{pluginName}] Delete missing : {delete_miss}"]) + mylog("verbose", [f"[{pluginName}] " + ("=" * 60)]) + + # ------------------------------------------------------------------ + # Load devices from NetAlertX + # ------------------------------------------------------------------ + devices = get_netalertx_devices(fullDbPath, include_offline, include_new) + mylog("verbose", [f"[{pluginName}] Loaded {len(devices)} device(s) from NetAlertX database."]) + + if not devices: + mylog("verbose", ["No devices to sync – exiting."]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "summary", + watched1 = "0", + watched2 = "0", + watched3 = "0", + watched4 = "0", + extra = "No devices found in NetAlertX", + ) + plugin_objects.write_result_file() + return + + # ------------------------------------------------------------------ + # Connect to AdGuard Home and sync + # ------------------------------------------------------------------ + try: + agrd = AdGuardClient(agrd_url, agrd_user, agrd_pass, verify_ssl) + existing_clients = agrd.get_clients() + except requests.exceptions.ConnectionError as exc: + mylog("verbose", [f"[{pluginName}] ERROR – cannot connect to AdGuard Home at {agrd_url}: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"Connection failed: {exc}", + ) + plugin_objects.write_result_file() + return + except requests.HTTPError as exc: + mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an HTTP error: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"HTTP error: {exc}", + ) + plugin_objects.write_result_file() + return + except Exception as exc: + mylog("verbose", [f"[{pluginName}] ERROR – AdGuard Home returned an unknown error: {exc}"]) + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "error", + extra = f"Unknown error: {exc}", + ) + plugin_objects.write_result_file() + return + + added, updated, skipped, deleted = sync_to_adguard( + agrd, devices, use_mac, delete_miss, existing_clients=existing_clients + ) + + summary = ( + f"Sync complete – added={added} updated={updated} " + f"skipped={skipped} deleted={deleted}" + ) + + # ------------------------------------------------------------------ + # Write plugin result (one summary row + one row per touched device) + # ------------------------------------------------------------------ + plugin_objects.add_object( + primaryId = "adguard_export", + secondaryId = "summary", + watched1 = str(added), + watched2 = str(updated), + watched3 = str(skipped), + watched4 = str(deleted), + extra = summary, + ) + + for device in devices: + plugin_objects.add_object( + primaryId = device["mac"] or device["last_ip"], + secondaryId = device["last_ip"], + watched1 = device["name"], + watched2 = device["mac"], + watched3 = device["last_ip"], + watched4 = agrd_url, + extra = "exported", + foreignKey = device["mac"] or "", + ) + + mylog("verbose", [f"[{pluginName}] {summary}"]) + plugin_objects.write_result_file() + return + + +if __name__ == "__main__": + main() diff --git a/front/plugins/adguard_import/config.json b/front/plugins/adguard_import/config.json index f4df3377c..e4c2a6e17 100644 --- a/front/plugins/adguard_import/config.json +++ b/front/plugins/adguard_import/config.json @@ -26,7 +26,7 @@ "description": [ { "language_code": "en_us", - "string": "Plugin to ..." + "string": "Imports known devices from AdGuard to NetAlertX as persistent devices, keeping names and IP/MAC identifiers in sync." } ], "icon": [ @@ -36,7 +36,7 @@ } ], "params": [], - "settings": [ + "settings": [ { "function": "RUN", "events": ["run"], diff --git a/test/plugins/test_adguard_export.py b/test/plugins/test_adguard_export.py new file mode 100644 index 000000000..98051865b --- /dev/null +++ b/test/plugins/test_adguard_export.py @@ -0,0 +1,401 @@ +""" +Tests for adguard_export/script.py + +Run from inside the NetAlertX container (where the full environment is available), +or locally — in that case the NetAlertX-specific modules are stubbed out +automatically before the script is imported. + + pytest test/plugins/test_adguard_export.py -v +""" + +import json +import os +import sqlite3 +import sys +import tempfile +import types +from unittest.mock import MagicMock, call, patch + +import pytest + +# --------------------------------------------------------------------------- +# Stub NetAlertX-specific modules so tests can run outside the container. +# sys.modules.setdefault() is a no-op when the real module is already loaded, +# so this is safe to run inside the container too. +# --------------------------------------------------------------------------- +_tmp_log = tempfile.mkdtemp() + + +def _stub(name: str, **attrs): + if name not in sys.modules: + mod = types.ModuleType(name) + for k, v in attrs.items(): + setattr(mod, k, v) + sys.modules[name] = mod + + +_stub("pytz", timezone=lambda tz: tz) +_stub("conf") +_stub("const", dataPath=_tmp_log, logPath=_tmp_log, fullDbPath=os.path.join(_tmp_log, "test.db")) +_stub("plugin_helper", Plugin_Objects=MagicMock) +_stub("logger", mylog=lambda *a: None, Logger=MagicMock) +_stub("helper", get_setting_value=lambda k: "") + +# Stub requests only when it isn't installed (e.g. bare system Python locally). +# In the container and CI, the real package is present and will be used. +if "requests" not in sys.modules: + _req = types.ModuleType("requests") + _req.Session = MagicMock + _req.HTTPError = type("HTTPError", (Exception,), {}) + _req_exc = types.ModuleType("requests.exceptions") + _req_exc.ConnectionError = type("ConnectionError", (Exception,), {}) + _req.exceptions = _req_exc + sys.modules["requests"] = _req + sys.modules["requests.exceptions"] = _req_exc + +# --------------------------------------------------------------------------- +# Import the functions under test (must come after the stubs above). +# --------------------------------------------------------------------------- +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "front", "plugins", "adguard_export")) + +from script import ( # noqa: E402 + AdGuardClient, + _TYPE_TAG_MAP, + build_agrd_client, + device_type_to_tag, + get_netalertx_devices, + load_managed_names, + save_managed_names, + sync_to_adguard, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_db(path: str, rows: list[dict]) -> None: + """Create a minimal Devices table and populate it with *rows*.""" + conn = sqlite3.connect(path) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS Devices ( + devMac TEXT, + devName TEXT, + devLastIP TEXT, + devType TEXT, + devIsArchived INTEGER DEFAULT 0, + devPresentLastScan INTEGER DEFAULT 1, + devIsNew INTEGER DEFAULT 0 + ) + """ + ) + for row in rows: + conn.execute( + "INSERT INTO Devices VALUES (?,?,?,?,?,?,?)", + ( + row.get("devMac", ""), + row.get("devName", ""), + row.get("devLastIP", ""), + row.get("devType", ""), + row.get("devIsArchived", 0), + row.get("devPresentLastScan", 1), + row.get("devIsNew", 0), + ), + ) + conn.commit() + conn.close() + + +def _mock_agrd(existing=None) -> MagicMock: + """Return a mock AdGuardClient whose get_clients() returns *existing*.""" + agrd = MagicMock(spec=AdGuardClient) + agrd.get_clients.return_value = existing or [] + return agrd + + +# --------------------------------------------------------------------------- +# device_type_to_tag +# --------------------------------------------------------------------------- + + +class TestDeviceTypeToTag: + def test_empty_string_returns_empty(self): + assert device_type_to_tag("") == "" + + def test_none_returns_empty(self): + assert device_type_to_tag(None) == "" + + def test_exact_match_case_insensitive(self): + assert device_type_to_tag("Smartphone") == "device_phone" + assert device_type_to_tag("LAPTOP") == "device_laptop" + assert device_type_to_tag("nas") == "device_nas" + + def test_substring_fallback(self): + # "gaming smartphone" contains "smartphone" + assert device_type_to_tag("gaming smartphone") == "device_phone" + + def test_unknown_type_returns_empty(self): + assert device_type_to_tag("toaster") == "" + + def test_all_map_values_are_valid_adguard_tags(self): + valid_prefixes = ("device_", "ct_", "os_") + for tag in _TYPE_TAG_MAP.values(): + assert any(tag.startswith(p) for p in valid_prefixes), ( + f"{tag!r} is not a valid AdGuard Home tag" + ) + + +# --------------------------------------------------------------------------- +# build_agrd_client +# --------------------------------------------------------------------------- + + +class TestBuildAgrdClient: + def _device(self, **overrides) -> dict: + base = {"mac": "AA:BB:CC:DD:EE:FF", "name": "My PC", "last_ip": "192.168.1.10", "dev_type": "desktop"} + return {**base, **overrides} + + def test_mac_and_ip_both_included_when_use_mac_true(self): + result = build_agrd_client(self._device(), use_mac=True) + assert "aa:bb:cc:dd:ee:ff" in result["ids"] + assert "192.168.1.10" in result["ids"] + + def test_only_ip_when_use_mac_false(self): + result = build_agrd_client(self._device(), use_mac=False) + assert result["ids"] == ["192.168.1.10"] + + def test_returns_empty_dict_when_no_usable_id(self): + result = build_agrd_client( + {"mac": "", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""}, + use_mac=True, + ) + assert result == {} + + def test_null_mac_falls_back_to_ip(self): + result = build_agrd_client( + {"mac": "00:00:00:00:00:00", "name": "Dev", "last_ip": "10.0.0.5", "dev_type": ""}, + use_mac=True, + ) + assert result["ids"] == ["10.0.0.5"] + + def test_device_type_tag_applied(self): + result = build_agrd_client(self._device(dev_type="smartphone"), use_mac=True) + assert result["tags"] == ["device_phone"] + + def test_unknown_device_type_produces_no_tag(self): + result = build_agrd_client(self._device(dev_type=""), use_mac=True) + assert result["tags"] == [] + + def test_mac_is_lowercased(self): + result = build_agrd_client(self._device(mac="AA:BB:CC:DD:EE:FF"), use_mac=True) + assert "aa:bb:cc:dd:ee:ff" in result["ids"] + + +# --------------------------------------------------------------------------- +# load_managed_names / save_managed_names +# --------------------------------------------------------------------------- + + +class TestManagedNames: + def test_round_trip(self, tmp_path): + state = tmp_path / "state.json" + with patch("script.STATE_FILE", str(state)): + save_managed_names({"alpha", "beta", "gamma"}) + loaded = load_managed_names() + assert loaded == {"alpha", "beta", "gamma"} + + def test_missing_file_returns_empty_set(self, tmp_path): + with patch("script.STATE_FILE", str(tmp_path / "nonexistent.json")): + assert load_managed_names() == set() + + def test_corrupt_file_returns_empty_set(self, tmp_path): + state = tmp_path / "state.json" + state.write_text("not valid json") + with patch("script.STATE_FILE", str(state)): + assert load_managed_names() == set() + + def test_save_sorts_names(self, tmp_path): + state = tmp_path / "state.json" + with patch("script.STATE_FILE", str(state)): + save_managed_names({"zebra", "apple", "mango"}) + data = json.loads(state.read_text()) + assert data["managed"] == ["apple", "mango", "zebra"] + + +# --------------------------------------------------------------------------- +# get_netalertx_devices +# --------------------------------------------------------------------------- + + +class TestGetNetalertxDevices: + def test_basic_query(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "AA:BB:CC:00:00:01", "devName": "PC", "devLastIP": "10.0.0.1", "devType": "desktop"}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "PC" + assert result[0]["mac"] == "AA:BB:CC:00:00:01" + + def test_archived_devices_excluded(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Active", "devLastIP": "10.0.0.1", "devIsArchived": 0}, + {"devMac": "AA:00:00:00:00:02", "devName": "Archived", "devLastIP": "10.0.0.2", "devIsArchived": 1}, + ]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "Active" + + def test_offline_excluded_when_flag_false(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Online", "devLastIP": "10.0.0.1", "devPresentLastScan": 1}, + {"devMac": "AA:00:00:00:00:02", "devName": "Offline", "devLastIP": "10.0.0.2", "devPresentLastScan": 0}, + ]) + result = get_netalertx_devices(db, include_offline=False, include_new=True) + assert len(result) == 1 + assert result[0]["name"] == "Online" + + def test_new_devices_excluded_when_flag_false(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [ + {"devMac": "AA:00:00:00:00:01", "devName": "Known", "devLastIP": "10.0.0.1", "devIsNew": 0}, + {"devMac": "AA:00:00:00:00:02", "devName": "Unknown", "devLastIP": "10.0.0.2", "devIsNew": 1}, + ]) + result = get_netalertx_devices(db, include_offline=True, include_new=False) + assert len(result) == 1 + assert result[0]["name"] == "Known" + + def test_nameless_device_falls_back_to_mac(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "BB:CC:DD:EE:FF:00", "devName": "", "devLastIP": "10.0.0.5"}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert result[0]["name"] == "BB:CC:DD:EE:FF:00" + + def test_row_with_no_mac_and_no_ip_skipped(self, tmp_path): + db = str(tmp_path / "na.db") + _make_db(db, [{"devMac": "", "devName": "Ghost", "devLastIP": ""}]) + result = get_netalertx_devices(db, include_offline=True, include_new=True) + assert result == [] + + def test_missing_db_returns_empty_list(self, tmp_path): + result = get_netalertx_devices(str(tmp_path / "missing.db"), True, True) + assert result == [] + + +# --------------------------------------------------------------------------- +# sync_to_adguard +# --------------------------------------------------------------------------- + + +class TestSyncToAdguard: + def _device(self, name="PC", mac="AA:BB:CC:00:00:01", ip="10.0.0.1", dev_type="desktop") -> dict: + return {"mac": mac, "name": name, "last_ip": ip, "dev_type": dev_type} + + def test_new_device_is_added(self, tmp_path): + agrd = _mock_agrd(existing=[]) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False + ) + assert added == 1 + assert updated == skipped == deleted == 0 + agrd.add_client.assert_called_once() + + def test_unchanged_device_is_skipped(self, tmp_path): + existing = [{"name": "PC", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False + ) + assert skipped == 1 + assert added == updated == deleted == 0 + agrd.update_client.assert_not_called() + + def test_renamed_device_is_updated(self, tmp_path): + existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False + ) + assert updated == 1 + agrd.update_client.assert_called_once_with("Old Name", agrd.update_client.call_args[0][1]) + + def test_missing_device_deleted_when_flag_true(self, tmp_path): + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": ["Gone Device"]})) + existing = [{"name": "Gone Device", "ids": ["10.0.0.99"], "tags": []}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [], use_mac=True, delete_missing=True + ) + assert deleted == 1 + agrd.delete_client.assert_called_once_with("Gone Device") + + def test_unmanaged_device_not_deleted(self, tmp_path): + # State file is empty — we never added this client + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": []})) + existing = [{"name": "Manual Client", "ids": ["10.0.0.50"], "tags": []}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + sync_to_adguard(agrd, [], use_mac=True, delete_missing=True) + agrd.delete_client.assert_not_called() + + def test_device_with_no_usable_id_is_skipped(self, tmp_path): + agrd = _mock_agrd(existing=[]) + device = {"mac": "00:00:00:00:00:00", "name": "Ghost", "last_ip": "0.0.0.0", "dev_type": ""} + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + added, updated, skipped, deleted = sync_to_adguard( + agrd, [device], use_mac=True, delete_missing=False + ) + assert skipped == 1 + agrd.add_client.assert_not_called() + + def test_existing_clients_parameter_avoids_extra_api_call(self, tmp_path): + existing = [] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + sync_to_adguard( + agrd, [self._device()], use_mac=True, delete_missing=False, + existing_clients=existing, + ) + agrd.get_clients.assert_not_called() + + def test_rename_removes_old_name_from_managed_names(self, tmp_path): + state = tmp_path / "state.json" + state.write_text(json.dumps({"managed": ["Old Name"]})) + existing = [{"name": "Old Name", "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], "tags": ["device_pc"]}] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(state)): + sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False) + loaded = load_managed_names() + assert "Old Name" not in loaded + assert "New Name" in loaded + + def test_update_preserves_custom_adguard_settings(self, tmp_path): + existing = [{ + "name": "Old Name", + "ids": ["aa:bb:cc:00:00:01", "10.0.0.1"], + "tags": ["device_pc"], + "filtering_enabled": True, + "use_global_settings": False, + "parental_enabled": True, + "safebrowsing_enabled": False, + "safesearch_enabled": False, + "use_global_blocked_services": False, + "blocked_services": ["youtube.com"], + "upstreams": ["1.1.1.1"], + }] + agrd = _mock_agrd(existing=existing) + with patch("script.STATE_FILE", str(tmp_path / "state.json")): + sync_to_adguard(agrd, [self._device(name="New Name")], use_mac=True, delete_missing=False) + _, sent_payload = agrd.update_client.call_args[0] + assert sent_payload["filtering_enabled"] is True + assert sent_payload["use_global_settings"] is False + assert sent_payload["blocked_services"] == ["youtube.com"] + assert sent_payload["upstreams"] == ["1.1.1.1"]