"""Image calibration, masking, error arrays, alignment, and stacking.
Provides WCS handling, alignment to a common grid, cosmic-ray rejection,
satellite masking, master calibration combination, and stacked science
output. Authors: Charlie Kilpatrick.
"""
from potpyri._version import __version__
import os
import time
import numpy as np
import logging
import sys
import copy
# Needed for satellite trail detection - dependency can be removed later if
# we turn off satellite masking or use some other algorithm (e.g., the Rubin one)
import acstools
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits, ascii
from astropy.stats import sigma_clipped_stats
from astropy.wcs import WCS
from ccdproc import wcs_project
from ccdproc import CCDData
from ccdproc import combine
from ccdproc import cosmicray_lacosmic
# Internal dependencies
from potpyri.primitives import photometry
from potpyri.primitives import solve_wcs
from potpyri.utils import utilities
[docs]
def remove_pv_distortion(header):
"""Remove PV* distortion keywords from a FITS header (modifies in place).
Parameters
----------
header : astropy.io.fits.Header
Header to modify.
Returns
-------
astropy.io.fits.Header
The same header reference (modified in place).
"""
done = False
while not done:
bad_key = False
for key in header.keys():
if key.startswith('PV'):
if key in header.keys():
bad_key = True
del header[key]
if not bad_key: done = True
return(header)
[docs]
def get_fieldcenter(images):
"""Compute mean RA/Dec of image centers from a list of FITS files.
Parameters
----------
images : list of str
Paths to FITS files with WCS in primary header.
Returns
-------
list
[mean_ra_deg, mean_dec_deg].
"""
ras = [] ; des = []
for file in images:
with fits.open(file) as hdu:
w = WCS(hdu[0].header)
data_shape = hdu[0].data.shape
center_pix = (data_shape[1]/2., data_shape[0]/2.)
coord = w.pixel_to_world(*center_pix)
ras.append(coord.ra.degree)
des.append(coord.dec.degree)
mean_ra = np.mean(ras)
mean_de = np.mean(des)
return([mean_ra, mean_de])
[docs]
def generate_wcs(tel, binn, fieldcenter, out_size):
"""Build a TAN WCS for a given field center and output size.
Parameters
----------
tel : Instrument
Instrument instance (for pixel scale).
binn : str
Binning string (e.g. '22').
fieldcenter : sequence
[ra_deg, dec_deg] or parseable coordinate.
out_size : int
Output image size (out_size x out_size pixels).
Returns
-------
astropy.wcs.WCS
WCS instance.
"""
w = {'NAXES':2, 'NAXIS1': out_size, 'NAXIS2': out_size,
'EQUINOX': 2000.0, 'LONPOLE': 180.0, 'LATPOLE': 0.0}
pixscale = tel.get_pixscale()
cdelt = pixscale * int(str(binn)[0])/3600.0
w['CDELT1'] = -1.0 * cdelt
w['CDELT2'] = cdelt
w['CTYPE1']='RA---TAN'
w['CTYPE2']='DEC--TAN'
w['CUNIT1']='deg'
w['CUNIT2']='deg'
w['CRPIX1']=float(out_size)/2 + 0.5
w['CRPIX2']=float(out_size)/2 + 0.5
coord = utilities.parse_coord(fieldcenter[0], fieldcenter[1])
w['CRVAL1']=coord.ra.degree
w['CRVAL2']=coord.dec.degree
w = WCS(w)
return(w)
[docs]
def align_images(reduced_files, paths, tel, binn, use_wcs=None, fieldcenter=None,
out_size=None, skip_gaia=False, keep_all_astro=False, log=None):
"""Solve WCS and align reduced images to a common grid.
Runs astrometry.net and optionally Gaia alignment, then reprojects to
a common WCS.
Parameters
----------
reduced_files : list of str
Paths to reduced FITS files.
paths : dict
Paths dict from options.add_paths.
tel : Instrument
Instrument instance.
binn : str
Binning string.
use_wcs : astropy.wcs.WCS, optional
If provided, use this WCS instead of solving.
fieldcenter : sequence, optional
[ra, dec] for generating WCS.
out_size : int, optional
Output image size.
skip_gaia : bool, optional
If True, skip Gaia alignment. Default is False.
keep_all_astro : bool, optional
If True, keep all images regardless of astrometric dispersion.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
tuple or None
(aligned_data, masks, errors) as lists, or None if no images aligned.
"""
solved_images = []
aligned_images = []
aligned_data = []
for file in reduced_files:
# Coarse WCS solution using astrometry.net
success = solve_wcs.solve_astrometry(file, tel, binn, paths,
shift_only=False, log=log)
if not success: continue
# Fine WCS solution using Gaia DR3 point sources
if skip_gaia:
hdu = fits.open(file)
hdu[0].header['RADISP']=0.0
hdu[0].header['DEDISP']=0.0
hdu.writeto(file, overwrite=True)
else:
success = solve_wcs.align_to_gaia(file, tel, log=log)
if not success: continue
solved_images.append(file)
# Reject images from stack where either RADISP or DEDISP is >5-sigma outlier
if not keep_all_astro:
if len(solved_images)>2:
radisp = [] ; dedisp = []
for file in solved_images:
hdu = fits.open(file, mode='readonly')
radisp.append(hdu[0].header['RADISP'])
dedisp.append(hdu[0].header['DEDISP'])
radisp = np.array(radisp) ; dedisp = np.array(dedisp)
ramean, ramedian, rastddev = sigma_clipped_stats(radisp)
demean, demedian, destddev = sigma_clipped_stats(dedisp)
if log: log.info(f'Median dispersion in R.A.={ramedian}')
if log: log.info(f'Median dispersion in Decl.={demedian}')
mask = (radisp-ramedian <= 5 * rastddev) &\
(dedisp-demedian <= 5 * destddev)
if log:
log.info('Rejecting the following images for high astrometric dispersion:')
if np.all(mask):
log.info('No images rejected')
else:
for i,m in enumerate(mask):
if not m: log.info(solved_images[i])
solved_images = np.array(solved_images)[mask]
else:
if log:
log.info('Keeping all images (even poorly aligned).')
solved_images = np.array(solved_images)
if len(solved_images)==0:
return(None, None)
# Determine what the value of use_wcs should be
if use_wcs is None:
if fieldcenter is None:
fieldcenter = get_fieldcenter(solved_images)
use_wcs = generate_wcs(tel, binn, fieldcenter, out_size)
for file in solved_images:
if log: log.info(f'Reprojecting {file}...')
# Create a reprojection of the file in a common astrometric frame
hdu = fits.open(file)
header = remove_pv_distortion(hdu[0].header)
wcs = WCS(hdu[0].header)
if use_wcs is None:
use_wcs = wcs
image = CCDData(hdu[0].data, meta=hdu[0].header, wcs=wcs,
unit=u.electron)
if out_size:
target_shape = (out_size, out_size)
else:
target_shape = None
reprojected = wcs_project(image, target_wcs=use_wcs, order='bilinear',
target_shape=target_shape)
# Get rid of mask in data
reprojected.mask = None
outfile = file.replace('.fits','_reproj.fits')
reprojected.write(file.replace('.fits','_reproj.fits'), overwrite=True)
aligned_images.append(outfile)
aligned_data.append(reprojected)
return(aligned_images, aligned_data)
[docs]
def image_proc(image_data, tel, paths, skip_skysub=False,
fieldcenter=None, out_size=None, satellites=True, cosmic_ray=True,
skip_gaia=False, keep_all_astro=False, relative_calibration=False,
log=None):
"""Full image processing: align, mask, stack, and optionally detrend.
Orchestrates WCS solving, alignment, satellite/cosmic-ray masking,
stacking, and optional sky subtraction. Optionally calibrates frames
to each other using SExtractor catalogs and RA/Dec cross-matching before stacking.
Parameters
----------
image_data : astropy.table.Table
File table subset (e.g. one TargType) with 'File' column.
tel : Instrument
Instrument instance.
paths : dict
Paths dict from options.add_paths.
skip_skysub : bool, optional
If True, skip sky subtraction. Default is False.
fieldcenter : sequence, optional
[ra, dec] for alignment.
out_size : int, optional
Output image size.
satellites : bool, optional
If True, mask satellite trails. Default is True.
cosmic_ray : bool, optional
If True, run cosmic-ray rejection. Default is True.
skip_gaia : bool, optional
If True, skip Gaia alignment. Default is False.
keep_all_astro : bool, optional
If True, keep all images regardless of astrometric dispersion.
relative_calibration : bool, optional
If True, run SExtractor on each aligned frame, cross-match sources in
RA/Dec, and use relative fluxes to set scale factors for the stack
combine. Default is False.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
str or None
Path to stacked output FITS, or None if processing failed.
"""
wavelength = tel.wavelength
red_path = paths['red']
work_path = paths['work']
# All data in the table should be for one target
assert np.all([image_data['Target']==image_data['Target'][0]])
assert np.all([image_data['TargType']==image_data['TargType'][0]])
cal_type = image_data['TargType'][0]
target = image_data['Target'][0]
fil = image_data['Filter'][0]
amp = image_data['Amp'][0]
binn = image_data['Binning'][0]
# Load bias frame
if tel.bias:
if log: log.info('Loading master bias.')
try:
mbias = tel.load_bias(paths, amp, binn)
except Exception as e:
if log:
log.error('No master bias found for this configuration')
log.error(f'Skipping reduction for: {cal_type}')
log.error(e)
return(None)
else:
mbias = None
# Load bias frame
if tel.dark:
if log: log.info('Loading master dark.')
try:
mdark = tel.load_dark(paths, amp, binn)
except Exception as e:
if log:
log.error('No master dark found for this configuration')
log.error(f'Skipping reduction for: {cal_type}')
log.error(e)
return(None)
else:
mdark = None
# Load flat frame
if tel.flat:
if log: log.info('Loading master flat.')
try:
mflat = tel.load_flat(paths, fil, amp, binn)
except Exception as e:
if log:
log.error('No master bias found for this configuration')
log.error(f'Skipping reduction for: {cal_type}')
log.error(e)
return(None)
else:
mflat = None
t1 = time.time()
if log: log.info(f'Processing data for {cal_type}')
# Bias subtraction, gain correction, flat correction, and flat fielding
files = image_data['File']
processed = tel.process_science(files, fil, amp, binn, paths,
mbias=mbias, mflat=mflat, mdark=mdark, skip_skysub=skip_skysub, log=log)
# Get filenames for output processed data
reduced_files = [p.header['FILENAME'] for p in processed]
t2 = time.time()
if log: log.info(f'Data processed in {t2-t1} sec')
if log: log.info('Aligning images.')
# If masking satellite trails, this needs to be done before reprojection so
# that the acstools algorithm accurately models the edges of the detector
if satellites:
mask_satellites(processed, reduced_files, log=log)
# Sort files so deepest exposure is first
exptimes = []
for file in reduced_files:
hdu = fits.open(file)
exptimes.append(tel.get_exptime(hdu[0].header))
idx = np.flip(np.argsort(exptimes))
reduced_files = np.array(reduced_files)[idx]
if out_size is None:
out_size = tel.get_out_size(processed[0].header)
if fieldcenter is not None:
use_wcs = generate_wcs(tel, binn, fieldcenter, out_size)
else:
use_wcs = None
aligned_images, aligned_data = align_images(reduced_files, paths, tel, binn,
use_wcs=use_wcs, fieldcenter=fieldcenter, out_size=out_size,
skip_gaia=skip_gaia, keep_all_astro=keep_all_astro, log=log)
if aligned_images is None or aligned_data is None:
return(None)
if log: log.info('Creating mask and error arrays.')
masks = []
errors = []
data_images = []
for stack_img in aligned_images:
hdu = fits.open(stack_img, mode='readonly')
hdr = hdu[0].header
clean, mask = create_mask(stack_img,
hdr['SATURATE'], np.mean(tel.get_rdnoise(hdr)),
log=log, cosmic_ray=cosmic_ray, outpath=work_path)
error = create_error(stack_img, mask, np.mean(tel.get_rdnoise(hdr)))
masks.append(mask)
errors.append(error)
# Set nan values to 0 now that they are masked
data = hdu[0].data
data[np.isnan(data)] = 0.0
# Create final image triplet before stacking with data, mask, and error
hdulist = fits.HDUList()
sci_hdu = fits.ImageHDU()
sci_hdu.data = data
sci_hdu.header = hdu[0].header
msk_hdu = fits.ImageHDU()
msk_hdu.data = mask.data
msk_hdu.header = mask.header
err_hdu = fits.ImageHDU()
err_hdu.data = error.data
err_hdu.header = error.header
hdulist.append(sci_hdu)
hdulist.append(msk_hdu)
hdulist.append(err_hdu)
hdulist[0].name='SCI'
hdulist[1].name='MASK'
hdulist[2].name='ERROR'
hdulist[0].header['EXTNAME']='SCI'
hdulist[1].header['EXTNAME']='MASK'
hdulist[2].header['EXTNAME']='ERROR'
filename = hdulist[0].header['FILENAME'].replace('.fits',
'_data.fits')
fullfilename = os.path.join(work_path, filename)
if log:
log.info(f'Writing out all file data: {fullfilename}')
else:
print(f'Writing out all file data: {fullfilename}')
hdulist.writeto(fullfilename, overwrite=True, output_verify='silentfix')
data_images.append(fullfilename)
if log: log.info('Creating median stack.')
if len(aligned_data)>1:
relative_scales = None
if relative_calibration:
exptimes = np.array([tel.get_exptime(d.header) for d in aligned_data])
relative_scales = compute_relative_scales(data_images, paths, exptimes, log=log)
sci_med = stack_data(aligned_data, tel, masks, errors, log=log,
relative_scales=relative_scales)
sci_med = add_stack_mask(sci_med, aligned_data)
if tel.detrend:
if log: log.info('Detrending stack')
sci_med = detrend_stack(sci_med)
else:
if log: log.info('Skipping detrending')
else:
sci_med = fits.open(data_images[0])
sci_med[0].data = sci_med[0].data.astype(np.float32)
sci_med[1].data = sci_med[1].data.astype(np.uint8)
sci_med[2].data = sci_med[2].data.astype(np.float32)
sci_med[0].header['EXTNAME']='SCI'
sci_med[1].header['EXTNAME']='MASK'
sci_med[2].header['EXTNAME']='ERROR'
sci_med[0].header['BITPIX'] = -32
sci_med[1].header['BITPIX'] = 8
sci_med[2].header['BITPIX'] = -32
# Get time parameters from aligned data
mid_time = np.average([tel.get_time(d.header) for d in aligned_data])
exptimes = np.array([tel.get_exptime(d.header) for d in aligned_data])
eff_time = np.sum(exptimes**2)/np.sum(exptimes)
total_time = np.sum(exptimes)
# Rescale both data and error images by effective exposure times so they
# are in e- instead of e-/s
sci_med[0].data = sci_med[0].data * eff_time
sci_med[2].data = sci_med[2].data * eff_time
# Explicitly note that data and error extensions are in ELECTRONS
sci_med[0].header['BUNIT'] = 'ELECTRONS'
sci_med[2].header['BUNIT'] = 'ELECTRONS'
# Add both formats for code that requires either
sci_med[0].header['MJD-OBS'] = (mid_time,
'Mid-MJD of the observation sequence.')
sci_med[0].header['MJD'] = (mid_time,
'Mid-MJD of the observation sequence.')
# Since we generated stack with median, effective exposure should be a
# weighted average of the input exposure times
sci_med[0].header['EXPTIME'] = (eff_time,
'Effective expsoure time in seconds.')
sci_med[0].header['EXPTOT'] = (total_time,
'Total exposure time in seconds')
sci_med[0].header['GAIN'] = (len(aligned_data),
'Effecetive gain for stack.')
# Calculate read noise
rdnoise = np.mean(tel.get_rdnoise(sci_med[0].header))/np.sqrt(len(aligned_data))
sci_med[0].header['RDNOISE'] = (rdnoise, 'Readnoise of stack.')
sci_med[0].header['NFILES'] = (len(aligned_data),
'Number of images in stack')
sci_med[0].header['FILTER'] = fil
sci_med[0].header['OBSTYPE'] = 'OBJECT'
# Generate stack name and write out
stackname = tel.get_stk_name(sci_med[0].header, red_path)
sci_med.writeto(stackname, overwrite=True, output_verify='silentfix')
if log:
log.info(f'Stack made for {stackname}')
else:
print(f'Stack made for {stackname}')
return(stackname)
[docs]
def detrend_stack(stack):
"""Remove row and column median from stack science data (detrend).
Parameters
----------
stack : astropy.io.fits.HDUList
HDU list with [0]=SCI, [1]=MASK. Modified in place.
Returns
-------
astropy.io.fits.HDUList
The same stack reference (modified in place).
"""
data = stack[0].data
mask = stack[1].data.astype(bool)
mean, med, stddev = sigma_clipped_stats(data, mask=mask, axis=1,
sigma_upper=2.5)
data = data - med[:,None]
row_med = np.nanmedian(med)
mean, med, stddev = sigma_clipped_stats(data, mask=mask, axis=0,
sigma_upper=2.5)
data = data - med[None,:]
col_med = np.nanmedian(med)
# Reapply mask to data
data[mask] = 0.0
stack[0].data = data
stack[0].header['SATURATE'] = stack[0].header['SATURATE'] - (row_med+col_med)
return(stack)
[docs]
def compute_relative_scales(data_images, paths, exptimes, log=None,
match_radius_arcsec=2.0, min_sources=5, min_rel_err=0.01):
"""Compute relative flux scale factors from SExtractor catalogs and RA/Dec cross-matching.
Runs Source Extractor on each aligned science image, cross-matches sources
between frames in RA/Dec, and uses the inverse-variance-weighted mean of
flux ratios (using FLUXERR_AUTO) to define scale factors so that combine()
can stack on a common flux scale. For frames where matching fails (too few
matches), the scale is set to 1.0/exptime for that frame.
Parameters
----------
data_images : list of str
Paths to FITS with SCI extension (e.g. *_data.fits from image_proc).
paths : dict
Paths dict; paths.get('source_extractor') used for SExtractor binary.
exptimes : array-like
Exposure time in seconds for each image (same order as data_images).
log : ColoredLogger, optional
Logger for progress.
match_radius_arcsec : float, optional
Match radius in arcsec for cross-matching sources. Default 2.0.
min_sources : int, optional
Minimum matched sources per frame to compute scale. Default 5.
min_rel_err : float, optional
Minimum relative flux error (err/flux) used in variance of ratio to
avoid zero variance. Default 0.01.
Returns
-------
np.ndarray or None
Relative scale factors, one per image (reference frame scale = 1.0).
Frames that fail get scale = 1.0/exptime. None if catalogs cannot be
built at all (caller should use exposure-time-only scaling).
"""
nimg = len(data_images)
if nimg < 2:
return None
exptimes = np.atleast_1d(np.asarray(exptimes, dtype=float))
if len(exptimes) != nimg:
if log:
log.warning('Relative calibration: exptimes length does not match data_images.')
return None
sextractor_path = paths.get('source_extractor')
def _col(cat, name):
"""Get column by case-insensitive name."""
for c in cat.colnames:
if c.upper() == name.upper():
return c
return None
catalogs = []
for p in data_images:
cat = photometry.run_sextractor(p, log=log, sextractor_path=sextractor_path)
if cat is None or len(cat) < min_sources:
if log:
log.warning(f'Relative calibration: insufficient catalog for {p}, skipping.')
return None
ra_col = _col(cat, 'ALPHA_J2000')
dec_col = _col(cat, 'DELTA_J2000')
flux_col = _col(cat, 'FLUX_AUTO')
fluxerr_col = _col(cat, 'FLUXERR_AUTO')
if not (ra_col and dec_col and flux_col):
if log:
log.warning('Relative calibration: catalog missing RA/Dec or FLUX_AUTO.')
return None
catalogs.append((cat, ra_col, dec_col, flux_col, fluxerr_col))
# Reference = frame with most sources
ref_idx = int(np.argmax([len(c[0]) for c in catalogs]))
ref, ref_ra, ref_dec, ref_flux, ref_fluxerr = catalogs[ref_idx]
ref_coords = SkyCoord(ref[ref_ra], ref[ref_dec], unit='deg')
match_radius = match_radius_arcsec * u.arcsec
relative_scales = np.ones(nimg, dtype=float)
# Reference frame scale = 1.0; failed frames will get 1.0/exptime
for i in range(nimg):
if i == ref_idx:
continue
cat, ra_col, dec_col, flux_col, fluxerr_col = catalogs[i]
coords = SkyCoord(cat[ra_col], cat[dec_col], unit='deg')
# For each source in this frame, find nearest in reference
idx_ref, d2d, _ = coords.match_to_catalog_sky(ref_coords)
keep = d2d < match_radius
n_match = np.sum(keep)
if n_match < min_sources:
if log:
log.warning(f'Relative calibration: frame {i} has {n_match} matches (< {min_sources}), using 1/exptime.')
relative_scales[i] = 1.0 / exptimes[i]
continue
flux_ref = np.array(ref[ref_flux][idx_ref[keep]], dtype=float)
flux_i = np.array(cat[flux_col][keep], dtype=float)
# Flux errors: use FLUXERR_AUTO if present, else fall back to unweighted
if ref_fluxerr and fluxerr_col:
err_ref = np.array(ref[ref_fluxerr][idx_ref[keep]], dtype=float)
err_i = np.array(cat[fluxerr_col][keep], dtype=float)
else:
err_ref = err_i = None
# Avoid zeros and negatives in flux
valid = (flux_i > 0) & (flux_ref > 0)
if np.sum(valid) < min_sources:
if log:
log.warning(f'Relative calibration: frame {i} has too few valid flux pairs, using 1/exptime.')
relative_scales[i] = 1.0 / exptimes[i]
continue
ratio = flux_ref[valid] / flux_i[valid]
if err_ref is not None and err_i is not None:
err_ref = err_ref[valid]
err_i = err_i[valid]
# Relative errors with floor to avoid zero variance
rel_err_ref = np.maximum(np.abs(err_ref) / np.maximum(flux_ref[valid], 1e-30), min_rel_err)
rel_err_i = np.maximum(np.abs(err_i) / np.maximum(flux_i[valid], 1e-30), min_rel_err)
# Variance of ratio R = A/B: var(R) ≈ R^2 * ( (σ_A/A)^2 + (σ_B/B)^2 )
var_ratio = ratio ** 2 * (rel_err_ref ** 2 + rel_err_i ** 2)
var_ratio = np.maximum(var_ratio, 1e-30)
weight = 1.0 / var_ratio
relative_scales[i] = float(np.average(ratio, weights=weight))
else:
relative_scales[i] = float(np.median(ratio))
if log:
log.info(f'Relative calibration: scales (ref frame {ref_idx} = 1.0): {relative_scales}')
return relative_scales
[docs]
def stack_data(stacking_data, tel, masks, errors, mem_limit=8.0e9, log=None,
relative_scales=None):
"""Combine aligned CCDData list with exposure scaling (median or average).
Masks are applied (masked pixels set to nan) before combination.
Optionally uses relative flux scales from source cross-matching so that
frames are combined on a common flux scale.
Parameters
----------
stacking_data : list of ccdproc.CCDData
Aligned science images.
tel : Instrument
Instrument instance (for stack_method and get_exptime).
masks : list of astropy.io.fits.ImageHDU
Per-image masks.
errors : list of astropy.io.fits.ImageHDU
Per-image error arrays.
mem_limit : float, optional
Memory limit in bytes for ccdproc.combine. Default is 8e9.
log : ColoredLogger, optional
Logger for progress.
relative_scales : np.ndarray, optional
Relative flux scale factors (one per image; reference = 1.0). If
provided, scale = relative_scales (exposure-time scaling is not applied).
Returns
-------
astropy.io.fits.HDUList
HDU list [science, mask, error] (mask/error from first image).
"""
stack_method = tel.stack_method
if not stack_method:
if log:
log.critical('Could not get stacking method for these images')
logging.shutdown()
sys.exit(-1)
else:
raise Exception('Could not get stacking method for these images')
exptimes = []
for i,stk in enumerate(stacking_data):
mask = masks[i].data.astype(bool)
stacking_data[i].data[mask] = np.nan
exptimes.append(float(tel.get_exptime(stk.header)))
exptimes = np.array(exptimes)
scale = 1.0 / exptimes
if relative_scales is not None:
scale = np.asarray(relative_scales, dtype=float)
if len(scale)==1:
stack_method='average'
sci_med = combine(stacking_data, scale=scale,
method=stack_method, mem_limit=mem_limit)
sci_med = sci_med.to_hdu()
return(sci_med)
[docs]
def add_stack_mask(stack, stacking_data):
"""Update stack mask from per-image masks and saturation; set masked pixels to 0.
Parameters
----------
stack : astropy.io.fits.HDUList
HDU list with [0]=SCI, [1]=MASK, [2]=ERROR. Modified in place.
stacking_data : list of ccdproc.CCDData
Per-image data (headers used for SATURATE).
Returns
-------
astropy.io.fits.HDUList
The same stack reference (modified in place).
"""
data = stack[0].data
mask = stack[1].data
error = stack[2].data
# Keep track of original masked values
bp_mask = (mask > 0) | (data==0.0) | np.isnan(data) | np.isnan(mask) |\
np.isnan(error) | (error==0.0) | np.isinf(data)
# Reset mask
mask = np.zeros(mask.shape).astype(np.uint8)
# Add bad pixels back to mask
mask[bp_mask] = 1
# Add saturation to mask
sat = np.median([d.header['SATURATE'] for d in stacking_data])
stack[0].header['SATURATE'] = sat
sat_mask = data >= sat
mask[sat_mask] = 4
# Set masked values to 0
data[mask>0] = 0.0
error[mask>0] = 0.0
# Make sure that nan and inf values are removed from data
data[np.isnan(data)]=0.0
data[np.isinf(data)]=0.0
stack[0].data = data
stack[1].data = mask
stack[2].data = error
return(stack)
[docs]
def mask_satellites(images, filenames, log=None):
"""Detect and mask satellite trails in science images using acstools.satdet.
Parameters
----------
images : list of ccdproc.CCDData
Science images; data is modified in place (trail pixels set to nan).
filenames : list of str
Paths corresponding to images (used for temp files).
log : ColoredLogger, optional
Logger for progress.
Returns
-------
None
"""
out_data = []
for i,science_data in enumerate(images):
if log: log.info(f'Checking for satellite trails in: {filenames[i]}')
data = np.ascontiguousarray(science_data.data.astype(np.float32))
tmphdu = fits.PrimaryHDU()
tmpfile = filenames[i].replace('.fits','.tmp.fits')
tmphdu.data = data
tmphdu.writeto(tmpfile, overwrite=True)
tmpshape = data.shape
buf = int(np.round(np.max(tmpshape)/10.))
results, errors = acstools.satdet.detsat(tmpfile,
chips=[0], sigma=4, h_thresh=0.1, buf=buf, verbose=False)
trail_coords = results[(tmpfile, 0)]
if len(trail_coords)>0:
n_trails = len(trail_coords)
trail_masks = []
for i,coord in enumerate(trail_coords):
if log: log.info(f'Masking for satellite trail {i+1}/{n_trails}')
try:
mask = acstools.satdet.make_mask(tmpfile, 0, coord,
verbose=False)
except ValueError:
continue
trail_masks.append(mask)
trail_mask = np.any(trail_masks, axis=0)
# Set actual pixel values to nan so they don't contribute to image
science_data.data[trail_mask] = np.nan
out_data.append(science_data)
os.remove(tmpfile)
[docs]
def create_mask(science_data, saturation, rdnoise, sigclip=3.0,
sigfrac=0.10, objlim=4.0, niter=6, outpath='', grow=0, cosmic_ray=True,
fsmode='convolve', cleantype='medmask', log=None):
"""Build a bad-pixel/saturation/cosmic-ray mask using LACosmic and optional growth.
Parameters
----------
science_data : str
Path to science FITS file.
saturation : float
Saturation level (ADU); pixels above this get flag 4.
rdnoise : float
Read noise for LACosmic (electrons).
sigclip : float, optional
LACosmic sigma clipping. Default is 3.0.
sigfrac : float, optional
LACosmic sigfrac. Default is 0.10.
objlim : float, optional
LACosmic objlim. Default is 4.0.
niter : int, optional
LACosmic iterations. Default is 6.
outpath : str, optional
Output path for debug files. Default is ''.
grow : int, optional
Pixels to grow mask (adds flag 8). Default is 0.
cosmic_ray : bool, optional
If True, run LACosmic. Default is True.
fsmode : str, optional
LACosmic fsmode. Default is 'convolve'.
cleantype : str, optional
LACosmic cleantype. Default is 'medmask'.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
tuple
(cleaned_data_ndarray, mask_ImageHDU). Mask uses additive flags:
1=bad, 2=cosmic ray, 4=saturated, 8=neighbor.
"""
t_start = time.time()
if log:
log.info(f'Running astroscrappy on {science_data}')
else:
print(f'Running astroscrappy on {science_data}')
hdu = fits.open(science_data)
data = np.ascontiguousarray(hdu[0].data.astype(np.float32))
# Estimate FWHM size for LACosmic
table = photometry.run_sextractor(science_data, log=log)
if table is not None:
# Clip fwhm_stars by fwhm
fwhm, meanfwhm, stdfwhm = sigma_clipped_stats(table['FWHM_IMAGE'])
if log:
log.info(f'Using FWHM for cosmic rays: {fwhm}')
else:
print(f'Using FWHM for cosmic rays: {fwhm}')
else:
fwhm = 3.5
# Astroscrappy requires added sky background, so add this value back
# Set the sky background to some nominal value if it is too low for CR rej
skybkg = hdu[0].header['SKYBKG']
if log:
log.info(f'Sky background in science frame is {skybkg}')
else:
print(f'Sky background in science frame is {skybkg}')
if skybkg < 2000.0:
skybkg = 2000.0
if log:
log.info('Setting sky background to 2000.0')
else:
print('Setting sky background to 2000.0')
# This needs to be done before data is modified to preserve bad pixels
mask_bp = (data==0.0) | np.isnan(data)
# Set data background to skybkg
data = data + skybkg
# Also need to adjust the saturation level by SKYBKG for saturated pixels
saturation += skybkg
if log: log.info('Masking saturated pixels.')
mask_sat = np.zeros(data.shape).astype(bool) # create empty mask
mask_sat = mask_sat.astype(np.uint8) #set saturated star mask type
mask_sat[data >= saturation] = 4 #set saturated pixel flag
if log: log.info('Cleaning and masking cosmic rays.')
if log: log.info(f'Using sigclip={sigclip}, sigfrac={sigfrac}, objlim={objlim}')
#clean science image of cosmic rays and create cosmic ray mask
inmask = (mask_sat+mask_bp).astype(bool)
scidata = CCDData(data, unit=u.electron, mask=inmask,
wcs=WCS(hdu[0].header), meta=hdu[0].header)
mask_cr = np.zeros(data.shape)
mask_cr = mask_cr.astype(np.uint8)
psfsize = int(np.round(2.5*fwhm))
if psfsize%2==0: psfsize+=1
if cosmic_ray:
newdata, mask_cr = cosmicray_lacosmic(data,
readnoise=rdnoise, satlevel=saturation, verbose=True,
sigclip=sigclip, sigfrac=sigfrac, objlim=objlim, niter=niter,
psffwhm=fwhm, psfsize=psfsize, fsmode=fsmode,
cleantype=cleantype)
mask_cr = mask_cr.astype(np.uint8) #set cosmic ray mask type
else:
newdata = copy.copy(data)
mask_cr[mask_cr == 1] = 2 #set cosmic ray flag
#combine bad pixel, cosmic ray, saturated star and satellite trail masks
mask = mask_bp+mask_sat+mask_cr
if grow>0:
shape = mask.shape
grow_mask = np.zeros(shape)
shape = mask.shape
mask = data > 0
xs, ys = np.where(mask.astype(bool))
# Grow mask by grow factor
g = int(np.ceil((grow-1)/2))
for p in zip(xs, ys):
grow_mask[np.max([p[0]-g,0]):np.min([p[0]+g,shape[0]-1]),
np.max([p[1]-g,0]):np.min([p[1]+g,shape[1]-1])]=8
grow_mask = grow_mask.astype(np.uint8)
mask = mask_bp+mask_sat+mask_cr+grow_mask
# Get number of bad, cosmic-ray flagged, and saturated pixels
nbad = np.sum(mask & 1 == 1)
ncr = np.sum(mask & 2 == 2)
nsat = np.sum(mask & 4 == 4)
ngrow = np.sum(mask & 8 == 8)
mask_hdu = fits.ImageHDU(mask) #create mask Primary HDU
mask_hdu.header['VER'] = (__version__,
'Version of image procedures used.') #name of log file
mask_hdu.header['USE'] = 'Complex mask using additive flags.'#header comment
mask_hdu.header['M-BP'] = (1, 'Value of masked bad pixels.')
mask_hdu.header['M-BPNUM'] = (nbad, 'Number of bad pixels.')
mask_hdu.header['M-CR'] = (2, 'Value of masked cosmic ray pixels.')
mask_hdu.header['M-CRNUM'] = (ncr, 'Number of cosmic ray pixels.')
mask_hdu.header['SATURATE'] = (saturation, 'Level of saturation.')
mask_hdu.header['M-SP'] = (4, 'Value of masked saturated pixels.')
mask_hdu.header['M-SPNUM'] = (nsat, 'Number of saturated pixels.')
mask_hdu.header['M-NE'] = (8, 'Value of masked neighbor pixels.')
mask_hdu.header['M-NENUM'] = (ngrow, 'Number of neighboring masked pixels.')
if log:
log.info('Mask created.')
log.info(f'{nbad} bad, {ncr} cosmic-ray, and {nsat} saturated pixels')
log.info(f'{ngrow} neighbor masked pixels.')
else:
print('Mask created.')
print(f'{nbad} bad, {ncr} cosmic-ray, and {nsat} saturated pixels')
print(f'{ngrow} neighbor masked pixels.')
t_end = time.time()
if log:
log.info(f'Mask creation completed in {t_end-t_start} sec')
else:
print(f'Mask creation completed in {t_end-t_start} sec')
return(newdata, mask_hdu)
[docs]
def create_error(science_data, mask_data, rdnoise):
"""Compute per-pixel error array from science image, mask, and read noise.
Error = sqrt(poisson + rms^2 + rdnoise^2). Masked pixels are excluded
from RMS estimation. science_data can be a path or HDU; mask_data is
an HDU with mask array.
Parameters
----------
science_data : str or astropy.io.fits.HDUList
Path to science FITS or open HDU list.
mask_data : astropy.io.fits.PrimaryHDU or ImageHDU
HDU containing boolean or integer mask (masked pixels excluded from rms).
rdnoise : float
Read noise in electrons.
Returns
-------
astropy.io.fits.PrimaryHDU
Error array in electrons (BUNIT='ELECTRONS').
"""
with fits.open(science_data) as hdu:
img_data = hdu[0].data.astype(np.float32)
mask = mask_data.data.astype(bool)
# Check for issue with all of the file being masked
if np.all(mask):
rms = hdu[0].header['SATURATE']
else:
rms = 0.5 * (
np.percentile(img_data[~mask], 84.13)
- np.percentile(img_data[~mask], 15.86)
)
poisson = img_data.copy()
poisson[poisson < 0.] = 0.
error = np.sqrt(poisson + rms**2 + rdnoise)
# Sanitize error array
mask = np.isnan(error)
error[mask] = np.nanmedian(error)
mask = error < 0.0
error[mask] = np.nanmedian(error)
maxval = np.float32(hdu[0].header['SATURATE'])
mask = np.isinf(error)
error[mask] = maxval
error_hdu = fits.PrimaryHDU(error) # create mask Primary HDU
error_hdu.header['VER'] = (__version__,
'Version of image procedures used used.')
error_hdu.header['USE'] = 'Error array for Poisson, read, and RMS noise'
error_hdu.header['BUNIT'] = ('ELECTRONS', 'Units of the error array.')
return(error_hdu)