Source code for potpyri.primitives.absphot

"""Absolute photometry zeropoint calibration using catalog magnitudes.

Queries Vizier (e.g. PS1) via :mod:`potpyri.utils.catalogs`, matches sources,
and fits zeropoint via iterative ODR. Writes ZPTMAG and related keywords to
the stack header.
Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__

import numpy as np

from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.table import Table
from astropy.io import fits

# Internal dependencies
from potpyri.utils import catalogs


def _fits_extension_names(hdulist):
    """Return EXTNAME values for an HDUList (empty string if unset)."""
    return [h.name if h.name else '' for h in hdulist]


def _log_or_print(msg, log, level='info'):
    """Emit *msg* via *log* at *level*, or print if *log* is None."""
    if log is None:
        print(msg, flush=True)
        return
    getattr(log, level)(msg)


[docs] class absphot(object): """Zeropoint fitter using catalog magnitudes and iterative sigma clipping.""" def __init__(self, iterations=5, sigma=5): """Initialize zeropoint fitter. Parameters ---------- iterations : int, optional Number of sigma-clip iterations. Default is 5. sigma : float, optional Sigma threshold for clipping. Default is 5. """ self.iterations = iterations self.sigma = sigma self.odr_iter = 2
[docs] def get_zeropoint(self, flux, fluxerr, mag, magerr): """Fit zeropoint (and error) from flux/mag arrays using ODR. Parameters ---------- flux : array-like Object fluxes (e.g. from aperture photometry). fluxerr : array-like Flux errors. mag : array-like Catalog magnitudes. magerr : array-like Catalog magnitude errors. Returns ------- tuple of float (zeropoint, zeropoint_error). """ import warnings with warnings.catch_warnings(): warnings.filterwarnings('ignore', message=r'.*scipy\.odr.*', category=DeprecationWarning) from scipy.odr import ODR from scipy.odr import Model from scipy.odr import RealData def magnitude(zpt, flux): return(zpt - 2.5*np.log10(flux)) zpt_guess = np.nanmedian(mag + 2.5*np.log10(flux)) data = RealData(np.array(flux, dtype=float), np.array(mag, dtype=float), sx=np.array(fluxerr, dtype=float), sy=np.array(magerr, dtype=float)) model = Model(magnitude) odr = ODR(data, model, beta0=[zpt_guess], maxit=self.odr_iter) output = odr.run() zpt = output.beta[0] zpterr = output.sd_beta[0] return(zpt, zpterr)
[docs] def zpt_iteration(self, flux, fluxerr, mag, magerr, log=None): """Iteratively sigma-clip and fit zeropoint via ODR. Parameters ---------- flux : array-like Object fluxes. fluxerr : array-like Flux errors. mag : array-like Catalog magnitudes. magerr : array-like Catalog magnitude errors. log : ColoredLogger, optional Logger for progress. Returns ------- tuple (zeropoint, zeropoint_error, master_mask). master_mask is boolean array of sources used in final fit. """ flux = np.array(flux) fluxerr = np.array(fluxerr) mag = np.array(mag) magerr = np.array(magerr) nobj_orig = len(mag) idx = np.arange(nobj_orig) for i in np.arange(self.iterations): nobj = len(flux) zpt, zpterr = self.get_zeropoint(flux, fluxerr, mag, magerr) mag_deriv = -2.5*np.log10(flux)+zpt magerr_deriv = np.array(2.5/np.log(10) * fluxerr/flux) total_err = np.sqrt(magerr**2+magerr_deriv**2+zpterr**2) mask = np.abs(mag - mag_deriv) < self.sigma * total_err idx = idx[mask] flux=flux[mask] ; fluxerr=fluxerr[mask] mag=mag[mask] ; magerr=magerr[mask] zpt = float('%.6f'%zpt) zpterr = float('%.6f'%zpterr) message = f'Iteration {i+1}/{self.iterations}: {nobj} obj, ' message += f'zpt={zpt}+/-{zpterr}, ' message += f'{self.sigma}-sigma clip to {len(flux)} obj' if log: log.info(message) else: print(message) if log: log.info(f'{len(flux)} stars used to calculate final zeropoint.') else: print(f'{len(flux)} stars used to calculate final zeropoint.') master_mask = np.array([i in idx for i in np.arange(nobj_orig)]) return(zpt, zpterr, master_mask)
[docs] def get_catalog(self, coords, catalog, filt, log=None): """Query VizieR for catalog magnitudes in a filter around given coordinates. Parameters ---------- coords : astropy.coordinates.SkyCoord Target coordinates (used for region size). catalog : str Catalog name (e.g. 'PS1', '2MASS'). filt : str Filter name (e.g. 'g', 'r', 'J'). log : ColoredLogger, optional Logger for progress. Returns ------- tuple or (None, None, None) (astropy.table.Table, catalog_name, catalog_ID) or (None, None, None) if query fails. """ if log: log.info(f'Searching for catalog {catalog}') coord_ra = np.median([c.ra.degree for c in coords]) coord_dec = np.median([c.dec.degree for c in coords]) catalog, cat_ID, cat_ra, cat_dec, cat_mag, cat_err = catalogs.find_catalog( catalog, filt, coord_ra, coord_dec) med_coord = SkyCoord(coord_ra, coord_dec, unit='deg') seps = med_coord.separation(coords) max_sep = np.max(seps.to(u.deg).value) cols = [cat_ra, cat_dec, cat_mag, cat_err] # Add Kron mag if the catalog is PS1 if cat_ID=='II/349': cols.append(f'{filt}Kmag') if log: log.info(f'Getting {catalog} catalog with ID {cat_ID} in filt {filt}') log.info(f'Querying around {coord_ra}, {coord_dec} deg') width = np.max([2.0 * max_sep, 0.5]) cat = catalogs.query_vizier_region( med_coord, width * u.degree, cat_ID, cols, log=log) if cat is not None: cat = cat[~np.isnan(cat[cat_mag])] cat = cat[cat[cat_err]>0.] if cat_ID=='II/349': if log: log.info('Cutting on Kron magnitudes') else: print('Cutting on Kron magnitudes') nsources = len(cat) cat_kron = f'{filt}Kmag' mask = cat[cat_mag]-cat[cat_kron] < 0.1 cat = cat[mask] nkron = len(cat) if log: log.info(f'Cut catalog from {nsources} to {nkron}') else: print(f'Cut catalog from {nsources} to {nkron}') cat.rename_column(cat_ra, 'ra') cat.rename_column(cat_dec, 'dec') cat.rename_column(cat_mag, 'mag') cat.rename_column(cat_err, 'mag_err') # Convert to AB magnitudes if catalog=='2MASS': if filt=='J': cat['mag'] = cat['mag'] + (4.56-3.65) if filt=='H': cat['mag'] = cat['mag'] + (4.71-3.32) if filt=='K': cat['mag'] = cat['mag'] + (5.14-3.29) if filt=='Ks': cat['mag'] = cat['mag'] + (5.14-3.29) if catalog=='2MASS' and filt=='Y': cat = cat[~np.isnan(cat['Kmag'])] cat['mag'], cat['mag_err'] = self.Y_band(cat['mag'], cat['mag_err'], cat['Kmag'], cat['e_Kmag']) return(cat, catalog, cat_ID) else: m='ERROR: cat {0}, ra {1}, dec {2} did not return a catalog' m=m.format(catalog, coord_ra, coord_dec) if log: log.error(m) else: print(m) return(None, None, None)
[docs] def find_zeropoint(self, cmpfile, tel, match_radius=2.5*u.arcsec, phottable='APPPHOT', input_catalog=None, log=None): """Compute zeropoint from cmpfile photometry and catalog; write to FITS header. Matches sources to catalog (e.g. PS1), runs iterative ODR fit, and updates ZPTMAG, ZPTNSTAR, ZPTCAT, etc. in the stack FITS. Parameters ---------- cmpfile : str Path to stacked/comparison FITS with SCI and phottable extensions. tel : Instrument Instrument instance (for get_catalog). match_radius : astropy.units.Quantity, optional Matching radius for catalog. Default is 2.5 arcsec. phottable : str, optional FITS extension with photometry table. Default is 'APPPHOT'. input_catalog : astropy.table.Table, optional Pre-loaded catalog; if None, catalog is queried via get_catalog. log : ColoredLogger, optional Logger for progress. Returns ------- bool True if ZPT keywords were written; False if photometry or catalog step did not complete (cmpfile may be unchanged). """ _log_or_print(f'Zeropoint: reading stack {cmpfile!r}', log) with fits.open(cmpfile) as hdu: extnames = _fits_extension_names(hdu) if phottable not in hdu: _log_or_print( f'Zeropoint aborted: FITS extension {phottable!r} not found in ' f'{cmpfile!r}. Present extensions (EXTNAME): {extnames!r}. ' 'Run photometry first (photometry.photloop / pipeline photometry ' 'step) so APPPHOT is written; if photometry already ran, check logs ' 'for PhotometryError or earlier tracebacks.', log, level='error', ) return False header = hdu['SCI'].header filtorig = header['FILTER'] catalog = tel.get_catalog(header) table = Table(hdu[phottable].data, meta=hdu[phottable].header) required = ('RA', 'Dec', 'flux', 'flux_err') missing = [c for c in required if c not in table.colnames] if missing: _log_or_print( f'Zeropoint aborted: extension {phottable!r} is missing columns ' f'{missing!r}; have {list(table.colnames)!r}. Photometry output ' 'may be corrupt or from an incompatible version.', log, level='error', ) return False coords = SkyCoord(table['RA'], table['Dec'], unit='deg') # New metadata to update metadata = {} cat = None cat_ID = None filt = self.convert_filter_name(filtorig) if input_catalog is not None: cat = input_catalog else: _log_or_print( f'Zeropoint: querying {catalog} catalog for filter {filt}', log) cat, catalog, cat_ID = self.get_catalog(coords, catalog, filt, log=log) if cat is not None and len(cat) > 0: min_mag = self.get_minmag(filt) cat = cat[cat['mag'] > min_mag] if len(cat) == 0: _log_or_print( f'Zeropoint not written: no catalog sources fainter than ' f'bright limit min_mag={min_mag} for filter {filt!r}.', log, level='warning', ) return False coords_cat = SkyCoord(cat['ra'], cat['dec'], unit='deg') idx, d2, d3 = coords_cat.match_to_catalog_sky(coords) # Get matches from calibration catalog and cmpfile mask = d2 < match_radius cat = cat[mask] idx = idx[mask] if len(cat) == 0: _log_or_print( 'Zeropoint not written: no catalog stars match photometry ' f'within {match_radius} (try a larger match radius or check ' 'WCS/field).', log, level='warning', ) return False match_table = None for i in idx: if not match_table: match_table = Table(table[i]) else: match_table.add_row(table[i]) # Sort by flux flux_idx = np.argsort(match_table['flux']) match_table = match_table[flux_idx] cat = cat[flux_idx] # Do basic cuts on flux, fluxerr, catalog magnitude, cat magerr flux = match_table['flux'].data.astype('float32') fluxerr = match_table['flux_err'].data.astype('float32') cat_mag = cat['mag'].data.astype('float32') cat_magerr = cat['mag_err'].data.astype('float32') if len(flux) == 0: _log_or_print( 'Zeropoint not written: no rows left after flux/magnitude cuts.', log, level='warning', ) return False zpt, zpterr, master_mask = self.zpt_iteration(flux, fluxerr, cat_mag, cat_magerr, log=log) # Set header variables metadata['ZPTNSTAR'] = len(flux) metadata['ZPTMAG'] = zpt metadata['ZPTMUCER'] = zpterr metadata['ZPTCAT'] = catalog metadata['ZPTCATID'] = cat_ID metadata['ZPTPHOT'] = phottable metadata['FILTER'] = filtorig # Add limiting magnitudes if 'FWHM' in header.keys() and 'SKYSIG' in header.keys(): fwhm = header['FWHM'] sky = header['SKYSIG'] Npix_per_FWHM_Area = 2.5 * 2.5 * fwhm * fwhm skysig_per_FWHM_Area = np.sqrt(Npix_per_FWHM_Area * (sky * sky)) m3sigma = -2.5 * np.log10(3.0 * skysig_per_FWHM_Area) + zpt m5sigma = -2.5 * np.log10(5.0 * skysig_per_FWHM_Area) + zpt m10sigma = -2.5 * np.log10(10.0 * skysig_per_FWHM_Area) + zpt m3sigma = float('%.6f' % m3sigma) m5sigma = float('%.6f' % m5sigma) m10sigma = float('%.6f' % m10sigma) metadata['M3SIGMA'] = m3sigma metadata['M5SIGMA'] = m5sigma metadata['M10SIGMA'] = m10sigma _log_or_print(f'3-sigma limiting mag of image is {m3sigma}', log) hdu['PRIMARY'].header.update(metadata) hdu['SCI'].header.update(metadata) hdu[phottable].header.update(metadata) hdu.writeto(cmpfile, overwrite=True) _log_or_print( f'Zeropoint written to {cmpfile!r} (ZPTMAG={zpt:.4f}, N={len(flux)})', log, ) return True if input_catalog is not None: _log_or_print( 'Zeropoint not written: input_catalog is missing or has zero rows.', log, level='warning', ) else: _log_or_print( 'Zeropoint not written: catalog query returned no usable table ' f'(catalog_name={catalog!r}, filter={filt!r}).', log, level='warning', ) return False
[docs] def Y_band(self, J, J_err, K, K_err): """Compute Y-band mag and error from J and K (2MASS relation). Parameters ---------- J, J_err : array-like or float J magnitude(s) and error(s). K, K_err : array-like or float K magnitude(s) and error(s). Returns ------- tuple (Y_mag, Y_err). """ Y = J+0.46*(J-K) JK_err = np.sqrt(J_err**2+K_err**2) JKY_err = 0.46*(J-K)*np.sqrt((0.02/0.46)**2+(JK_err/(J-K))**2) Y_err = np.sqrt(J_err**2+JKY_err**2) return Y, Y_err
[docs] def convert_filter_name(self, filt): """Map instrument filter names to catalog filter names (e.g. PS1). Parameters ---------- filt : str Instrument filter keyword (e.g. 'gG0301', 'RG850'). Returns ------- str Catalog filter name ('u', 'g', 'r', 'i', 'z', 'J', etc.). """ if filt=='uG0308' or filt=='uG0332' or filt=='U': return 'u' if filt=='gG0301' or filt=='gG0325' or filt=='G' or filt=='V' or filt=='B': return 'g' if filt=='rG0303' or filt=='rG0326' or filt=='R' or filt=='Rs': return 'r' if filt=='iG0302' or filt=='iG0327' or filt=='I': return 'i' if filt=='zG0304' or filt=='zG0328' or filt=='Z': return 'z' if filt=='RG850': return 'z' if filt=='Y': return 'J' # K, Ks, Kspec all use 2MASS K-band for calibration if filt in ('K', 'Ks', 'Kspec'): return 'K' else: return filt
[docs] def get_minmag(self, filt): """Return minimum catalog magnitude to use for zeropoint (bright limit). Parameters ---------- filt : str Filter name. Returns ------- float Minimum magnitude (brighter limit). """ if filt=='J': return 15.5 if filt=='K': return 13.0 if filt=='Y': return 15.0 else: return 16.0
[docs] def find_zeropoint(stack, tel, log=None): """Compute and write zeropoint to stack FITS using instrument catalog (e.g. PS1). Parameters ---------- stack : str Path to stacked science FITS file (SCI + APPPHOT extensions). tel : Instrument Instrument instance (for catalog and filter). log : ColoredLogger, optional Logger for progress and errors. Returns ------- bool True if ZPT keywords were written; False otherwise (see log). """ cal = absphot() return cal.find_zeropoint(stack, tel, log=log)