"""Aperture and PSF photometry for pipeline stacks.
Uses Source Extractor for initial catalogs, photutils for ePSF building
and PSF fitting. Writes APPPHOT, PSFPHOT, PSFSTARS, PSF, and RESIDUAL
extensions to the stack FITS. Authors: Kerry Paterson, Charlie Kilpatrick.
"""
from potpyri._version import __version__
from photutils.aperture import ApertureStats
from photutils.aperture import CircularAperture
from photutils.detection import DAOStarFinder
from photutils.psf import EPSFBuilder
from photutils.psf import extract_stars
from photutils.psf import PSFPhotometry
from astropy.io import fits
from astropy.io import ascii
from astropy.stats import sigma_clipped_stats
from astropy.stats import SigmaClip
from astropy.nddata import NDData
from astropy.table import Table
from astropy.table import Column
from astropy.table import hstack
from astropy.visualization import simple_norm
from astropy.wcs import WCS
from astropy.modeling import fitting
from astropy.modeling import functional_models
import matplotlib.pyplot as plt
from scipy.stats import sigmaclip
import numpy as np
import copy
import os
import warnings
warnings.filterwarnings('ignore')
[docs]
def create_conv(outfile):
"""Write a 3x3 CONV NORM filter file for Source Extractor.
Parameters
----------
outfile : str
Path to write the convolution kernel file.
Returns
-------
None
"""
with open(outfile, 'w') as f:
f.write('CONV NORM \n')
f.write('1 2 1 \n')
f.write('2 4 2 \n')
f.write('1 2 1 \n')
[docs]
def create_params(outfile):
"""Write Source Extractor parameter file (NUMBER, X_IMAGE, FWHM_IMAGE, etc.).
Parameters
----------
outfile : str
Path to write the parameter file.
Returns
-------
None
"""
with open(outfile, 'w') as f:
f.write('NUMBER \n')
f.write('X_IMAGE \n')
f.write('Y_IMAGE \n')
f.write('ALPHA_J2000 \n')
f.write('DELTA_J2000 \n')
f.write('MAG_AUTO \n')
f.write('MAGERR_AUTO \n')
f.write('FLUX_AUTO \n')
f.write('FLUXERR_AUTO \n')
f.write('FWHM_IMAGE \n')
[docs]
def generate_epsf(img_file, x, y, size=11, oversampling=2, maxiters=11,
log=None):
"""Build an ePSF from cutouts at (x, y) positions in the image.
Parameters
----------
img_file : str
Path to FITS with SCI extension.
x, y : array-like
Star x,y positions (pixels).
size : int, optional
Cutout size in pixels. Default is 11.
oversampling : int, optional
EPSFBuilder oversampling. Default is 2.
maxiters : int, optional
EPSFBuilder max iterations. Default is 11.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
photutils.psf.EPSFModel
Fitted ePSF model.
"""
# Construct stars table from bright
stars_tbl = Table()
stars_tbl['x'] = x
stars_tbl['y'] = y
with fits.open(img_file) as img_hdu:
ndimage = NDData(data=img_hdu['SCI'].data)
stars = extract_stars(ndimage, stars_tbl, size=size)
if log:
log.info(f'Extracted {len(stars)} stars. Building EPSF...')
else:
print(f'Extracted {len(stars)} stars. Building EPSF...')
epsf_builder = EPSFBuilder(oversampling=oversampling,
maxiters=maxiters, progress_bar=True, smoothing_kernel='quadratic',
sigma_clip=SigmaClip(sigma=5, sigma_lower=5, sigma_upper=5,
maxiters=20, cenfunc='median', stdfunc='std', grow=False))
epsf, fitted_stars = epsf_builder(stars)
return(epsf)
[docs]
def run_photometry(img_file, epsf, fwhm, threshold, shape, stars):
"""Run PSF photometry and aperture photometry; append result tables to FITS.
Parameters
----------
img_file : str
Path to FITS with SCI, MASK, ERROR extensions.
epsf : photutils.psf.EPSFModel
ePSF model for PSF fit.
fwhm : float
FWHM in pixels (for aperture radius).
threshold : float
Detection threshold (unused in this wrapper; for compatibility).
shape : int
Fit shape (pixels) for PSFPhotometry.
stars : astropy.table.Table
Star table with xcentroid, ycentroid, flux_best for initial guesses.
Returns
-------
tuple
(result_table, residual_image). result_table has flux_fit, x_fit, y_fit, etc.
"""
with fits.open(img_file) as img_hdu:
image = img_hdu['SCI'].data
ndimage = NDData(data=img_hdu['SCI'].data)
mask = img_hdu['MASK'].data.astype(bool)
error = img_hdu['ERROR'].data
psf = copy.copy(epsf)
# Generate initial guesses for star centroid and flux from aperture table
stars_tbl = Table()
stars_tbl['x_0'] = stars['xcentroid']
stars_tbl['y_0'] = stars['ycentroid']
stars_tbl['flux_0'] = stars['flux_best']
stars_tbl['local_bkg'] = np.array([0.]*len(stars))
photometry = PSFPhotometry(psf_model=psf, fit_shape=(shape,shape),
aperture_radius=int(shape*1.5), progress_bar=True)
result_tab = photometry(image, mask=mask, error=error,
init_params=stars_tbl)
# Also generate a residual image for quality control
residual_image = photometry.make_residual_image(image,
psf_shape=(shape, shape), include_localbkg=True)
# Mask results table for sources with bad flux error, fit flux, or centroid
mask = ~np.isnan(result_tab['flux_err'])
result_tab = result_tab[mask]
mask = result_tab['flux_fit'] > 0.
result_tab = result_tab[mask]
mask = ~np.isnan(result_tab['x_err'])
result_tab = result_tab[mask]
mask = ~np.isnan(result_tab['y_err'])
result_tab = result_tab[mask]
return(result_tab, residual_image)
# Identifies stars and gets aperture photometry and statistics using
# photutils.aperture.ApertureStats
[docs]
def get_star_catalog(img_data, img_mask, img_error, fwhm_init=5.0,
threshold=50.0, log=None):
"""Detect stars with DAOStarFinder and compute aperture stats; return star table.
Parameters
----------
img_data : np.ndarray
Science image data.
img_mask : np.ndarray
Boolean mask (True = masked).
img_error : np.ndarray
Per-pixel error array.
fwhm_init : float, optional
DAOStarFinder FWHM and aperture scale. Default is 5.0.
threshold : float, optional
DAOStarFinder detection threshold. Default is 50.0.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
astropy.table.Table
Star table with centroid, flux, fwhm, flux_best, etc.
"""
# Construct the finder with input FWHM and threshold
daofind = DAOStarFinder(fwhm=fwhm_init, threshold=threshold,
exclude_border=True, min_separation=fwhm_init)
# Get initial set of stars in image with iraffind
if log:
log.info('Finding stars...')
else:
print('Finding stars...')
# Do the finding...
stars = daofind(img_data, mask=img_mask)
# Ignore stars whose peak is below the background-subtracted level
mask = stars['peak'] > 0.
stars = stars[mask]
stars.sort('flux')
# Extract the aperture stats from each star and append to the output catalog
stats = extract_aperture_stats(img_data, img_mask, img_error, stars,
aperture_radius=2.5*fwhm_init, log=log)
stars = hstack([stars, stats])
return(stars)
[docs]
def do_phot(img_file,
fwhm_scale_psf=4.5, oversampling=1,
star_param={'snthresh_psf': 20.0, 'fwhm_init': 8.0, 'snthresh_final': 10.0},
save_psf_img=False,
save_residual_hdu=False,
log=None):
"""Run full PSF and aperture photometry on a stack; write extensions to FITS.
Builds ePSF, finds stars, runs PSF and aperture photometry, and appends
APPPHOT, PSFPHOT, EPSF, and optional residual extensions to the FITS file.
Parameters
----------
img_file : str
Path to stack FITS (SCI, MASK, ERROR).
fwhm_scale_psf : float, optional
Scale factor for PSF fit shape. Default is 4.5.
oversampling : int, optional
ePSF oversampling. Default is 1.
star_param : dict, optional
Keys: snthresh_psf, fwhm_init, snthresh_final. Defaults as shown.
save_psf_img : bool, optional
If True, save ePSF image extension. Default is False.
save_residual_hdu : bool, optional
If True, save residual image extension. Default is False.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
None
img_file is updated in place with new extensions.
"""
img_hdu = fits.open(img_file)
data = img_hdu['SCI'].data
mask = img_hdu['MASK'].data.astype(bool)
error = img_hdu['ERROR'].data
# Get sky statistics
mean, median, std_sky = sigma_clipped_stats(data[~mask], sigma=5.0,
maxiters=21, grow=1)
# Threshold for constructing star catalog
threshold = std_sky*star_param['snthresh_final']/2.0
stars = get_star_catalog(data, mask, error,
fwhm_init=star_param['fwhm_init'], threshold=threshold, log=log)
metadata={'SKYADU':median, 'SKYSIG': std_sky}
if log:
log.info(f'Found {len(stars)} stars')
else:
print(f'Found {len(stars)} stars')
med_sharp = np.median(stars['sharpness'])
med_round = np.median(stars['roundness1'])
std_sharp = np.std(stars['sharpness'])
std_round = np.std(stars['roundness1'])
mask = ((stars['sharpness'] < med_sharp + std_sharp) &\
(stars['roundness1'] < med_round + 3*std_round) &\
(stars['roundness1'] > med_round - 3*std_round))
fwhm_stars = stars[mask]
mask = ~np.isnan(fwhm_stars['fwhm'])
fwhm_stars = fwhm_stars[mask]
# Clip fwhm_stars by fwhm
fwhm_clipped, _, _ = sigmaclip(fwhm_stars['fwhm'])
fwhm = np.median(fwhm_clipped)
std_fwhm = np.std(fwhm_clipped)
mask = (fwhm_stars['fwhm'] > fwhm-3*std_fwhm) &\
(fwhm_stars['fwhm'] < fwhm+3*std_fwhm)
fwhm_stars = fwhm_stars[mask]
fwhm = np.median(fwhm_stars['fwhm'])
fwhm = float('%.4f'%fwhm)
std_fwhm = float('%.4f'%std_fwhm)
if log:
log.info(f'Masked to {len(fwhm_stars)} stars based on sharpness, roundness, FWHM')
else:
print(f'Masked to {len(fwhm_stars)} stars based on sharpness, roundness, FWHM')
mask = (fwhm_stars['signal_to_noise'] > star_param['snthresh_psf'])
bright = fwhm_stars[mask]
if len(bright) < 40:
if log:
log.info('Bright stars for PSF are <90, lowering S/N thresh to 5.')
else:
print('Bright stars for PSF are <90, lowering S/N tresh to 5.')
mask = (fwhm_stars['signal_to_noise'] > 5)
bright = fwhm_stars[mask]
if log:
log.info(f'Masked to {len(bright)} PSF stars based on flux.')
else:
print(f'Masked to {len(bright)} PSF stars based on flux.')
metadata['NPSFSTAR']=len(bright)
# Instantiate EPSF
size=int(fwhm*fwhm_scale_psf)
if size%2==0: size=size+1
if log:
log.info(f'EPSF size will be {size} pixels')
else:
print(f'EPSF size will be {size} pixels')
epsf = generate_epsf(img_file, bright['xcentroid'], bright['ycentroid'],
size=size, oversampling=oversampling, maxiters=11, log=log)
fwhm = extract_fwhm_from_epsf(epsf, fwhm*oversampling)
# Scale by oversampling
fwhm = fwhm/oversampling
fwhm = float('%.4f'%fwhm)
if log:
log.info(f'FWHM={fwhm} pixels')
else:
print(f'FWHM={fwhm} pixels')
metadata['FWHM']=fwhm
# Also update img_hdu['PRIMARY'] with FWHM
img_hdu['PRIMARY'].header['FWHM']=(fwhm,
'Full-width at half-maximum [pixels]')
mask = (stars['signal_to_noise'] > star_param['snthresh_final'])
final_stars = stars[mask]
photometry, residual_image = run_photometry(img_file, epsf, fwhm,
star_param['snthresh_final'], size, final_stars)
# Format final stars table and add as APPPHOT to img_hdu
w = WCS(img_hdu['SCI'].header)
coords = w.pixel_to_world(final_stars['Xpos'], final_stars['Ypos'])
final_stars['flux'] = final_stars['flux_best']
final_stars['flux_err'] = final_stars['flux_best_err']
final_stars['SN'] = final_stars['flux']/final_stars['flux_best_err']
final_stars['mag'] = -2.5*np.log10(final_stars['flux'])
final_stars['mag_err'] = 2.5/np.log(10) * 1./final_stars['SN']
final_stars['RA'] = [c.ra.degree for c in coords]
final_stars['Dec'] = [c.dec.degree for c in coords]
final_stars['sky'] = np.array([0.]*len(final_stars))
final_stars['FWHM'] = final_stars['fwhm']
# Sort from brightest to faintest
final_stars.sort('flux', reverse=True)
# Get final list of column names we want in catalog in order and the number
# of significant figures they should all have
colnames=['Xpos','Ypos','Xpos_err','Ypos_err','mag','mag_err','flux',
'flux_err','SN','FWHM','sky','RA','Dec']
sigfig=[4,4,4,4,4,4,4,4,4,4,4,7,7]
final_stars = final_stars[*colnames]
for col,sig in zip(colnames, sigfig):
final_stars[col] = np.array([float(f'%.{sig}f'%val)
for val in final_stars[col].data])
# Create a new HDU for the photometry table
newhdu = fits.BinTableHDU(final_stars)
newhdu.header.update(metadata)
newhdu.name='APPPHOT'
if newhdu.name in [h.name for h in img_hdu]:
img_hdu[newhdu.name] = newhdu
else:
img_hdu.append(newhdu)
# Record final number of objects to write out
metadata['NOBJECT']=len(photometry)
if log:
log.info(f'Final catalog is {len(photometry)} stars')
else:
print(f'Final catalog is {len(photometry)} stars')
# Get RA/Dec from final positions
w = WCS(img_hdu['SCI'].header)
coords = w.pixel_to_world(photometry['x_fit'], photometry['y_fit'])
# Join the photometry and all star catalogs and rename columns
flux_err = np.asarray(photometry['flux_err'], dtype=float)
sn = np.full_like(flux_err, np.nan)
np.divide(photometry['flux_fit'], flux_err, out=sn, where=flux_err > 0)
photometry['SN'] = sn
photometry['mag'] = -2.5*np.log10(photometry['flux_fit'])
photometry['mag_err'] = 2.5/np.log(10) * 1./photometry['SN']
photometry['RA'] = [c.ra.degree for c in coords]
photometry['Dec'] = [c.dec.degree for c in coords]
photometry.add_column(Column([fwhm]*len(photometry), name='FWHM'))
photometry.rename_column('x_fit', 'Xpos')
photometry.rename_column('y_fit', 'Ypos')
photometry.rename_column('x_err', 'Xpos_err')
photometry.rename_column('y_err', 'Ypos_err')
photometry.rename_column('flux_fit','flux')
photometry.rename_column('local_bkg','sky')
# Sort from brightest to faintest
photometry.sort('flux', reverse=True)
# Get final list of column names we want in catalog in order and the number
# of significant figures they should all have
colnames=['Xpos','Ypos','Xpos_err','Ypos_err','mag','mag_err','flux',
'flux_err','SN','FWHM','sky','RA','Dec']
sigfig=[4,4,4,4,4,4,4,4,4,4,4,7,7]
photometry = photometry[*colnames]
for col,sig in zip(colnames, sigfig):
photometry[col] = np.array([float(f'%.{sig}f'%val)
for val in photometry[col].data])
# Update primary header with metadata
img_hdu['PRIMARY'].header.update(metadata)
# Create a new HDU for the photometry table
newhdu = fits.BinTableHDU(photometry)
newhdu.header.update(metadata)
newhdu.name='PSFPHOT'
if newhdu.name in [h.name for h in img_hdu]:
img_hdu[newhdu.name] = newhdu
else:
img_hdu.append(newhdu)
# Add PSF stars to img_hdu
bright.sort('flux')
newhdu = fits.BinTableHDU(bright)
newhdu.name='PSFSTARS'
if newhdu.name in [h.name for h in img_hdu]:
img_hdu[newhdu.name] = newhdu
else:
img_hdu.append(newhdu)
# Write out a EPSF quality image and add raw data to img_hdu
# Useful for validating the photometry methods
if save_psf_img:
norm = simple_norm(epsf.data, 'log', percent=99.)
plt.imshow(epsf.data, norm=norm, origin='lower', cmap='viridis')
plt.colorbar()
outname = img_file.replace('.fits','.epsf.png')
plt.savefig(outname)
plt.clf()
# Add the residual image to img_hdu
# Can be used to validate the quality of PSF fitting to point sources
if save_residual_hdu:
newhdu = fits.ImageHDU(residual_image)
newhdu.name = 'RESIDUAL'
if newhdu.name in [h.name for h in img_hdu]:
img_hdu[newhdu.name] = newhdu
else:
img_hdu.append(newhdu)
# Always add the PSF to the image as an extension
newhdu = fits.ImageHDU(epsf.data)
newhdu.name='PSF'
if newhdu.name in [h.name for h in img_hdu]:
img_hdu[newhdu.name] = newhdu
else:
img_hdu.append(newhdu)
# Finally, write out img_hdu to save all data
img_hdu.writeto(img_file, overwrite=True)
img_hdu.close()
[docs]
def photloop(stack, phot_sn_min=3.0, phot_sn_max=40.0, fwhm_init=5.0, log=None):
"""Try PSF photometry with decreasing S/N threshold until do_phot succeeds.
Parameters
----------
stack : str
Path to stack FITS (SCI, MASK, ERROR).
phot_sn_min : float, optional
Minimum S/N threshold to try. Default is 3.0.
phot_sn_max : float, optional
Initial S/N threshold. Default is 40.0.
fwhm_init : float, optional
Initial FWHM for star finding. Default is 5.0.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
None
Stack FITS is updated in place when do_phot succeeds.
"""
signal_to_noise = phot_sn_max
epsf = None ; fwhm = None
while signal_to_noise > phot_sn_min:
if log: log.info(f'Trying PSF generation with S/N={signal_to_noise}')
star_param = {'snthresh_psf': signal_to_noise*2.0,
'fwhm_init': fwhm_init,
'snthresh_final': signal_to_noise}
try:
do_phot(stack, star_param=star_param)
except Exception as e:
log.error(e)
signal_to_noise = signal_to_noise / 2.0
continue
break