"""WCS solution using astrometry.net and Gaia DR3 alignment.
Provides coarse solution via solve-field and fine alignment by matching
detected sources to Gaia. Writes RADISP/DEDISP and updated WCS to FITS.
Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__
import numpy as np
import subprocess
import os
import progressbar
import requests
import matplotlib.pyplot as plt
from photutils.aperture import ApertureStats
from photutils.aperture import CircularAperture
from astroquery.vizier import Vizier
import astropy.units as u
from astropy.stats import sigma_clipped_stats
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.coordinates import match_coordinates_sky
from astropy.table import Column
from astropy.table import Table
from astropy.utils.exceptions import AstropyWarning
from astropy.wcs import WCS
from astropy.wcs.utils import fit_wcs_from_points
# Internal dependency
from potpyri.utils import utilities
from potpyri.primitives import photometry
#turn Astropy warnings off
import warnings
warnings.simplefilter('ignore', category=AstropyWarning)
def _log_gaia(log, level, message):
"""Small helper to keep Gaia-stage logging consistent."""
if log:
getattr(log, level)(message)
else:
print(message)
def _write_gaia_fallback_header(hdu, file, reason, log=None, disp_arcsec=1.0):
"""Write fallback astrometric dispersion values when Gaia refinement fails.
This preserves a coarse astrometry.net WCS solution while making the
failure mode explicit in the header and logs.
"""
_log_gaia(log, 'warning',
f'Gaia alignment fallback for {file}: {reason}. '
f'Keeping coarse astrometry.net WCS.')
hdu[0].header['RADISP'] = (disp_arcsec, 'Dispersion in R.A. of WCS [Arcsec]')
hdu[0].header['DEDISP'] = (disp_arcsec, 'Dispersion in Decl. of WCS [Arcsec]')
hdu[0].header['GAIAFAIL'] = (reason[:32], 'Gaia fallback reason')
hdu.writeto(file, overwrite=True, output_verify='silentfix')
def _validate_refined_wcs(w, header, tel, log=None):
"""Validate refined WCS against expected pixel scale and anisotropy.
Returns
-------
tuple
(is_valid, reason_string)
"""
try:
m = np.asarray(w.pixel_scale_matrix, dtype=float)
_, s, _ = np.linalg.svd(m)
# singular-value pixel scales in arcsec/pixel
scales_arcsec = s * 3600.0
max_scale = float(np.max(scales_arcsec))
min_scale = float(np.min(scales_arcsec))
aniso = float(max_scale / max(min_scale, 1e-12))
det = float(np.linalg.det(m))
except Exception:
return (False, 'could not compute WCS pixel-scale matrix diagnostics')
# Expected scale from instrument + binning in header
try:
binn = tel.get_binning(header)
expected = float(tel.get_pixscale() * int(str(binn)[0]))
except Exception:
expected = None
_log_gaia(log, 'info',
f'Gaia WCS diagnostics: scales={scales_arcsec}, anisotropy={aniso:.3f}, det={det:.3e}, expected_scale={expected}')
# Basic physical sanity
if min_scale <= 0.0 or max_scale <= 0.0:
return (False, 'non-positive WCS pixel scale')
if aniso > 3.0:
return (False, f'excessive WCS anisotropy ({aniso:.2f} > 3.0)')
if expected is not None:
if min_scale < 0.5 * expected:
return (False, f'WCS min scale too small ({min_scale:.4f} < {0.5*expected:.4f} arcsec/pix)')
if max_scale > 1.5 * expected:
return (False, f'WCS max scale too large ({max_scale:.4f} > {1.5*expected:.4f} arcsec/pix)')
return (True, '')
[docs]
def get_gaia_catalog(input_file, log=None):
"""Query Gaia DR3 for stars in the field; return filtered catalog table.
Uses header RA/DEC to define field; filters by PSS, Plx, PM.
Parameters
----------
input_file : str
Path to FITS file; header used for CRVAL1/CRVAL2 or RA/DEC.
log : ColoredLogger, optional
Logger for progress and errors.
Returns
-------
astropy.table.Table or None or False
Filtered Gaia DR3 table (PSS>0.99, Plx<20, PM<10), or None if empty,
or False if coordinates could not be parsed.
Raises
------
Exception
If Gaia query fails after retries.
"""
with fits.open(input_file) as hdu:
data = hdu[0].data
header = hdu[0].header
hkeys = list(header.keys())
check_pairs = [('CRVAL1','CRVAL2'),('RA','DEC'),('OBJCTRA','OBJCTDEC')]
coord = None
for pair in check_pairs:
if pair[0] in header.keys() and pair[1] in header.keys():
ra = header[pair[0]]
dec = header[pair[1]]
coord = utilities.parse_coord(ra, dec)
if coord:
break
if not coord:
if log: log.error(f'Could not parse RA/DEC from header of {input_file}')
return(False)
vizier = Vizier(columns=['RA_ICRS', 'DE_ICRS', 'Plx', 'PSS','PM'])
vizier.ROW_LIMIT = -1
tries = 0
cat = None
while tries<4:
try:
cat = vizier.query_region(coord, width=20 * u.arcmin,
catalog='I/355/gaiadr3')
if cat is None:
if log: log.error(f'Gaia did not return catalog. Try #{tries+1}')
else:
break
except requests.exceptions.ReadTimeout:
if log: log.error(f'Gaia catalog timeout. Try #{tries+1}')
tries += 1
if cat is None:
raise Exception('ERROR: could not get Gaia catalog')
if len(cat)>0:
cat = cat[0]
else:
return(None)
mask = (cat['PSS'] > 0.99) & (cat['Plx']<20) & (cat['PM']<10)
cat = cat[mask]
return(cat)
[docs]
def clean_up_astrometry(directory, file, exten):
"""Remove astrometry.net temporary files (.axy, .corr, .solved, .wcs, etc.).
Parameters
----------
directory : str
Directory containing the files.
file : str
Base filename (with extension); exten is replaced to form temp names.
exten : str
File extension (e.g. '.fits') to replace with .axy, .corr, etc.
Returns
-------
None
"""
filelist = [file.replace(exten,'.axy'),
file.replace(exten,'.corr'),
file.replace(exten,'-indx.xyls'),
file.replace(exten,'.match'),
file.replace(exten,'.rdls'),
file.replace(exten,'.solved'),
file.replace(exten,'.wcs')]
filelist = [os.path.join(directory, f) for f in filelist]
for f in filelist:
if os.path.exists(f):
os.remove(f)
[docs]
def solve_astrometry(file, tel, binn, paths, radius=0.5, replace=True,
shift_only=False, index=None, log=None):
"""Run astrometry.net (solve-field) to get coarse WCS; optionally use custom index.
Parameters
----------
file : str
Path to FITS file to solve.
tel : Instrument
Instrument instance (name, get_pixscale).
binn : str
Binning string (e.g. '22') for pixel scale.
paths : dict
Paths dict (source_extractor key for solve-field).
radius : float, optional
Search radius in degrees. Default is 0.5.
replace : bool, optional
If True, write WCS into original file; else write .solved.fits.
shift_only : bool, optional
If True, only update CRPIX/CRVAL. Default is False.
index : str, optional
Path to custom index file or directory for solve-field.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
bool
True if solve succeeded, False otherwise.
"""
# Starting solve, print file and directory for reference
fullfile = os.path.abspath(file)
directory = os.path.dirname(file)
if log:
log.info(f'Trying to solve file: {file}')
else:
print(f'Trying to solve file: {file}')
if not os.path.exists(fullfile):
return(False)
with fits.open(fullfile) as hdu:
data = hdu[0].data
header = hdu[0].header
hkeys = list(header.keys())
exten = '.'+file.split('.')[-1]
if not replace:
if os.path.exists(fullfile.replace(exten,'.solved.fits')):
if log:
log.info(f'SUCCESS: solved {fullfile}')
else:
print(f'SUCCESS: solved {fullfile}')
return(True)
exten = '.'+file.split('.')[-1]
if tel.name.upper=='BINOSPEC':
check_pairs = [('CRVAL1','CRVAL2'),('RA','DEC'),('OBJCTRA','OBJCTDEC')]
else:
check_pairs = [('RA','DEC'),('OBJCTRA','OBJCTDEC'),('TARGRA','TARGDEC'),
('CRVAL1','CRVAL2')]
coord = None
for pair in check_pairs:
if pair[0] in header.keys() and pair[1] in header.keys():
ra = header[pair[0]]
dec = header[pair[1]]
coord = utilities.parse_coord(ra, dec)
if log: log.info(f'Got coord={coord} for RA key {pair[0]}, DEC key {pair[1]}')
if log: log.info(f'RA value is {ra}, DEC value is {dec}')
if coord:
break
if not coord:
if log: log.error(f'Could not parse RA/DEC from header of {file}')
return(False)
if 'solved.fits' in fullfile:
newfile = 'tmp.fits'
else:
newfile = fullfile.replace(exten,'.solved.fits')
# Handle pixel scale guess
scale = tel.get_pixscale() * int(binn[0])
if scale is not None:
scale = float(scale)
scale_high = float('%.4f'%(scale * 1.2))
scale_low = float('%.4f'%(scale * 0.8))
else:
scale_high = 5.0
scale_low = 0.05
# Get RA and Dec
ra = float('%.6f'%coord.ra.degree)
dec = float('%.6f'%coord.dec.degree)
cmd = 'solve-field'
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--ra {ra} --dec {dec} '
args += f' --radius {radius} --no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
if index and os.path.exists(index):
if os.path.isfile(index):
args += f'--index-file {index} '
elif os.path.isdir(index):
args += f'--index-dir {index} '
# Test for --use-source-extractor flag
p = subprocess.run(['solve-field','-h'],capture_output=True)
data = p.stdout.decode().lower()
if '--use-source-extractor' in data:
source_extractor_path = paths['source_extractor']
args += '--use-source-extractor '
args += f'--source-extractor-path {source_extractor_path} '
elif '--use-sextractor' in data:
args += '--use-sextractor '
extra_opts = '--downsample 2 --no-verify --odds-to-tune-up 1e4 --objs 15'
tries = 1
good = False
while tries < 6 and not good:
input_args = f'{args} {extra_opts}'
if log:
log.info(f'Try #{tries} with astrometry.net...')
else:
print(f'Try #{tries} with astrometry.net...')
process = [cmd,fullfile]+input_args.split()
if log:
log.info(' '.join(process))
else:
print(' '.join(process))
with open(os.devnull, 'w') as FNULL:
p = subprocess.Popen(process, stdout=FNULL, stderr=subprocess.STDOUT)
try:
p.wait(90)
except subprocess.TimeoutExpired:
p.kill()
if os.path.exists(newfile):
good = True
else:
tries += 1
if tries==2:
extra_opts='--objs 15 --no-verify'
elif tries==3:
extra_opts='--no-verify'
elif tries==4:
# Try without source extractor
extra_opts='--no-verify'
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
args += f'--ra {ra} --dec {dec} '
args += f' --radius {radius} '
elif tries==5:
# Try with no constraint on RA/Dec
args = '--scale-units arcsecperpix '
args += f'--scale-low {scale_low} --scale-high {scale_high} '
args += f'--no-plots -T '
args += f'--overwrite -N {newfile} --dir {directory} '
extra_opts='--no-verify'
file_exists=os.path.exists(newfile)
if log:
log.info(f'{newfile} exists: {file_exists}')
else:
print(f'{newfile} exists: {file_exists}')
if os.path.exists(newfile):
clean_up_astrometry(directory, file, exten)
if log:
log.info(f'SUCCESS: solved {fullfile}')
else:
print(f'SUCCESS: solved {fullfile}')
if replace or newfile=='tmp.fits':
output_file = fullfile
# astrometry.net replaces extra extensions, so instead of renaming
# copy the new header into the old file
with fits.open(newfile) as newhdu:
with fits.open(output_file) as hdu:
if shift_only:
hdu[0].header['CRPIX1'] = newhdu[0].header['CRPIX1']
hdu[0].header['CRPIX2'] = newhdu[0].header['CRPIX2']
hdu[0].header['CRVAL1'] = newhdu[0].header['CRVAL1']
hdu[0].header['CRVAL2'] = newhdu[0].header['CRVAL2']
else:
hdu[0].header = newhdu[0].header
hdu.writeto(output_file, overwrite=True, output_verify='silentfix')
os.remove(newfile)
else:
output_file = fullfile.replace(exten,'.solved.fits')
with fits.open(output_file) as hdu:
for i,h in enumerate(hdu):
if 'COMMENT' in hdu[i].header.keys():
del hdu[i].header['COMMENT']
if 'HISTORY' in hdu[i].header.keys():
del hdu[i].header['HISTORY']
# Delete other WCS header keys
wcs_key = ['CSYER1','CSYER2','CRDER1','CRDER2','CD1_1','CD1_2',
'CD2_1','CD2_2','CRPIX1','CRPIX2','CUNIT1','CUNIT2','EQUINOX',
'RADESYS','CNAME1','CNAME2','CTYPE1','CTYPE2','WCSNAME',
'CRVAL1','CRVAL2']
for key in [w + 'C' for w in wcs_key]:
if key in list(hdu[i].header.keys()):
del hdu[i].header[key]
for key in [w + 'S' for w in wcs_key]:
if key in list(hdu[i].header.keys()):
del hdu[i].header[key]
# Delete old WCS keywords starting with '_'
for key in list(hdu[i].header.keys()):
if key.startswith('_'):
del hdu[i].header[key]
try:
hdu.writeto(output_file, overwrite=True, output_verify='silentfix')
except TypeError:
if log:
log.error(f'FAILURE: could not save file {fullfile}')
else:
print(f'FAILURE: could not save file {fullfile}')
return(False)
return(True)
else:
if log:
log.error(f'FAILURE: did not solve {fullfile}')
else:
print(f'FAILURE: did not solve {fullfile}')
clean_up_astrometry(directory, file, exten)
return(False)
[docs]
def align_to_gaia(file, tel, radius=0.5, max_search_radius=5.0*u.arcsec,
save_centroids=False, min_gaia_match=7, log=None):
"""Refine WCS by matching detected sources to Gaia DR3 and fitting WCS.
Parameters
----------
file : str
Path to FITS file with WCS (e.g. after solve_astrometry).
tel : Instrument
Instrument instance.
radius : float, optional
Gaia query radius in degrees. Default is 0.5.
max_search_radius : astropy.units.Quantity, optional
Match radius for source-Gaia matching. Default is 5 arcsec.
save_centroids : bool, optional
If True, save centroid table. Default is False.
min_gaia_match : int, optional
Minimum number of Gaia matches required. Default is 7.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
bool
True if alignment succeeded and WCS was updated, False otherwise.
"""
cat = get_gaia_catalog(file, log=log)
with fits.open(file) as hdu:
header = hdu[0].header
if cat is False:
_write_gaia_fallback_header(hdu, file,
'could not parse coordinate keywords', log=log)
return(True)
if cat is None or len(cat)==0:
_write_gaia_fallback_header(hdu, file,
'Gaia query returned no alignment stars', log=log)
return(True)
_log_gaia(log, 'info', f'Got {len(cat)} Gaia DR3 alignment stars')
# Estimate sources that land within the image using WCS from header
w = WCS(header, relax=True)
naxis1 = header['NAXIS1']
naxis2 = header['NAXIS2']
# Get pixel coordinates of all stars in image
gaia_coords = SkyCoord(cat['RA_ICRS'], cat['DE_ICRS'], unit=(u.deg, u.deg))
x_pix, y_pix = w.world_to_pixel(gaia_coords)
# Mask to stars that are nominally in image
mask = (x_pix > 0) & (x_pix < naxis1) & (y_pix > 0) & (y_pix < naxis2)
x_pix = x_pix[mask] ; y_pix = y_pix[mask]
# Also mask catalog and coordinates
cat = cat[mask]
gaia_coords = gaia_coords[mask]
if cat is None or len(cat)==0:
_write_gaia_fallback_header(hdu, file,
'no Gaia stars project into image bounds', log=log)
return(True)
_log_gaia(log, 'info',
f'Gaia stars projected into image bounds: {len(cat)} (of {len(mask)})')
if len(cat)<min_gaia_match:
_write_gaia_fallback_header(hdu, file,
f'too few in-frame Gaia stars ({len(cat)} < {min_gaia_match})',
log=log)
return(True)
sky_cat = photometry.run_sextractor(file, log=log)
if sky_cat is None or len(sky_cat) == 0:
_write_gaia_fallback_header(hdu, file,
'Source Extractor returned no sources for Gaia alignment', log=log)
return(True)
_log_gaia(log, 'info', f'SExtractor sources available for matching: {len(sky_cat)}')
# Get central coordinate of image based on centroiding
central_pix = (float(naxis1)/2., float(naxis2)/2.)
central_coo = w.pixel_to_world(*central_pix)
_log_gaia(log, 'info', 'Converging on fine WCS solution')
bar = progressbar.ProgressBar(maxval=10)
bar.start()
succeeded = False
cat_coords_match = None
best_n_match = 0
best_iter = -1
n_iter_no_match = 0
for i in np.arange(10):
bar.update(i+1)
cat_coords = w.pixel_to_world(sky_cat['X_IMAGE'], sky_cat['Y_IMAGE'])
idx, d2d, d3d = match_coordinates_sky(gaia_coords, cat_coords)
sep_mask = d2d < max_search_radius
n_match = int(np.sum(sep_mask))
if n_match > best_n_match:
best_n_match = n_match
best_iter = int(i + 1)
if n_match == 0:
n_iter_no_match += 1
continue
sky_coords_match = gaia_coords[sep_mask]
cat_coords_match = sky_cat[idx[sep_mask]]
sip_degree = 0
if len(cat_coords_match)>100:
sip_degree = 2
if len(cat_coords_match)>500:
sip_degree = 4
if len(cat_coords_match)>2000:
sip_degree = 6
# Make a fresh table each iteration; avoid duplicate-column failures
cat_coords_match = Table(cat_coords_match)
cat_coords_match['RA_ICRS'] = sky_coords_match.ra.degree
cat_coords_match['DE_ICRS'] = sky_coords_match.dec.degree
fit_coords = SkyCoord(cat_coords_match['RA_ICRS'],
cat_coords_match['DE_ICRS'], unit=(u.deg, u.deg))
xy = (cat_coords_match['X_IMAGE'], cat_coords_match['Y_IMAGE'])
central_coo = w.pixel_to_world(*central_pix)
try:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning,
message=".*cdelt will be ignored since cd is present.*")
w = fit_wcs_from_points(xy, fit_coords, proj_point=central_coo,
sip_degree=sip_degree)
succeeded = True
except ValueError:
# Try with next iteration
continue
bar.finish()
_log_gaia(log, 'info',
f'Gaia match summary for {file}: '
f'best matches={best_n_match} at iter={best_iter}, '
f'iters with zero matches={n_iter_no_match}/10, '
f'min required={min_gaia_match}, match radius={max_search_radius}.')
if not succeeded:
_write_gaia_fallback_header(hdu, file,
'fine WCS fit failed during iterative Gaia matching', log=log)
return(True)
_log_gaia(log, 'info', f'Solved with SIP degree = {sip_degree}')
if cat_coords_match is None or len(cat_coords_match)==0:
_write_gaia_fallback_header(hdu, file,
'zero matched Gaia/SExtractor sources after iterations', log=log)
return(True)
else:
_log_gaia(log, 'info', f'Matched to {len(cat_coords_match)} coordinates')
if len(cat_coords_match)<min_gaia_match:
_write_gaia_fallback_header(hdu, file,
f'too few matched stars ({len(cat_coords_match)} < {min_gaia_match})',
log=log)
return(True)
# If matching collapsed on most iterations and only barely met threshold,
# this fit is often unstable; prefer coarse astrometry in that case.
if n_iter_no_match >= 8 and len(cat_coords_match) <= (min_gaia_match + 1):
_write_gaia_fallback_header(hdu, file,
f'unstable Gaia matching (zero-match iters={n_iter_no_match}/10, matched={len(cat_coords_match)})',
log=log)
return(True)
# Bootstrap WCS solution and get center pixel
ra_cent = [] ; de_cent = []
coords = SkyCoord(cat_coords_match['RA_ICRS'], cat_coords_match['DE_ICRS'],
unit=(u.deg, u.deg))
bar = progressbar.ProgressBar(maxval=1000)
bar.start()
idx = np.arange(len(cat_coords_match))
for i in np.arange(1000):
bar.update(i+1)
if len(idx)>200:
size = 100
else:
size = int(len(idx)/2)
idxs = np.random.choice(idx, size=size, replace=False)
xy = (cat_coords_match['X_IMAGE'][idxs],
cat_coords_match['Y_IMAGE'][idxs])
c = coords[idxs]
central_coo = w.pixel_to_world(*central_pix)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", category=RuntimeWarning,
message=".*cdelt will be ignored since cd is present.*")
new_wcs = fit_wcs_from_points(xy, c, projection=w)
cent_coord = new_wcs.pixel_to_world(*central_pix)
ra_cent.append(cent_coord.ra.degree)
de_cent.append(cent_coord.dec.degree)
bar.finish()
ra_cent = np.array(ra_cent)
de_cent = np.array(de_cent)
# Estimate systematic precision of new WCS
mean_ra, med_ra, std_ra = sigma_clipped_stats(ra_cent, maxiters=11,
sigma=3)
mean_de, med_de, std_de = sigma_clipped_stats(de_cent, maxiters=11,
sigma=3)
mask = (np.abs(ra_cent-med_ra)<3*std_ra) & (np.abs(de_cent-med_de)<3*std_de)
# Create a scatter plot of the centroid values to validate the astrometry
if save_centroids:
fig, ax = plt.subplots()
ax.scatter(ra_cent[mask], de_cent[mask])
plt.savefig(file.replace('.fits','_centroids.png'))
header = w.to_header()
header['CRPIX1']=central_pix[0]
header['CRPIX2']=central_pix[1]
header['CRVAL1']=med_ra
header['CRVAL2']=med_de
header['CUNIT1']='deg'
header['CUNIT2']='deg'
if sip_degree>0:
header['CTYPE1']='RA---TAN-SIP'
header['CTYPE2']='DEC--TAN-SIP'
for kw, val in w._write_sip_kw().items():
header[kw] = val
# Validate refined WCS; if pathological, fall back to coarse solution.
is_valid, reason = _validate_refined_wcs(w, hdu[0].header, tel, log=log)
if not is_valid:
_write_gaia_fallback_header(hdu, file, f'pathological refined WCS: {reason}', log=log)
return(True)
# Delete all old WCS keys
delkeys = ['WCSNAME','CUNIT1','CUNIT2','CTYPE1','CTYPE2','CRPIX1','CRPIX2',
'CRVAL1','CRVAL2','CD1_1','CD1_2','CD2_1','CD2_2','RADECSYS',
'PC1_1','PC1_2','PC2_1','PC2_2','LONPOLE','LATPOLE','CDELT1','CDELT2']
while True:
found_key = False
for key in hdu[0].header.keys():
if any([k in key for k in delkeys]):
found_key=True
del hdu[0].header[key]
if not found_key:
break
hdu[0].header.update(header)
ra_disp = std_ra*3600.0*np.cos(np.pi/180.0 * mean_de)
de_disp = std_de*3600.0
ra_disp = float('%.6f'%ra_disp)
de_disp = float('%.6f'%de_disp)
_log_gaia(log, 'info', f'R.A. dispersion ={ra_disp}\", Decl. dispersion={de_disp}\"')
# Add header variables for dispersion in WCS solution
hdu[0].header['RADISP']=(ra_disp, 'Dispersion in R.A. of WCS [Arcsec]')
hdu[0].header['DEDISP']=(de_disp, 'Dispersion in Decl. of WCS [Arcsec]')
if 'GAIAFAIL' in hdu[0].header:
del hdu[0].header['GAIAFAIL']
hdu.writeto(file, overwrite=True, output_verify='silentfix')
return(True)