"""Base instrument class and shared reduction logic for POTPyRI.
Defines detector keywords, calibration flags, file naming, and methods for
importing/processing science frames, building master bias/dark/flat, and
loading static masks. Subclasses (GMOS, LRIS, BINOSPEC, etc.) set
instrument-specific attributes.
"""
from potpyri._version import __version__
import os
import astropy
import datetime
import copy
import ccdproc
import numpy as np
from photutils.background import Background2D
from photutils.background import MeanBackground
import astropy.units as u
from astropy.io import fits
from astropy.modeling import models
from astropy.nddata import CCDData
from astropy.stats import sigma_clipped_stats
from astropy.stats import SigmaClip
from astropy.time import Time
# WCS-related keywords to remove from calibration headers so saved files never
# trigger InvalidTransformError (e.g. DEC=-100, ill-conditioned CD matrix).
_CAL_HEADER_WCS_KEYS = (
'CTYPE1', 'CTYPE2', 'CRVAL1', 'CRVAL2', 'CRPIX1', 'CRPIX2',
'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2', 'CDELT1', 'CDELT2', 'CROTA2',
'RADECSYS', 'RADESYS', 'EQUINOX', 'RA', 'DEC', 'LONPOLE', 'LATPOLE',
'CUNIT1', 'CUNIT2', 'MJD-OBS',
'LTM1_1', 'LTM1_2', 'LTM2_1', 'LTM2_2', 'LTV1', 'LTV2',
'DTV1', 'DTV2', 'DTM1_1', 'DTM2_2',
)
def _sanitize_calibration_header(header):
"""Remove WCS and coordinate keywords from a header so it can be written safely.
Calibration frames (bias, dark, flat, sky) inherit headers from input frames
that may contain invalid or ill-conditioned WCS (e.g. DEC=-100). Removing
these keywords ensures saved calibration files do not cause
InvalidTransformError when later loaded by code that parses WCS.
Modifies header in place.
"""
for key in _CAL_HEADER_WCS_KEYS:
if key in header:
del header[key]
# Remove PV* distortion keywords (can be ill-conditioned)
to_del = [k for k in header.keys() if k.startswith('PV')]
for k in to_del:
del header[k]
def _read_calibration_ccd(path, unit, hdu_index=0):
"""Read a calibration FITS into CCDData without parsing WCS.
Master bias/dark/flat frames often have WCS keywords that are ill-conditioned
or copied from templates. Parsing them can raise InvalidTransformError and
log 'Ill-conditioned coordinate transformation parameter'. Calibration
frames do not need WCS, so we load data and header explicitly with wcs=None.
"""
with fits.open(path) as hdu_list:
hdu = hdu_list[hdu_index]
return CCDData(hdu.data, meta=hdu.header.copy(), unit=unit, wcs=None)
[docs]
class Instrument(object):
"""Base class for all POTPyRI instruments.
Holds detector parameters (gain, rdnoise, pixscale, saturation), header
keyword names, file-sorting rules, and methods for calibration and
science processing. Subclasses override attributes as needed.
"""
def __init__(self):
# Version
self.version = __version__
# Intrument name
self.name = ''
# Extensions for keeping track of general metadata and WCS
self.raw_header_ext = 0
self.wcs_extension = 0
# Detector specific characteristics
self.pixscale = None
self.saturation = 65535
self.min_exptime = 1.0
# Run dark/bias/flat calibration?
self.dark = False
self.bias = False
self.flat = False
# Parameters for handling calibration files
# Run rejection on possible CR pixels in bias
self.cr_bias = True
# Extend header to first file extension for purposes of sort_files
self.extend_header = False
# How to combine images during stacking
self.stack_method = 'median'
self.wavelength = 'OPT'
self.gain = [1.02, 1.08]
self.rdnoise = [4.0, 4.0]
self.datasec = ['[1055:3024,217:3911]','[1067:3033,217:3911]']
self.biassec = ['[1:1054,1:4112]','[3034:4096,1:4112]']
# Keywords for selecting files from Sort_files object
# This allows a single file type to be used for multiple purposes (e.g., for
# generating a flat-field image from science exposures)
self.filetype_keywords = {'SCIENCE':'SCIENCE', 'FLAT':'FLAT',
'DARK':'DARK','BIAS':'BIAS'}
# Header keywords
self.target_keyword = 'OBJECT'
self.exptime_keyword = 'EXPTIME'
self.filter_keyword = 'FILTER'
self.mjd_keyword = 'MJD'
self.bin_keyword = 'CCDSUM'
self.amp_keyword = '2'
# File sorting keywords
self.science_keywords = ['MASK','SCRN']
self.science_values = ['imaging','stowed']
self.flat_keywords = ['MASK','SCRN']
self.flat_values = ['imaging','deployed']
self.bias_keywords = []
self.bias_values = []
self.dark_keywords = []
self.dark_values = []
self.spec_keywords = []
self.spec_values = []
self.bad_keywords = []
self.bad_values = []
self.detrend = True
self.catalog_zp = 'PS1'
# Default final image size for binning of 1x1
self.out_size = 5000
[docs]
def match_type_keywords(self, kwds, file_table):
"""Return a boolean mask of file_table rows whose Type is in kwds.
Parameters
----------
kwds : str
Comma-separated type names (e.g. 'SCIENCE', 'FLAT').
file_table : astropy.table.Table
File list table with 'Type' column.
Returns
-------
np.ndarray
Boolean mask, True where file_table['Type'] is in kwds.
"""
masks = []
for kwd in kwds.split(','):
masks.append(file_table['Type']==kwd)
masks = np.array(masks)
mask = np.any(masks, axis=0)
return(mask)
[docs]
def needs_sky_subtraction(self, filt):
"""Return True if sky subtraction should be run for this filter.
Parameters
----------
filt : str
Filter name (e.g. 'r', 'J').
Returns
-------
bool
True for NIR wavelength instruments, False otherwise.
"""
if self.wavelength=='NIR':
return(True)
else:
return(False)
[docs]
def get_pixscale(self):
"""Return pixel scale in arcsec/pixel.
Returns
-------
float or None
Pixel scale; may vary by mode in subclasses.
"""
return(self.pixscale)
[docs]
def get_rdnoise(self, hdr):
"""Return read noise value(s) for the detector/amplifier.
Parameters
----------
hdr : astropy.io.fits.Header
FITS header (may be used in subclasses for amp-dependent values).
Returns
-------
float or list
Read noise in electrons.
"""
return(self.rdnoise)
[docs]
def get_gain(self, hdr):
"""Return gain value(s) for the detector/amplifier.
Parameters
----------
hdr : astropy.io.fits.Header
FITS header (may be used in subclasses for amp-dependent values).
Returns
-------
float or list
Gain in e-/ADU.
"""
return(self.gain)
[docs]
def get_target(self, hdr):
"""Return target name from header (target_keyword), spaces stripped."""
return(hdr[self.target_keyword].replace(' ',''))
[docs]
def get_filter(self, hdr):
"""Return filter name from header; leading segment before underscore."""
filt = hdr[self.filter_keyword]
filt = filt.replace(' ','')
filt = filt.split('_')[0]
return(filt)
[docs]
def get_exptime(self, hdr):
"""Return exposure time from header (exptime_keyword)."""
return(hdr[self.exptime_keyword])
[docs]
def get_ampl(self, hdr):
"""Return amplifier identifier from header as string."""
if self.amp_keyword in hdr.keys():
return(str(hdr[self.amp_keyword]))
else:
return(str(self.amp_keyword))
[docs]
def get_binning(self, hdr):
"""Return binning string from header (e.g. '11', '22'), normalized and no 'x'."""
if self.bin_keyword in hdr.keys():
binn = str(hdr[self.bin_keyword]).lower().replace(' ','')
binn = binn.replace('x','')
binn = binn.replace(',','')
return(binn)
else:
return(self.bin_keyword)
[docs]
def get_out_size(self, hdr):
"""Return output image size (pixels) for this binning (out_size / binn)."""
binn = int(str(self.get_binning(hdr))[0])
out_size = int(self.out_size/binn)
return(out_size)
[docs]
def get_time(self, hdr):
"""Return MJD from header (mjd_keyword) as float."""
return(float(hdr[self.mjd_keyword]))
[docs]
def get_instrument_name(self, hdr):
"""Return instrument name (lowercase) for paths/filenames."""
return(self.name.lower())
[docs]
def get_staticmask_filename(self, hdr, paths):
"""Return path(s) to the static bad-pixel mask FITS for this instrument/binning.
Parameters
----------
hdr : astropy.io.fits.Header
FITS header (for binning and instrument).
paths : dict
Paths dict with 'code' key (package root).
Returns
-------
list
One-element list with path to staticmask FITS, or [None] if not found.
"""
instname = self.get_instrument_name(hdr)
binn = self.get_binning(hdr)
mask_file = os.path.join(paths['code'], '..', 'data', 'staticmasks',
f'{instname}.{binn}.staticmask.fits.fz')
if os.path.exists(mask_file):
return([mask_file])
else:
return([None])
[docs]
def get_catalog(self, hdr):
"""Return catalog name for zeropoint (e.g. 'PS1')."""
return(self.catalog_zp)
[docs]
def get_stk_name(self, hdr, red_path):
"""Return full path for stacked output FITS (target.filter.utYYMMDD.amp.binn.stk.fits)."""
target = self.get_target(hdr)
fil = self.get_filter(hdr)
amp = self.get_ampl(hdr)
binn = self.get_binning(hdr)
file_time = self.get_time(hdr)
datestr = Time(file_time, format='mjd').datetime.strftime('ut%y%m%d')
filename = f'{target}.{fil}.{datestr}.{amp}.{binn}.stk.fits'
fullfilename = os.path.join(red_path, filename)
return(fullfilename)
[docs]
def get_sci_name(self, hdr, red_path):
"""Return full path for single-extension science FITS."""
target = self.get_target(hdr)
fil = self.get_filter(hdr)
amp = self.get_ampl(hdr)
binn = self.get_binning(hdr)
file_time = self.get_time(hdr)
number = self.get_number(hdr)
datestr = Time(file_time, format='mjd').datetime.strftime('ut%y%m%d')
filename = f'{target}.{fil}.{datestr}.{amp}.{binn}.{number}.fits'
fullfilename = os.path.join(red_path, filename)
return(fullfilename)
[docs]
def get_bkg_name(self, hdr, red_path):
"""Return full path for background FITS (sci name with _bkg.fits)."""
sci_name = self.get_sci_name(hdr, red_path)
bkg_filename = sci_name.replace('.fits','_bkg.fits')
return(bkg_filename)
[docs]
def get_mbias_name(self, paths, amp, binn):
"""Return path for master bias FITS (paths['cal']/mbias_amp_binn.fits)."""
red_path = paths['cal']
return(os.path.join(red_path, f'mbias_{amp}_{binn}.fits'))
[docs]
def get_mdark_name(self, paths, amp, binn):
"""Return path for master dark FITS."""
red_path = paths['cal']
return(os.path.join(red_path, f'mdark_{amp}_{binn}.fits'))
[docs]
def get_mflat_name(self, paths, fil, amp, binn):
"""Return path for master flat FITS (filter, amp, binning)."""
red_path = paths['cal']
return(os.path.join(red_path, f'mflat_{fil}_{amp}_{binn}.fits'))
[docs]
def get_msky_name(self, paths, fil, amp, binn):
"""Return path for master sky FITS."""
red_path = paths['cal']
return(os.path.join(red_path, f'msky_{fil}_{amp}_{binn}.fits'))
[docs]
def load_bias(self, paths, amp, binn):
"""Load master bias CCDData for given amp and binning.
Parameters
----------
paths : dict
Paths dict (cal key).
amp : str
Amplifier identifier.
binn : str
Binning string.
Returns
-------
ccdproc.CCDData
Master bias in electrons.
Raises
------
Exception
If master bias file not found.
"""
bias = self.get_mbias_name(paths, amp, binn)
if os.path.exists(bias):
mbias = _read_calibration_ccd(bias, u.electron, hdu_index=0)
elif os.path.exists(bias+'.fz'):
mbias = _read_calibration_ccd(bias+'.fz', u.electron, hdu_index=1)
else:
raise Exception(f'Could not find bias: {bias}')
return(mbias)
[docs]
def load_dark(self, paths, amp, binn):
"""Load master dark CCDData for given amp and binning.
Parameters
----------
paths : dict
Paths dict (cal key).
amp : str
Amplifier identifier.
binn : str
Binning string.
Returns
-------
ccdproc.CCDData
Master dark in electrons.
Raises
------
Exception
If master dark file not found.
"""
dark = self.get_mdark_name(paths, amp, binn)
if os.path.exists(dark):
mdark = _read_calibration_ccd(dark, u.electron, hdu_index=0)
elif os.path.exists(dark+'.fz'):
mdark = _read_calibration_ccd(dark+'.fz', u.electron, hdu_index=1)
else:
raise Exception(f'Could not find dark: {dark}')
return(mdark)
[docs]
def load_flat(self, paths, fil, amp, binn):
"""Load master flat CCDData for given filter, amp, and binning.
Parameters
----------
paths : dict
Paths dict (cal key).
fil : str
Filter name.
amp : str
Amplifier identifier.
binn : str
Binning string.
Returns
-------
ccdproc.CCDData
Master flat (normalized).
Raises
------
Exception
If master flat file not found.
"""
flat = self.get_mflat_name(paths, fil, amp, binn)
if os.path.exists(flat):
mflat = _read_calibration_ccd(flat, u.dimensionless_unscaled, hdu_index=0)
elif os.path.exists(flat+'.fz'):
mflat = _read_calibration_ccd(flat+'.fz', u.dimensionless_unscaled, hdu_index=1)
else:
raise Exception(f'Could not find flat: {flat}')
return(mflat)
[docs]
def load_sky(self, paths, fil, amp, binn):
"""Load master sky CCDData for given filter, amp, and binning.
Parameters
----------
paths : dict
Paths dict (cal key).
fil : str
Filter name.
amp : str
Amplifier identifier.
binn : str
Binning string.
Returns
-------
ccdproc.CCDData
Master sky (normalized).
Raises
------
Exception
If master sky file not found.
"""
sky = self.get_msky_name(paths, fil, amp, binn)
if os.path.exists(sky):
msky = _read_calibration_ccd(sky, u.dimensionless_unscaled, hdu_index=0)
elif os.path.exists(sky+'.fz'):
msky = _read_calibration_ccd(sky+'.fz', u.dimensionless_unscaled, hdu_index=1)
else:
raise Exception(f'Could not find sky frame: {sky}')
return(msky)
[docs]
def import_image(self, filename, amp, log=None):
"""Load raw multi-extension FITS and return processed CCDData (bias/overscan/trim/gain).
Parameters
----------
filename : str
Path to raw FITS file.
amp : str
Amplifier identifier (number of extensions).
log : ColoredLogger, optional
Logger for progress.
Returns
-------
ccdproc.CCDData
Processed frame in electrons (single combined image).
"""
filename = os.path.abspath(filename)
if log: log.info(f'Loading file: {filename}')
with fits.open(flat) as hdr:
header = hdr[1].header
raw = [CCDData.read(flat, hdu=x+1, unit='adu') for x in range(int(amp))]
red = [ccdproc.ccd_process(x, oscan=self.biassec[k],
oscan_model=models.Chebyshev1D(3),
trim=self.datasec[k], gain=self.gain[k]*u.electron/u.adu,
readnoise=rdnoise(hdr)*u.electron,
gain_corrected=True) for k,x in enumerate(raw)]
frame_full = CCDData(np.concatenate((red[0],
np.empty((red[0].shape[0], 794)), red[1]), axis=1),
header=header,unit=u.electron)
frame_full.header['SATURATE'] = saturation(header)
return(frame_full)
[docs]
def import_sci_image(self, filename, log=None):
"""Load a single science FITS as CCDData (no overscan/trim; already reduced).
Parameters
----------
filename : str
Path to FITS file.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
ccdproc.CCDData
Science frame.
"""
filename = os.path.abspath(filename)
if log: log.info(f'Loading file: {filename}')
frame_full = CCDData.read(filename)
return(frame_full)
[docs]
def create_bias(self, bias_list, amp, binn, paths,
log=None, **kwargs):
"""Combine bias frames into master bias and write to paths['cal'].
Parameters
----------
bias_list : list of str
Paths to bias FITS files.
amp : str
Amplifier identifier.
binn : str
Binning string.
paths : dict
Paths dict (cal key).
log : ColoredLogger, optional
Logger for progress.
**kwargs
Unused; for subclass compatibility.
Returns
-------
None
Master bias written via get_mbias_name.
"""
if log:
log.info(f'Processing bias files with {amp} amps and {binn} binning.')
log.info(f'{len(bias_list)} files found.')
biases = []
for i, bias in enumerate(bias_list):
if log: log.info(f'Importing {bias}')
bias_full = self.import_image(bias, amp, log=log)
# Load static mask for this specific file
staticmask = self.load_staticmask(bias_full.header, paths)
if staticmask is not None:
if log: log.info('Applying static mask')
bias_full.data[staticmask]=np.nan
if self.cr_bias:
# Add a CR mask to the bias image
mean, median, stddev = sigma_clipped_stats(bias_full.data)
mask = bias_full.data > median + 3 * stddev
if bias_full.mask is not None:
bias_full.mask = bias_full.mask | mask
else:
bias_full.mask = mask
bias_full.data[mask]=median
biases.append(bias_full)
mbias = ccdproc.combine(biases, method='median', sigma_clip=True,
clip_extrema=True)
mbias.data[np.isnan(mbias.data)]=np.nanmedian(mbias.data)
if log: log.info(f'Made bias for amp: {amp}, bin: {binn}.')
mbias.header['VER'] = (__version__,
'Version of telescope parameter file used.')
bias_filename = self.get_mbias_name(paths, amp, binn)
_sanitize_calibration_header(mbias.header)
mbias.write(bias_filename, overwrite=True, output_verify='silentfix')
if log: log.info(f'Master bias written to {bias_filename}')
return
[docs]
def create_dark(self, dark_list, amp, binn, paths, mbias=None, log=None):
"""Combine dark frames (with optional bias subtraction) and write master dark.
Parameters
----------
dark_list : list of str
Paths to dark FITS files.
amp : str
Amplifier identifier.
binn : str
Binning string.
paths : dict
Paths dict (cal key).
mbias : ccdproc.CCDData, optional
Master bias to subtract before combining.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
None
Master dark written via get_mdark_name.
"""
if log:
log.info(f'Processing dark files with {amp} amps and {binn} binning.')
log.info(f'{len(dark_list)} files found.')
darks = []
exptimes = []
for dark in dark_list:
if log: log.info(f'Importing {dark}')
dark_full = self.import_image(dark, amp, log=log)
# Load static mask for this specific file
staticmask = self.load_staticmask(dark_full.header, paths)
if staticmask is not None:
if log: log.info('Applying static mask')
dark_full.data[staticmask]=np.nan
exptimes.append(self.get_exptime(dark_full.header))
if mbias is not None:
if log: log.info('Subtracting bias')
dark_full = ccdproc.subtract_bias(dark_full, mbias)
darks.append(dark_full)
if log: log.info('Creating master dark.')
mdark = ccdproc.combine(darks, method='median',
scale=1./np.array(exptimes), sigma_clip=True, clip_extrema=True)
# Rescale to electrons by average of all exposure times
avg_exptime = np.mean(exptimes)
mdark.data *= avg_exptime
mdark.header[self.exptime_keyword]=(avg_exptime,
'Exposure time of master dark (sec).')
# Update other header variables
mdark.header['VER'] = (__version__,
'Version of telescope parameter file used.')
for i,im in enumerate(dark_list):
mdark.header[f'FILE{i+1}']=os.path.basename(im)
darkname = self.get_mdark_name(paths, amp, binn)
if log: log.info(f'Writing master dark to {darkname}')
_sanitize_calibration_header(mdark.header)
mdark.write(darkname, overwrite=True, output_verify='silentfix')
return
[docs]
def create_flat(self, flat_list, fil, amp, binn, paths, mbias=None,
mdark=None, is_science=False, log=None, **kwargs):
"""Combine flat frames (bias/dark subtract, normalize) and write master flat.
Parameters
----------
flat_list : list of str
Paths to flat FITS files.
fil : str
Filter name.
amp : str
Amplifier identifier.
binn : str
Binning string.
paths : dict
Paths dict (cal key).
mbias : ccdproc.CCDData, optional
Master bias to subtract.
mdark : ccdproc.CCDData, optional
Master dark to subtract.
is_science : bool, optional
If True, mask high pixels as for science. Default is False.
log : ColoredLogger, optional
Logger for progress.
**kwargs
Unused; for subclass compatibility.
Returns
-------
None
Master flat written via get_mflat_name.
"""
if log:
log.info(f'Processing files for filter: {fil}')
log.info(f'{len(flat_list)} files found.')
scale = []
flats = []
for i, flat in enumerate(flat_list):
if log: log.info(f'Importing {flat}')
flat_full = self.import_image(flat, amp, log=log)
# Load static mask for this specific file
staticmask = self.load_staticmask(flat_full.header, paths)
if staticmask is not None:
if log: log.info('Applying static mask')
flat_full.data[staticmask]=np.nan
if mbias is not None:
if log: log.info('Subtracting bias')
flat_full = ccdproc.subtract_bias(flat_full, mbias)
if mdark is not None:
if log: log.info('Subtracting dark')
flat_full = ccdproc.subtract_dark(flat_full, mdark,
exposure_time=self.exptime_keyword, exposure_unit=u.second)
# Mask flat_full if the image is a science frame
if is_science:
mean, median, stddev = sigma_clipped_stats(flat_full.data)
mask = flat_full.data > 5*stddev + median
flat_full.mask = mask.astype(np.uint8)
exptime = self.get_exptime(flat_full.header)
if log: log.info(f'Exposure time of image is {exptime}')
norm = 1./np.nanmedian(flat_full.data)
# Vet the flat normalization - it should not be negative
if norm > 0.:
log.info(f'Flat normalization: {norm}')
else:
# Skip this file
log.error(f'Flat normalization: {norm}')
continue
scale.append(norm)
flats.append(flat_full)
mflat = ccdproc.combine(flats, method='median', scale=scale,
sigma_clip=True, clip_extrema=True)
# Mask flat
mflat.data[np.isinf(mflat.data)]=np.nan
mflat.data[mflat.data==0.0]=np.nan
mflat.data[mflat.data<0.0]=np.nan
mean, median, stddev = sigma_clipped_stats(mflat.data)
mask = mflat.data > median + 10 * stddev
mflat.data[mask]=np.nan
# Mask the flat for numerical under/overflows
logflat = np.log10(mflat.data)
mean, median, stddev = sigma_clipped_stats(logflat)
mask = np.abs(logflat - median) > 10 * stddev
mflat.data[mask]=np.nan
# Remove all nan values from flat
mask = np.isnan(mflat.data)
mflat.mask = mask
mflat.data[mask]=1.0
if log:
log.info(f'Made flat for filter: {fil}, amp: {amp}, bin: {binn}.')
mflat.header['VER'] = (__version__,
'Version of telescope parameter file used.')
flat_filename = self.get_mflat_name(paths, fil, amp, binn)
_sanitize_calibration_header(mflat.header)
mflat.write(flat_filename, overwrite=True, output_verify='silentfix')
if log: log.info(f'Master flat written to {flat_filename}')
return
[docs]
def create_sky(self, sky_list, fil, amp, binn, paths, log=None,
sky_sigma_upper=3.0, sky_sigma_lower=4.0, sky_maxiters=5,
sky_n_sigma_high=3.0, sky_n_sigma_low=4.0,
msky_n_sigma_high=5.0, msky_n_sigma_low=5.0, msky_maxiters=5,
**kwargs):
"""Combine sky frames (normalize by median) and write master sky.
Uses iterative sigma clipping to estimate the sky background and mask
bright sources (and optionally low outliers) so the median is not
biased. Per-frame and combined master sky masking use configurable
sigma and iteration limits.
Parameters
----------
sky_list : list of str
Paths to sky FITS files.
fil : str
Filter name.
amp : str
Amplifier identifier.
binn : str
Binning string.
paths : dict
Paths dict (cal key).
log : ColoredLogger, optional
Logger for progress.
sky_sigma_upper : float, optional
Upper sigma for iterative clip when estimating per-frame sky.
Default 3.0.
sky_sigma_lower : float, optional
Lower sigma for iterative clip when estimating per-frame sky.
Default 4.0.
sky_maxiters : int, optional
Max iterations for sigma clipping per sky frame. Default 5.
sky_n_sigma_high : float, optional
Mask pixels above med + this many * std (bright sources). Default 3.0.
sky_n_sigma_low : float, optional
Mask pixels below med - this many * std (defects). Default 4.0.
msky_n_sigma_high : float, optional
Mask master sky pixels above median + this * std. Default 5.0.
msky_n_sigma_low : float, optional
Mask master sky pixels below median - this * std. Default 5.0.
msky_maxiters : int, optional
Max iterations for sigma clipping on combined master sky. Default 5.
**kwargs
Unused; for subclass compatibility.
Returns
-------
None
Master sky written via get_msky_name.
"""
if log:
log.info(f'Processing files for filter: {fil}')
log.info(f'{len(sky_list)} files found.')
skys = []
for i, sky in enumerate(sky_list):
if log: log.info(f'Importing {sky}')
sky_full = self.import_sci_image(sky, log=log)
# Iterative sigma-clipped stats for robust sky level (excludes
# bright sources and defects so median is unbiased)
mean, med, stddev = sigma_clipped_stats(
sky_full.data,
sigma_upper=sky_sigma_upper,
sigma_lower=sky_sigma_lower,
maxiters=sky_maxiters,
)
# Mask bright sources and low outliers so they don't affect norm
mask_high = sky_full.data > med + sky_n_sigma_high * stddev
mask_low = sky_full.data < med - sky_n_sigma_low * stddev
sky_full.data[mask_high] = np.nan
sky_full.data[mask_low] = np.nan
# Recompute stats on masked data (NaNs excluded) for normalization
mean, med, stddev = sigma_clipped_stats(
sky_full.data,
sigma_upper=sky_sigma_upper,
sigma_lower=sky_sigma_lower,
maxiters=sky_maxiters,
)
norm = 1.0 / med
sky_full = sky_full.multiply(norm)
# Vet the sky normalization - it should not be negative
if norm > 0.:
if log: log.info(f'Sky normalization: {norm}')
else:
if log: log.error(f'Sky normalization: {norm}')
continue
nan_mask = np.isnan(sky_full.data)
if sky_full.mask is None:
sky_full.mask = np.zeros(sky_full.data.shape, dtype=bool)
sky_full.mask[nan_mask] = True
sky_full.data[nan_mask] = 1.0
skys.append(sky_full)
msky = ccdproc.combine(skys, method='median', sigma_clip=True,
clip_extrema=True)
# Robust masking of combined master sky: iterative sigma clipping
# for accurate median/std, then mask excess flux and defects
msky.data[np.isinf(msky.data)] = np.nan
msky.data[msky.data == 0.0] = np.nan
mean, median, stddev = sigma_clipped_stats(
msky.data,
sigma_upper=3.0,
sigma_lower=3.0,
maxiters=msky_maxiters,
)
mask_high = msky.data > median + msky_n_sigma_high * stddev
mask_low = msky.data < median - msky_n_sigma_low * stddev
msky.data[mask_high] = 1.0
msky.data[mask_low] = 1.0
# Restore valid fill for normalized sky (1.0 = no correction)
msky.data[np.isnan(msky.data)] = 1.0
if log:
log.info(f'Made sky for filter: {fil}, amp: {amp}, bin: {binn}.')
msky.header['VER'] = (__version__,
'Version of telescope parameter file used.')
sky_filename = self.get_msky_name(paths, fil, amp, binn)
_sanitize_calibration_header(msky.header)
msky.write(sky_filename, overwrite=True, output_verify='silentfix')
if log: log.info(f'Master sky written to {sky_filename}')
return
[docs]
def load_staticmask(self, hdr, paths):
"""Load the static mask array for this instrument and binning.
Parameters
----------
hdr : astropy.io.fits.Header
FITS header (for binning and instrument name).
paths : dict
Paths dict with 'code' key.
Returns
-------
np.ndarray or None
Boolean mask from static mask FITS extension [1], or None if not found.
"""
satmask_filename = self.get_staticmask_filename(hdr, paths)
if (satmask_filename is not None and
len(satmask_filename) > 0 and
satmask_filename[0] is not None and
os.path.exists(satmask_filename[0])):
with fits.open(satmask_filename[0]) as hdu:
satmask = hdu[1].data.astype(bool)
else:
satmask = None
return(satmask)
[docs]
def expand_mask(self, input_data, input_mask=None):
"""Build combined mask from NaN, inf, zero pixels and optional input mask.
Parameters
----------
input_data : ccdproc.CCDData
Image data (input_data.data used).
input_mask : np.ndarray, optional
Boolean mask to OR with bad-pixel mask.
Returns
-------
np.ndarray
Boolean mask (True = bad).
"""
# Get image statistics
mean, median, stddev = sigma_clipped_stats(input_data.data)
# Add to input mask
add_mask = np.isnan(input_data.data)
add_mask = add_mask | np.isinf(input_data.data)
add_mask = add_mask | (input_data.data==0.0)
# Expand with input mask if it was provided
if input_mask is None:
input_mask = add_mask
else:
input_mask = input_mask | add_mask
return(input_mask)
[docs]
def process_science(self, sci_list, fil, amp, binn, paths, mbias=None,
mflat=None, mdark=None, skip_skysub=False, save_bkg=False, log=None):
"""Reduce science frames: bias/dark/flat, optional sky subtraction; return CCDData list.
Parameters
----------
sci_list : list of str
Paths to science FITS files.
fil : str
Filter name.
amp : str
Amplifier identifier.
binn : str
Binning string.
paths : dict
Paths dict (work, cal, etc.).
mbias : ccdproc.CCDData, optional
Master bias.
mflat : ccdproc.CCDData, optional
Master flat.
mdark : ccdproc.CCDData, optional
Master dark.
skip_skysub : bool, optional
If True, skip 2D background subtraction. Default is False.
save_bkg : bool, optional
If True, save background model. Default is False.
log : ColoredLogger, optional
Logger for progress.
Returns
-------
list of ccdproc.CCDData
Reduced science frames (each with FILENAME in header).
"""
processed = []
processed_names = []
for sci in sorted(sci_list):
if log: log.info(f'Importing {sci}')
sci_full = self.import_image(sci, amp, log=log)
# Load static mask for this specific file
staticmask = self.load_staticmask(sci_full.header, paths)
if staticmask is None:
staticmask = np.zeros(sci_full.data.shape).astype(bool)
else:
if log: log.info('Applying static mask')
sci_full.data[staticmask]=np.nan
# Subtract bias
if mbias is not None:
if log: log.info('Subtracting bias')
sci_full = ccdproc.subtract_bias(sci_full, mbias)
# Subtract dark
if mdark is not None:
if log: log.info('Subtracting dark')
sci_full = ccdproc.subtract_dark(sci_full, mdark,
exposure_time=self.exptime_keyword, exposure_unit=u.second)
# Perform flat-fielding
if mflat is not None:
if log: log.info('Flattening image')
sci_full = ccdproc.flat_correct(sci_full, mflat)
# Expand mask wherever sensitivity of the detector is low
# e.g., due to vignetting
staticmask = staticmask | (mflat.data < 0.5)
# Expand input mask
processed_data = sci_full
processed_data.mask = self.expand_mask(processed_data,
input_mask=staticmask)
processed_data.data[processed_data.mask]=np.nan
if log: log.info(f'Wavelength is {self.wavelength}')
if not skip_skysub and not self.needs_sky_subtraction(fil):
if log: log.info('Calculating 2D background.')
approx_background=np.nanmedian(processed_data.data) * u.electron
if log: log.info(f'Approximate background value: {approx_background}')
bkg = Background2D(processed_data, (64, 64), filter_size=(3, 3),
sigma_clip=SigmaClip(sigma=3), exclude_percentile=80,
bkg_estimator=MeanBackground(), mask=processed_data.mask,
fill_value=np.nanmedian(processed_data.data))
med_background = np.nanmedian(bkg.background)
if log: log.info(f'Median background: {med_background}')
if np.isnan(med_background.value):
final_img = processed_data.subtract(approx_background)
final_img.header = processed_data.header
final_img.header['SATURATE'] -= approx_background.value
final_img.header['SKYBKG'] = approx_background.value
else:
if save_bkg:
bkg_filename = self.get_bkg_name(processed_data.header, paths['work'])
if log: log.info(f'Writing background file: {bkg_filename}')
bkg_hdu = fits.PrimaryHDU(bkg.background.value)
bkg_hdu.header = processed_data.header
bkg_hdu.writeto(bkg_filename, overwrite=True,
output_verify='silentfix')
final_img = processed_data.subtract(CCDData(bkg.background,
unit=u.electron), propagate_uncertainties=True,
handle_meta='first_found')
final_img.header['SATURATE'] -= med_background.value
final_img.header['SKYBKG'] = med_background.value
if log: log.info('Updating background and saturation values.')
else:
final_img = processed_data
final_img.header['SKYBKG'] = 0.0
# Apply final masking based on excessively negative values
mean, median, stddev = sigma_clipped_stats(final_img.data)
mask = final_img.data < median - 10 * stddev
final_img.data[mask] = np.nan
final_img.mask = final_img.mask | mask
# Convert data format to float32
final_img.data = final_img.data.astype(np.float32)
final_img.header['BITPIX']=-32
# Get read noise for uncertainty image
rdnoise = np.mean(self.get_rdnoise(final_img.header))
# Add RMS noise to account for sky background
data = final_img.data
rms = 0.5 * (
np.percentile(data[~np.isnan(data)], 84.13)
- np.percentile(data[~np.isnan(data)], 15.86)
)
rdnoise = rdnoise + rms**2
if log: log.info('Creating uncertainty image')
final_img = ccdproc.create_deviation(final_img,
readnoise=rdnoise*u.electron,
disregard_nan=True)
# Delete empty header keys
for key in list(final_img.header.keys()):
if not key.strip():
if key in final_img.header.keys():
del final_img.header[key]
final_filename = self.get_sci_name(final_img.header, paths['work'])
# Edit additional header values
final_img.header['FILENAME']=final_filename
final_img.header['FILTER']=fil
final_img.header['AMPS']=amp
final_img.header['BINNING']=binn
final_img.header['ORGFILE']=sci
final_img.header['EXTNAME']='SCI'
# Get rid of header values if they exist
for key in ['DATASEC','BIASSEC','CCDSEC']:
if key in final_img.header.keys():
del final_img.header[key]
if log: log.info(f'Writing final file: {final_filename}')
final_img.write(final_filename, overwrite=True, output_verify='silentfix')
processed.append(final_img)
processed_names.append(final_filename)
# Create sky image and subtract from every science frame
if self.needs_sky_subtraction(fil):
sky_frame = self.get_msky_name(paths, fil, amp, binn)
if not os.path.exists(sky_frame):
self.create_sky(processed_names, fil, amp, binn, paths,
log=log)
sky_frame = self.load_sky(paths, fil, amp, binn)
for i, frame in enumerate(processed):
# Robust scale: sigma-clipped median so bright sources don't bias sky level
mean, med, stddev = sigma_clipped_stats(
frame.data,
sigma_upper=3.0,
sigma_lower=3.0,
maxiters=5,
)
# Scale normalized sky to same units as science (electrons) so subtract is valid
science_unit = frame.unit if frame.unit is not None else u.electron
frame_sky = sky_frame.multiply(med * science_unit,
propagate_uncertainties=True, handle_meta='first_found')
processed[i] = frame.subtract(frame_sky,
propagate_uncertainties=True, handle_meta='first_found')
processed[i].header['SKYBKG']=med
processed[i].header['SATURATE']-=med
# Rewrite file
final_filename = self.get_sci_name(processed[i].header,
paths['work'])
if log: log.info(f'Writing final file: {final_filename}')
processed[i].write(final_filename, overwrite=True, output_verify='silentfix')
return(processed)