diff --git a/README.md b/README.md index f11999d..3350c20 100644 --- a/README.md +++ b/README.md @@ -19,3 +19,27 @@ Set up the project and run the test suite with: uv sync --group dev uv run pytest ``` + +## Catalog pipelines + +Run a pipeline from the Adagio pipeline catalog: + +```bash +adagio pipeline show @adagio/microbial-diversity +adagio run @adagio/microbial-diversity --cache-dir /path/to/cache --arguments run-arguments.json +``` + +`@adagio/` first resolves against a nearby local `adagio-pipelines` +checkout when one is available. If no local catalog is found, Adagio fetches +`pipeline.adg` from `cymis/adagio-pipelines` on GitHub, checking `official` +before `community`. + +During `adagio run`, remote catalog pipelines are downloaded under the selected +`--cache-dir` and reused by source name and slug on later runs. `adagio pipeline +show` uses a temporary download when it fetches from GitHub because it does not +take a cache directory. + +Private GitHub access is explicit: set `GITHUB_TOKEN` or `GH_TOKEN` to a token +that can read `cymis/adagio-pipelines`; with a token, the CLI fetches through +the GitHub contents API. The CLI does not read browser, git, or `gh` credentials +automatically. diff --git a/src/adagio/cli/dynamic.py b/src/adagio/cli/dynamic.py index 5428acd..c53cc5e 100644 --- a/src/adagio/cli/dynamic.py +++ b/src/adagio/cli/dynamic.py @@ -304,7 +304,7 @@ def build_dynamic_run( CliParameter( name=("--pipeline", "-p"), group=command_group, - help="Path to the pipeline JSON file.", + help="Path to the pipeline file or a catalog reference like @adagio/slug.", ), ] } @@ -589,7 +589,7 @@ def run( run.__doc__ = ( "Run an Adagio pipeline.\n\n" "Dynamic inputs, parameters, and outputs are loaded from the pipeline file and exposed as CLI options.\n" - "Use: adagio run --pipeline PATH --help" + "Use: adagio run --pipeline PATH-OR-@ADAGIO/SLUG --help" ) return run diff --git a/src/adagio/cli/main.py b/src/adagio/cli/main.py index 8e23417..93f6e52 100644 --- a/src/adagio/cli/main.py +++ b/src/adagio/cli/main.py @@ -1,5 +1,6 @@ import json import sys +from contextlib import ExitStack from functools import partial from pathlib import Path from typing import Annotated, Any @@ -13,11 +14,16 @@ from ..app.parsers.pipeline import Output as OutputSpec from ..app.parsers.pipeline import Parameter as ParamSpec from ..app.parsers.pipeline import parse_inputs, parse_outputs, parse_parameters -from ..executors.cache_support import CACHE_DIR_HELP, REUSE_HELP +from ..executors.cache_support import CACHE_DIR_HELP, REUSE_HELP, resolve_cache_dir_path from .args import ShowParamsMode, extract_flag_value, promote_positional_pipeline from .config import load_run_config from .dynamic import build_dynamic_run from .pipeline import run_pipeline_cli +from .pipeline_sources import ( + PipelineResolution, + PipelineResolutionError, + resolve_pipeline_reference_details, +) from .qapi import run_qapi from .runner import run_pipeline_from_kwargs @@ -56,6 +62,7 @@ def main(argv: list[str] | None = None) -> None: argv, positional_pipeline = promote_positional_pipeline(argv) pipeline_str = extract_flag_value(argv, "--pipeline", "-p") + cache_dir_str = extract_flag_value(argv, "--cache-dir") show_mode_str = extract_flag_value(argv, "--show-params") try: show_mode = ( @@ -108,7 +115,7 @@ def run( Parameter( name=("--pipeline", "-p"), group=command_group, - help="Path to the pipeline JSON file.", + help="Path to the pipeline file or a catalog reference like @adagio/slug.", ), ], arguments: Annotated[ @@ -155,45 +162,61 @@ def run( ): """Run a pipeline (requires --pipeline; dynamic options come from that file).""" _ = (config, show_params, cache_dir, reuse) - console.print(CycloptsPanel("Missing --pipeline. Try:\n adagio run --pipeline pipeline.json --help")) + console.print( + CycloptsPanel( + "Missing --pipeline. Try:\n" + " adagio run --pipeline pipeline.adg --help\n" + " adagio run @adagio/microbial-diversity --help" + ) + ) sys.exit(1) app(argv) return - pipeline_path = Path(pipeline_str) - data = json.loads(pipeline_path.read_text(encoding="utf-8")) - input_specs = parse_inputs(data) - param_specs = parse_parameters(data) - output_specs = parse_outputs(data) - arguments_path_str = extract_flag_value(argv, "--arguments") - config_path_str = extract_flag_value(argv, "--config") - arguments_data = ( - _load_arguments_data(Path(arguments_path_str), console) if arguments_path_str else None - ) - if config_path_str: - load_run_config(Path(config_path_str)) - visible_inputs, visible_params, visible_outputs = _filter_visible_specs( - input_specs=input_specs, - param_specs=param_specs, - output_specs=output_specs, - show_mode=show_mode, - arguments_data=arguments_data, - ) + with ExitStack() as exit_stack: + pipeline_resolution = _resolve_pipeline( + pipeline_str, + console=console, + exit_stack=exit_stack, + download_cache_dir=_resolve_download_cache_dir(cache_dir_str), + ) + data = json.loads(pipeline_resolution.path.read_text(encoding="utf-8")) + input_specs = parse_inputs(data) + param_specs = parse_parameters(data) + output_specs = parse_outputs(data) + arguments_path_str = extract_flag_value(argv, "--arguments") + config_path_str = extract_flag_value(argv, "--config") + arguments_data = ( + _load_arguments_data(Path(arguments_path_str), console) if arguments_path_str else None + ) + if config_path_str: + load_run_config(Path(config_path_str)) + visible_inputs, visible_params, visible_outputs = _filter_visible_specs( + input_specs=input_specs, + param_specs=param_specs, + output_specs=output_specs, + show_mode=show_mode, + arguments_data=arguments_data, + ) - dynamic_run = build_dynamic_run( - input_specs=input_specs, - param_specs=param_specs, - output_specs=output_specs, - visible_input_names={spec.name for spec in visible_inputs}, - visible_param_names={spec.name for spec in visible_params}, - visible_output_names={spec.name for spec in visible_outputs}, - argument_inputs=arguments_data.get("inputs", {}) if arguments_data else None, - argument_params=arguments_data.get("parameters", {}) if arguments_data else None, - run_handler=partial(run_pipeline_from_kwargs, console=console), - ) - app.command(dynamic_run, name="run") - app(argv) + dynamic_run = build_dynamic_run( + input_specs=input_specs, + param_specs=param_specs, + output_specs=output_specs, + visible_input_names={spec.name for spec in visible_inputs}, + visible_param_names={spec.name for spec in visible_params}, + visible_output_names={spec.name for spec in visible_outputs}, + argument_inputs=arguments_data.get("inputs", {}) if arguments_data else None, + argument_params=arguments_data.get("parameters", {}) if arguments_data else None, + run_handler=partial( + run_pipeline_from_kwargs, + console=console, + resolved_pipeline=pipeline_resolution, + ), + ) + app.command(dynamic_run, name="run") + app(argv) def _filter_visible_specs( @@ -262,5 +285,29 @@ def _is_missing(value: Any) -> bool: return value is None or value == "" or value == "" or value == [] or value == {} +def _resolve_pipeline( + reference: str, + *, + console: Console, + exit_stack: ExitStack, + download_cache_dir: Path | None = None, +) -> PipelineResolution: + try: + return resolve_pipeline_reference_details( + reference, + exit_stack=exit_stack, + download_cache_dir=download_cache_dir, + ) + except PipelineResolutionError as error: + console.print(CycloptsPanel(str(error))) + sys.exit(1) + + +def _resolve_download_cache_dir(raw_value: str | None) -> Path | None: + if raw_value is None: + return None + return resolve_cache_dir_path(cwd=Path.cwd().resolve(), raw_value=raw_value) + + if __name__ == "__main__": main() diff --git a/src/adagio/cli/pipeline.py b/src/adagio/cli/pipeline.py index 0d7168c..5b54713 100644 --- a/src/adagio/cli/pipeline.py +++ b/src/adagio/cli/pipeline.py @@ -1,4 +1,5 @@ import json +from contextlib import ExitStack from pathlib import Path from cyclopts import App @@ -6,6 +7,7 @@ from ..describe import render_pipeline_text from ..model.pipeline import AdagioPipeline +from .pipeline_sources import PipelineResolutionError, resolve_pipeline_reference console = Console() @@ -21,7 +23,13 @@ def run_pipeline_cli(argv: list[str]) -> None: def show_pipeline(pipeline: Path) -> None: """Print a pipeline summary to the terminal.""" - data = json.loads(pipeline.read_text(encoding="utf-8")) - pipeline_data = data.get("spec", data) if isinstance(data, dict) else data - parsed_pipeline = AdagioPipeline.model_validate(pipeline_data) - console.print(render_pipeline_text(parsed_pipeline), soft_wrap=True) + with ExitStack() as exit_stack: + try: + pipeline_path = resolve_pipeline_reference(pipeline, exit_stack=exit_stack) + except PipelineResolutionError as error: + raise SystemExit(str(error)) from error + + data = json.loads(pipeline_path.read_text(encoding="utf-8")) + pipeline_data = data.get("spec", data) if isinstance(data, dict) else data + parsed_pipeline = AdagioPipeline.model_validate(pipeline_data) + console.print(render_pipeline_text(parsed_pipeline), soft_wrap=True) diff --git a/src/adagio/cli/pipeline_sources.py b/src/adagio/cli/pipeline_sources.py new file mode 100644 index 0000000..b2f69aa --- /dev/null +++ b/src/adagio/cli/pipeline_sources.py @@ -0,0 +1,322 @@ +from __future__ import annotations + +import os +import re +from contextlib import ExitStack +from dataclasses import dataclass +from pathlib import Path +from tempfile import TemporaryDirectory +from urllib.error import HTTPError, URLError +from urllib.parse import quote, unquote, urlsplit +from urllib.request import Request, urlopen + + +CATALOG_TIERS = ("official", "community") +DEFAULT_PIPELINE_SOURCE = "adagio" +MAX_REMOTE_PIPELINE_BYTES = 5 * 1024 * 1024 +SOURCE_NAME_RE = re.compile(r"[A-Za-z0-9][A-Za-z0-9._-]*") +SLUG_RE = re.compile(r"[a-z0-9][a-z0-9-]*") + + +class PipelineResolutionError(RuntimeError): + """Raised when a pipeline reference cannot be resolved.""" + + +@dataclass(frozen=True) +class LocalCatalogLocation: + root: Path + + def candidate_paths(self, slug: str) -> tuple[Path, ...]: + return tuple( + (self.root / "pipelines" / tier / slug / "pipeline.adg").resolve() + for tier in CATALOG_TIERS + ) + + +@dataclass(frozen=True) +class GitHubCatalogLocation: + owner: str + repo: str + ref: str = "main" + + def candidate_urls(self, slug: str) -> tuple[str, ...]: + quoted_slug = _quote_slug(slug) + return tuple( + f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{self.ref}/" + f"pipelines/{tier}/{quoted_slug}/pipeline.adg" + for tier in CATALOG_TIERS + ) + + +@dataclass(frozen=True) +class PipelineSource: + name: str + locations: tuple[LocalCatalogLocation | GitHubCatalogLocation, ...] + + +@dataclass(frozen=True) +class PipelineResolution: + path: Path + origin: str + is_remote: bool = False + + +def parse_pipeline_source_reference(reference: str) -> tuple[str, str] | None: + raw = reference.strip() + if not raw: + return None + if raw.startswith(("/", "./", "../", "~")): + return None + if "://" in raw: + return None + if Path(raw).suffix in {".adg", ".json"}: + return None + if not raw.startswith("@"): + return None + + source_name, separator, slug = raw[1:].partition("/") + if not separator or not source_name or not slug: + return None + if not SOURCE_NAME_RE.fullmatch(source_name): + return None + if source_name != DEFAULT_PIPELINE_SOURCE: + return None + if not SLUG_RE.fullmatch(slug): + return None + return source_name, slug + + +def discover_workspace_catalog_roots( + *, search_roots: tuple[Path, ...] | None = None +) -> tuple[Path, ...]: + seen: set[Path] = set() + discovered: list[Path] = [] + anchors = list(search_roots or ()) + anchors.extend([Path.cwd(), Path(__file__).resolve()]) + + for anchor in anchors: + current = anchor.resolve() + if current.is_file(): + current = current.parent + + for parent in (current, *current.parents): + for candidate in _catalog_candidates(parent): + resolved = candidate.resolve() + if resolved in seen: + continue + seen.add(resolved) + discovered.append(resolved) + + return tuple(discovered) + + +def default_pipeline_sources( + *, search_roots: tuple[Path, ...] | None = None +) -> tuple[PipelineSource, ...]: + local_locations = tuple( + LocalCatalogLocation(root=root) + for root in discover_workspace_catalog_roots(search_roots=search_roots) + ) + github_fallback = GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines") + built_in_locations = (*local_locations, github_fallback) + return ( + PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=built_in_locations, + ), + ) + + +def resolve_pipeline_reference( + reference: str | Path, + *, + exit_stack: ExitStack, + sources: tuple[PipelineSource, ...] | None = None, + download_cache_dir: Path | None = None, +) -> Path: + return resolve_pipeline_reference_details( + reference, + exit_stack=exit_stack, + sources=sources, + download_cache_dir=download_cache_dir, + ).path + + +def resolve_pipeline_reference_details( + reference: str | Path, + *, + exit_stack: ExitStack, + sources: tuple[PipelineSource, ...] | None = None, + download_cache_dir: Path | None = None, +) -> PipelineResolution: + raw = str(reference).strip() + if not raw: + raise PipelineResolutionError("Pipeline reference is empty.") + + candidate_path = Path(raw).expanduser() + if candidate_path.exists(): + resolved_path = candidate_path.resolve() + return PipelineResolution(path=resolved_path, origin=str(resolved_path)) + + parsed_reference = parse_pipeline_source_reference(raw) + if parsed_reference is None: + if raw.startswith("@"): + raise PipelineResolutionError( + f"Invalid pipeline reference '{raw}'. Expected @adagio/slug, " + "where slug uses lowercase letters, digits, and hyphens." + ) + raise PipelineResolutionError(f"Pipeline file does not exist: {raw}") + + source_name, slug = parsed_reference + registered_sources = default_pipeline_sources() if sources is None else sources + source_registry = {source.name: source for source in registered_sources} + source = source_registry.get(source_name) + if source is None: + available = ", ".join(sorted(source_registry)) or "none" + raise PipelineResolutionError( + f"Unknown pipeline source '{source_name}'. Available sources: {available}." + ) + + attempted_candidates: list[str] = [] + access_errors: list[str] = [] + + for location in source.locations: + if isinstance(location, LocalCatalogLocation): + for path in location.candidate_paths(slug): + attempted_candidates.append(str(path)) + if path.exists(): + return PipelineResolution(path=path, origin=str(path)) + continue + + cached_path = _cached_remote_pipeline_path( + cache_dir=download_cache_dir, + source_name=source_name, + slug=slug, + ) + if cached_path is not None: + attempted_candidates.append(str(cached_path)) + if cached_path.exists(): + return PipelineResolution(path=cached_path, origin=str(cached_path)) + + for url in location.candidate_urls(slug): + attempted_candidates.append(url) + try: + return PipelineResolution( + path=_download_remote_pipeline( + url=url, + exit_stack=exit_stack, + cache_path=cached_path, + ), + origin=url, + is_remote=True, + ) + except FileNotFoundError: + continue + except PipelineResolutionError as error: + access_errors.append(str(error)) + continue + + message = [f"Pipeline reference '{raw}' was not found."] + if attempted_candidates: + message.append("Looked in:") + message.extend(f" - {candidate}" for candidate in attempted_candidates) + if access_errors: + message.append("Errors:") + message.extend(f" - {error}" for error in access_errors) + raise PipelineResolutionError("\n".join(message)) + + +def _catalog_candidates(parent: Path) -> tuple[Path, ...]: + candidates: list[Path] = [] + if parent.name == "adagio-pipelines" and (parent / "pipelines").is_dir(): + candidates.append(parent) + + sibling = parent / "adagio-pipelines" + if sibling.is_dir() and (sibling / "pipelines").is_dir(): + candidates.append(sibling) + + return tuple(candidates) + + +def _download_remote_pipeline( + *, + url: str, + exit_stack: ExitStack, + cache_path: Path | None = None, +) -> Path: + request = _remote_request(url) + try: + with urlopen(request, timeout=10) as response: + payload = response.read(MAX_REMOTE_PIPELINE_BYTES + 1) + except HTTPError as error: + if error.code == 404: + raise FileNotFoundError(url) from error + raise PipelineResolutionError( + f"Failed to fetch pipeline from {url}: HTTP {error.code}" + ) from error + except URLError as error: + raise PipelineResolutionError( + f"Failed to fetch pipeline from {url}: {error.reason}" + ) from error + + if len(payload) > MAX_REMOTE_PIPELINE_BYTES: + raise PipelineResolutionError( + f"Failed to fetch pipeline from {url}: file exceeds " + f"{MAX_REMOTE_PIPELINE_BYTES} bytes." + ) + + if cache_path is None: + tempdir = Path( + exit_stack.enter_context(TemporaryDirectory(prefix="adagio-pipeline-")) + ) + pipeline_path = tempdir / "pipeline.adg" + else: + pipeline_path = cache_path + pipeline_path.parent.mkdir(parents=True, exist_ok=True) + + pipeline_path.write_bytes(payload) + return pipeline_path + + +def _quote_slug(slug: str) -> str: + return "/".join(quote(part) for part in Path(slug).parts) + + +def _remote_request(url: str) -> Request: + headers = {"User-Agent": "adagio-cli"} + token = os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN") + if token: + headers["Authorization"] = f"Bearer {token}" + github_api_url = _github_contents_api_url(url) + if github_api_url is not None: + headers["Accept"] = "application/vnd.github.raw" + return Request(github_api_url, headers=headers) + return Request(url, headers=headers) + + +def _github_contents_api_url(raw_url: str) -> str | None: + parsed = urlsplit(raw_url) + if parsed.scheme != "https" or parsed.netloc != "raw.githubusercontent.com": + return None + + parts = [unquote(part) for part in parsed.path.split("/") if part] + if len(parts) < 4: + return None + + owner, repo, ref, *path_parts = parts + path = "/".join(quote(part, safe="") for part in path_parts) + return ( + f"https://api.github.com/repos/{quote(owner, safe='')}/" + f"{quote(repo, safe='')}/contents/{path}?ref={quote(ref, safe='')}" + ) + + +def _cached_remote_pipeline_path( + *, + source_name: str, + slug: str, + cache_dir: Path | None, +) -> Path | None: + if cache_dir is None: + return None + return cache_dir / "adagio-pipelines" / source_name / slug / "pipeline.adg" diff --git a/src/adagio/cli/runner.py b/src/adagio/cli/runner.py index 52c8a2c..0f8901d 100644 --- a/src/adagio/cli/runner.py +++ b/src/adagio/cli/runner.py @@ -1,6 +1,7 @@ import json import os import sys +from contextlib import ExitStack from pathlib import Path from typing import Any @@ -10,9 +11,15 @@ from rich.text import Text from .config import load_run_config +from .pipeline_sources import ( + PipelineResolution, + PipelineResolutionError, + resolve_pipeline_reference_details, +) from ..executors.base import TaskEnvironmentOverride from ..executors.cache_support import ( describe_cache_config, + resolve_cache_dir_path, resolve_cache_config, ) @@ -46,6 +53,7 @@ def run_pipeline_from_kwargs( required_params: list[str], *, console: Console, + resolved_pipeline: PipelineResolution | None = None, ) -> None: """Run a pipeline from resolved CLI keyword arguments.""" from ..model.arguments import AdagioArgumentsFile @@ -54,7 +62,21 @@ def run_pipeline_from_kwargs( cache_dir = kwargs.pop("cache_dir", None) reuse = bool(kwargs.pop("reuse", True)) - data = json.loads(pipeline.read_text(encoding="utf-8")) + with ExitStack() as exit_stack: + try: + pipeline_resolution = ( + resolved_pipeline + or resolve_pipeline_reference_details( + pipeline, + exit_stack=exit_stack, + download_cache_dir=_resolve_download_cache_dir(cache_dir), + ) + ) + except PipelineResolutionError as error: + _error_exit(console, str(error)) + + data = json.loads(pipeline_resolution.path.read_text(encoding="utf-8")) + pipeline_data = data.get("spec", data) if isinstance(data, dict) else data parsed_pipeline = AdagioPipeline.model_validate(pipeline_data) arguments = parsed_pipeline.signature.to_default_arguments() @@ -147,6 +169,7 @@ def run_pipeline_from_kwargs( suppress_header = _is_truthy(os.getenv("ADAGIO_SUPPRESS_RUN_HEADER")) if not suppress_header: console.print(f"[bold]Pipeline:[/bold] {pipeline}") + console.print(f"[bold]Resolved from:[/bold] {pipeline_resolution.origin}") cache_config = resolve_cache_config( cwd=Path.cwd().resolve(), @@ -185,6 +208,12 @@ def _is_missing(value: Any) -> bool: return value is None or value == "" or value == "" or value == [] or value == {} +def _resolve_download_cache_dir(raw_value: str | Path | None) -> Path | None: + if raw_value is None: + return None + return resolve_cache_dir_path(cwd=Path.cwd().resolve(), raw_value=raw_value) + + def _is_missing_output(value: Any) -> bool: if not isinstance(value, str): return True diff --git a/src/adagio/io.py b/src/adagio/io.py index cfcf5b3..e43f74e 100644 --- a/src/adagio/io.py +++ b/src/adagio/io.py @@ -4,7 +4,7 @@ @lift_parsl(lambda fut: IndexedProxyArtifact(fut, 0)) def load_input(*, ctx, source: str): - from qiime2.sdk import Results, Artifact + from qiime2.sdk import Artifact from qiime2.sdk import PluginManager PluginManager() diff --git a/src/adagio/model/arguments.py b/src/adagio/model/arguments.py index 4d372dc..8982ff5 100644 --- a/src/adagio/model/arguments.py +++ b/src/adagio/model/arguments.py @@ -1,4 +1,3 @@ -import typing as t from pydantic import BaseModel, Field from .task import AllowableValue diff --git a/tests/test_pipeline_sources.py b/tests/test_pipeline_sources.py new file mode 100644 index 0000000..4c16beb --- /dev/null +++ b/tests/test_pipeline_sources.py @@ -0,0 +1,513 @@ +import io +import json +import os +import tempfile +import unittest +from contextlib import ExitStack +from pathlib import Path +from unittest.mock import patch +from urllib.error import HTTPError, URLError + +from rich.console import Console + +from adagio.cli.pipeline import show_pipeline +from adagio.cli.pipeline_sources import ( + DEFAULT_PIPELINE_SOURCE, + GitHubCatalogLocation, + LocalCatalogLocation, + MAX_REMOTE_PIPELINE_BYTES, + PipelineResolutionError, + PipelineSource, + discover_workspace_catalog_roots, + parse_pipeline_source_reference, + resolve_pipeline_reference, + resolve_pipeline_reference_details, +) + + +def _sample_pipeline_payload() -> dict: + return { + "spec": { + "type": "pipeline", + "signature": { + "inputs": [], + "parameters": [], + "outputs": [], + }, + "graph": [ + { + "id": "task-dada2", + "kind": "plugin-action", + "plugin": "dada2", + "action": "denoise_single", + "inputs": {}, + "parameters": {}, + "outputs": {}, + } + ], + } + } + + +class _FakeResponse: + def __init__(self, payload: bytes) -> None: + self._payload = payload + + def __enter__(self) -> "_FakeResponse": + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + def read(self, size: int = -1) -> bytes: + if size is None or size < 0: + return self._payload + return self._payload[:size] + + +def _minimal_pipeline_payload() -> bytes: + return ( + b'{"spec": {"type": "pipeline", "signature": ' + b'{"inputs": [], "parameters": [], "outputs": []}, "graph": []}}' + ) + + +def _http_error(code: int) -> HTTPError: + return HTTPError( + url="https://example.invalid/pipeline.adg", + code=code, + msg="server error", + hdrs=None, + fp=None, + ) + + +class PipelineSourceTests(unittest.TestCase): + def test_parse_pipeline_source_reference_recognizes_source_slug_syntax( + self, + ) -> None: + self.assertEqual( + parse_pipeline_source_reference("@adagio/microbial-diversity"), + ("adagio", "microbial-diversity"), + ) + self.assertIsNone( + parse_pipeline_source_reference("@my-personal-channel/denoise") + ) + self.assertIsNone(parse_pipeline_source_reference("adagio/denoise")) + self.assertIsNone(parse_pipeline_source_reference("./pipeline.adg")) + self.assertIsNone(parse_pipeline_source_reference("pipeline.adg")) + self.assertIsNone(parse_pipeline_source_reference("@adagio/../denoise")) + self.assertIsNone(parse_pipeline_source_reference("@adagio/denoise/extra")) + self.assertIsNone(parse_pipeline_source_reference("@adagio/Denoise")) + + def test_discover_workspace_catalog_roots_finds_sibling_repo_from_worktree( + self, + ) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + workspace = Path(tmpdir) + catalog_root = workspace / "adagio-pipelines" + (catalog_root / "pipelines" / "community").mkdir(parents=True) + worktree_root = workspace / ".worktrees" / "adagio-cli-community-pipelines" + worktree_root.mkdir(parents=True) + + discovered = discover_workspace_catalog_roots(search_roots=(worktree_root,)) + + self.assertIn(catalog_root.resolve(), discovered) + + def test_existing_local_path_takes_precedence_over_source_resolution(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_path = Path(tmpdir) / DEFAULT_PIPELINE_SOURCE / "denoise" + pipeline_path.parent.mkdir(parents=True) + pipeline_path.write_text("{}", encoding="utf-8") + + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + pipeline_path, exit_stack=exit_stack + ) + + self.assertEqual(resolved, pipeline_path.resolve()) + + def test_source_reference_resolves_from_local_catalog(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + catalog_root = Path(tmpdir) / "adagio-pipelines" + pipeline_path = ( + catalog_root / "pipelines" / "community" / "denoise" / "pipeline.adg" + ) + pipeline_path.parent.mkdir(parents=True) + pipeline_path.write_text("{}", encoding="utf-8") + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(LocalCatalogLocation(root=catalog_root),), + ) + + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertEqual(resolved, pipeline_path.resolve()) + + def test_local_catalog_prefers_official_over_community(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + catalog_root = Path(tmpdir) / "adagio-pipelines" + official_path = ( + catalog_root / "pipelines" / "official" / "denoise" / "pipeline.adg" + ) + community_path = ( + catalog_root / "pipelines" / "community" / "denoise" / "pipeline.adg" + ) + official_path.parent.mkdir(parents=True) + community_path.parent.mkdir(parents=True) + official_path.write_text('{"source": "official"}', encoding="utf-8") + community_path.write_text('{"source": "community"}', encoding="utf-8") + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(LocalCatalogLocation(root=catalog_root),), + ) + + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertEqual(resolved, official_path.resolve()) + + def test_local_catalog_hit_skips_network_fallback(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + catalog_root = Path(tmpdir) / "adagio-pipelines" + pipeline_path = ( + catalog_root / "pipelines" / "official" / "denoise" / "pipeline.adg" + ) + pipeline_path.parent.mkdir(parents=True) + pipeline_path.write_text("{}", encoding="utf-8") + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=( + LocalCatalogLocation(root=catalog_root), + GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"), + ), + ) + + with patch("adagio.cli.pipeline_sources.urlopen") as mock_urlopen: + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertEqual(resolved, pipeline_path.resolve()) + mock_urlopen.assert_not_called() + + def test_source_reference_falls_back_to_github_when_needed(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + return_value=_FakeResponse( + b'{"spec": {"type": "pipeline", "signature": {"inputs": [], "parameters": [], "outputs": []}, "graph": []}}' + ), + ) as mock_urlopen: + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + payload = json.loads(resolved.read_text(encoding="utf-8")) + + request = mock_urlopen.call_args.args[0] + self.assertIn("/pipelines/official/denoise/pipeline.adg", request.full_url) + self.assertEqual(payload["spec"]["type"], "pipeline") + + def test_remote_fetch_continues_to_next_tier_after_non_404_error(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + side_effect=[_http_error(500), _FakeResponse(_minimal_pipeline_payload())], + ) as mock_urlopen: + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + payload = json.loads(resolved.read_text(encoding="utf-8")) + + first_request = mock_urlopen.call_args_list[0].args[0] + second_request = mock_urlopen.call_args_list[1].args[0] + self.assertIn( + "/pipelines/official/denoise/pipeline.adg", first_request.full_url + ) + self.assertIn( + "/pipelines/community/denoise/pipeline.adg", + second_request.full_url, + ) + self.assertEqual(payload["spec"]["type"], "pipeline") + + def test_remote_fetch_adds_github_token_header_when_configured(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch.dict(os.environ, {"GITHUB_TOKEN": "secret-token"}, clear=False): + with patch( + "adagio.cli.pipeline_sources.urlopen", + return_value=_FakeResponse(_minimal_pipeline_payload()), + ) as mock_urlopen: + with ExitStack() as exit_stack: + resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + request = mock_urlopen.call_args.args[0] + self.assertEqual( + request.full_url, + "https://api.github.com/repos/cymis/adagio-pipelines/contents/" + "pipelines/official/denoise/pipeline.adg?ref=main", + ) + self.assertEqual(request.get_header("Authorization"), "Bearer secret-token") + self.assertEqual(request.get_header("Accept"), "application/vnd.github.raw") + + def test_remote_fetch_rejects_oversized_pipeline_files(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + return_value=_FakeResponse(b"x" * (MAX_REMOTE_PIPELINE_BYTES + 2)), + ): + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertIn("file exceeds", str(error.exception)) + + def test_source_reference_reports_remote_origin(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + return_value=_FakeResponse(b"{}"), + ): + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference_details( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertTrue(resolved.is_remote) + self.assertIn("/pipelines/official/denoise/pipeline.adg", resolved.origin) + + def test_remote_http_error_is_reported(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + side_effect=HTTPError( + url="https://example.invalid/pipeline.adg", + code=500, + msg="server error", + hdrs=None, + fp=None, + ), + ): + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertIn("HTTP 500", str(error.exception)) + + def test_remote_url_error_is_reported(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + side_effect=URLError("network unavailable"), + ): + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + ) + + self.assertIn("network unavailable", str(error.exception)) + + def test_remote_pipeline_downloads_to_cache_dir_when_provided(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + expected_path = ( + cache_dir + / "adagio-pipelines" + / DEFAULT_PIPELINE_SOURCE + / "denoise" + / "pipeline.adg" + ) + + with patch( + "adagio.cli.pipeline_sources.urlopen", + return_value=_FakeResponse( + b'{"spec": {"type": "pipeline", "signature": {"inputs": [], "parameters": [], "outputs": []}, "graph": []}}' + ), + ) as mock_urlopen: + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference_details( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + download_cache_dir=cache_dir, + ) + payload = json.loads(resolved.path.read_text(encoding="utf-8")) + + request = mock_urlopen.call_args.args[0] + self.assertIn("/pipelines/official/denoise/pipeline.adg", request.full_url) + self.assertEqual(resolved.path, expected_path) + self.assertEqual(payload["spec"]["type"], "pipeline") + self.assertTrue(resolved.is_remote) + + def test_cached_remote_pipeline_short_circuits_network(self) -> None: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(GitHubCatalogLocation(owner="cymis", repo="adagio-pipelines"),), + ) + with tempfile.TemporaryDirectory() as tmpdir: + cache_dir = Path(tmpdir) / "cache" + cached_path = ( + cache_dir + / "adagio-pipelines" + / DEFAULT_PIPELINE_SOURCE + / "denoise" + / "pipeline.adg" + ) + cached_path.parent.mkdir(parents=True) + cached_path.write_text('{"cached": true}', encoding="utf-8") + + with patch("adagio.cli.pipeline_sources.urlopen") as mock_urlopen: + with ExitStack() as exit_stack: + resolved = resolve_pipeline_reference_details( + f"@{DEFAULT_PIPELINE_SOURCE}/denoise", + exit_stack=exit_stack, + sources=(source,), + download_cache_dir=cache_dir, + ) + + self.assertEqual(resolved.path, cached_path) + self.assertEqual(resolved.origin, str(cached_path)) + self.assertFalse(resolved.is_remote) + mock_urlopen.assert_not_called() + + def test_non_adagio_source_is_rejected(self) -> None: + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + "@my-personal-channel/denoise", + exit_stack=exit_stack, + sources=(), + ) + + self.assertIn("Expected @adagio/slug", str(error.exception)) + + def test_invalid_at_reference_reports_reference_shape(self) -> None: + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + "@adagio/../secret", + exit_stack=exit_stack, + sources=(), + ) + + self.assertIn("Expected @adagio/slug", str(error.exception)) + + def test_missing_source_reference_reports_attempted_locations(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=( + LocalCatalogLocation(root=Path(tmpdir) / "adagio-pipelines"), + ), + ) + + with ExitStack() as exit_stack: + with self.assertRaises(PipelineResolutionError) as error: + resolve_pipeline_reference( + f"@{DEFAULT_PIPELINE_SOURCE}/missing", + exit_stack=exit_stack, + sources=(source,), + ) + + message = str(error.exception) + self.assertIn("Pipeline reference '@adagio/missing' was not found.", message) + self.assertIn("pipelines/community/missing/pipeline.adg", message) + + +class PipelineSourceIntegrationTests(unittest.TestCase): + def test_pipeline_show_accepts_source_reference(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + catalog_root = Path(tmpdir) / "adagio-pipelines" + pipeline_path = ( + catalog_root / "pipelines" / "community" / "denoise" / "pipeline.adg" + ) + pipeline_path.parent.mkdir(parents=True) + pipeline_path.write_text( + json.dumps(_sample_pipeline_payload()), + encoding="utf-8", + ) + source = PipelineSource( + name=DEFAULT_PIPELINE_SOURCE, + locations=(LocalCatalogLocation(root=catalog_root),), + ) + output = io.StringIO() + console = Console(file=output, width=120, record=True) + + with patch( + "adagio.cli.pipeline_sources.default_pipeline_sources", + return_value=(source,), + ): + with patch("adagio.cli.pipeline.console", console): + show_pipeline(Path(f"@{DEFAULT_PIPELINE_SOURCE}/denoise")) + + self.assertIn("dada2.denoise_single", output.getvalue()) + + +if __name__ == "__main__": + unittest.main()