"""WCS solution using astrometry.net and Gaia DR3 alignment.
Provides coarse solution via solve-field and fine alignment by matching
detected sources to Gaia. Writes RADISP/DEDISP and updated WCS to FITS.
Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__
import numpy as np
import subprocess
import os
import progressbar
import matplotlib.pyplot as plt
from photutils.aperture import ApertureStats
from photutils.aperture import CircularAperture
import astropy.units as u
from astropy.stats import sigma_clipped_stats
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.coordinates import match_coordinates_sky
from astropy.table import Column
from astropy.table import Table
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import WCS
from astropy.wcs.utils import fit_wcs_from_points
# Internal dependencies
from potpyri.utils import catalogs
from potpyri.utils import utilities
from potpyri.primitives import photometry
#turn Astropy warnings off
import warnings
warnings.simplefilter('ignore', category=AstropyWarning)
def _log_fine_align(log, level, message):
"""Small helper to keep fine-alignment logging consistent."""
if log:
getattr(log, level)(message)
else:
print(message)
def _write_fine_align_fallback_header(hdu, file, reason, log=None, disp_arcsec=1.0):
"""Write fallback astrometric dispersion when fine catalog alignment fails.
Preserves the coarse astrometry.net WCS while recording the failure in the
header and logs.
"""
_log_fine_align(log, 'warning',
f'Fine alignment fallback for {file}: {reason}. '
f'Keeping coarse astrometry.net WCS.')
hdu[0].header['RADISP'] = (disp_arcsec, 'Dispersion in R.A. of WCS [Arcsec]')
hdu[0].header['DEDISP'] = (disp_arcsec, 'Dispersion in Decl. of WCS [Arcsec]')
hdu[0].header['ALGNFAIL'] = (reason[:32], 'Fine alignment fallback reason')
hdu.writeto(file, overwrite=True, output_verify='silentfix')
def _validate_refined_wcs(w, header, tel, log=None):
"""Validate refined WCS against expected pixel scale and anisotropy.
Returns
-------
tuple
(is_valid, reason_string)
"""
try:
m = np.asarray(w.pixel_scale_matrix, dtype=float)
_, s, _ = np.linalg.svd(m)
# singular-value pixel scales in arcsec/pixel
scales_arcsec = s * 3600.0
max_scale = float(np.max(scales_arcsec))
min_scale = float(np.min(scales_arcsec))
aniso = float(max_scale / max(min_scale, 1e-12))
det = float(np.linalg.det(m))
except Exception:
return (False, 'could not compute WCS pixel-scale matrix diagnostics')
# Expected scale from instrument + binning in header
try:
binn = tel.get_binning(header)
expected = float(tel.get_pixscale() * int(str(binn)[0]))
except Exception:
expected = None
_log_fine_align(log, 'info',
f'Fine-align WCS diagnostics: scales={scales_arcsec}, anisotropy={aniso:.3f}, det={det:.3e}, expected_scale={expected}')
# Basic physical sanity
if min_scale <= 0.0 or max_scale <= 0.0:
return (False, 'non-positive WCS pixel scale')
if aniso > 3.0:
return (False, f'excessive WCS anisotropy ({aniso:.2f} > 3.0)')
if expected is not None:
if min_scale < 0.5 * expected:
return (False, f'WCS min scale too small ({min_scale:.4f} < {0.5*expected:.4f} arcsec/pix)')
if max_scale > 1.5 * expected:
return (False, f'WCS max scale too large ({max_scale:.4f} > {1.5*expected:.4f} arcsec/pix)')
return (True, '')
def _field_center_from_fits_primary(input_file, log=None):
"""Parse a field-center SkyCoord from the primary FITS header.
Returns
-------
SkyCoord or False
ICRS center, or False if RA/Dec could not be parsed.
"""
with fits.open(input_file) as hdu:
header = hdu[0].header
check_pairs = [('CRVAL1', 'CRVAL2'), ('RA', 'DEC'), ('OBJCTRA', 'OBJCTDEC')]
for pair in check_pairs:
if pair[0] in header.keys() and pair[1] in header.keys():
ra = header[pair[0]]
dec = header[pair[1]]
coord = utilities.parse_coord(ra, dec)
if coord:
return coord
if log:
log.error(f'Could not parse RA/DEC from header of {input_file}')
return False
[docs]
def get_fine_align_reference_catalog(input_file, catalog='gaia', radius_deg=0.5,
log=None):
"""Query the configured reference catalog for fine WCS alignment.
Parameters
----------
input_file : str
Path to FITS file; primary header used for field center.
catalog : str, optional
Catalog key (see :data:`potpyri.utils.catalogs.FINE_ALIGN_CATALOG_CHOICES`).
Default is ``'gaia'``.
radius_deg : float, optional
Half-width of the VizieR query box in degrees. Default 0.5.
log : ColoredLogger, optional
Logger.
Returns
-------
astropy.table.Table or None or False
Table with ``RA_ICRS``, ``DE_ICRS`` in degrees, None if empty or query
failed, False if field center could not be parsed from the header.
"""
coord = _field_center_from_fits_primary(input_file, log=log)
if coord is False:
return False
field_width = float(radius_deg) * u.deg
return catalogs.fetch_astrometry_reference_table(
coord, catalog, field_width, log=log)
[docs]
def clean_up_astrometry(directory, file, exten):
"""Remove astrometry.net temporary files (.axy, .corr, .solved, .wcs, etc.).
Parameters
----------
directory : str
Directory containing the files.
file : str
Base filename (with extension); exten is replaced to form temp names.
exten : str
File extension (e.g. '.fits') to replace with .axy, .corr, etc.
Returns
-------
None
"""
filelist = [file.replace(exten,'.axy'),
file.replace(exten,'.corr'),
file.replace(exten,'-indx.xyls'),
file.replace(exten,'.match'),
file.replace(exten,'.rdls'),
file.replace(exten,'.solved'),
file.replace(exten,'.wcs')]
filelist = [os.path.join(directory, f) for f in filelist]
for f in filelist:
if os.path.exists(f):
os.remove(f)
[docs]
def solve_astrometry(file, tel, binn, paths, radius=0.5, replace=True,
shift_only=False, index=None, log=None):
"""Run astrometry.net (solve-field) to get coarse WCS; optionally use custom index.
Parameters
----------
file : str
Path to FITS file to solve.
tel : Instrument
Instrument instance (name, get_pixscale).
binn : str
Binning string (e.g. '22') for pixel scale.
paths : dict
Paths dict (source_extractor key for solve-field).
radius : float, optional
Search radius in degrees. Default is 0.5.
replace : bool, optional
If True, write WCS into original file; else write .solved.fits.
shift_only : bool, optional
If True, only update CRPIX/CRVAL. Default is False.
index : str, optional
Path to custom index file or directory for solve-field.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
bool
True if solve succeeded, False otherwise.
"""
# Starting solve, print file and directory for reference
fullfile = os.path.abspath(file)
directory = os.path.dirname(file)
if log:
log.info(f'Trying to solve file: {file}')
else:
print(f'Trying to solve file: {file}')
if not os.path.exists(fullfile):
return(False)
with fits.open(fullfile) as hdu:
data = hdu[0].data
header = hdu[0].header
hkeys = list(header.keys())
exten = '.'+file.split('.')[-1]
if not replace:
if os.path.exists(fullfile.replace(exten,'.solved.fits')):
if log:
log.info(f'SUCCESS: solved {fullfile}')
else:
print(f'SUCCESS: solved {fullfile}')
return(True)
exten = '.'+file.split('.')[-1]
if tel.name.upper=='BINOSPEC':
check_pairs = [('CRVAL1','CRVAL2'),('RA','DEC'),('OBJCTRA','OBJCTDEC')]
else:
check_pairs = [('RA','DEC'),('OBJCTRA','OBJCTDEC'),('TARGRA','TARGDEC'),
('CRVAL1','CRVAL2')]
coord = None
for pair in check_pairs:
if pair[0] in header.keys() and pair[1] in header.keys():
ra = header[pair[0]]
dec = header[pair[1]]
coord = utilities.parse_coord(ra, dec)
if log: log.info(f'Got coord={coord} for RA key {pair[0]}, DEC key {pair[1]}')
if log: log.info(f'RA value is {ra}, DEC value is {dec}')
if coord:
break
if not coord:
if log: log.error(f'Could not parse RA/DEC from header of {file}')
return(False)
if 'solved.fits' in fullfile:
newfile = 'tmp.fits'
else:
newfile = fullfile.replace(exten,'.solved.fits')
# Handle pixel scale guess
scale = tel.get_pixscale() * int(binn[0])
if scale is not None:
scale = float(scale)
scale_high = float('%.4f'%(scale * 1.2))
scale_low = float('%.4f'%(scale * 0.8))
else:
scale_high = 5.0
scale_low = 0.05
# Get RA and Dec
ra = float('%.6f'%coord.ra.degree)
dec = float('%.6f'%coord.dec.degree)
cmd = 'solve-field'
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--ra {ra} --dec {dec} '
args += f' --radius {radius} --no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
if index and os.path.exists(index):
if os.path.isfile(index):
args += f'--index-file {index} '
elif os.path.isdir(index):
args += f'--index-dir {index} '
# Test for --use-source-extractor flag
p = subprocess.run(['solve-field','-h'],capture_output=True)
data = p.stdout.decode().lower()
if '--use-source-extractor' in data:
source_extractor_path = paths['source_extractor']
args += '--use-source-extractor '
args += f'--source-extractor-path {source_extractor_path} '
elif '--use-sextractor' in data:
args += '--use-sextractor '
extra_opts = '--downsample 2 --no-verify --odds-to-tune-up 1e4 --objs 15'
tries = 1
good = False
while tries < 6 and not good:
input_args = f'{args} {extra_opts}'
if log:
log.info(f'Try #{tries} with astrometry.net...')
else:
print(f'Try #{tries} with astrometry.net...')
process = [cmd,fullfile]+input_args.split()
if log:
log.info(' '.join(process))
else:
print(' '.join(process))
with open(os.devnull, 'w') as FNULL:
p = subprocess.Popen(process, stdout=FNULL, stderr=subprocess.STDOUT)
try:
p.wait(90)
except subprocess.TimeoutExpired:
p.kill()
if os.path.exists(newfile):
good = True
else:
tries += 1
if tries==2:
extra_opts='--objs 15 --no-verify'
elif tries==3:
extra_opts='--no-verify'
elif tries==4:
# Try without source extractor
extra_opts='--no-verify'
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
args += f'--ra {ra} --dec {dec} '
args += f' --radius {radius} '
elif tries==5:
# Try with no constraint on RA/Dec
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
extra_opts='--no-verify'
file_exists=os.path.exists(newfile)
if log:
log.info(f'{newfile} exists: {file_exists}')
else:
print(f'{newfile} exists: {file_exists}')
if os.path.exists(newfile):
clean_up_astrometry(directory, file, exten)
if log:
log.info(f'SUCCESS: solved {fullfile}')
else:
print(f'SUCCESS: solved {fullfile}')
if replace or newfile=='tmp.fits':
output_file = fullfile
# astrometry.net replaces extra extensions, so instead of renaming
# copy the new header into the old file
with fits.open(newfile) as newhdu:
with fits.open(output_file) as hdu:
if shift_only:
hdu[0].header['CRPIX1'] = newhdu[0].header['CRPIX1']
hdu[0].header['CRPIX2'] = newhdu[0].header['CRPIX2']
hdu[0].header['CRVAL1'] = newhdu[0].header['CRVAL1']
hdu[0].header['CRVAL2'] = newhdu[0].header['CRVAL2']
else:
hdu[0].header = newhdu[0].header
hdu.writeto(output_file, overwrite=True, output_verify='silentfix')
os.remove(newfile)
else:
output_file = fullfile.replace(exten,'.solved.fits')
with fits.open(output_file) as hdu:
for i,h in enumerate(hdu):
if 'COMMENT' in hdu[i].header.keys():
del hdu[i].header['COMMENT']
if 'HISTORY' in hdu[i].header.keys():
del hdu[i].header['HISTORY']
# Delete other WCS header keys
wcs_key = ['CSYER1','CSYER2','CRDER1','CRDER2','CD1_1','CD1_2',
'CD2_1','CD2_2','CRPIX1','CRPIX2','CUNIT1','CUNIT2','EQUINOX',
'RADESYS','CNAME1','CNAME2','CTYPE1','CTYPE2','WCSNAME',
'CRVAL1','CRVAL2']
for key in [w + 'C' for w in wcs_key]:
if key in list(hdu[i].header.keys()):
del hdu[i].header[key]
for key in [w + 'S' for w in wcs_key]:
if key in list(hdu[i].header.keys()):
del hdu[i].header[key]
# Delete old WCS keywords starting with '_'
for key in list(hdu[i].header.keys()):
if key.startswith('_'):
del hdu[i].header[key]
try:
hdu.writeto(output_file, overwrite=True, output_verify='silentfix')
except TypeError:
if log:
log.error(f'FAILURE: could not save file {fullfile}')
else:
print(f'FAILURE: could not save file {fullfile}')
return(False)
return(True)
else:
if log:
log.error(f'FAILURE: did not solve {fullfile}')
else:
print(f'FAILURE: did not solve {fullfile}')
clean_up_astrometry(directory, file, exten)
return(False)
[docs]
def fine_align_wcs(file, tel, catalog='gaia', radius=0.5,
max_search_radius=5.0*u.arcsec, save_centroids=False, min_cat_match=7, log=None):
"""Refine WCS by matching detections to a reference catalog (fine alignment).
Parameters
----------
file : str
Path to FITS file with WCS (e.g. after solve_astrometry).
tel : Instrument
Instrument instance.
catalog : str, optional
Reference catalog: ``gaia`` (default), ``panstarrs``, ``sdss``, ``legacy``,
``twomass``, or ``skymapper``. See
:data:`potpyri.utils.catalogs.FINE_ALIGN_CATALOG_CHOICES`.
radius : float, optional
Catalog query half-width in degrees. Default is 0.5.
max_search_radius : astropy.units.Quantity, optional
Match radius between detections and catalog positions. Default is 5 arcsec.
save_centroids : bool, optional
If True, save centroid table. Default is False.
min_cat_match : int, optional
Minimum number of in-frame catalog sources required before matching.
Default is 7.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
bool
True if alignment succeeded and WCS was updated, False otherwise.
"""
catalog = catalogs.normalize_fine_align_catalog(catalog)
cat = get_fine_align_reference_catalog(
file, catalog=catalog, radius_deg=radius, log=log)
with fits.open(file) as hdu:
header = hdu[0].header
if cat is False:
_write_fine_align_fallback_header(hdu, file,
'could not parse coordinate keywords', log=log)
return(True)
if cat is None or len(cat) == 0:
_write_fine_align_fallback_header(hdu, file,
f'{catalog} query returned no alignment stars', log=log)
return(True)
_log_fine_align(log, 'info', f'Got {len(cat)} {catalog} alignment stars')
# Estimate sources that land within the image using WCS from header
w = WCS(header, relax=True)
naxis1 = header['NAXIS1']
naxis2 = header['NAXIS2']
ref_coords = SkyCoord(cat['RA_ICRS'], cat['DE_ICRS'], unit=(u.deg, u.deg))
x_pix, y_pix = w.world_to_pixel(ref_coords)
mask = (x_pix > 0) & (x_pix < naxis1) & (y_pix > 0) & (y_pix < naxis2)
x_pix = x_pix[mask]
y_pix = y_pix[mask]
cat = cat[mask]
ref_coords = ref_coords[mask]
if cat is None or len(cat) == 0:
_write_fine_align_fallback_header(hdu, file,
f'no {catalog} stars project into image bounds', log=log)
return(True)
_log_fine_align(log, 'info',
f'{catalog} stars projected into image bounds: {len(cat)}')
if len(cat) < min_cat_match:
_write_fine_align_fallback_header(hdu, file,
f'too few in-frame catalog stars ({len(cat)} < {min_cat_match})',
log=log)
return(True)
sky_cat = photometry.run_sextractor(file, log=log)
if sky_cat is None or len(sky_cat) == 0:
_write_fine_align_fallback_header(hdu, file,
'Source Extractor returned no sources for fine alignment', log=log)
return(True)
_log_fine_align(log, 'info', f'SExtractor sources available for matching: {len(sky_cat)}')
central_pix = (float(naxis1)/2., float(naxis2)/2.)
central_coo = w.pixel_to_world(*central_pix)
_log_fine_align(log, 'info', 'Converging on fine WCS solution')
bar = progressbar.ProgressBar(maxval=10)
bar.start()
succeeded = False
cat_coords_match = None
best_n_match = 0
best_iter = -1
n_iter_no_match = 0
for i in np.arange(10):
bar.update(i+1)
cat_coords = w.pixel_to_world(sky_cat['X_IMAGE'], sky_cat['Y_IMAGE'])
idx, d2d, d3d = match_coordinates_sky(ref_coords, cat_coords)
sep_mask = d2d < max_search_radius
n_match = int(np.sum(sep_mask))
if n_match > best_n_match:
best_n_match = n_match
best_iter = int(i + 1)
if n_match == 0:
n_iter_no_match += 1
continue
sky_coords_match = ref_coords[sep_mask]
cat_coords_match = sky_cat[idx[sep_mask]]
sip_degree = 0
if len(cat_coords_match)>100:
sip_degree = 2
if len(cat_coords_match)>500:
sip_degree = 4
if len(cat_coords_match)>2000:
sip_degree = 6
# Make a fresh table each iteration; avoid duplicate-column failures
cat_coords_match = Table(cat_coords_match)
cat_coords_match['RA_ICRS'] = sky_coords_match.ra.degree
cat_coords_match['DE_ICRS'] = sky_coords_match.dec.degree
fit_coords = SkyCoord(cat_coords_match['RA_ICRS'],
cat_coords_match['DE_ICRS'], unit=(u.deg, u.deg))
xy = (cat_coords_match['X_IMAGE'], cat_coords_match['Y_IMAGE'])
central_coo = w.pixel_to_world(*central_pix)
try:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning,
message=".*cdelt will be ignored since cd is present.*")
w = fit_wcs_from_points(xy, fit_coords, proj_point=central_coo,
sip_degree=sip_degree)
succeeded = True
except ValueError:
# Try with next iteration
continue
bar.finish()
_log_fine_align(log, 'info',
f'Fine-align match summary for {file} ({catalog}): '
f'best matches={best_n_match} at iter={best_iter}, '
f'iters with zero matches={n_iter_no_match}/10, '
f'min required={min_cat_match}, match radius={max_search_radius}.')
if not succeeded:
_write_fine_align_fallback_header(hdu, file,
'fine WCS fit failed during iterative catalog matching', log=log)
return(True)
_log_fine_align(log, 'info', f'Solved with SIP degree = {sip_degree}')
if cat_coords_match is None or len(cat_coords_match)==0:
_write_fine_align_fallback_header(hdu, file,
'zero matched catalog/SExtractor sources after iterations', log=log)
return(True)
else:
_log_fine_align(log, 'info', f'Matched to {len(cat_coords_match)} coordinates')
if len(cat_coords_match)<min_cat_match:
_write_fine_align_fallback_header(hdu, file,
f'too few matched stars ({len(cat_coords_match)} < {min_cat_match})',
log=log)
return(True)
if n_iter_no_match >= 8 and len(cat_coords_match) <= (min_cat_match + 1):
_write_fine_align_fallback_header(hdu, file,
f'unstable catalog matching (zero-match iters={n_iter_no_match}/10, matched={len(cat_coords_match)})',
log=log)
return(True)
# Bootstrap WCS solution and get center pixel
ra_cent = [] ; de_cent = []
coords = SkyCoord(cat_coords_match['RA_ICRS'], cat_coords_match['DE_ICRS'],
unit=(u.deg, u.deg))
bar = progressbar.ProgressBar(maxval=1000)
bar.start()
idx = np.arange(len(cat_coords_match))
for i in np.arange(1000):
bar.update(i+1)
if len(idx)>200:
size = 100
else:
size = int(len(idx)/2)
idxs = np.random.choice(idx, size=size, replace=False)
xy = (cat_coords_match['X_IMAGE'][idxs],
cat_coords_match['Y_IMAGE'][idxs])
c = coords[idxs]
central_coo = w.pixel_to_world(*central_pix)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning,
message=".*cdelt will be ignored since cd is present.*")
new_wcs = fit_wcs_from_points(xy, c, projection=w)
cent_coord = new_wcs.pixel_to_world(*central_pix)
ra_cent.append(cent_coord.ra.degree)
de_cent.append(cent_coord.dec.degree)
bar.finish()
ra_cent = np.array(ra_cent)
de_cent = np.array(de_cent)
# Estimate systematic precision of new WCS
mean_ra, med_ra, std_ra = sigma_clipped_stats(ra_cent, maxiters=11,
sigma=3)
mean_de, med_de, std_de = sigma_clipped_stats(de_cent, maxiters=11,
sigma=3)
mask = (np.abs(ra_cent-med_ra)<3*std_ra) & (np.abs(de_cent-med_de)<3*std_de)
# Create a scatter plot of the centroid values to validate the astrometry
if save_centroids:
fig, ax = plt.subplots()
ax.scatter(ra_cent[mask], de_cent[mask])
plt.savefig(file.replace('.fits','_centroids.png'))
header = w.to_header()
header['CRPIX1']=central_pix[0]
header['CRPIX2']=central_pix[1]
header['CRVAL1']=med_ra
header['CRVAL2']=med_de
header['CUNIT1']='deg'
header['CUNIT2']='deg'
if sip_degree>0:
header['CTYPE1']='RA---TAN-SIP'
header['CTYPE2']='DEC--TAN-SIP'
for kw, val in w._write_sip_kw().items():
header[kw] = val
# Validate refined WCS; if pathological, fall back to coarse solution.
is_valid, reason = _validate_refined_wcs(w, hdu[0].header, tel, log=log)
if not is_valid:
_write_fine_align_fallback_header(hdu, file, f'pathological refined WCS: {reason}', log=log)
return(True)
# Delete all old WCS keys
delkeys = ['WCSNAME','CUNIT1','CUNIT2','CTYPE1','CTYPE2','CRPIX1','CRPIX2',
'CRVAL1','CRVAL2','CD1_1','CD1_2','CD2_1','CD2_2','RADECSYS',
'PC1_1','PC1_2','PC2_1','PC2_2','LONPOLE','LATPOLE','CDELT1','CDELT2']
while True:
found_key = False
for key in hdu[0].header.keys():
if any([k in key for k in delkeys]):
found_key=True
del hdu[0].header[key]
if not found_key:
break
hdu[0].header.update(header)
ra_disp = std_ra*3600.0*np.cos(np.pi/180.0 * mean_de)
de_disp = std_de*3600.0
ra_disp = float('%.6f'%ra_disp)
de_disp = float('%.6f'%de_disp)
_log_fine_align(log, 'info', f'R.A. dispersion ={ra_disp}\", Decl. dispersion={de_disp}\"')
hdu[0].header['RADISP']=(ra_disp, 'Dispersion in R.A. of WCS [Arcsec]')
hdu[0].header['DEDISP']=(de_disp, 'Dispersion in Decl. of WCS [Arcsec]')
if 'ALGNFAIL' in hdu[0].header:
del hdu[0].header['ALGNFAIL']
hdu.writeto(file, overwrite=True, output_verify='silentfix')
return(True)