diff --git a/docs/source/contributors.md b/docs/source/contributors.md index a9ab324e..4762aa95 100644 --- a/docs/source/contributors.md +++ b/docs/source/contributors.md @@ -41,12 +41,15 @@ Martin Kilbinger - [Axel Guinot](https://github.com/aguinot) : Primary module developer, validation, performance and debugging - [Martin Kilbinger](https://github.com/martinkilbinger) : Scientific lead, module developer and debugging -## Principal Contributors +## Contributors - [Lucie Baumont](https://github.com/lbaumo) : Testing - [Jérôme Bonnin](https://github.com/jerome-bonnin) : PSF module developer +- [Cail Daley](https://www.cosmostat.org/people/cail-daley) : Container development +- [Fabian Hervas Peters](https://www.cosmostat.org/people/former-members/fabian-hervas-peters) : Simulations, validation - Marc Gentile : File IO, initial architecture -- Xavier Jimenez : External catalogue matching module developer +- [Mike Hudson](https://uwaterloo.ca/physics-astronomy/profile/mjhudson) : Further contributors +- Xavier Jimenez : External catalogue matching - [François Lanusse](https://github.com/eiffl) : Further contributors - [Tobias Liaudat](https://github.com/tobias-liaudat) : PSF module developer and validation - [Austin Peel](https://github.com/austinpeel) : Validation diff --git a/docs/source/pipeline_canfar.md b/docs/source/pipeline_canfar.md index 39111b05..13e7689c 100644 --- a/docs/source/pipeline_canfar.md +++ b/docs/source/pipeline_canfar.md @@ -456,7 +456,6 @@ ln -s ..//unions_shapepipe_comprehensive_struct_2024_v1.6.c.hdf5 unions_shapepip calibrate_comprehensive - ### Create matched star catalogue For diagnostics, a catalogue with multi-epoch shapes measured by ngmix matched with the validation star catalogue is used. diff --git a/pyproject.toml b/pyproject.toml index 7e4e938a..e0a77109 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,8 @@ dependencies = [ "canfar", "sf_tools", "h5py", + "healsparse", + "skyproj", ] @@ -33,6 +35,10 @@ lint = [ "black", "isort" ] +plot = [ + "skyproj", + "matplotlib" +] release = [ "build", "twine", @@ -52,6 +58,12 @@ summary_run = "shapepipe.summary_run:main" canfar_submit_job = "shapepipe.canfar_run:run_job" canfar_monitor = "shapepipe.canfar_run:run_log" canfar_monitor_log = "shapepipe.canfar_run:run_monitor_log" +get_ccd_with_psfs = "shapepipe.get_ccds_run:run_ccd_psf_handler" +download_headers = "shapepipe.coverage_run:run_download_headers" +extract_field_corners = "shapepipe.coverage_run:run_extract_corners" +build_coverage_map = "shapepipe.coverage_run:run_build_coverage" +plot_coverage_map = "shapepipe.coverage_run:run_plot_coverage" +coverage_pipeline = "shapepipe.coverage_run:run_pipeline" [tool.pytest.ini_options] addopts = "--verbose --cov=shapepipe" diff --git a/scripts/python/get_ccds_with_psf.py b/scripts/python/get_ccds_with_psf.py old mode 100644 new mode 100755 index 5f917ad0..e284e264 --- a/scripts/python/get_ccds_with_psf.py +++ b/scripts/python/get_ccds_with_psf.py @@ -9,192 +9,50 @@ """ - import sys -import numpy as np - -from shapepipe.utilities import summary - - -def get_lines(fname): - """Get Lines. - - Return list of lines read from a text file. - - Parameters - ---------- - fname: str - input file name - Returns: - list - IDs +from shapepipe.utilities.ccd_psf_handler import CcdPsfHandler - """ - IDs = [] - with open(fname) as f: - lines = f.readlines() - for line in lines: - IDs.append(line.rstrip()) - - return IDs +def run_ccd_psf_handler(args=None): + """Run CCD PSF Handler. -def get_exp_shdu_missing(patches): - """Get Exp Shdu Missing. - - Returns set of missing CCDs (single-exposure single-HDU IDs) from a list of patches. + Create instance and run the CCD PSF handler. Parameters ---------- - patches: list - input patches + args : list, optional + command line arguments Returns ------- - set - missing CCD IDs + int + exit code """ - exp_shdu_missing_all = set() - - for patch in patches: - - path_exp_shdu_missing = f"{patch}/summary/missing_job_32_all.txt" - exp_shdu_missing = get_lines(path_exp_shdu_missing) - - print(f"Patch {patch}: Found {len(exp_shdu_missing)} missing ccds", end="; ") - - exp_shdu_missing_all.update(exp_shdu_missing) - - print(f"cumulative {len(exp_shdu_missing_all)} missing ccds") - - print() - - return exp_shdu_missing_all - - -def get_exp(patches): - """Get Exp. - - Return set of exposures from a list of patches. - - Parameters - ---------- - patches: list - input patches - - Returns - ------- - set - exposure IDs - - """ - exp_all = set() - - for patch in patches: - - path_exp = f"{patch}/exp_numbers.txt" - exp = get_lines(path_exp) - - print(f"Patch {patch}: Found {len(exp)} exposures", end="; ") - - exp_all.update(exp) - - print(f"cumulative {len(exp_all)} exposures") + # Create instance + obj = CcdPsfHandler() - print() + return obj.run(args=args) - return exp_all +def main(argv=None): + """Main. -def get_ccds_with_psf(patches, n_CCD=40): - """Get CCDs With PSF. - - Return set of CCDs from list of patches. + Main program. Parameters ---------- - patches: list - input patches + argv : list, optional + command line arguments Returns ------- - set - CCD IDs + int + exit code """ - # Get missing CCDs - print("=== get missing CCDs ===") - exp_shdu_missing_all = get_exp_shdu_missing(patches) - - # Get all exposures used in tiles - print("=== get exposures ===") - exp_all = get_exp(patches) - - # Turn exposures into exposure-single-HDU names (CCDs) - exp_shdu_all = summary.get_all_shdus(exp_all, n_CCD) - - print(f"Found {len(exp_shdu_all)} CCDs") - - return exp_shdu_all - - -def get_ccds_with_psf_method_2(patches, n_CCD=40): - - for patch in patches: - directory = f"{patch}/exp_runs" - - for entry in os.scandir(directory): - pass - -def save(IDs, path): - """Save. - - Save list of IDs to text file. - - Parameters - ---------- - IDs: set - input IDs - - path: str - output file name - - """ - with open(path, "w") as f_out: - for ID in IDs: - print(ID, file=f_out) - -def main(argv): - """Main. - - Main program. - - """ - version = "v1.5" - - if version == "v1.4": - n_patch = 7 - elif version == "v1.5": - n_patch = 8 - else: - raise ValueError(f"Invalid version {version}") - - patches = [f'P{x}' for x in np.arange(n_patch) + 1] - - print(f"=== get_ccds_with_psf for version {version}, patches {patches} ===") - - print("=== method 1: exp_list - missing === ") - exp_shdu_all = get_ccds_with_psf(patches) - - save(exp_shdu_all, f"ccds_with_psf_{version}.txt") - - #print("=== method 2: star cats === ") - #exp_shdu_all_method_2 = get_ccds_with_psf_method_2(patches) - #save(exp_shdu_all_method_2, f"ccds_with_psf_{version}_method_2.txt") - - return 0 + return run_ccd_psf_handler(args=argv) if __name__ == "__main__": diff --git a/src/shapepipe/coverage_run.py b/src/shapepipe/coverage_run.py new file mode 100644 index 00000000..3b30ce04 --- /dev/null +++ b/src/shapepipe/coverage_run.py @@ -0,0 +1,149 @@ +"""COVERAGE_RUN + +Call coverage processing classes. + +Author: Martin Kilbinger + +""" + +import sys + +from shapepipe.utilities.header_downloader import HeaderDownloader +from shapepipe.utilities.field_corners_extractor import FieldCornersExtractor +from shapepipe.utilities.coverage_map_builder import CoverageMapBuilder +from shapepipe.utilities.coverage_plotter import CoveragePlotter + + +def run_download_headers(args=None): + """Run Download Headers. + + Download FITS headers from VOSpace for exposures in a CCD list. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + obj = HeaderDownloader() + return obj.run(args=args) + + +def run_extract_corners(args=None): + """Run Extract Corners. + + Extract field corner coordinates from FITS headers. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + obj = FieldCornersExtractor() + return obj.run(args=args) + + +def run_build_coverage(args=None): + """Run Build Coverage. + + Build HealSparse coverage maps from field corner coordinates. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + obj = CoverageMapBuilder() + return obj.run(args=args) + + +def run_plot_coverage(args=None): + """Run Plot Coverage. + + Plot HealSparse coverage map. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + obj = CoveragePlotter() + return obj.run(args=args) + + +def run_pipeline(args=None): + """Run Pipeline. + + Run the complete coverage map pipeline: + 1. Download FITS headers + 2. Extract field corners + 3. Build coverage map + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + print("Step 1: Downloading FITS headers") + ret = run_download_headers(args) + if ret != 0: + print("Failed to download headers") + return ret + + print() + print("Step 2: Extracting field corners") + ret = run_extract_corners(args) + if ret != 0: + print("Failed to extract field corners") + return ret + + print() + print("Step 3: Building coverage map") + ret = run_build_coverage(args) + if ret != 0: + print("Failed to build coverage map") + return ret + + print() + print("Pipeline completed successfully") + + return 0 + + +def main(argv=None): + """Main. + + Main program. + + """ + # Scripts to call coverage classes are created by pyproject.toml + return 0 diff --git a/src/shapepipe/get_ccds_run.py b/src/shapepipe/get_ccds_run.py new file mode 100644 index 00000000..fb18b0a7 --- /dev/null +++ b/src/shapepipe/get_ccds_run.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 + +"""GET_CCDS_WITH_PSF + +Obtain list of CCDs (single-exposure single-HDU files) for which valid PSF information +is available. This can serve to create a footprint coverage mask. + +Author: Martin Kilbinger + +""" + +import sys + +from shapepipe.utilities.ccd_psf_handler import CcdPsfHandler + + +def run_ccd_psf_handler(args=None): + """Run CCD PSF Handler. + + Create instance and run the CCD PSF handler. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code + + """ + # Create instance + + obj = CcdPsfHandler() + + return obj.run(args=args) + + +def main(argv=None): + """Main. + """ + # A scripts to call the ccd psf class is created by pyproject.toml + return 0 diff --git a/src/shapepipe/info.py b/src/shapepipe/info.py index 7ce7f1d7..4748c2bc 100644 --- a/src/shapepipe/info.py +++ b/src/shapepipe/info.py @@ -69,11 +69,8 @@ def shapepipe_logo(colour=False): Martin Kilbinger Main Contributors: - Tobias Liaudat - Morgan Schmitz - Andre Zamorano Vitorelli - Francois Lanusse - Xavier Jimenez + Cail Daley + Fabian Hervas Peters Version: {} diff --git a/src/shapepipe/utilities/ccd_psf_handler.py b/src/shapepipe/utilities/ccd_psf_handler.py new file mode 100644 index 00000000..3ee29303 --- /dev/null +++ b/src/shapepipe/utilities/ccd_psf_handler.py @@ -0,0 +1,378 @@ +"""CCD_PSF_HANDLER + +Obtain list of CCDs (single-exposure single-HDU files) for which valid PSF +information is available. This can serve to create a footprint coverage mask. + +Author: Mike Hudson, Martin Kilbinger + +""" + +import glob +import os +import re +import sys + +import numpy as np + +from cs_util import args as cs_args +from cs_util import logging + +from shapepipe.utilities import summary + + +class CcdPsfHandler(object): + """CCD PSF Handler Class. + + Handles extraction of CCDs with valid PSF information from shapepipe + patches. + """ + + def __init__(self): + """Initialize the handler.""" + self.params_default() + + def params_default(self): + """Set default parameters and command line options.""" + + self._params = { + "version_cat": "v1.6", + "n_CCD": 40, + "output": None, + } + + self._short_options = { + "version_cat": "-V", + "n_CCD": "-n", + "output": "-o", + } + + self._types = { + "n_CCD": "int", + } + + self._help_strings = { + "version_cat": "catalogue major version, allowed are v1.3, v1.4, v1.5, v1.6; default is {}", + + "n_CCD": "number of CCDs per exposure; default is {}", + "output": "output file path; default is ccds_with_psf_.txt", + } + + def set_params_from_command_line(self, args): + """Set Params From Command line. + + Only use when calling using python from command line. + Does not work from ipython or jupyter. + + Parameters + ---------- + args : list + command line arguments + + """ + # Read command line options + options = cs_args.parse_options( + self._params, + self._short_options, + self._types, + self._help_strings, + args=args, + ) + self._params = options + + # Save calling command + logging.log_command(args) + + def update_params(self): + """Update parameters. + + Set derived parameters based on input parameters. + """ + # Determine number of patches based on version + version = self._params["version_cat"] + if version in ("v1.3", "v1.4"): + n_patch = 7 + elif version == "v1.5": + n_patch = 8 + elif version == "v1.6": + n_patch = 9 + else: + raise ValueError(f"Invalid version {version}") + + self._params["n_patch"] = n_patch + self._params["patches"] = [f"P{x}" for x in np.arange(n_patch) + 1] + + # Set output file if not specified + if self._params["output"] is None: + self._params["output"] = f"ccds_with_psf_{version}.txt" + + def check_params(self): + """Check parameters for validity.""" + # Add any parameter validation logic here + pass + + def get_lines(self, fname): + """Get Lines. + + Return list of lines read from a text file. + + Parameters + ---------- + fname : str + input file name + + Returns + ------- + list + IDs + + """ + IDs = [] + with open(fname) as f: + lines = f.readlines() + for line in lines: + IDs.append(line.rstrip()) + + return IDs + + def get_exp_shdu_missing(self, patches): + """Get Exp Shdu Missing. + + Returns set of missing CCDs (single-exposure single-HDU IDs) from a list of patches. + + Parameters + ---------- + patches : list + input patches + + Returns + ------- + set + missing CCD IDs + + """ + exp_shdu_missing_all = set() + + for patch in patches: + + path_exp_shdu_missing = f"{patch}/summary/missing_job_32_all.txt" + exp_shdu_missing = self.get_lines(path_exp_shdu_missing) + + print( + f"Patch {patch}: Found {len(exp_shdu_missing)} missing ccds", + end="; ", + ) + + exp_shdu_missing_all.update(exp_shdu_missing) + + print(f"cumulative {len(exp_shdu_missing_all)} missing ccds") + + print() + + return exp_shdu_missing_all + + def get_exp(self, patches): + """Get Exp. + + Return set of exposures from a list of patches. + + Parameters + ---------- + patches : list + input patches + + Returns + ------- + set + exposure IDs + + """ + exp_all = set() + + for patch in patches: + + path_exp = f"{patch}/exp_numbers.txt" + exp = self.get_lines(path_exp) + + print(f"Patch {patch}: Found {len(exp)} exposures", end="; ") + + exp_all.update(exp) + + print(f"cumulative {len(exp_all)} exposures") + + print() + + return exp_all + + def get_ccds_with_psf(self, patches, n_CCD=40): + """Get CCDs With PSF. + + Return set of CCDs from list of patches. + + Parameters + ---------- + patches : list + input patches + n_CCD : int + number of CCDs per exposure + + Returns + ------- + set + CCD IDs + + """ + # Get missing CCDs + print("=== get missing CCDs ===") + exp_shdu_missing_all = self.get_exp_shdu_missing(patches) + + # Get all exposures used in tiles + print("=== get exposures ===") + exp_all = self.get_exp(patches) + + # Turn exposures into exposure-single-HDU names (CCDs) + exp_shdu_all = summary.get_all_shdus(exp_all, n_CCD) + + print(f"Found {len(exp_shdu_all)} CCDs") + + return exp_shdu_all + + def get_ccds_with_psf_method_v1_3(self, patches, n_CCD=40): + """Get CCDs With PSF Method v1.3. + + Return set of CCDs with valid PSF by scanning run directories. + Finds star_selection files and checks corresponding stat files + for valid (non-nan) FWHM values. + + Parameters + ---------- + patches : list + input patches + n_CCD : int + number of CCDs per exposure (unused, kept for API consistency) + + Returns + ------- + set + CCD IDs with valid PSF + + """ + exp_shdu_all = set() + + for patch in patches: + # Find all star_selection files + mask_pattern = ( + f"{patch}/output/run_sp_exp_SxSePs*/" + f"setools_runner/output/mask/star_selection-*.fits" + ) + mask_files = glob.glob(mask_pattern) + + print( + f"Patch {patch}: Found {len(mask_files)} star_selection FITS" + + " files" + ) + + for mask_file in mask_files: + # Extract exp_shdu from filename + basename = os.path.basename(mask_file) + match = re.match(r"star_selection-(.+)\.fits", basename) + if not match: + print(f"Warning: Non-matching file name {basename}") + continue + exp_shdu = match.group(1) + + # Get corresponding stat file path + # Replace mask dir with stat/stat dir in the path + stat_file = mask_file.replace( + "/mask/star_selection-", + "/stat/star_stat-", + ).replace(".fits", ".txt") + + #print("MKDEBUG ", exp_shdu, stat_file) + + # Check if stat file exists and has valid FWHM + if not os.path.exists(stat_file): + print("MKDEBUG stat file ", stat_file, " does not exist") + continue + + # Read stat file and check for nan in "Mean star fwhm selected" + has_nan = False + with open(stat_file) as f: + for line in f: + if "Mean star fwhm selected" in line: + if "nan" in line.lower(): + has_nan = True + break + + if not has_nan: + #print("MKDEBUG append", exp_shdu) + exp_shdu_all.add(exp_shdu) + + print(f"After patch {patch}: {len(exp_shdu_all)} CCDs with valid PSF") + + print(f"Found {len(exp_shdu_all)} CCDs with valid PSF") + + return exp_shdu_all + + def save(self, IDs, path): + """Save. + + Save list of IDs to text file. + + Parameters + ---------- + IDs : set + input IDs + path : str + output file name + + """ + with open(path, "w") as f_out: + for ID in IDs: + print(ID, file=f_out) + + def run(self, args=None): + """Run. + + Main execution method. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code (0 for success) + + """ + if args is None: + args = sys.argv + + # Set parameters from command line + self.set_params_from_command_line(args) + self.update_params() + self.check_params() + + # Get parameters + patches = self._params["patches"] + version = self._params["version_cat"] + n_CCD = self._params["n_CCD"] + output = self._params["output"] + + print( + f"=== get_ccds_with_psf for version {version}, patches {patches} ===" + ) + + if self._params["version_cat"] == "v1.3": + print("=== method for 1.3: exp with PSF star sample === ") + exp_shdu_all = self.get_ccds_with_psf_method_v1_3(patches, n_CCD) + else: + print("=== method for >= 1.4: exp_list - missing === ") + exp_shdu_all = self.get_ccds_with_psf(patches, n_CCD) + + self.save(exp_shdu_all, output) + + print(f"Results saved to {output}") + + return 0 diff --git a/src/shapepipe/utilities/coverage_map_builder.py b/src/shapepipe/utilities/coverage_map_builder.py new file mode 100644 index 00000000..152598e1 --- /dev/null +++ b/src/shapepipe/utilities/coverage_map_builder.py @@ -0,0 +1,325 @@ +"""COVERAGE_MAP_BUILDER + +Build HealSparse coverage maps from field corner coordinates. + +Author: Mike Hudson, Martin Kilbinger + +""" + +import sys +from os.path import exists + +import numpy as np +import healsparse as hsp +import hpgeom as hpg + +from cs_util import args as cs_args +from cs_util import logging + +from shapepipe.utilities.coverage_plotter import CoveragePlotter + + +class CoverageMapBuilder(object): + """Coverage Map Builder Class. + + Builds HealSparse coverage maps from field corner coordinates. + """ + + def __init__(self): + """Initialize the builder.""" + self.params_default() + + def params_default(self): + """Set default parameters and command line options.""" + + self._params = { + "input_file": "exp_ra_dec.txt", + "output_file": "coverage.hsp", + "nside_coverage": 32, + "nside": 2048, + "apply_median_filter": False, + "n_median_iterations": 2, + "create_boolean": False, + "boolean_threshold": 3, + "boolean_output": None, + "create_plot": False, + "plot_output": None, + "plot_region": None, + } + + self._short_options = { + "input_file": "-i", + "output_file": "-o", + "nside_coverage": "-c", + "nside": "-n", + "apply_median_filter": "-m", + "n_median_iterations": "-N", + "create_boolean": "-b", + "boolean_threshold": "-t", + "boolean_output": "-B", + "create_plot": "-p", + "plot_output": "-P", + "plot_region": "-g", + } + + self._types = { + "nside_coverage": "int", + "nside": "int", + "apply_median_filter": "bool", + "n_median_iterations": "int", + "create_boolean": "bool", + "boolean_threshold": "int", + "create_plot": "bool", + } + + self._help_strings = { + "input_file": "input file with field corners; default is {}", + "output_file": "output HealSparse map file; default is {}", + "nside_coverage": "HealSparse coverage nside; default is {}", + "nside": "HealSparse map nside; default is {}", + "apply_median_filter": "apply median filter to smooth map; default is {}", + "n_median_iterations": "number of median filter iterations; default is {}", + "create_boolean": "create boolean coverage map; default is {}", + "boolean_threshold": "threshold value for boolean map; default is {}", + "boolean_output": "output file for boolean map; default is _bool.hsp", + "create_plot": "create plot of coverage map; default is {}", + "plot_output": "output file for plot; default is .png", + "plot_region": "predefined region for plot (NGC, SGC, fullsky); default is {}", + } + + def set_params_from_command_line(self, args): + """Set Params From Command line. + + Only use when calling using python from command line. + Does not work from ipython or jupyter. + + Parameters + ---------- + args : list + command line arguments + + """ + # Read command line options + options = cs_args.parse_options( + self._params, + self._short_options, + self._types, + self._help_strings, + args=args, + ) + self._params = options + + # Save calling command + logging.log_command(args) + + def update_params(self): + """Update parameters. + + Set derived parameters based on input parameters. + """ + # Set boolean output filename if not specified + if self._params["boolean_output"] is None: + output_file = self._params["output_file"] + if output_file.endswith(".hsp"): + base = output_file[:-4] + else: + base = output_file + self._params["boolean_output"] = f"{base}_bool.hsp" + + # Set plot output filename if not specified + if self._params["plot_output"] is None: + output_file = self._params["output_file"] + if output_file.endswith(".hsp"): + base = output_file[:-4] + else: + base = output_file + self._params["plot_output"] = f"{base}.png" + + def check_params(self): + """Check parameters for validity.""" + if not exists(self._params["input_file"]): + raise FileNotFoundError( + f"Input file not found: {self._params['input_file']}" + ) + + # Check nside values are powers of 2 + nside_coverage = self._params["nside_coverage"] + nside = self._params["nside"] + + if not (nside_coverage & (nside_coverage - 1) == 0): + raise ValueError( + f"nside_coverage must be a power of 2, got {nside_coverage}" + ) + + if not (nside & (nside - 1) == 0): + raise ValueError(f"nside must be a power of 2, got {nside}") + + def median_filter(self, hsp_map): + """Median Filter. + + Apply median filter to HealSparse map using neighbors. + + Parameters + ---------- + hsp_map : healsparse.HealSparseMap + input map + + Returns + ------- + healsparse.HealSparseMap + filtered map + + """ + nside = hsp_map.nside_sparse + + new_hsp = hsp_map + 0 + pixs = hsp_map.valid_pixels + n = hpg.neighbors(nside, pixs) + n = np.where(n >= 0, n, 0) + pix2 = pixs.reshape(len(pixs), 1) + n = np.hstack([n, pix2]) + mn = hsp_map[n] + new = np.median(mn, axis=1).astype(np.uint16) + new_hsp[pixs] = new + + return new_hsp + + def run(self, args=None): + """Run. + + Main execution method. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code (0 for success) + + """ + if args is None: + args = sys.argv + + # Set parameters from command line + self.set_params_from_command_line(args) + self.update_params() + self.check_params() + + # Get parameters + input_file = self._params["input_file"] + output_file = self._params["output_file"] + nside_coverage = self._params["nside_coverage"] + nside = self._params["nside"] + apply_median_filter = self._params["apply_median_filter"] + n_median_iterations = self._params["n_median_iterations"] + create_boolean = self._params["create_boolean"] + boolean_threshold = self._params["boolean_threshold"] + boolean_output = self._params["boolean_output"] + create_plot = self._params["create_plot"] + plot_output = self._params["plot_output"] + plot_region = self._params["plot_region"] + verbose = self._params["verbose"] + + if verbose: + print(f"Reading field corners from: {input_file}") + + # Load field corner data + dtype = np.dtype( + [ + ("expnum", int), + ("tlra", float), + ("trra", float), + ("brra", float), + ("blra", float), + ("tldec", float), + ("trdec", float), + ("brdec", float), + ("bldec", float), + ] + ) + rows = np.loadtxt(input_file, dtype=dtype) + + print(f"Loaded {len(rows)} exposure field corners") + + if verbose: + print( + f"Creating HealSparse map (nside_coverage={nside_coverage}, nside={nside})" + ) + + # Create empty map + m = hsp.HealSparseMap.make_empty(nside_coverage, nside, np.uint16) + + # Add polygons for each exposure + if verbose: + print("Adding polygons to map") + + for i, r in enumerate(rows): + ra = [r["tlra"], r["trra"], r["brra"], r["blra"]] + dec = [r["tldec"], r["trdec"], r["brdec"], r["bldec"]] + poly = hsp.Polygon(ra=ra, dec=dec, value=1) + m += poly + + if verbose and i % 1000 == 0: + print(f"{i:6d} / {len(rows):6d}") + + print(f"Added {len(rows)} polygons to map") + + # Apply median filter if requested + if apply_median_filter: + if verbose: + print( + f"Applying median filter ({n_median_iterations} iterations)" + ) + + for i in range(n_median_iterations): + m = self.median_filter(m) + if verbose: + print(f" Iteration {i+1}/{n_median_iterations} complete") + + # Write main coverage map + if verbose: + print(f"Writing coverage map to: {output_file}") + + m.write(output_file, clobber=True) + print(f"Coverage map saved to {output_file}") + + # Create and write boolean map if requested + if create_boolean: + if verbose: + print( + f"Creating boolean map (threshold={boolean_threshold})" + ) + + c = hsp.HealSparseMap.make_empty(nside_coverage, nside, bool) + c[m.valid_pixels] = m[m.valid_pixels] >= boolean_threshold + + if verbose: + print(f"Writing boolean map to: {boolean_output}") + + c.write(boolean_output, clobber=True) + print(f"Boolean map saved to {boolean_output}") + + # Create plot if requested + if create_plot: + if verbose: + print(f"Creating plot of coverage map") + + try: + plotter = CoveragePlotter() + plotter.plot_coverage_map( + m, + plot_output, + region=plot_region, + vmax=boolean_threshold if create_boolean else 3, + colorbar=True, + colorbar_label="Coverage depth", + ) + print(f"Plot saved to {plot_output}") + except ImportError as e: + print(f"Warning: Could not create plot: {e}") + print("Install sp_validation package for plotting support: pip install -e /path/to/sp_validation") + + return 0 diff --git a/src/shapepipe/utilities/coverage_plotter.py b/src/shapepipe/utilities/coverage_plotter.py new file mode 100644 index 00000000..14f84fc6 --- /dev/null +++ b/src/shapepipe/utilities/coverage_plotter.py @@ -0,0 +1,343 @@ +"""COVERAGE_PLOTTER + +Plot HealSparse coverage maps. + +Author: Mike Hudson, Martin Kilbinger + +""" + +import sys +from os.path import exists + +import numpy as np +import healsparse as hsp +import matplotlib +import matplotlib.pyplot as plt + +try: + from cs_util.plots import FootprintPlotter + HAS_FOOTPRINT_PLOTTER = True +except ImportError: + HAS_FOOTPRINT_PLOTTER = False + +from cs_util import args as cs_args +from cs_util import logging + + +class CoveragePlotter(object): + """Coverage Plotter Class. + + Plots HealSparse coverage maps. + """ + + def __init__(self): + """Initialize the plotter.""" + self.params_default() + + def params_default(self): + """Set default parameters and command line options.""" + + self._params = { + "input_file": None, + "output_file": "coverage_plot.png", + "region": None, + "ralo": 100.0, + "rahi": 270.0, + "declo": 28.0, + "dechi": 85.0, + "vmin": 0, + "vmax": 3, + "cmap": "rainbow", + "draw_milky_way": True, + "colorbar": True, + "colorbar_label": "Coverage depth", + "figsize_x": 12, + "figsize_y": 8, + "dpi": 200, + } + + self._short_options = { + "input_file": "-i", + "output_file": "-o", + "region": "-g", + "ralo": "-R", + "rahi": "-r", + "declo": "-D", + "dechi": "-d", + "vmin": "-m", + "vmax": "-M", + "cmap": "-c", + "draw_milky_way": "-w", + "colorbar": "-C", + "colorbar_label": "-L", + "figsize_x": "-x", + "figsize_y": "-y", + "dpi": "-p", + } + + self._types = { + "ralo": "float", + "rahi": "float", + "declo": "float", + "dechi": "float", + "vmin": "int", + "vmax": "int", + "draw_milky_way": "bool", + "colorbar": "bool", + "figsize_x": "float", + "figsize_y": "float", + "dpi": "int", + } + + self._help_strings = { + "input_file": "input HealSparse map file; required", + "output_file": "output plot file (png, pdf, etc.); default is {}", + "region": "predefined region (NGC, SGC, fullsky); overrides RA/Dec limits if specified; default is {}", + "ralo": "minimum RA for plot extent; default is {}", + "rahi": "maximum RA for plot extent; default is {}", + "declo": "minimum Dec for plot extent; default is {}", + "dechi": "maximum Dec for plot extent; default is {}", + "vmin": "minimum value for colormap; default is {}", + "vmax": "maximum value for colormap; default is {}", + "cmap": "matplotlib colormap name; default is {}", + "draw_milky_way": "draw Milky Way outline; default is {}", + "colorbar": "add colorbar to plot; default is {}", + "colorbar_label": "colorbar label; default is {}", + "figsize_x": "figure width in inches; default is {}", + "figsize_y": "figure height in inches; default is {}", + "dpi": "figure DPI; default is {}", + } + + def set_params_from_command_line(self, args): + """Set Params From Command line. + + Only use when calling using python from command line. + Does not work from ipython or jupyter. + + Parameters + ---------- + args : list + command line arguments + + """ + # Read command line options + options = cs_args.parse_options( + self._params, + self._short_options, + self._types, + self._help_strings, + args=args, + ) + self._params = options + + # Save calling command + logging.log_command(args) + + def update_params(self): + """Update parameters. + + Set derived parameters based on input parameters. + """ + pass + + def check_params(self): + """Check parameters for validity.""" + if self._params["input_file"] is None: + raise ValueError("Input file is required (use -i or --input_file)") + + if not exists(self._params["input_file"]): + raise FileNotFoundError( + f"Input file not found: {self._params['input_file']}" + ) + + if not HAS_FOOTPRINT_PLOTTER: + raise ImportError( + "cs_util package with FootprintPlotter is required for plotting. " + "Install with: pip install -e /path/to/cs_util" + ) + + # Check region validity + region = self._params["region"] + if region is not None: + valid_regions = list(FootprintPlotter._regions.keys()) + if region not in valid_regions: + raise ValueError( + f"Invalid region '{region}'. Valid regions are: {', '.join(valid_regions)}" + ) + + def plot_coverage_map( + self, + hsp_map, + output_file, + region=None, + ralo=100.0, + rahi=270.0, + declo=28.0, + dechi=85.0, + vmin=0, + vmax=3, + title=None, + colorbar=True, + colorbar_label="Coverage depth", + figsize=(10, 10), + dpi=200, + ): + """Plot Coverage Map. + + Create a plot of the HealSparse coverage map using FootprintPlotter. + + Parameters + ---------- + hsp_map : healsparse.HealSparseMap + coverage map to plot + output_file : str + output file path + region : str, optional + predefined region name (NGC, SGC, fullsky) + ralo : float + minimum RA for plot extent (used if region is None) + rahi : float + maximum RA for plot extent (used if region is None) + declo : float + minimum Dec for plot extent (used if region is None) + dechi : float + maximum Dec for plot extent (used if region is None) + vmin : int + minimum value for colormap + vmax : int + maximum value for colormap + title : str, optional + plot title + colorbar : bool + add colorbar to plot + colorbar_label : str + colorbar label + figsize : tuple + figure size (width, height) in inches + dpi : int + figure DPI + + """ + # Set up matplotlib parameters + font = {"size": 16} + matplotlib.rc("font", **font) + matplotlib.rcParams["savefig.dpi"] = dpi + matplotlib.rcParams["figure.dpi"] = dpi + matplotlib.rcParams["figure.figsize"] = figsize + + # Create FootprintPlotter instance + plotter = FootprintPlotter( + nside_coverage=hsp_map.nside_coverage, + nside_map=hsp_map.nside_sparse + ) + + # Plot based on region or manual extent + if region is not None: + # Use predefined region + region_params = FootprintPlotter._regions[region] + plotter.plot_region( + hsp_map, + region_params, + outpath=output_file, + title=title, + colorbar=colorbar, + colorbar_label=colorbar_label + ) + else: + # Use manual RA/Dec limits + # Calculate ra_0 for projection + if ralo < rahi: + ra_0 = (ralo + rahi) / 2.0 + else: + ra_0 = 130.0 + + extend = [ralo, rahi, declo, dechi] + plotter.plot_area( + hsp_map, + ra_0=ra_0, + extend=extend, + vmax=vmax, + outpath=output_file, + title=title, + colorbar=colorbar, + colorbar_label=colorbar_label + ) + + def run(self, args=None): + """Run. + + Main execution method. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code (0 for success) + + """ + if args is None: + args = sys.argv + + # Set parameters from command line + self.set_params_from_command_line(args) + self.update_params() + self.check_params() + + # Get parameters + input_file = self._params["input_file"] + output_file = self._params["output_file"] + region = self._params["region"] + ralo = self._params["ralo"] + rahi = self._params["rahi"] + declo = self._params["declo"] + dechi = self._params["dechi"] + vmin = self._params["vmin"] + vmax = self._params["vmax"] + colorbar = self._params["colorbar"] + colorbar_label = self._params["colorbar_label"] + figsize_x = self._params["figsize_x"] + figsize_y = self._params["figsize_y"] + dpi = self._params["dpi"] + verbose = self._params["verbose"] + + if verbose: + print(f"Reading coverage map from: {input_file}") + + # Load HealSparse map + hsp_map = hsp.HealSparseMap.read(input_file) + + print(f"Loaded map with nside={hsp_map.nside_sparse}") + print( + f"Valid pixels: {len(hsp_map.valid_pixels)} / {hsp_map.nside_sparse**2 * 12}" + ) + + if verbose: + if region: + print(f"Creating plot for region: {region}") + else: + print(f"Creating plot with extent: RA=[{ralo}, {rahi}], Dec=[{declo}, {dechi}]") + + # Create plot + self.plot_coverage_map( + hsp_map, + output_file, + region=region, + ralo=ralo, + rahi=rahi, + declo=declo, + dechi=dechi, + vmin=vmin, + vmax=vmax, + colorbar=colorbar, + colorbar_label=colorbar_label, + figsize=(figsize_x, figsize_y), + dpi=dpi, + ) + + print(f"Plot saved to {output_file}") + + return 0 diff --git a/src/shapepipe/utilities/field_corners_extractor.py b/src/shapepipe/utilities/field_corners_extractor.py new file mode 100644 index 00000000..f0765a04 --- /dev/null +++ b/src/shapepipe/utilities/field_corners_extractor.py @@ -0,0 +1,405 @@ +"""FIELD_CORNERS_EXTRACTOR + +Extract field corner coordinates from FITS headers. + +Author: Mike Hudson, Martin Kilbinger + +""" + +import sys +import os +import glob +import re +from os.path import exists +from multiprocessing import Pool, cpu_count + +import numpy as np +from astropy import wcs +from astropy.io.fits import Header + +from cs_util import args as cs_args +from cs_util import logging + + +class FieldCornersExtractor(object): + """Field Corners Extractor Class. + + Extracts RA/Dec coordinates of field corners from FITS headers. + """ + + def __init__(self): + """Initialize the extractor.""" + self.params_default() + + def params_default(self): + """Set default parameters and command line options.""" + + self._params = { + "input_dir": "header", + "output_file": "exp_ra_dec.txt", + "resume": False, + "n_processes": 1, + } + + self._short_options = { + "input_dir": "-i", + "output_file": "-o", + "resume": "-r", + "n_processes": "-n", + } + + self._types = { + "resume": "bool", + "n_processes": "int", + } + + self._help_strings = { + "input_dir": "input directory containing header files; default is {}", + "output_file": "output file for field corners; default is {}", + "resume": "resume from existing output file; default is {}", + "n_processes": f"number of parallel processes (1=serial, 0=auto={cpu_count()}); default is {{}}", + } + + def set_params_from_command_line(self, args): + """Set Params From Command line. + + Only use when calling using python from command line. + Does not work from ipython or jupyter. + + Parameters + ---------- + args : list + command line arguments + + """ + # Read command line options + options = cs_args.parse_options( + self._params, + self._short_options, + self._types, + self._help_strings, + args=args, + ) + self._params = options + + # Save calling command + logging.log_command(args) + + def update_params(self): + """Update parameters. + + Set derived parameters based on input parameters. + """ + # Ensure input directory ends without trailing slash + if self._params["input_dir"].endswith("/"): + self._params["input_dir"] = self._params["input_dir"][:-1] + + def check_params(self): + """Check parameters for validity.""" + if not exists(self._params["input_dir"]): + raise FileNotFoundError( + f"Input directory not found: {self._params['input_dir']}" + ) + + # Set n_processes to cpu_count if 0 + if self._params["n_processes"] == 0: + self._params["n_processes"] = cpu_count() + + if self._params["n_processes"] < 0: + raise ValueError( + f"n_processes must be >= 0, got {self._params['n_processes']}" + ) + + @staticmethod + def process_single_header(path_and_verbose): + """Process Single Header. + + Worker function to process a single header file. + Static method so it can be pickled for multiprocessing. + + Parameters + ---------- + path_and_verbose : tuple + (path, verbose) where path is header file path and verbose is bool + + Returns + ------- + tuple or None + (expnum, ra_list, dec_list) on success, None on failure + + """ + path, verbose = path_and_verbose + + # Extract exposure number from filename + end = path.find(".txt") + expnum = int(path[end - 6 : end]) + + try: + # Parse header and extract WCS + with open(path, "r") as f: + string = f.read() + tokens = re.split(r"^(END\s+)", string, flags=re.MULTILINE) + n = len(tokens) + w = [] + for i in range(2, n - 1, 2): + h1 = Header.fromstring(tokens[i] + tokens[i + 1], sep="\n") + w.append(wcs.WCS(h1)) + + # Calculate field corners + tl = w[0].pixel_to_world(2079, 0) + tr = w[8].pixel_to_world(32, 0) + br = w[35].pixel_to_world(2079, 0) + bl = w[27].pixel_to_world(32, 0) + + ra = [tl.ra.deg, tr.ra.deg, br.ra.deg, bl.ra.deg] + dec = [tl.dec.deg, tr.dec.deg, br.dec.deg, bl.dec.deg] + + return (expnum, ra, dec) + + except Exception as e: + if verbose: + print(f"Failed to process {expnum}: {e}") + return None + + def get_wcs_from_header(self, ftext): + """Get WCS From Header. + + Parse header text file and extract WCS for all HDUs. + + Parameters + ---------- + ftext : str + path to header text file + + Returns + ------- + list or None + list of WCS objects, one per HDU, or None if parsing failed + + """ + try: + with open(ftext, "r") as f: + string = f.read() + tokens = re.split(r"^(END\s+)", string, flags=re.MULTILINE) + n = len(tokens) + h = [] + w = [] + for i in range(2, n - 1, 2): + h1 = Header.fromstring(tokens[i] + tokens[i + 1], sep="\n") + h.append(h1) + w.append(wcs.WCS(h1)) + return w + except Exception as e: + if self._params["verbose"]: + print(f"Problem opening {ftext}: {e}") + return None + + def get_megacam_field(self, w): + """Get MegaCam Field. + + Calculate RA/Dec of the 4 corners of a MegaCam field. + + Parameters + ---------- + w : list + list of WCS objects for all CCDs + + Returns + ------- + tuple + (ra_list, dec_list) where each is a list of 4 corner coordinates + + """ + # MegaCam field corners from specific chips and pixels + # Note: chips are flipped, so bottom right becomes top left, etc. + tl = w[0].pixel_to_world(2079, 0) # top left + tr = w[8].pixel_to_world(32, 0) # top right + br = w[35].pixel_to_world(2079, 0) # bottom right + bl = w[27].pixel_to_world(32, 0) # bottom left + + return ( + [tl.ra.deg, tr.ra.deg, br.ra.deg, bl.ra.deg], + [tl.dec.deg, tr.dec.deg, br.dec.deg, bl.dec.deg], + ) + + def get_done_exposures(self): + """Get Done Exposures. + + Read exposures already processed from output file. + + Returns + ------- + set + set of exposure numbers already processed + + """ + output_file = self._params["output_file"] + + if not exists(output_file): + return set() + + try: + done = np.loadtxt(output_file, usecols=(0), dtype=int) + return set(done) + except Exception as e: + if self._params["verbose"]: + print(f"Could not read existing output file: {e}") + return set() + + def run(self, args=None): + """Run. + + Main execution method. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code (0 for success) + + """ + if args is None: + args = sys.argv + + # Set parameters from command line + self.set_params_from_command_line(args) + self.update_params() + self.check_params() + + # Get parameters + input_dir = self._params["input_dir"] + output_file = self._params["output_file"] + resume = self._params["resume"] + verbose = self._params["verbose"] + + # Find all header files + paths = glob.glob(f"{input_dir}/*.txt") + n = len(paths) + + if n == 0: + print(f"No header files found in {input_dir}/") + return 1 + + print(f"{n} header files found") + + # Get list of already processed exposures if resuming + done = set() + if resume: + done = self.get_done_exposures() + print(f"{len(done)} already done") + + # Build todo list + todo = [] + for p in paths: + end = p.find(".txt") + expnum = int(p[end - 6 : end]) + if expnum not in done: + todo.append(p) + + n_todo = len(todo) + print(f"{n_todo} to process") + + if n_todo == 0: + print("Nothing to do") + return 0 + + # Get n_processes + n_processes = self._params["n_processes"] + + # Process headers + if n_processes == 1: + # Serial processing + results = self._process_serial(todo, verbose) + else: + # Parallel processing + print(f"Using {n_processes} parallel processes") + results = self._process_parallel(todo, n_processes, verbose) + + # Filter out failed results (None values) + results = [r for r in results if r is not None] + + # Sort results by exposure number + results.sort(key=lambda x: x[0]) + + # Write results to file + mode = "a" if resume else "w" + with open(output_file, mode, buffering=1) as f: + for expnum, ra, dec in results: + f.write(f"{expnum:6d} ") + np.savetxt(f, ra, fmt="%9.5f", newline=" ") + np.savetxt(f, dec, fmt="%9.5f", newline=" ") + f.write("\n") + + n_success = len(results) + n_failed = n_todo - n_success + + print(f"Processed {n_success} exposures") + if n_failed > 0: + print(f"Failed to process {n_failed} exposures") + + print(f"Results written to {output_file}") + + return 0 + + def _process_serial(self, todo, verbose): + """Process Serial. + + Process headers serially. + + Parameters + ---------- + todo : list + list of header file paths to process + verbose : bool + verbose output + + Returns + ------- + list + list of results (expnum, ra, dec) tuples + + """ + results = [] + n_todo = len(todo) + + for i, p in enumerate(todo): + result = self.process_single_header((p, verbose)) + results.append(result) + + if verbose and i % 100 == 0: + print(f"{i:6d} / {n_todo:6d}") + + return results + + def _process_parallel(self, todo, n_processes, verbose): + """Process Parallel. + + Process headers in parallel using multiprocessing. + + Parameters + ---------- + todo : list + list of header file paths to process + n_processes : int + number of parallel processes + verbose : bool + verbose output + + Returns + ------- + list + list of results (expnum, ra, dec) tuples + + """ + # Prepare arguments for worker function + args = [(p, verbose) for p in todo] + + # Create pool and process + with Pool(processes=n_processes) as pool: + results = pool.map(self.process_single_header, args) + + return results diff --git a/src/shapepipe/utilities/header_downloader.py b/src/shapepipe/utilities/header_downloader.py new file mode 100644 index 00000000..e7cb70e6 --- /dev/null +++ b/src/shapepipe/utilities/header_downloader.py @@ -0,0 +1,247 @@ +"""HEADER_DOWNLOADER + +Download FITS headers from VOSpace for exposures listed in a CCD file. + +Author: Mike Hudson, Martin Kilbinger + +""" + +import sys +import os +from os.path import exists + +import numpy as np +import vos + +from cs_util import args as cs_args +from cs_util import logging + + +class HeaderDownloader(object): + """Header Downloader Class. + + Downloads FITS headers from VOSpace for exposures in a CCD list. + """ + + def __init__(self): + """Initialize the downloader.""" + self.params_default() + + def params_default(self): + """Set default parameters and command line options.""" + + self._params = { + "input_file": None, + "output_dir": "header", + "vospace_path": "vos:cfis/pitcairn", + "overwrite": False, + } + + self._short_options = { + "input_file": "-i", + "output_dir": "-o", + "vospace_path": "-p", + "overwrite": "-O", + } + + self._types = { + "overwrite": "bool", + } + + self._help_strings = { + "input_file": "input CCD list file (txt or csv); required", + "output_dir": "output directory for headers; default is {}", + "vospace_path": "VOSpace base path; default is {}", + "overwrite": "overwrite existing header files; default is {}", + } + + def set_params_from_command_line(self, args): + """Set Params From Command line. + + Only use when calling using python from command line. + Does not work from ipython or jupyter. + + Parameters + ---------- + args : list + command line arguments + + """ + # Read command line options + options = cs_args.parse_options( + self._params, + self._short_options, + self._types, + self._help_strings, + args=args, + ) + self._params = options + + # Save calling command + logging.log_command(args) + + def update_params(self): + """Update parameters. + + Set derived parameters based on input parameters. + """ + # Ensure output directory ends without trailing slash + if self._params["output_dir"].endswith("/"): + self._params["output_dir"] = self._params["output_dir"][:-1] + + def check_params(self): + """Check parameters for validity.""" + if self._params["input_file"] is None: + raise ValueError("Input file is required (use -i or --input_file)") + + if not exists(self._params["input_file"]): + raise FileNotFoundError( + f"Input file not found: {self._params['input_file']}" + ) + + # Create output directory if it doesn't exist + if not exists(self._params["output_dir"]): + os.makedirs(self._params["output_dir"]) + if self._params["verbose"]: + print(f"Created output directory: {self._params['output_dir']}") + + def get_exposures(self, ccd_list_file): + """Get Exposures. + + Extract unique exposure numbers from CCD list file. + + Parameters + ---------- + ccd_list_file : str + path to CCD list file (txt or csv) + + Returns + ------- + np.array + unique exposure numbers + + """ + exps = [] + + # Check if CSV format + if ccd_list_file.endswith(".csv"): + import astropy.table + + t = astropy.table.Table.read(ccd_list_file) + expccd = t["CCD"].data + r = np.char.split(expccd, sep="-") + + for i, r1 in enumerate(r): + exp = int(r1[0]) + exps.append(exp) + else: + # Text format + f = np.loadtxt(ccd_list_file, dtype="str", encoding="ascii") + r = np.char.split(f, sep="-") + + for i, r1 in enumerate(r): + exp = int(r1[0]) + exps.append(exp) + + exps = np.array(exps) + uniq = np.unique(exps) + + return uniq + + def get_fits_header(self, expnum, client): + """Get FITS Header. + + Download FITS header from VOSpace. + + Parameters + ---------- + expnum : int + exposure number + client : vos.Client + VOSpace client + + Returns + ------- + bool + True if successful, False otherwise + + """ + vospace_path = self._params["vospace_path"] + output_dir = self._params["output_dir"] + overwrite = self._params["overwrite"] + + source = f"{vospace_path}/{expnum:d}p.fits.fz" + dest = f"{output_dir}/{expnum:d}.txt" + + if exists(dest) and not overwrite: + return True + + try: + client.copy(source, dest, head=True) + return True + except Exception as e: + print(f"Could not copy {source}: {e}") + return False + + def run(self, args=None): + """Run. + + Main execution method. + + Parameters + ---------- + args : list, optional + command line arguments + + Returns + ------- + int + exit code (0 for success) + + """ + if args is None: + args = sys.argv + + # Set parameters from command line + self.set_params_from_command_line(args) + self.update_params() + self.check_params() + + # Get parameters + input_file = self._params["input_file"] + output_dir = self._params["output_dir"] + verbose = self._params["verbose"] + + if verbose: + print(f"Reading CCD list from: {input_file}") + + # Extract unique exposures + exps = self.get_exposures(input_file) + + print(f"Found {len(exps)} unique exposures") + + if verbose: + print(f"Downloading headers to: {output_dir}/") + + # Initialize VOSpace client once + client = vos.Client() + + # Download headers + n_success = 0 + n_failed = 0 + + for i, exp in enumerate(exps): + success = self.get_fits_header(exp, client) + if success: + n_success += 1 + else: + n_failed += 1 + + if verbose and i % 100 == 0: + print(f"{i:6d} / {len(exps):6d}") + + print(f"Downloaded {n_success} headers") + if n_failed > 0: + print(f"Failed to download {n_failed} headers") + + return 0