"""Absolute photometry zeropoint calibration using catalog magnitudes.
Queries Vizier (e.g. PS1), matches sources, and fits zeropoint via
iterative ODR. Writes ZPTMAG and related keywords to the stack header.
Uses multiple Vizier mirrors (CDS, Tokyo, CfA, INASAN, IUCAA, IDIA) with
fallback when the first attempt fails.
Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__
import numpy as np
from astroquery.vizier import Vizier
from astropy import units as u
# Vizier mirror servers (hostnames only); tried in order when a query fails.
VIZIER_MIRRORS = [
'vizier.cds.unistra.fr', # CDS / Strasbourg, France
'vizier.nao.ac.jp', # ADAC / Tokyo, Japan
'vizier.cfa.harvard.edu', # CfA / Harvard, USA
'vizier.inasan.ru', # INASAN / Moscow, Russia
'vizier.iucaa.in', # IUCAA / Pune, India
'vizier.idia.ac.za', # IDIA / South Africa
]
from astropy.coordinates import SkyCoord
from astropy.table import Table
from astropy.io import fits
# Internal dependency
from potpyri.utils import utilities
[docs]
class absphot(object):
"""Zeropoint fitter using catalog magnitudes and iterative sigma clipping."""
def __init__(self, iterations=5, sigma=5):
"""Initialize zeropoint fitter.
Parameters
----------
iterations : int, optional
Number of sigma-clip iterations. Default is 5.
sigma : float, optional
Sigma threshold for clipping. Default is 5.
"""
self.iterations = iterations
self.sigma = sigma
self.odr_iter = 2
[docs]
def get_zeropoint(self, flux, fluxerr, mag, magerr):
"""Fit zeropoint (and error) from flux/mag arrays using ODR.
Parameters
----------
flux : array-like
Object fluxes (e.g. from aperture photometry).
fluxerr : array-like
Flux errors.
mag : array-like
Catalog magnitudes.
magerr : array-like
Catalog magnitude errors.
Returns
-------
tuple of float
(zeropoint, zeropoint_error).
"""
import warnings
with warnings.catch_warnings():
warnings.filterwarnings('ignore', message=r'.*scipy\.odr.*', category=DeprecationWarning)
from scipy.odr import ODR
from scipy.odr import Model
from scipy.odr import RealData
def magnitude(zpt, flux):
return(zpt - 2.5*np.log10(flux))
zpt_guess = np.nanmedian(mag + 2.5*np.log10(flux))
data = RealData(np.array(flux, dtype=float),
np.array(mag, dtype=float),
sx=np.array(fluxerr, dtype=float),
sy=np.array(magerr, dtype=float))
model = Model(magnitude)
odr = ODR(data, model, beta0=[zpt_guess], maxit=self.odr_iter)
output = odr.run()
zpt = output.beta[0]
zpterr = output.sd_beta[0]
return(zpt, zpterr)
[docs]
def zpt_iteration(self, flux, fluxerr, mag, magerr, log=None):
"""Iteratively sigma-clip and fit zeropoint via ODR.
Parameters
----------
flux : array-like
Object fluxes.
fluxerr : array-like
Flux errors.
mag : array-like
Catalog magnitudes.
magerr : array-like
Catalog magnitude errors.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
tuple
(zeropoint, zeropoint_error, master_mask). master_mask is boolean
array of sources used in final fit.
"""
flux = np.array(flux)
fluxerr = np.array(fluxerr)
mag = np.array(mag)
magerr = np.array(magerr)
nobj_orig = len(mag)
idx = np.arange(nobj_orig)
for i in np.arange(self.iterations):
nobj = len(flux)
zpt, zpterr = self.get_zeropoint(flux, fluxerr, mag, magerr)
mag_deriv = -2.5*np.log10(flux)+zpt
magerr_deriv = np.array(2.5/np.log(10) * fluxerr/flux)
total_err = np.sqrt(magerr**2+magerr_deriv**2+zpterr**2)
mask = np.abs(mag - mag_deriv) < self.sigma * total_err
idx = idx[mask]
flux=flux[mask] ; fluxerr=fluxerr[mask]
mag=mag[mask] ; magerr=magerr[mask]
zpt = float('%.6f'%zpt)
zpterr = float('%.6f'%zpterr)
message = f'Iteration {i+1}/{self.iterations}: {nobj} obj, '
message += f'zpt={zpt}+/-{zpterr}, '
message += f'{self.sigma}-sigma clip to {len(flux)} obj'
if log:
log.info(message)
else:
print(message)
if log:
log.info(f'{len(flux)} stars used to calculate final zeropoint.')
else:
print(f'{len(flux)} stars used to calculate final zeropoint.')
master_mask = np.array([i in idx for i in np.arange(nobj_orig)])
return(zpt, zpterr, master_mask)
[docs]
def get_catalog(self, coords, catalog, filt, log=None):
"""Query VizieR for catalog magnitudes in a filter around given coordinates.
Parameters
----------
coords : astropy.coordinates.SkyCoord
Target coordinates (used for region size).
catalog : str
Catalog name (e.g. 'PS1', '2MASS').
filt : str
Filter name (e.g. 'g', 'r', 'J').
log : ColoredLogger, optional
Logger for progress.
Returns
-------
tuple or (None, None, None)
(astropy.table.Table, catalog_name, catalog_ID) or (None, None, None)
if query fails.
"""
if log: log.info(f'Searching for catalog {catalog}')
coord_ra = np.median([c.ra.degree for c in coords])
coord_dec = np.median([c.dec.degree for c in coords])
catalog, cat_ID, cat_ra, cat_dec, cat_mag, cat_err = utilities.find_catalog(catalog, filt, coord_ra, coord_dec)
med_coord = SkyCoord(coord_ra, coord_dec, unit='deg')
seps = med_coord.separation(coords)
max_sep = np.max(seps.to(u.deg).value)
cols = [cat_ra, cat_dec, cat_mag, cat_err]
# Add Kron mag if the catalog is PS1
if cat_ID=='II/349':
cols.append(f'{filt}Kmag')
if log:
log.info(f'Getting {catalog} catalog with ID {cat_ID} in filt {filt}')
log.info(f'Querying around {coord_ra}, {coord_dec} deg')
Vizier.clear_cache()
width = np.max([2.0 * max_sep, 0.5])
cat = None
last_error = None
for server in VIZIER_MIRRORS:
try:
vizier = Vizier(columns=cols, vizier_server=server)
vizier.ROW_LIMIT = -1
cat = vizier.query_region(med_coord, width=width*u.degree,
catalog=cat_ID)
if cat is not None and len(cat) > 0:
if log:
log.info(f'Vizier query succeeded via {server}')
break
cat = None
except Exception as e:
last_error = e
if log:
log.warning(f'Vizier mirror {server} failed: {e}')
else:
print(f'Vizier mirror {server} failed: {e}')
cat = None
if cat is None and last_error is not None and log:
log.warning('All Vizier mirrors failed; last error: {}'.format(last_error))
if cat is not None and len(cat)>0:
cat = cat[0]
cat = cat[~np.isnan(cat[cat_mag])]
cat = cat[cat[cat_err]>0.]
if cat_ID=='II/349':
if log:
log.info('Cutting on Kron magnitudes')
else:
print('Cutting on Kron magnitudes')
nsources = len(cat)
cat_kron = f'{filt}Kmag'
mask = cat[cat_mag]-cat[cat_kron] < 0.1
cat = cat[mask]
nkron = len(cat)
if log:
log.info(f'Cut catalog from {nsources} to {nkron}')
else:
print(f'Cut catalog from {nsources} to {nkron}')
cat.rename_column(cat_ra, 'ra')
cat.rename_column(cat_dec, 'dec')
cat.rename_column(cat_mag, 'mag')
cat.rename_column(cat_err, 'mag_err')
# Convert to AB magnitudes
if catalog=='2MASS':
if filt=='J': cat['mag'] = cat['mag'] + (4.56-3.65)
if filt=='H': cat['mag'] = cat['mag'] + (4.71-3.32)
if filt=='K': cat['mag'] = cat['mag'] + (5.14-3.29)
if filt=='Ks': cat['mag'] = cat['mag'] + (5.14-3.29)
if catalog=='2MASS' and filt=='Y':
cat = cat[~np.isnan(cat['Kmag'])]
cat['mag'], cat['mag_err'] = self.Y_band(cat['mag'],
cat['mag_err'], cat['Kmag'], cat['e_Kmag'])
return(cat, catalog, cat_ID)
else:
m='ERROR: cat {0}, ra {1}, dec {2} did not return a catalog'
m=m.format(catalog, coord_ra, coord_dec)
if log:
log.error(m)
else:
print(m)
return(None, None, None)
[docs]
def find_zeropoint(self, cmpfile, tel, match_radius=2.5*u.arcsec,
phottable='APPPHOT', input_catalog=None, log=None):
"""Compute zeropoint from cmpfile photometry and catalog; write to FITS header.
Matches sources to catalog (e.g. PS1), runs iterative ODR fit, and
updates ZPTMAG, ZPTNSTAR, ZPTCAT, etc. in the stack FITS.
Parameters
----------
cmpfile : str
Path to stacked/comparison FITS with SCI and phottable extensions.
tel : Instrument
Instrument instance (for get_catalog).
match_radius : astropy.units.Quantity, optional
Matching radius for catalog. Default is 2.5 arcsec.
phottable : str, optional
FITS extension with photometry table. Default is 'APPPHOT'.
input_catalog : astropy.table.Table, optional
Pre-loaded catalog; if None, catalog is queried via get_catalog.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
None
cmpfile is updated in place.
"""
if log:
log.info(f'Importing catalog from file: {cmpfile}')
else:
print(f'Importing catalog from file: {cmpfile}')
hdu = fits.open(cmpfile)
header = hdu['SCI'].header
filtorig = header['FILTER']
catalog = tel.get_catalog(header)
table = Table(hdu[phottable].data, meta=hdu[phottable].header)
coords = SkyCoord(table['RA'], table['Dec'], unit='deg')
# New metadata to update
metadata = {}
cat = None ; cat_ID = None
filt = self.convert_filter_name(filtorig)
if input_catalog is not None:
cat = input_catalog
else:
if log:
log.info(f'Downloading {catalog} catalog in {filt}')
else:
print(f'Downloading {catalog} catalog in {filt}')
cat, catalog, cat_ID = self.get_catalog(coords, catalog, filt, log=log)
if cat:
min_mag = self.get_minmag(filt)
cat = cat[cat['mag']>min_mag]
coords_cat = SkyCoord(cat['ra'], cat['dec'], unit='deg')
idx, d2, d3 = coords_cat.match_to_catalog_sky(coords)
# Get matches from calibration catalog and cmpfile
mask = d2 < match_radius
cat = cat[mask]
idx = idx[mask]
if len(cat)==0:
if log:
log.info('No star matches within match radius.')
else:
print('No star matches within match radius.')
return(None, None)
match_table = None
for i in idx:
if not match_table:
match_table = Table(table[i])
else:
match_table.add_row(table[i])
# Sort by flux
flux_idx = np.argsort(match_table['flux'])
match_table = match_table[flux_idx]
cat = cat[flux_idx]
# Do basic cuts on flux, fluxerr, catalog magnitude, cat magerr
flux = match_table['flux'].data.astype('float32')
fluxerr = match_table['flux_err'].data.astype('float32')
cat_mag = cat['mag'].data.astype('float32')
cat_magerr = cat['mag_err'].data.astype('float32')
if len(flux)==0:
if log:
log.info('No suitable stars to calculate zeropoint .')
return(None, None)
zpt, zpterr, master_mask = self.zpt_iteration(flux, fluxerr,
cat_mag, cat_magerr, log=log)
# Set header variables
metadata['ZPTNSTAR']=len(flux)
metadata['ZPTMAG']=zpt
metadata['ZPTMUCER']=zpterr
metadata['ZPTCAT']=catalog
metadata['ZPTCATID']=cat_ID
metadata['ZPTPHOT']=phottable
metadata['FILTER']=filtorig
# Add limiting magnitudes
if 'FWHM' in header.keys() and 'SKYSIG' in header.keys():
fwhm = header['FWHM']
sky = header['SKYSIG']
Npix_per_FWHM_Area = 2.5 * 2.5 * fwhm * fwhm
skysig_per_FWHM_Area = np.sqrt(Npix_per_FWHM_Area * (sky*sky))
m3sigma = -2.5*np.log10(3.0*skysig_per_FWHM_Area)+zpt
m5sigma = -2.5*np.log10(5.0*skysig_per_FWHM_Area)+zpt
m10sigma = -2.5*np.log10(10.0*skysig_per_FWHM_Area)+zpt
m3sigma = float('%.6f'%m3sigma)
m5sigma = float('%.6f'%m5sigma)
m10sigma = float('%.6f'%m10sigma)
metadata['M3SIGMA']=m3sigma
metadata['M5SIGMA']=m5sigma
metadata['M10SIGMA']=m10sigma
if log:
log.info(f'3-sigma limiting mag of image is {m3sigma}')
else:
print(f'3-sigma limiting mag of image is {m3sigma}')
hdu['PRIMARY'].header.update(metadata)
hdu['SCI'].header.update(metadata)
hdu[phottable].header.update(metadata)
hdu.writeto(cmpfile, overwrite=True)
elif log:
log.info('No zeropoint calculated.')
[docs]
def Y_band(self, J, J_err, K, K_err):
"""Compute Y-band mag and error from J and K (2MASS relation).
Parameters
----------
J, J_err : array-like or float
J magnitude(s) and error(s).
K, K_err : array-like or float
K magnitude(s) and error(s).
Returns
-------
tuple
(Y_mag, Y_err).
"""
Y = J+0.46*(J-K)
JK_err = np.sqrt(J_err**2+K_err**2)
JKY_err = 0.46*(J-K)*np.sqrt((0.02/0.46)**2+(JK_err/(J-K))**2)
Y_err = np.sqrt(J_err**2+JKY_err**2)
return Y, Y_err
[docs]
def convert_filter_name(self, filt):
"""Map instrument filter names to catalog filter names (e.g. PS1).
Parameters
----------
filt : str
Instrument filter keyword (e.g. 'gG0301', 'RG850').
Returns
-------
str
Catalog filter name ('u', 'g', 'r', 'i', 'z', 'J', etc.).
"""
if filt=='uG0308' or filt=='uG0332' or filt=='U':
return 'u'
if filt=='gG0301' or filt=='gG0325' or filt=='G' or filt=='V' or filt=='B':
return 'g'
if filt=='rG0303' or filt=='rG0326' or filt=='R' or filt=='Rs':
return 'r'
if filt=='iG0302' or filt=='iG0327' or filt=='I':
return 'i'
if filt=='zG0304' or filt=='zG0328' or filt=='Z':
return 'z'
if filt=='RG850':
return 'z'
if filt=='Y':
return 'J'
# K, Ks, Kspec all use 2MASS K-band for calibration
if filt in ('K', 'Ks', 'Kspec'):
return 'K'
else:
return filt
[docs]
def get_minmag(self, filt):
"""Return minimum catalog magnitude to use for zeropoint (bright limit).
Parameters
----------
filt : str
Filter name.
Returns
-------
float
Minimum magnitude (brighter limit).
"""
if filt=='J':
return 15.5
if filt=='K':
return 13.0
if filt=='Y':
return 15.0
else:
return 16.0
[docs]
def find_zeropoint(stack, tel, log=None):
"""Compute and write zeropoint to stack FITS using instrument catalog (e.g. PS1).
Parameters
----------
stack : str
Path to stacked science FITS file (SCI + APPPHOT extensions).
tel : Instrument
Instrument instance (for catalog and filter).
log : ColoredLogger, optional
Logger for progress and errors.
Returns
-------
None
Stack FITS is updated in place with ZPTMAG, ZPTNSTAR, etc.
"""
cal = absphot()
cal.find_zeropoint(stack, tel, log=log)