Source code for potpyri.primitives.solve_wcs

"""WCS solution using astrometry.net and Gaia DR3 alignment.

Provides coarse solution via solve-field and fine alignment by matching
detected sources to Gaia. Writes RADISP/DEDISP and updated WCS to FITS.
Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__

import numpy as np
import subprocess
import os
import progressbar
import requests

import matplotlib.pyplot as plt

from photutils.aperture import ApertureStats
from photutils.aperture import CircularAperture

from astroquery.vizier import Vizier

import astropy.units as u
from astropy.stats import sigma_clipped_stats
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.coordinates import match_coordinates_sky
from astropy.table import Column
from astropy.table import Table
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import WCS
from astropy.wcs.utils import fit_wcs_from_points

# Internal dependency
from potpyri.utils import utilities
from potpyri.primitives import photometry

#turn Astropy warnings off
import warnings
warnings.simplefilter('ignore', category=AstropyWarning)

def _log_gaia(log, level, message):
    """Small helper to keep Gaia-stage logging consistent."""
    if log:
        getattr(log, level)(message)
    else:
        print(message)

def _write_gaia_fallback_header(hdu, file, reason, log=None, disp_arcsec=1.0):
    """Write fallback astrometric dispersion values when Gaia refinement fails.

    This preserves a coarse astrometry.net WCS solution while making the
    failure mode explicit in the header and logs.
    """
    _log_gaia(log, 'warning',
        f'Gaia alignment fallback for {file}: {reason}. '
        f'Keeping coarse astrometry.net WCS.')
    hdu[0].header['RADISP'] = (disp_arcsec, 'Dispersion in R.A. of WCS [Arcsec]')
    hdu[0].header['DEDISP'] = (disp_arcsec, 'Dispersion in Decl. of WCS [Arcsec]')
    hdu[0].header['GAIAFAIL'] = (reason[:32], 'Gaia fallback reason')
    hdu.writeto(file, overwrite=True, output_verify='silentfix')

def _validate_refined_wcs(w, header, tel, log=None):
    """Validate refined WCS against expected pixel scale and anisotropy.

    Returns
    -------
    tuple
        (is_valid, reason_string)
    """
    try:
        m = np.asarray(w.pixel_scale_matrix, dtype=float)
        _, s, _ = np.linalg.svd(m)
        # singular-value pixel scales in arcsec/pixel
        scales_arcsec = s * 3600.0
        max_scale = float(np.max(scales_arcsec))
        min_scale = float(np.min(scales_arcsec))
        aniso = float(max_scale / max(min_scale, 1e-12))
        det = float(np.linalg.det(m))
    except Exception:
        return (False, 'could not compute WCS pixel-scale matrix diagnostics')

    # Expected scale from instrument + binning in header
    try:
        binn = tel.get_binning(header)
        expected = float(tel.get_pixscale() * int(str(binn)[0]))
    except Exception:
        expected = None

    _log_gaia(log, 'info',
        f'Gaia WCS diagnostics: scales={scales_arcsec}, anisotropy={aniso:.3f}, det={det:.3e}, expected_scale={expected}')

    # Basic physical sanity
    if min_scale <= 0.0 or max_scale <= 0.0:
        return (False, 'non-positive WCS pixel scale')
    if aniso > 3.0:
        return (False, f'excessive WCS anisotropy ({aniso:.2f} > 3.0)')
    if expected is not None:
        if min_scale < 0.5 * expected:
            return (False, f'WCS min scale too small ({min_scale:.4f} < {0.5*expected:.4f} arcsec/pix)')
        if max_scale > 1.5 * expected:
            return (False, f'WCS max scale too large ({max_scale:.4f} > {1.5*expected:.4f} arcsec/pix)')

    return (True, '')

[docs] def get_gaia_catalog(input_file, log=None): """Query Gaia DR3 for stars in the field; return filtered catalog table. Uses header RA/DEC to define field; filters by PSS, Plx, PM. Parameters ---------- input_file : str Path to FITS file; header used for CRVAL1/CRVAL2 or RA/DEC. log : ColoredLogger, optional Logger for progress and errors. Returns ------- astropy.table.Table or None or False Filtered Gaia DR3 table (PSS>0.99, Plx<20, PM<10), or None if empty, or False if coordinates could not be parsed. Raises ------ Exception If Gaia query fails after retries. """ with fits.open(input_file) as hdu: data = hdu[0].data header = hdu[0].header hkeys = list(header.keys()) check_pairs = [('CRVAL1','CRVAL2'),('RA','DEC'),('OBJCTRA','OBJCTDEC')] coord = None for pair in check_pairs: if pair[0] in header.keys() and pair[1] in header.keys(): ra = header[pair[0]] dec = header[pair[1]] coord = utilities.parse_coord(ra, dec) if coord: break if not coord: if log: log.error(f'Could not parse RA/DEC from header of {input_file}') return(False) vizier = Vizier(columns=['RA_ICRS', 'DE_ICRS', 'Plx', 'PSS','PM']) vizier.ROW_LIMIT = -1 tries = 0 cat = None while tries<4: try: cat = vizier.query_region(coord, width=20 * u.arcmin, catalog='I/355/gaiadr3') if cat is None: if log: log.error(f'Gaia did not return catalog. Try #{tries+1}') else: break except requests.exceptions.ReadTimeout: if log: log.error(f'Gaia catalog timeout. Try #{tries+1}') tries += 1 if cat is None: raise Exception('ERROR: could not get Gaia catalog') if len(cat)>0: cat = cat[0] else: return(None) mask = (cat['PSS'] > 0.99) & (cat['Plx']<20) & (cat['PM']<10) cat = cat[mask] return(cat)
[docs] def clean_up_astrometry(directory, file, exten): """Remove astrometry.net temporary files (.axy, .corr, .solved, .wcs, etc.). Parameters ---------- directory : str Directory containing the files. file : str Base filename (with extension); exten is replaced to form temp names. exten : str File extension (e.g. '.fits') to replace with .axy, .corr, etc. Returns ------- None """ filelist = [file.replace(exten,'.axy'), file.replace(exten,'.corr'), file.replace(exten,'-indx.xyls'), file.replace(exten,'.match'), file.replace(exten,'.rdls'), file.replace(exten,'.solved'), file.replace(exten,'.wcs')] filelist = [os.path.join(directory, f) for f in filelist] for f in filelist: if os.path.exists(f): os.remove(f)
[docs] def solve_astrometry(file, tel, binn, paths, radius=0.5, replace=True, shift_only=False, index=None, log=None): """Run astrometry.net (solve-field) to get coarse WCS; optionally use custom index. Parameters ---------- file : str Path to FITS file to solve. tel : Instrument Instrument instance (name, get_pixscale). binn : str Binning string (e.g. '22') for pixel scale. paths : dict Paths dict (source_extractor key for solve-field). radius : float, optional Search radius in degrees. Default is 0.5. replace : bool, optional If True, write WCS into original file; else write .solved.fits. shift_only : bool, optional If True, only update CRPIX/CRVAL. Default is False. index : str, optional Path to custom index file or directory for solve-field. log : ColoredLogger, optional Logger for progress. Returns ------- bool True if solve succeeded, False otherwise. """ # Starting solve, print file and directory for reference fullfile = os.path.abspath(file) directory = os.path.dirname(file) if log: log.info(f'Trying to solve file: {file}') else: print(f'Trying to solve file: {file}') if not os.path.exists(fullfile): return(False) with fits.open(fullfile) as hdu: data = hdu[0].data header = hdu[0].header hkeys = list(header.keys()) exten = '.'+file.split('.')[-1] if not replace: if os.path.exists(fullfile.replace(exten,'.solved.fits')): if log: log.info(f'SUCCESS: solved {fullfile}') else: print(f'SUCCESS: solved {fullfile}') return(True) exten = '.'+file.split('.')[-1] if tel.name.upper=='BINOSPEC': check_pairs = [('CRVAL1','CRVAL2'),('RA','DEC'),('OBJCTRA','OBJCTDEC')] else: check_pairs = [('RA','DEC'),('OBJCTRA','OBJCTDEC'),('TARGRA','TARGDEC'), ('CRVAL1','CRVAL2')] coord = None for pair in check_pairs: if pair[0] in header.keys() and pair[1] in header.keys(): ra = header[pair[0]] dec = header[pair[1]] coord = utilities.parse_coord(ra, dec) if log: log.info(f'Got coord={coord} for RA key {pair[0]}, DEC key {pair[1]}') if log: log.info(f'RA value is {ra}, DEC value is {dec}') if coord: break if not coord: if log: log.error(f'Could not parse RA/DEC from header of {file}') return(False) if 'solved.fits' in fullfile: newfile = 'tmp.fits' else: newfile = fullfile.replace(exten,'.solved.fits') # Handle pixel scale guess scale = tel.get_pixscale() * int(binn[0]) if scale is not None: scale = float(scale) scale_high = float('%.4f'%(scale * 1.2)) scale_low = float('%.4f'%(scale * 0.8)) else: scale_high = 5.0 scale_low = 0.05 # Get RA and Dec ra = float('%.6f'%coord.ra.degree) dec = float('%.6f'%coord.dec.degree) cmd = 'solve-field' args = '--scale-units arcsecperpix ' args += f'--scale-low {scale_low} --scale-high {scale_high} ' args += f'--ra {ra} --dec {dec} ' args += f' --radius {radius} --no-plots -T ' args += f'--overwrite -N {newfile} --dir {directory} ' if index and os.path.exists(index): if os.path.isfile(index): args += f'--index-file {index} ' elif os.path.isdir(index): args += f'--index-dir {index} ' # Test for --use-source-extractor flag p = subprocess.run(['solve-field','-h'],capture_output=True) data = p.stdout.decode().lower() if '--use-source-extractor' in data: source_extractor_path = paths['source_extractor'] args += '--use-source-extractor ' args += f'--source-extractor-path {source_extractor_path} ' elif '--use-sextractor' in data: args += '--use-sextractor ' extra_opts = '--downsample 2 --no-verify --odds-to-tune-up 1e4 --objs 15' tries = 1 good = False while tries < 6 and not good: input_args = f'{args} {extra_opts}' if log: log.info(f'Try #{tries} with astrometry.net...') else: print(f'Try #{tries} with astrometry.net...') process = [cmd,fullfile]+input_args.split() if log: log.info(' '.join(process)) else: print(' '.join(process)) with open(os.devnull, 'w') as FNULL: p = subprocess.Popen(process, stdout=FNULL, stderr=subprocess.STDOUT) try: p.wait(90) except subprocess.TimeoutExpired: p.kill() if os.path.exists(newfile): good = True else: tries += 1 if tries==2: extra_opts='--objs 15 --no-verify' elif tries==3: extra_opts='--no-verify' elif tries==4: # Try without source extractor extra_opts='--no-verify' args = '--scale-units arcsecperpix ' args += f'--scale-low {scale_low} --scale-high {scale_high} ' args += f'--no-plots -T ' args += f'--overwrite -N {newfile} --dir {directory} ' args += f'--ra {ra} --dec {dec} ' args += f' --radius {radius} ' elif tries==5: # Try with no constraint on RA/Dec args = '--scale-units arcsecperpix ' args += f'--scale-low {scale_low} --scale-high {scale_high} ' args += f'--no-plots -T ' args += f'--overwrite -N {newfile} --dir {directory} ' extra_opts='--no-verify' file_exists=os.path.exists(newfile) if log: log.info(f'{newfile} exists: {file_exists}') else: print(f'{newfile} exists: {file_exists}') if os.path.exists(newfile): clean_up_astrometry(directory, file, exten) if log: log.info(f'SUCCESS: solved {fullfile}') else: print(f'SUCCESS: solved {fullfile}') if replace or newfile=='tmp.fits': output_file = fullfile # astrometry.net replaces extra extensions, so instead of renaming # copy the new header into the old file with fits.open(newfile) as newhdu: with fits.open(output_file) as hdu: if shift_only: hdu[0].header['CRPIX1'] = newhdu[0].header['CRPIX1'] hdu[0].header['CRPIX2'] = newhdu[0].header['CRPIX2'] hdu[0].header['CRVAL1'] = newhdu[0].header['CRVAL1'] hdu[0].header['CRVAL2'] = newhdu[0].header['CRVAL2'] else: hdu[0].header = newhdu[0].header hdu.writeto(output_file, overwrite=True, output_verify='silentfix') os.remove(newfile) else: output_file = fullfile.replace(exten,'.solved.fits') with fits.open(output_file) as hdu: for i,h in enumerate(hdu): if 'COMMENT' in hdu[i].header.keys(): del hdu[i].header['COMMENT'] if 'HISTORY' in hdu[i].header.keys(): del hdu[i].header['HISTORY'] # Delete other WCS header keys wcs_key = ['CSYER1','CSYER2','CRDER1','CRDER2','CD1_1','CD1_2', 'CD2_1','CD2_2','CRPIX1','CRPIX2','CUNIT1','CUNIT2','EQUINOX', 'RADESYS','CNAME1','CNAME2','CTYPE1','CTYPE2','WCSNAME', 'CRVAL1','CRVAL2'] for key in [w + 'C' for w in wcs_key]: if key in list(hdu[i].header.keys()): del hdu[i].header[key] for key in [w + 'S' for w in wcs_key]: if key in list(hdu[i].header.keys()): del hdu[i].header[key] # Delete old WCS keywords starting with '_' for key in list(hdu[i].header.keys()): if key.startswith('_'): del hdu[i].header[key] try: hdu.writeto(output_file, overwrite=True, output_verify='silentfix') except TypeError: if log: log.error(f'FAILURE: could not save file {fullfile}') else: print(f'FAILURE: could not save file {fullfile}') return(False) return(True) else: if log: log.error(f'FAILURE: did not solve {fullfile}') else: print(f'FAILURE: did not solve {fullfile}') clean_up_astrometry(directory, file, exten) return(False)
[docs] def align_to_gaia(file, tel, radius=0.5, max_search_radius=5.0*u.arcsec, save_centroids=False, min_gaia_match=7, log=None): """Refine WCS by matching detected sources to Gaia DR3 and fitting WCS. Parameters ---------- file : str Path to FITS file with WCS (e.g. after solve_astrometry). tel : Instrument Instrument instance. radius : float, optional Gaia query radius in degrees. Default is 0.5. max_search_radius : astropy.units.Quantity, optional Match radius for source-Gaia matching. Default is 5 arcsec. save_centroids : bool, optional If True, save centroid table. Default is False. min_gaia_match : int, optional Minimum number of Gaia matches required. Default is 7. log : ColoredLogger, optional Logger for progress. Returns ------- bool True if alignment succeeded and WCS was updated, False otherwise. """ cat = get_gaia_catalog(file, log=log) with fits.open(file) as hdu: header = hdu[0].header if cat is False: _write_gaia_fallback_header(hdu, file, 'could not parse coordinate keywords', log=log) return(True) if cat is None or len(cat)==0: _write_gaia_fallback_header(hdu, file, 'Gaia query returned no alignment stars', log=log) return(True) _log_gaia(log, 'info', f'Got {len(cat)} Gaia DR3 alignment stars') # Estimate sources that land within the image using WCS from header w = WCS(header, relax=True) naxis1 = header['NAXIS1'] naxis2 = header['NAXIS2'] # Get pixel coordinates of all stars in image gaia_coords = SkyCoord(cat['RA_ICRS'], cat['DE_ICRS'], unit=(u.deg, u.deg)) x_pix, y_pix = w.world_to_pixel(gaia_coords) # Mask to stars that are nominally in image mask = (x_pix > 0) & (x_pix < naxis1) & (y_pix > 0) & (y_pix < naxis2) x_pix = x_pix[mask] ; y_pix = y_pix[mask] # Also mask catalog and coordinates cat = cat[mask] gaia_coords = gaia_coords[mask] if cat is None or len(cat)==0: _write_gaia_fallback_header(hdu, file, 'no Gaia stars project into image bounds', log=log) return(True) _log_gaia(log, 'info', f'Gaia stars projected into image bounds: {len(cat)} (of {len(mask)})') if len(cat)<min_gaia_match: _write_gaia_fallback_header(hdu, file, f'too few in-frame Gaia stars ({len(cat)} < {min_gaia_match})', log=log) return(True) sky_cat = photometry.run_sextractor(file, log=log) if sky_cat is None or len(sky_cat) == 0: _write_gaia_fallback_header(hdu, file, 'Source Extractor returned no sources for Gaia alignment', log=log) return(True) _log_gaia(log, 'info', f'SExtractor sources available for matching: {len(sky_cat)}') # Get central coordinate of image based on centroiding central_pix = (float(naxis1)/2., float(naxis2)/2.) central_coo = w.pixel_to_world(*central_pix) _log_gaia(log, 'info', 'Converging on fine WCS solution') bar = progressbar.ProgressBar(maxval=10) bar.start() succeeded = False cat_coords_match = None best_n_match = 0 best_iter = -1 n_iter_no_match = 0 for i in np.arange(10): bar.update(i+1) cat_coords = w.pixel_to_world(sky_cat['X_IMAGE'], sky_cat['Y_IMAGE']) idx, d2d, d3d = match_coordinates_sky(gaia_coords, cat_coords) sep_mask = d2d < max_search_radius n_match = int(np.sum(sep_mask)) if n_match > best_n_match: best_n_match = n_match best_iter = int(i + 1) if n_match == 0: n_iter_no_match += 1 continue sky_coords_match = gaia_coords[sep_mask] cat_coords_match = sky_cat[idx[sep_mask]] sip_degree = 0 if len(cat_coords_match)>100: sip_degree = 2 if len(cat_coords_match)>500: sip_degree = 4 if len(cat_coords_match)>2000: sip_degree = 6 # Make a fresh table each iteration; avoid duplicate-column failures cat_coords_match = Table(cat_coords_match) cat_coords_match['RA_ICRS'] = sky_coords_match.ra.degree cat_coords_match['DE_ICRS'] = sky_coords_match.dec.degree fit_coords = SkyCoord(cat_coords_match['RA_ICRS'], cat_coords_match['DE_ICRS'], unit=(u.deg, u.deg)) xy = (cat_coords_match['X_IMAGE'], cat_coords_match['Y_IMAGE']) central_coo = w.pixel_to_world(*central_pix) try: with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=RuntimeWarning, message=".*cdelt will be ignored since cd is present.*") w = fit_wcs_from_points(xy, fit_coords, proj_point=central_coo, sip_degree=sip_degree) succeeded = True except ValueError: # Try with next iteration continue bar.finish() _log_gaia(log, 'info', f'Gaia match summary for {file}: ' f'best matches={best_n_match} at iter={best_iter}, ' f'iters with zero matches={n_iter_no_match}/10, ' f'min required={min_gaia_match}, match radius={max_search_radius}.') if not succeeded: _write_gaia_fallback_header(hdu, file, 'fine WCS fit failed during iterative Gaia matching', log=log) return(True) _log_gaia(log, 'info', f'Solved with SIP degree = {sip_degree}') if cat_coords_match is None or len(cat_coords_match)==0: _write_gaia_fallback_header(hdu, file, 'zero matched Gaia/SExtractor sources after iterations', log=log) return(True) else: _log_gaia(log, 'info', f'Matched to {len(cat_coords_match)} coordinates') if len(cat_coords_match)<min_gaia_match: _write_gaia_fallback_header(hdu, file, f'too few matched stars ({len(cat_coords_match)} < {min_gaia_match})', log=log) return(True) # If matching collapsed on most iterations and only barely met threshold, # this fit is often unstable; prefer coarse astrometry in that case. if n_iter_no_match >= 8 and len(cat_coords_match) <= (min_gaia_match + 1): _write_gaia_fallback_header(hdu, file, f'unstable Gaia matching (zero-match iters={n_iter_no_match}/10, matched={len(cat_coords_match)})', log=log) return(True) # Bootstrap WCS solution and get center pixel ra_cent = [] ; de_cent = [] coords = SkyCoord(cat_coords_match['RA_ICRS'], cat_coords_match['DE_ICRS'], unit=(u.deg, u.deg)) bar = progressbar.ProgressBar(maxval=1000) bar.start() idx = np.arange(len(cat_coords_match)) for i in np.arange(1000): bar.update(i+1) if len(idx)>200: size = 100 else: size = int(len(idx)/2) idxs = np.random.choice(idx, size=size, replace=False) xy = (cat_coords_match['X_IMAGE'][idxs], cat_coords_match['Y_IMAGE'][idxs]) c = coords[idxs] central_coo = w.pixel_to_world(*central_pix) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=RuntimeWarning, message=".*cdelt will be ignored since cd is present.*") new_wcs = fit_wcs_from_points(xy, c, projection=w) cent_coord = new_wcs.pixel_to_world(*central_pix) ra_cent.append(cent_coord.ra.degree) de_cent.append(cent_coord.dec.degree) bar.finish() ra_cent = np.array(ra_cent) de_cent = np.array(de_cent) # Estimate systematic precision of new WCS mean_ra, med_ra, std_ra = sigma_clipped_stats(ra_cent, maxiters=11, sigma=3) mean_de, med_de, std_de = sigma_clipped_stats(de_cent, maxiters=11, sigma=3) mask = (np.abs(ra_cent-med_ra)<3*std_ra) & (np.abs(de_cent-med_de)<3*std_de) # Create a scatter plot of the centroid values to validate the astrometry if save_centroids: fig, ax = plt.subplots() ax.scatter(ra_cent[mask], de_cent[mask]) plt.savefig(file.replace('.fits','_centroids.png')) header = w.to_header() header['CRPIX1']=central_pix[0] header['CRPIX2']=central_pix[1] header['CRVAL1']=med_ra header['CRVAL2']=med_de header['CUNIT1']='deg' header['CUNIT2']='deg' if sip_degree>0: header['CTYPE1']='RA---TAN-SIP' header['CTYPE2']='DEC--TAN-SIP' for kw, val in w._write_sip_kw().items(): header[kw] = val # Validate refined WCS; if pathological, fall back to coarse solution. is_valid, reason = _validate_refined_wcs(w, hdu[0].header, tel, log=log) if not is_valid: _write_gaia_fallback_header(hdu, file, f'pathological refined WCS: {reason}', log=log) return(True) # Delete all old WCS keys delkeys = ['WCSNAME','CUNIT1','CUNIT2','CTYPE1','CTYPE2','CRPIX1','CRPIX2', 'CRVAL1','CRVAL2','CD1_1','CD1_2','CD2_1','CD2_2','RADECSYS', 'PC1_1','PC1_2','PC2_1','PC2_2','LONPOLE','LATPOLE','CDELT1','CDELT2'] while True: found_key = False for key in hdu[0].header.keys(): if any([k in key for k in delkeys]): found_key=True del hdu[0].header[key] if not found_key: break hdu[0].header.update(header) ra_disp = std_ra*3600.0*np.cos(np.pi/180.0 * mean_de) de_disp = std_de*3600.0 ra_disp = float('%.6f'%ra_disp) de_disp = float('%.6f'%de_disp) _log_gaia(log, 'info', f'R.A. dispersion ={ra_disp}\", Decl. dispersion={de_disp}\"') # Add header variables for dispersion in WCS solution hdu[0].header['RADISP']=(ra_disp, 'Dispersion in R.A. of WCS [Arcsec]') hdu[0].header['DEDISP']=(de_disp, 'Dispersion in Decl. of WCS [Arcsec]') if 'GAIAFAIL' in hdu[0].header: del hdu[0].header['GAIAFAIL'] hdu.writeto(file, overwrite=True, output_verify='silentfix') return(True)