diff --git a/README.md b/README.md index 593dbd6..0935536 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ docgen validate --pre-push # validate all outputs before committing | `docgen tts [--segment 01] [--dry-run]` | Generate TTS audio | | `docgen manim [--scene StackDAGScene]` | Render Manim animations | | `docgen vhs [--tape 02-quickstart.tape] [--strict]` | Render VHS terminal recordings | +| `docgen sync-vhs [--segment 01] [--dry-run]` | Rewrite VHS `Sleep` values from `animations/timing.json` | | `docgen compose [01 02 03] [--ffmpeg-timeout 900]` | Compose segments (audio + video) | | `docgen validate [--max-drift 2.75] [--pre-push]` | Run all validation checks | | `docgen concat [--config full-demo]` | Concatenate full demo files | @@ -75,6 +76,13 @@ manim: vhs: vhs_path: "" # optional explicit binary path (relative to docgen.yaml or absolute) + sync_from_timing: false # opt-in: allow tape Sleep rewrites from timing.json + typing_ms_per_char: 55 # typing estimate used by sync-vhs + max_typing_sec: 3.0 # per block cap for typing estimate + min_sleep_sec: 0.05 # floor for rewritten Sleep values + +pipeline: + sync_vhs_after_timestamps: false # opt-in: run sync-vhs automatically in generate-all/rebuild-after-audio compose: ffmpeg_timeout_sec: 300 # can also be overridden with: docgen compose --ffmpeg-timeout N @@ -83,6 +91,15 @@ compose: If you edit a `.tape` file, run `docgen vhs` before `docgen compose` so compose does not use stale rendered terminal video. +To auto-align tape pacing with generated narration: + +```bash +docgen timestamps +docgen sync-vhs --dry-run +docgen sync-vhs +docgen vhs +docgen compose +``` ## System dependencies - **ffmpeg** — composition and probing diff --git a/docs/demos/docgen.yaml b/docs/demos/docgen.yaml index 333999c..921a136 100644 --- a/docs/demos/docgen.yaml +++ b/docs/demos/docgen.yaml @@ -72,6 +72,10 @@ manim: vhs: vhs_path: "" + sync_from_timing: true + typing_ms_per_char: 45 + max_typing_sec: 2.0 + min_sleep_sec: 0.2 compose: ffmpeg_timeout_sec: 300 diff --git a/src/docgen/cli.py b/src/docgen/cli.py index 72bb8df..c889e7c 100644 --- a/src/docgen/cli.py +++ b/src/docgen/cli.py @@ -122,6 +122,18 @@ def vhs(ctx: click.Context, tape: str | None, strict: bool) -> None: click.echo(f" {e}") +@main.command("sync-vhs") +@click.option("--segment", default=None, help="Sync tape(s) for one segment ID/name.") +@click.option("--dry-run", is_flag=True, help="Preview updates without writing files.") +@click.pass_context +def sync_vhs(ctx: click.Context, segment: str | None, dry_run: bool) -> None: + """Sync VHS Sleep durations from animations/timing.json.""" + from docgen.tape_sync import TapeSynchronizer + + cfg = ctx.obj["config"] + TapeSynchronizer(cfg).sync(segment=segment, dry_run=dry_run) + + @main.command() @click.argument("segments", nargs=-1) @click.option( @@ -225,22 +237,35 @@ def pages(ctx: click.Context, force: bool) -> None: @click.option("--skip-tts", is_flag=True) @click.option("--skip-manim", is_flag=True) @click.option("--skip-vhs", is_flag=True) +@click.option("--skip-tape-sync", is_flag=True, help="Skip optional sync-vhs stage after timestamps.") @click.pass_context -def generate_all(ctx: click.Context, skip_tts: bool, skip_manim: bool, skip_vhs: bool) -> None: +def generate_all( + ctx: click.Context, + skip_tts: bool, + skip_manim: bool, + skip_vhs: bool, + skip_tape_sync: bool, +) -> None: """Run full pipeline: TTS -> Manim -> VHS -> compose -> validate -> concat -> pages.""" from docgen.pipeline import Pipeline cfg = ctx.obj["config"] pipeline = Pipeline(cfg) - pipeline.run(skip_tts=skip_tts, skip_manim=skip_manim, skip_vhs=skip_vhs) + pipeline.run( + skip_tts=skip_tts, + skip_manim=skip_manim, + skip_vhs=skip_vhs, + skip_tape_sync=skip_tape_sync, + ) @main.command("rebuild-after-audio") +@click.option("--skip-tape-sync", is_flag=True, help="Skip optional sync-vhs stage after timestamps.") @click.pass_context -def rebuild_after_audio(ctx: click.Context) -> None: +def rebuild_after_audio(ctx: click.Context, skip_tape_sync: bool) -> None: """Rebuild everything after new audio: Manim -> VHS -> compose -> validate -> concat.""" from docgen.pipeline import Pipeline cfg = ctx.obj["config"] pipeline = Pipeline(cfg) - pipeline.run(skip_tts=True) + pipeline.run(skip_tts=True, skip_tape_sync=skip_tape_sync) diff --git a/src/docgen/config.py b/src/docgen/config.py index 8c8f9d3..243fcf1 100644 --- a/src/docgen/config.py +++ b/src/docgen/config.py @@ -94,12 +94,46 @@ def manim_path(self) -> str | None: value = self.raw.get("manim", {}).get("manim_path") return str(value) if value else None + @property + def vhs_config(self) -> dict[str, Any]: + defaults: dict[str, Any] = { + "vhs_path": "", + "sync_from_timing": False, + "typing_ms_per_char": 35, + "max_typing_sec": 3.0, + "min_sleep_sec": 0.2, + } + defaults.update(self.raw.get("vhs", {})) + return defaults + @property def vhs_path(self) -> str | None: """Optional absolute/relative path to the VHS executable.""" - value = self.raw.get("vhs", {}).get("vhs_path") + value = self.vhs_config.get("vhs_path") return str(value) if value else None + @property + def sync_from_timing(self) -> bool: + return bool(self.vhs_config.get("sync_from_timing", False)) + + @property + def typing_ms_per_char(self) -> int: + return int(self.vhs_config.get("typing_ms_per_char", 35)) + + @property + def max_typing_sec(self) -> float: + return float(self.vhs_config.get("max_typing_sec", 3.0)) + + @property + def min_sleep_sec(self) -> float: + return float(self.vhs_config.get("min_sleep_sec", 0.2)) + + @property + def sync_vhs_after_timestamps(self) -> bool: + pipeline_cfg = self.raw.get("pipeline", {}) + if "sync_vhs_after_timestamps" in pipeline_cfg: + return bool(pipeline_cfg.get("sync_vhs_after_timestamps")) + return self.sync_from_timing # -- Compose ---------------------------------------------------------------- @property diff --git a/src/docgen/init.py b/src/docgen/init.py index 4267cff..7a518e1 100644 --- a/src/docgen/init.py +++ b/src/docgen/init.py @@ -257,6 +257,10 @@ def _write_config(plan: InitPlan) -> str: }, "vhs": { "vhs_path": "", + "sync_from_timing": False, + "typing_ms_per_char": 55, + "max_typing_sec": 3.0, + "min_sleep_sec": 0.2, }, "compose": { "ffmpeg_timeout_sec": 300, diff --git a/src/docgen/pipeline.py b/src/docgen/pipeline.py index e1bf6ce..ecfa8c8 100644 --- a/src/docgen/pipeline.py +++ b/src/docgen/pipeline.py @@ -17,6 +17,7 @@ def run( skip_tts: bool = False, skip_manim: bool = False, skip_vhs: bool = False, + skip_tape_sync: bool = False, ) -> None: if not skip_tts: print("\n=== Stage: TTS ===") @@ -27,6 +28,11 @@ def run( from docgen.timestamps import TimestampExtractor TimestampExtractor(self.config).extract_all() + if self.config.sync_vhs_after_timestamps and not skip_tape_sync: + print("\n=== Stage: Sync VHS tape sleep timings ===") + from docgen.tape_sync import TapeSynchronizer + TapeSynchronizer(self.config).sync() + if not skip_manim: print("\n=== Stage: Manim ===") from docgen.manim_runner import ManimRunner diff --git a/src/docgen/tape_sync.py b/src/docgen/tape_sync.py new file mode 100644 index 0000000..2d0bdd9 --- /dev/null +++ b/src/docgen/tape_sync.py @@ -0,0 +1,342 @@ +"""Sync VHS Sleep values from animations/timing.json.""" + +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from docgen.config import Config + + +@dataclass +class TapeSyncChange: + line_no: int + old_sleep_sec: float + new_sleep_sec: float + old_line: str + new_line: str + + +@dataclass +class TapeSyncResult: + tape: str + timing_key: str | None = None + duration_sec: float = 0.0 + blocks_found: int = 0 + changes: list[TapeSyncChange] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + wrote_file: bool = False + + +@dataclass +class _TapeBlock: + type_idx: int + enter_idx: int + sleep_idx: int + typed_text: str + + +class TapeSynchronizer: + def __init__(self, config: Config) -> None: + self.config = config + + def sync(self, segment: str | None = None, dry_run: bool = False) -> list[TapeSyncResult]: + timing = self._load_timing_json() + if not timing: + print("[sync-vhs] No timing.json data found. Run `docgen timestamps` first.") + return [] + + targets = self._collect_targets(segment=segment) + if not targets: + if segment: + print(f"[sync-vhs] No VHS targets matched segment filter '{segment}'.") + else: + print("[sync-vhs] No VHS tapes found to sync.") + return [] + + results: list[TapeSyncResult] = [] + for tape_path, timing_keys in targets: + result = self._sync_one( + tape_path=tape_path, + timing=timing, + timing_keys=timing_keys, + dry_run=dry_run, + ) + self._print_result(result, dry_run=dry_run) + results.append(result) + + changed = sum(1 for r in results if r.changes) + wrote = sum(1 for r in results if r.wrote_file) + print( + f"[sync-vhs] Done: {len(results)} tape(s), {changed} with changes, " + f"{wrote} file(s) written." + ) + return results + + def _collect_targets(self, segment: str | None) -> list[tuple[Path, list[str]]]: + query = segment.lower().strip() if segment else None + targets: list[tuple[Path, list[str]]] = [] + + for seg_id in sorted(self.config.visual_map): + vmap = self.config.visual_map.get(seg_id, {}) + if str(vmap.get("type", "")).lower() != "vhs": + continue + seg_name = self.config.resolve_segment_name(seg_id) + tape_name = str(vmap.get("tape", "")).strip() + if not tape_name: + source_name = str(vmap.get("source", "")).strip() + if source_name: + tape_name = f"{Path(source_name).stem}.tape" + if not tape_name: + continue + + tape_path = self.config.terminal_dir / tape_name + tape_stem = tape_path.stem + if query and query not in {seg_id.lower(), seg_name.lower(), tape_stem.lower()}: + continue + + timing_keys = [seg_name, seg_id, tape_stem] + targets.append((tape_path, self._unique_strings(timing_keys))) + + if targets: + return targets + + # Fallback for legacy projects without visual_map tape metadata. + for tape_path in sorted(self.config.terminal_dir.glob("*.tape")): + tape_stem = tape_path.stem + if query and query != tape_stem.lower(): + continue + targets.append((tape_path, [tape_stem])) + return targets + + @staticmethod + def _unique_strings(values: list[str]) -> list[str]: + seen: set[str] = set() + out: list[str] = [] + for value in values: + if value and value not in seen: + seen.add(value) + out.append(value) + return out + + def _load_timing_json(self) -> dict[str, Any]: + path = self.config.animations_dir / "timing.json" + if not path.exists(): + return {} + try: + data = json.loads(path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + print(f"[sync-vhs] Invalid JSON: {path}") + return {} + + def _sync_one( + self, + tape_path: Path, + timing: dict[str, Any], + timing_keys: list[str], + dry_run: bool, + ) -> TapeSyncResult: + result = TapeSyncResult(tape=tape_path.name) + if not tape_path.exists(): + result.warnings.append(f"missing tape: {tape_path}") + return result + + timing_key = next((k for k in timing_keys if k in timing), None) + if not timing_key: + result.warnings.append(f"no timing key found (tried: {', '.join(timing_keys)})") + return result + result.timing_key = timing_key + + duration_sec = self._timing_duration_sec(timing[timing_key]) + result.duration_sec = duration_sec + if duration_sec <= 0: + result.warnings.append(f"timing data for '{timing_key}' has zero duration") + return result + + lines = tape_path.read_text(encoding="utf-8").splitlines() + blocks = self._find_blocks(lines) + result.blocks_found = len(blocks) + if not blocks: + result.warnings.append("no Type/Enter/Sleep blocks found after first Show") + return result + + window_sec = duration_sec / len(blocks) + ms_per_char = max(1, self.config.typing_ms_per_char) + max_typing_sec = max(0.0, self.config.max_typing_sec) + min_sleep_sec = max(0.0, self.config.min_sleep_sec) + + for block in blocks: + old_line = lines[block.sleep_idx] + old_sleep = self._parse_sleep_sec(old_line) + if old_sleep is None: + continue + + typing_est = min(max_typing_sec, (len(block.typed_text) * ms_per_char) / 1000.0) + new_sleep = max(min_sleep_sec, window_sec - typing_est) + new_line = self._format_sleep_line(new_sleep) + if new_line == old_line.strip(): + continue + + lines[block.sleep_idx] = new_line + result.changes.append( + TapeSyncChange( + line_no=block.sleep_idx + 1, + old_sleep_sec=old_sleep, + new_sleep_sec=new_sleep, + old_line=old_line, + new_line=new_line, + ) + ) + + if result.changes and not dry_run: + tape_path.write_text("\n".join(lines) + "\n", encoding="utf-8") + result.wrote_file = True + return result + + @staticmethod + def _timing_duration_sec(entry: Any) -> float: + if not isinstance(entry, dict): + return 0.0 + + max_end = 0.0 + for key in ("words", "segments"): + values = entry.get(key, []) + if not isinstance(values, list): + continue + for item in values: + if not isinstance(item, dict): + continue + try: + max_end = max(max_end, float(item.get("end", 0))) + except (TypeError, ValueError): + continue + return max_end + + @staticmethod + def _find_blocks(lines: list[str]) -> list[_TapeBlock]: + show_idx = 0 + for i, line in enumerate(lines): + if line.strip().startswith("Show"): + show_idx = i + break + + blocks: list[_TapeBlock] = [] + i = show_idx + 1 + while i < len(lines): + current = lines[i].strip() + if not current.startswith("Type "): + i += 1 + continue + + type_idx = i + typed_text = TapeSynchronizer._extract_typed_text(current) + enter_idx: int | None = None + sleep_idx: int | None = None + + j = i + 1 + while j < len(lines): + nxt = lines[j].strip() + if nxt.startswith("Type "): + break + if enter_idx is None and nxt.startswith("Enter"): + enter_idx = j + elif enter_idx is not None and nxt.startswith("Sleep "): + sleep_idx = j + break + j += 1 + + if enter_idx is not None and sleep_idx is not None: + blocks.append( + _TapeBlock( + type_idx=type_idx, + enter_idx=enter_idx, + sleep_idx=sleep_idx, + typed_text=typed_text, + ) + ) + i = max(j, i + 1) + + return blocks + + @staticmethod + def _extract_typed_text(type_line: str) -> str: + payload = type_line[len("Type "):].strip() + return TapeSynchronizer._unquote(payload) + + @staticmethod + def _unquote(value: str) -> str: + if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: + return value[1:-1] + return value + + @staticmethod + def _parse_sleep_sec(line: str) -> float | None: + match = re.match(r"^\s*Sleep\s+([0-9]*\.?[0-9]+)\s*(ms|s)?\s*$", line) + if not match: + return None + value = float(match.group(1)) + unit = (match.group(2) or "s").lower() + if unit == "ms": + return value / 1000.0 + return value + + @staticmethod + def _format_sleep_line(seconds: float) -> str: + if seconds < 1.0: + ms = max(1, int(round(seconds * 1000))) + return f"Sleep {ms}ms" + + rounded = round(seconds, 2) + if abs(rounded - round(rounded)) < 1e-9: + return f"Sleep {int(round(rounded))}s" + return f"Sleep {rounded:.2f}s" + + @staticmethod + def _print_result(result: TapeSyncResult, dry_run: bool) -> None: + prefix = "[sync-vhs] DRY-RUN" if dry_run else "[sync-vhs]" + key_msg = result.timing_key or "no timing key" + print( + f"{prefix} {result.tape}: key={key_msg}, duration={result.duration_sec:.2f}s, " + f"blocks={result.blocks_found}, changes={len(result.changes)}" + ) + for warning in result.warnings: + print(f"{prefix} WARN: {warning}") + for change in result.changes[:10]: + print( + f"{prefix} L{change.line_no}: {change.old_line.strip()} -> " + f"{change.new_line}" + ) + + +def sync_single_tape_from_timing( + tape_path: str | Path, + timing_entry: dict[str, Any], + *, + typing_ms_per_char: int = 45, + max_typing_sec: float = 3.0, + min_sleep_sec: float = 0.15, + dry_run: bool = False, +) -> TapeSyncResult: + """Pure helper used by tests and external callers.""" + path = Path(tape_path) + fake_cfg = type( + "_Cfg", + (), + { + "typing_ms_per_char": typing_ms_per_char, + "max_typing_sec": max_typing_sec, + "min_sleep_sec": min_sleep_sec, + }, + )() + syncer = TapeSynchronizer(fake_cfg) # type: ignore[arg-type] + return syncer._sync_one( # noqa: SLF001 - intentional internal reuse + tape_path=path, + timing={path.stem: timing_entry}, + timing_keys=[path.stem], + dry_run=dry_run, + ) diff --git a/tests/test_config.py b/tests/test_config.py index 25d5054..b616d94 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -78,8 +78,9 @@ def test_resolved_dirs(tmp_config): def test_binary_paths_and_compose_config(tmp_path): cfg = { "manim": {"manim_path": "/opt/bin/manim"}, - "vhs": {"vhs_path": "/opt/bin/vhs"}, + "vhs": {"vhs_path": "/opt/bin/vhs", "sync_from_timing": True, "typing_ms_per_char": 40}, "compose": {"ffmpeg_timeout_sec": 900, "warn_stale_vhs": False}, + "pipeline": {"sync_vhs_after_timestamps": True}, } p = tmp_path / "docgen.yaml" p.write_text(yaml.dump(cfg), encoding="utf-8") @@ -88,3 +89,6 @@ def test_binary_paths_and_compose_config(tmp_path): assert c.vhs_path == "/opt/bin/vhs" assert c.ffmpeg_timeout_sec == 900 assert c.warn_stale_vhs is False + assert c.sync_from_timing is True + assert c.sync_vhs_after_timestamps is True + assert c.typing_ms_per_char == 40 diff --git a/tests/test_tape_sync.py b/tests/test_tape_sync.py new file mode 100644 index 0000000..c443732 --- /dev/null +++ b/tests/test_tape_sync.py @@ -0,0 +1,138 @@ +"""Tests for syncing VHS Sleep values from timing.json.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import yaml + +from docgen.config import Config +from docgen.tape_sync import TapeSynchronizer + + +def _write_cfg(tmp_path: Path, cfg: dict) -> Config: + path = tmp_path / "docgen.yaml" + path.write_text(yaml.dump(cfg), encoding="utf-8") + return Config.from_yaml(path) + + +def test_sync_rewrites_sleep_values(tmp_path: Path) -> None: + cfg = { + "dirs": { + "audio": "audio", + "animations": "animations", + "terminal": "terminal", + "recordings": "recordings", + "narration": "narration", + }, + "segments": {"default": ["01"], "all": ["01"]}, + "segment_names": {"01": "01-demo"}, + "visual_map": {"01": {"type": "vhs", "tape": "01-demo.tape", "source": "01-demo.mp4"}}, + "vhs": { + "sync_from_timing": True, + "typing_ms_per_char": 100, + "max_typing_sec": 0.5, + "min_sleep_sec": 0.1, + }, + } + c = _write_cfg(tmp_path, cfg) + (tmp_path / "animations").mkdir(parents=True, exist_ok=True) + (tmp_path / "terminal").mkdir(parents=True, exist_ok=True) + + timing = { + "01-demo": { + "segments": [ + {"start": 0.0, "end": 2.0}, + {"start": 2.0, "end": 4.0}, + ] + } + } + (tmp_path / "animations" / "timing.json").write_text(json.dumps(timing), encoding="utf-8") + + tape = tmp_path / "terminal" / "01-demo.tape" + tape.write_text( + "\n".join( + [ + 'Set Shell "bash"', + "Show", + 'Type "echo one"', + "Enter", + "Sleep 5s", + 'Type "echo two"', + "Enter", + "Sleep 4s", + "", + ] + ), + encoding="utf-8", + ) + + results = TapeSynchronizer(c).sync() + assert len(results) == 1 + assert results[0].changes + new_text = tape.read_text(encoding="utf-8") + assert "Sleep 5s" not in new_text + assert "Sleep 4s" not in new_text + assert "Sleep " in new_text + + +def test_sync_dry_run_does_not_write(tmp_path: Path) -> None: + cfg = { + "dirs": {"animations": "animations", "terminal": "terminal"}, + "segments": {"default": ["01"], "all": ["01"]}, + "segment_names": {"01": "01-demo"}, + "visual_map": {"01": {"type": "vhs", "tape": "01-demo.tape", "source": "01-demo.mp4"}}, + } + c = _write_cfg(tmp_path, cfg) + (tmp_path / "animations").mkdir(parents=True, exist_ok=True) + (tmp_path / "terminal").mkdir(parents=True, exist_ok=True) + (tmp_path / "animations" / "timing.json").write_text( + json.dumps({"01-demo": {"segments": [{"start": 0.0, "end": 2.0}]}}), + encoding="utf-8", + ) + tape = tmp_path / "terminal" / "01-demo.tape" + original = "\n".join(['Show', 'Type "echo one"', "Enter", "Sleep 5s", ""]) + "\n" + tape.write_text(original, encoding="utf-8") + + results = TapeSynchronizer(c).sync(dry_run=True) + assert len(results) == 1 + assert results[0].changes + assert tape.read_text(encoding="utf-8") == original + + +def test_sync_segment_filter(tmp_path: Path) -> None: + cfg = { + "dirs": {"animations": "animations", "terminal": "terminal"}, + "segments": {"default": ["01", "02"], "all": ["01", "02"]}, + "segment_names": {"01": "01-demo", "02": "02-demo"}, + "visual_map": { + "01": {"type": "vhs", "tape": "01-demo.tape", "source": "01-demo.mp4"}, + "02": {"type": "vhs", "tape": "02-demo.tape", "source": "02-demo.mp4"}, + }, + } + c = _write_cfg(tmp_path, cfg) + (tmp_path / "animations").mkdir(parents=True, exist_ok=True) + (tmp_path / "terminal").mkdir(parents=True, exist_ok=True) + (tmp_path / "animations" / "timing.json").write_text( + json.dumps( + { + "01-demo": {"segments": [{"start": 0.0, "end": 2.0}]}, + "02-demo": {"segments": [{"start": 0.0, "end": 2.0}]}, + } + ), + encoding="utf-8", + ) + (tmp_path / "terminal" / "01-demo.tape").write_text( + "\n".join(["Show", 'Type "a"', "Enter", "Sleep 4s", ""]) + "\n", + encoding="utf-8", + ) + (tmp_path / "terminal" / "02-demo.tape").write_text( + "\n".join(["Show", 'Type "b"', "Enter", "Sleep 4s", ""]) + "\n", + encoding="utf-8", + ) + + results = TapeSynchronizer(c).sync(segment="01") + assert len(results) == 1 + assert results[0].tape == "01-demo.tape" +