Source code for potpyri.instruments.instrument

"""Base instrument class and shared reduction logic for POTPyRI.

Defines detector keywords, calibration flags, file naming, and methods for
importing/processing science frames, building master bias/dark/flat, and
loading static masks. Subclasses (GMOS, LRIS, BINOSPEC, etc.) set
instrument-specific attributes.
"""
from potpyri._version import __version__

import os
import astropy
import datetime
import copy
import ccdproc
import numpy as np

from photutils.background import Background2D
from photutils.background import MeanBackground

import astropy.units as u
from astropy.io import fits
from astropy.modeling import models
from astropy.nddata import CCDData
from astropy.stats import sigma_clipped_stats
from astropy.stats import SigmaClip
from astropy.time import Time

# WCS-related keywords to remove from calibration headers so saved files never
# trigger InvalidTransformError (e.g. DEC=-100, ill-conditioned CD matrix).
_CAL_HEADER_WCS_KEYS = (
    'CTYPE1', 'CTYPE2', 'CRVAL1', 'CRVAL2', 'CRPIX1', 'CRPIX2',
    'CD1_1', 'CD1_2', 'CD2_1', 'CD2_2', 'CDELT1', 'CDELT2', 'CROTA2',
    'RADECSYS', 'RADESYS', 'EQUINOX', 'RA', 'DEC', 'LONPOLE', 'LATPOLE',
    'CUNIT1', 'CUNIT2', 'MJD-OBS',
    'LTM1_1', 'LTM1_2', 'LTM2_1', 'LTM2_2', 'LTV1', 'LTV2',
    'DTV1', 'DTV2', 'DTM1_1', 'DTM2_2',
)


def _sanitize_calibration_header(header):
    """Remove WCS and coordinate keywords from a header so it can be written safely.

    Calibration frames (bias, dark, flat, sky) inherit headers from input frames
    that may contain invalid or ill-conditioned WCS (e.g. DEC=-100). Removing
    these keywords ensures saved calibration files do not cause
    InvalidTransformError when later loaded by code that parses WCS.
    Modifies header in place.
    """
    for key in _CAL_HEADER_WCS_KEYS:
        if key in header:
            del header[key]
    # Remove PV* distortion keywords (can be ill-conditioned)
    to_del = [k for k in header.keys() if k.startswith('PV')]
    for k in to_del:
        del header[k]


def _read_calibration_ccd(path, unit, hdu_index=0):
    """Read a calibration FITS into CCDData without parsing WCS.

    Master bias/dark/flat frames often have WCS keywords that are ill-conditioned
    or copied from templates. Parsing them can raise InvalidTransformError and
    log 'Ill-conditioned coordinate transformation parameter'. Calibration
    frames do not need WCS, so we load data and header explicitly with wcs=None.
    """
    with fits.open(path) as hdu_list:
        hdu = hdu_list[hdu_index]
        return CCDData(hdu.data, meta=hdu.header.copy(), unit=unit, wcs=None)


[docs] class Instrument(object): """Base class for all POTPyRI instruments. Holds detector parameters (gain, rdnoise, pixscale, saturation), header keyword names, file-sorting rules, and methods for calibration and science processing. Subclasses override attributes as needed. """ def __init__(self): # Version self.version = __version__ # Intrument name self.name = '' # Extensions for keeping track of general metadata and WCS self.raw_header_ext = 0 self.wcs_extension = 0 # Detector specific characteristics self.pixscale = None self.saturation = 65535 self.min_exptime = 1.0 # Run dark/bias/flat calibration? self.dark = False self.bias = False self.flat = False # Parameters for handling calibration files # Run rejection on possible CR pixels in bias self.cr_bias = True # Extend header to first file extension for purposes of sort_files self.extend_header = False # How to combine images during stacking self.stack_method = 'median' self.wavelength = 'OPT' self.gain = [1.02, 1.08] self.rdnoise = [4.0, 4.0] self.datasec = ['[1055:3024,217:3911]','[1067:3033,217:3911]'] self.biassec = ['[1:1054,1:4112]','[3034:4096,1:4112]'] # Keywords for selecting files from Sort_files object # This allows a single file type to be used for multiple purposes (e.g., for # generating a flat-field image from science exposures) self.filetype_keywords = {'SCIENCE':'SCIENCE', 'FLAT':'FLAT', 'DARK':'DARK','BIAS':'BIAS'} # Header keywords self.target_keyword = 'OBJECT' self.exptime_keyword = 'EXPTIME' self.filter_keyword = 'FILTER' self.mjd_keyword = 'MJD' self.bin_keyword = 'CCDSUM' self.amp_keyword = '2' # File sorting keywords self.science_keywords = ['MASK','SCRN'] self.science_values = ['imaging','stowed'] self.flat_keywords = ['MASK','SCRN'] self.flat_values = ['imaging','deployed'] self.bias_keywords = [] self.bias_values = [] self.dark_keywords = [] self.dark_values = [] self.spec_keywords = [] self.spec_values = [] self.bad_keywords = [] self.bad_values = [] self.detrend = True self.catalog_zp = 'PS1' # Default final image size for binning of 1x1 self.out_size = 5000
[docs] def match_type_keywords(self, kwds, file_table): """Return a boolean mask of file_table rows whose Type is in kwds. Parameters ---------- kwds : str Comma-separated type names (e.g. 'SCIENCE', 'FLAT'). file_table : astropy.table.Table File list table with 'Type' column. Returns ------- np.ndarray Boolean mask, True where file_table['Type'] is in kwds. """ masks = [] for kwd in kwds.split(','): masks.append(file_table['Type']==kwd) masks = np.array(masks) mask = np.any(masks, axis=0) return(mask)
[docs] def needs_sky_subtraction(self, filt): """Return True if sky subtraction should be run for this filter. Parameters ---------- filt : str Filter name (e.g. 'r', 'J'). Returns ------- bool True for NIR wavelength instruments, False otherwise. """ if self.wavelength=='NIR': return(True) else: return(False)
[docs] def get_pixscale(self): """Return pixel scale in arcsec/pixel. Returns ------- float or None Pixel scale; may vary by mode in subclasses. """ return(self.pixscale)
[docs] def get_rdnoise(self, hdr): """Return read noise value(s) for the detector/amplifier. Parameters ---------- hdr : astropy.io.fits.Header FITS header (may be used in subclasses for amp-dependent values). Returns ------- float or list Read noise in electrons. """ return(self.rdnoise)
[docs] def get_gain(self, hdr): """Return gain value(s) for the detector/amplifier. Parameters ---------- hdr : astropy.io.fits.Header FITS header (may be used in subclasses for amp-dependent values). Returns ------- float or list Gain in e-/ADU. """ return(self.gain)
[docs] def get_target(self, hdr): """Return target name from header (target_keyword), spaces stripped.""" return(hdr[self.target_keyword].replace(' ',''))
[docs] def get_filter(self, hdr): """Return filter name from header; leading segment before underscore.""" filt = hdr[self.filter_keyword] filt = filt.replace(' ','') filt = filt.split('_')[0] return(filt)
[docs] def get_exptime(self, hdr): """Return exposure time from header (exptime_keyword).""" return(hdr[self.exptime_keyword])
[docs] def get_ampl(self, hdr): """Return amplifier identifier from header as string.""" if self.amp_keyword in hdr.keys(): return(str(hdr[self.amp_keyword])) else: return(str(self.amp_keyword))
[docs] def get_binning(self, hdr): """Return binning string from header (e.g. '11', '22'), normalized and no 'x'.""" if self.bin_keyword in hdr.keys(): binn = str(hdr[self.bin_keyword]).lower().replace(' ','') binn = binn.replace('x','') binn = binn.replace(',','') return(binn) else: return(self.bin_keyword)
[docs] def get_out_size(self, hdr): """Return output image size (pixels) for this binning (out_size / binn).""" binn = int(str(self.get_binning(hdr))[0]) out_size = int(self.out_size/binn) return(out_size)
[docs] def get_time(self, hdr): """Return MJD from header (mjd_keyword) as float.""" return(float(hdr[self.mjd_keyword]))
[docs] def get_instrument_name(self, hdr): """Return instrument name (lowercase) for paths/filenames.""" return(self.name.lower())
[docs] def get_staticmask_filename(self, hdr, paths): """Return path(s) to the static bad-pixel mask FITS for this instrument/binning. Parameters ---------- hdr : astropy.io.fits.Header FITS header (for binning and instrument). paths : dict Paths dict with 'code' key (package root). Returns ------- list One-element list with path to staticmask FITS, or [None] if not found. """ instname = self.get_instrument_name(hdr) binn = self.get_binning(hdr) mask_file = os.path.join(paths['code'], '..', 'data', 'staticmasks', f'{instname}.{binn}.staticmask.fits.fz') if os.path.exists(mask_file): return([mask_file]) else: return([None])
[docs] def get_catalog(self, hdr): """Return catalog name for zeropoint (e.g. 'PS1').""" return(self.catalog_zp)
[docs] def format_datasec(self, sec_string, binning=1): """Convert datasec string to binned pixel bounds (e.g. '[1:100,1:200]'). Parameters ---------- sec_string : str Data section string like '[x1:x2,y1:y2]'. binning : int, optional Binning factor. Default is 1. Returns ------- str Reformatted section string with binned limits. """ sec_string = sec_string.replace('[','').replace(']','') x,y = sec_string.split(',') x1,x2 = x.split(':') y1,y2 = y.split(':') x1 = float(x1) ; x2 = float(x2) ; y1 = float(y1) ; y2 = float(y2) x1 = np.max([int(1.0*x1/binning),1]) x2 = int(1.0*x2/binning) y1 = np.max([int(1.0*y1/binning),1]) y2 = int(1.0*y2/binning) sec_string = f'[{x1}:{x2},{y1}:{y2}]' return(sec_string)
[docs] def raw_format(self, proc): """Return glob pattern for raw files (e.g. 'sci_img_*.fits' or 'sci_img*[!proc].fits').""" if proc: return('sci_img_*.fits') else: return('sci_img*[!proc].fits')
[docs] def get_stk_name(self, hdr, red_path): """Return full path for stacked output FITS (target.filter.utYYMMDD.amp.binn.stk.fits).""" target = self.get_target(hdr) fil = self.get_filter(hdr) amp = self.get_ampl(hdr) binn = self.get_binning(hdr) file_time = self.get_time(hdr) datestr = Time(file_time, format='mjd').datetime.strftime('ut%y%m%d') filename = f'{target}.{fil}.{datestr}.{amp}.{binn}.stk.fits' fullfilename = os.path.join(red_path, filename) return(fullfilename)
[docs] def get_sci_name(self, hdr, red_path): """Return full path for single-extension science FITS.""" target = self.get_target(hdr) fil = self.get_filter(hdr) amp = self.get_ampl(hdr) binn = self.get_binning(hdr) file_time = self.get_time(hdr) number = self.get_number(hdr) datestr = Time(file_time, format='mjd').datetime.strftime('ut%y%m%d') filename = f'{target}.{fil}.{datestr}.{amp}.{binn}.{number}.fits' fullfilename = os.path.join(red_path, filename) return(fullfilename)
[docs] def get_bkg_name(self, hdr, red_path): """Return full path for background FITS (sci name with _bkg.fits).""" sci_name = self.get_sci_name(hdr, red_path) bkg_filename = sci_name.replace('.fits','_bkg.fits') return(bkg_filename)
[docs] def get_mbias_name(self, paths, amp, binn): """Return path for master bias FITS (paths['cal']/mbias_amp_binn.fits).""" red_path = paths['cal'] return(os.path.join(red_path, f'mbias_{amp}_{binn}.fits'))
[docs] def get_mdark_name(self, paths, amp, binn): """Return path for master dark FITS.""" red_path = paths['cal'] return(os.path.join(red_path, f'mdark_{amp}_{binn}.fits'))
[docs] def get_mflat_name(self, paths, fil, amp, binn): """Return path for master flat FITS (filter, amp, binning).""" red_path = paths['cal'] return(os.path.join(red_path, f'mflat_{fil}_{amp}_{binn}.fits'))
[docs] def get_msky_name(self, paths, fil, amp, binn): """Return path for master sky FITS.""" red_path = paths['cal'] return(os.path.join(red_path, f'msky_{fil}_{amp}_{binn}.fits'))
[docs] def load_bias(self, paths, amp, binn): """Load master bias CCDData for given amp and binning. Parameters ---------- paths : dict Paths dict (cal key). amp : str Amplifier identifier. binn : str Binning string. Returns ------- ccdproc.CCDData Master bias in electrons. Raises ------ Exception If master bias file not found. """ bias = self.get_mbias_name(paths, amp, binn) if os.path.exists(bias): mbias = _read_calibration_ccd(bias, u.electron, hdu_index=0) elif os.path.exists(bias+'.fz'): mbias = _read_calibration_ccd(bias+'.fz', u.electron, hdu_index=1) else: raise Exception(f'Could not find bias: {bias}') return(mbias)
[docs] def load_dark(self, paths, amp, binn): """Load master dark CCDData for given amp and binning. Parameters ---------- paths : dict Paths dict (cal key). amp : str Amplifier identifier. binn : str Binning string. Returns ------- ccdproc.CCDData Master dark in electrons. Raises ------ Exception If master dark file not found. """ dark = self.get_mdark_name(paths, amp, binn) if os.path.exists(dark): mdark = _read_calibration_ccd(dark, u.electron, hdu_index=0) elif os.path.exists(dark+'.fz'): mdark = _read_calibration_ccd(dark+'.fz', u.electron, hdu_index=1) else: raise Exception(f'Could not find dark: {dark}') return(mdark)
[docs] def load_flat(self, paths, fil, amp, binn): """Load master flat CCDData for given filter, amp, and binning. Parameters ---------- paths : dict Paths dict (cal key). fil : str Filter name. amp : str Amplifier identifier. binn : str Binning string. Returns ------- ccdproc.CCDData Master flat (normalized). Raises ------ Exception If master flat file not found. """ flat = self.get_mflat_name(paths, fil, amp, binn) if os.path.exists(flat): mflat = _read_calibration_ccd(flat, u.dimensionless_unscaled, hdu_index=0) elif os.path.exists(flat+'.fz'): mflat = _read_calibration_ccd(flat+'.fz', u.dimensionless_unscaled, hdu_index=1) else: raise Exception(f'Could not find flat: {flat}') return(mflat)
[docs] def load_sky(self, paths, fil, amp, binn): """Load master sky CCDData for given filter, amp, and binning. Parameters ---------- paths : dict Paths dict (cal key). fil : str Filter name. amp : str Amplifier identifier. binn : str Binning string. Returns ------- ccdproc.CCDData Master sky (normalized). Raises ------ Exception If master sky file not found. """ sky = self.get_msky_name(paths, fil, amp, binn) if os.path.exists(sky): msky = _read_calibration_ccd(sky, u.dimensionless_unscaled, hdu_index=0) elif os.path.exists(sky+'.fz'): msky = _read_calibration_ccd(sky+'.fz', u.dimensionless_unscaled, hdu_index=1) else: raise Exception(f'Could not find sky frame: {sky}') return(msky)
[docs] def import_image(self, filename, amp, log=None): """Load raw multi-extension FITS and return processed CCDData (bias/overscan/trim/gain). Parameters ---------- filename : str Path to raw FITS file. amp : str Amplifier identifier (number of extensions). log : ColoredLogger, optional Logger for progress. Returns ------- ccdproc.CCDData Processed frame in electrons (single combined image). """ filename = os.path.abspath(filename) if log: log.info(f'Loading file: {filename}') with fits.open(flat) as hdr: header = hdr[1].header raw = [CCDData.read(flat, hdu=x+1, unit='adu') for x in range(int(amp))] red = [ccdproc.ccd_process(x, oscan=self.biassec[k], oscan_model=models.Chebyshev1D(3), trim=self.datasec[k], gain=self.gain[k]*u.electron/u.adu, readnoise=rdnoise(hdr)*u.electron, gain_corrected=True) for k,x in enumerate(raw)] frame_full = CCDData(np.concatenate((red[0], np.empty((red[0].shape[0], 794)), red[1]), axis=1), header=header,unit=u.electron) frame_full.header['SATURATE'] = saturation(header) return(frame_full)
[docs] def import_sci_image(self, filename, log=None): """Load a single science FITS as CCDData (no overscan/trim; already reduced). Parameters ---------- filename : str Path to FITS file. log : ColoredLogger, optional Logger for progress. Returns ------- ccdproc.CCDData Science frame. """ filename = os.path.abspath(filename) if log: log.info(f'Loading file: {filename}') frame_full = CCDData.read(filename) return(frame_full)
[docs] def create_bias(self, bias_list, amp, binn, paths, log=None, **kwargs): """Combine bias frames into master bias and write to paths['cal']. Parameters ---------- bias_list : list of str Paths to bias FITS files. amp : str Amplifier identifier. binn : str Binning string. paths : dict Paths dict (cal key). log : ColoredLogger, optional Logger for progress. **kwargs Unused; for subclass compatibility. Returns ------- None Master bias written via get_mbias_name. """ if log: log.info(f'Processing bias files with {amp} amps and {binn} binning.') log.info(f'{len(bias_list)} files found.') biases = [] for i, bias in enumerate(bias_list): if log: log.info(f'Importing {bias}') bias_full = self.import_image(bias, amp, log=log) # Load static mask for this specific file staticmask = self.load_staticmask(bias_full.header, paths) if staticmask is not None: if log: log.info('Applying static mask') bias_full.data[staticmask]=np.nan if self.cr_bias: # Add a CR mask to the bias image mean, median, stddev = sigma_clipped_stats(bias_full.data) mask = bias_full.data > median + 3 * stddev if bias_full.mask is not None: bias_full.mask = bias_full.mask | mask else: bias_full.mask = mask bias_full.data[mask]=median biases.append(bias_full) mbias = ccdproc.combine(biases, method='median', sigma_clip=True, clip_extrema=True) mbias.data[np.isnan(mbias.data)]=np.nanmedian(mbias.data) if log: log.info(f'Made bias for amp: {amp}, bin: {binn}.') mbias.header['VER'] = (__version__, 'Version of telescope parameter file used.') bias_filename = self.get_mbias_name(paths, amp, binn) _sanitize_calibration_header(mbias.header) mbias.write(bias_filename, overwrite=True, output_verify='silentfix') if log: log.info(f'Master bias written to {bias_filename}') return
[docs] def create_dark(self, dark_list, amp, binn, paths, mbias=None, log=None): """Combine dark frames (with optional bias subtraction) and write master dark. Parameters ---------- dark_list : list of str Paths to dark FITS files. amp : str Amplifier identifier. binn : str Binning string. paths : dict Paths dict (cal key). mbias : ccdproc.CCDData, optional Master bias to subtract before combining. log : ColoredLogger, optional Logger for progress. Returns ------- None Master dark written via get_mdark_name. """ if log: log.info(f'Processing dark files with {amp} amps and {binn} binning.') log.info(f'{len(dark_list)} files found.') darks = [] exptimes = [] for dark in dark_list: if log: log.info(f'Importing {dark}') dark_full = self.import_image(dark, amp, log=log) # Load static mask for this specific file staticmask = self.load_staticmask(dark_full.header, paths) if staticmask is not None: if log: log.info('Applying static mask') dark_full.data[staticmask]=np.nan exptimes.append(self.get_exptime(dark_full.header)) if mbias is not None: if log: log.info('Subtracting bias') dark_full = ccdproc.subtract_bias(dark_full, mbias) darks.append(dark_full) if log: log.info('Creating master dark.') mdark = ccdproc.combine(darks, method='median', scale=1./np.array(exptimes), sigma_clip=True, clip_extrema=True) # Rescale to electrons by average of all exposure times avg_exptime = np.mean(exptimes) mdark.data *= avg_exptime mdark.header[self.exptime_keyword]=(avg_exptime, 'Exposure time of master dark (sec).') # Update other header variables mdark.header['VER'] = (__version__, 'Version of telescope parameter file used.') for i,im in enumerate(dark_list): mdark.header[f'FILE{i+1}']=os.path.basename(im) darkname = self.get_mdark_name(paths, amp, binn) if log: log.info(f'Writing master dark to {darkname}') _sanitize_calibration_header(mdark.header) mdark.write(darkname, overwrite=True, output_verify='silentfix') return
[docs] def create_flat(self, flat_list, fil, amp, binn, paths, mbias=None, mdark=None, is_science=False, log=None, **kwargs): """Combine flat frames (bias/dark subtract, normalize) and write master flat. Parameters ---------- flat_list : list of str Paths to flat FITS files. fil : str Filter name. amp : str Amplifier identifier. binn : str Binning string. paths : dict Paths dict (cal key). mbias : ccdproc.CCDData, optional Master bias to subtract. mdark : ccdproc.CCDData, optional Master dark to subtract. is_science : bool, optional If True, mask high pixels as for science. Default is False. log : ColoredLogger, optional Logger for progress. **kwargs Unused; for subclass compatibility. Returns ------- None Master flat written via get_mflat_name. """ if log: log.info(f'Processing files for filter: {fil}') log.info(f'{len(flat_list)} files found.') scale = [] flats = [] for i, flat in enumerate(flat_list): if log: log.info(f'Importing {flat}') flat_full = self.import_image(flat, amp, log=log) # Load static mask for this specific file staticmask = self.load_staticmask(flat_full.header, paths) if staticmask is not None: if log: log.info('Applying static mask') flat_full.data[staticmask]=np.nan if mbias is not None: if log: log.info('Subtracting bias') flat_full = ccdproc.subtract_bias(flat_full, mbias) if mdark is not None: if log: log.info('Subtracting dark') flat_full = ccdproc.subtract_dark(flat_full, mdark, exposure_time=self.exptime_keyword, exposure_unit=u.second) # Mask flat_full if the image is a science frame if is_science: mean, median, stddev = sigma_clipped_stats(flat_full.data) mask = flat_full.data > 5*stddev + median flat_full.mask = mask.astype(np.uint8) exptime = self.get_exptime(flat_full.header) if log: log.info(f'Exposure time of image is {exptime}') norm = 1./np.nanmedian(flat_full.data) # Vet the flat normalization - it should not be negative if norm > 0.: log.info(f'Flat normalization: {norm}') else: # Skip this file log.error(f'Flat normalization: {norm}') continue scale.append(norm) flats.append(flat_full) mflat = ccdproc.combine(flats, method='median', scale=scale, sigma_clip=True, clip_extrema=True) # Mask flat mflat.data[np.isinf(mflat.data)]=np.nan mflat.data[mflat.data==0.0]=np.nan mflat.data[mflat.data<0.0]=np.nan mean, median, stddev = sigma_clipped_stats(mflat.data) mask = mflat.data > median + 10 * stddev mflat.data[mask]=np.nan # Mask the flat for numerical under/overflows logflat = np.log10(mflat.data) mean, median, stddev = sigma_clipped_stats(logflat) mask = np.abs(logflat - median) > 10 * stddev mflat.data[mask]=np.nan # Remove all nan values from flat mask = np.isnan(mflat.data) mflat.mask = mask mflat.data[mask]=1.0 if log: log.info(f'Made flat for filter: {fil}, amp: {amp}, bin: {binn}.') mflat.header['VER'] = (__version__, 'Version of telescope parameter file used.') flat_filename = self.get_mflat_name(paths, fil, amp, binn) _sanitize_calibration_header(mflat.header) mflat.write(flat_filename, overwrite=True, output_verify='silentfix') if log: log.info(f'Master flat written to {flat_filename}') return
[docs] def create_sky(self, sky_list, fil, amp, binn, paths, log=None, sky_sigma_upper=3.0, sky_sigma_lower=4.0, sky_maxiters=5, sky_n_sigma_high=3.0, sky_n_sigma_low=4.0, msky_n_sigma_high=5.0, msky_n_sigma_low=5.0, msky_maxiters=5, **kwargs): """Combine sky frames (normalize by median) and write master sky. Uses iterative sigma clipping to estimate the sky background and mask bright sources (and optionally low outliers) so the median is not biased. Per-frame and combined master sky masking use configurable sigma and iteration limits. Parameters ---------- sky_list : list of str Paths to sky FITS files. fil : str Filter name. amp : str Amplifier identifier. binn : str Binning string. paths : dict Paths dict (cal key). log : ColoredLogger, optional Logger for progress. sky_sigma_upper : float, optional Upper sigma for iterative clip when estimating per-frame sky. Default 3.0. sky_sigma_lower : float, optional Lower sigma for iterative clip when estimating per-frame sky. Default 4.0. sky_maxiters : int, optional Max iterations for sigma clipping per sky frame. Default 5. sky_n_sigma_high : float, optional Mask pixels above med + this many * std (bright sources). Default 3.0. sky_n_sigma_low : float, optional Mask pixels below med - this many * std (defects). Default 4.0. msky_n_sigma_high : float, optional Mask master sky pixels above median + this * std. Default 5.0. msky_n_sigma_low : float, optional Mask master sky pixels below median - this * std. Default 5.0. msky_maxiters : int, optional Max iterations for sigma clipping on combined master sky. Default 5. **kwargs Unused; for subclass compatibility. Returns ------- None Master sky written via get_msky_name. """ if log: log.info(f'Processing files for filter: {fil}') log.info(f'{len(sky_list)} files found.') skys = [] for i, sky in enumerate(sky_list): if log: log.info(f'Importing {sky}') sky_full = self.import_sci_image(sky, log=log) # Iterative sigma-clipped stats for robust sky level (excludes # bright sources and defects so median is unbiased) mean, med, stddev = sigma_clipped_stats( sky_full.data, sigma_upper=sky_sigma_upper, sigma_lower=sky_sigma_lower, maxiters=sky_maxiters, ) # Mask bright sources and low outliers so they don't affect norm mask_high = sky_full.data > med + sky_n_sigma_high * stddev mask_low = sky_full.data < med - sky_n_sigma_low * stddev sky_full.data[mask_high] = np.nan sky_full.data[mask_low] = np.nan # Recompute stats on masked data (NaNs excluded) for normalization mean, med, stddev = sigma_clipped_stats( sky_full.data, sigma_upper=sky_sigma_upper, sigma_lower=sky_sigma_lower, maxiters=sky_maxiters, ) norm = 1.0 / med sky_full = sky_full.multiply(norm) # Vet the sky normalization - it should not be negative if norm > 0.: if log: log.info(f'Sky normalization: {norm}') else: if log: log.error(f'Sky normalization: {norm}') continue nan_mask = np.isnan(sky_full.data) if sky_full.mask is None: sky_full.mask = np.zeros(sky_full.data.shape, dtype=bool) sky_full.mask[nan_mask] = True sky_full.data[nan_mask] = 1.0 skys.append(sky_full) msky = ccdproc.combine(skys, method='median', sigma_clip=True, clip_extrema=True) # Robust masking of combined master sky: iterative sigma clipping # for accurate median/std, then mask excess flux and defects msky.data[np.isinf(msky.data)] = np.nan msky.data[msky.data == 0.0] = np.nan mean, median, stddev = sigma_clipped_stats( msky.data, sigma_upper=3.0, sigma_lower=3.0, maxiters=msky_maxiters, ) mask_high = msky.data > median + msky_n_sigma_high * stddev mask_low = msky.data < median - msky_n_sigma_low * stddev msky.data[mask_high] = 1.0 msky.data[mask_low] = 1.0 # Restore valid fill for normalized sky (1.0 = no correction) msky.data[np.isnan(msky.data)] = 1.0 if log: log.info(f'Made sky for filter: {fil}, amp: {amp}, bin: {binn}.') msky.header['VER'] = (__version__, 'Version of telescope parameter file used.') sky_filename = self.get_msky_name(paths, fil, amp, binn) _sanitize_calibration_header(msky.header) msky.write(sky_filename, overwrite=True, output_verify='silentfix') if log: log.info(f'Master sky written to {sky_filename}') return
[docs] def load_staticmask(self, hdr, paths): """Load the static mask array for this instrument and binning. Parameters ---------- hdr : astropy.io.fits.Header FITS header (for binning and instrument name). paths : dict Paths dict with 'code' key. Returns ------- np.ndarray or None Boolean mask from static mask FITS extension [1], or None if not found. """ satmask_filename = self.get_staticmask_filename(hdr, paths) if (satmask_filename is not None and len(satmask_filename) > 0 and satmask_filename[0] is not None and os.path.exists(satmask_filename[0])): with fits.open(satmask_filename[0]) as hdu: satmask = hdu[1].data.astype(bool) else: satmask = None return(satmask)
[docs] def expand_mask(self, input_data, input_mask=None): """Build combined mask from NaN, inf, zero pixels and optional input mask. Parameters ---------- input_data : ccdproc.CCDData Image data (input_data.data used). input_mask : np.ndarray, optional Boolean mask to OR with bad-pixel mask. Returns ------- np.ndarray Boolean mask (True = bad). """ # Get image statistics mean, median, stddev = sigma_clipped_stats(input_data.data) # Add to input mask add_mask = np.isnan(input_data.data) add_mask = add_mask | np.isinf(input_data.data) add_mask = add_mask | (input_data.data==0.0) # Expand with input mask if it was provided if input_mask is None: input_mask = add_mask else: input_mask = input_mask | add_mask return(input_mask)
[docs] def process_science(self, sci_list, fil, amp, binn, paths, mbias=None, mflat=None, mdark=None, skip_skysub=False, save_bkg=False, log=None): """Reduce science frames: bias/dark/flat, optional sky subtraction; return CCDData list. Parameters ---------- sci_list : list of str Paths to science FITS files. fil : str Filter name. amp : str Amplifier identifier. binn : str Binning string. paths : dict Paths dict (work, cal, etc.). mbias : ccdproc.CCDData, optional Master bias. mflat : ccdproc.CCDData, optional Master flat. mdark : ccdproc.CCDData, optional Master dark. skip_skysub : bool, optional If True, skip 2D background subtraction. Default is False. save_bkg : bool, optional If True, save background model. Default is False. log : ColoredLogger, optional Logger for progress. Returns ------- list of ccdproc.CCDData Reduced science frames (each with FILENAME in header). """ processed = [] processed_names = [] for sci in sorted(sci_list): if log: log.info(f'Importing {sci}') sci_full = self.import_image(sci, amp, log=log) # Load static mask for this specific file staticmask = self.load_staticmask(sci_full.header, paths) if staticmask is None: staticmask = np.zeros(sci_full.data.shape).astype(bool) else: if log: log.info('Applying static mask') sci_full.data[staticmask]=np.nan # Subtract bias if mbias is not None: if log: log.info('Subtracting bias') sci_full = ccdproc.subtract_bias(sci_full, mbias) # Subtract dark if mdark is not None: if log: log.info('Subtracting dark') sci_full = ccdproc.subtract_dark(sci_full, mdark, exposure_time=self.exptime_keyword, exposure_unit=u.second) # Perform flat-fielding if mflat is not None: if log: log.info('Flattening image') sci_full = ccdproc.flat_correct(sci_full, mflat) # Expand mask wherever sensitivity of the detector is low # e.g., due to vignetting staticmask = staticmask | (mflat.data < 0.5) # Expand input mask processed_data = sci_full processed_data.mask = self.expand_mask(processed_data, input_mask=staticmask) processed_data.data[processed_data.mask]=np.nan if log: log.info(f'Wavelength is {self.wavelength}') if not skip_skysub and not self.needs_sky_subtraction(fil): if log: log.info('Calculating 2D background.') approx_background=np.nanmedian(processed_data.data) * u.electron if log: log.info(f'Approximate background value: {approx_background}') bkg = Background2D(processed_data, (64, 64), filter_size=(3, 3), sigma_clip=SigmaClip(sigma=3), exclude_percentile=80, bkg_estimator=MeanBackground(), mask=processed_data.mask, fill_value=np.nanmedian(processed_data.data)) med_background = np.nanmedian(bkg.background) if log: log.info(f'Median background: {med_background}') if np.isnan(med_background.value): final_img = processed_data.subtract(approx_background) final_img.header = processed_data.header final_img.header['SATURATE'] -= approx_background.value final_img.header['SKYBKG'] = approx_background.value else: if save_bkg: bkg_filename = self.get_bkg_name(processed_data.header, paths['work']) if log: log.info(f'Writing background file: {bkg_filename}') bkg_hdu = fits.PrimaryHDU(bkg.background.value) bkg_hdu.header = processed_data.header bkg_hdu.writeto(bkg_filename, overwrite=True, output_verify='silentfix') final_img = processed_data.subtract(CCDData(bkg.background, unit=u.electron), propagate_uncertainties=True, handle_meta='first_found') final_img.header['SATURATE'] -= med_background.value final_img.header['SKYBKG'] = med_background.value if log: log.info('Updating background and saturation values.') else: final_img = processed_data final_img.header['SKYBKG'] = 0.0 # Apply final masking based on excessively negative values mean, median, stddev = sigma_clipped_stats(final_img.data) mask = final_img.data < median - 10 * stddev final_img.data[mask] = np.nan final_img.mask = final_img.mask | mask # Convert data format to float32 final_img.data = final_img.data.astype(np.float32) final_img.header['BITPIX']=-32 # Get read noise for uncertainty image rdnoise = np.mean(self.get_rdnoise(final_img.header)) # Add RMS noise to account for sky background data = final_img.data rms = 0.5 * ( np.percentile(data[~np.isnan(data)], 84.13) - np.percentile(data[~np.isnan(data)], 15.86) ) rdnoise = rdnoise + rms**2 if log: log.info('Creating uncertainty image') final_img = ccdproc.create_deviation(final_img, readnoise=rdnoise*u.electron, disregard_nan=True) # Delete empty header keys for key in list(final_img.header.keys()): if not key.strip(): if key in final_img.header.keys(): del final_img.header[key] final_filename = self.get_sci_name(final_img.header, paths['work']) # Edit additional header values final_img.header['FILENAME']=final_filename final_img.header['FILTER']=fil final_img.header['AMPS']=amp final_img.header['BINNING']=binn final_img.header['ORGFILE']=sci final_img.header['EXTNAME']='SCI' # Get rid of header values if they exist for key in ['DATASEC','BIASSEC','CCDSEC']: if key in final_img.header.keys(): del final_img.header[key] if log: log.info(f'Writing final file: {final_filename}') final_img.write(final_filename, overwrite=True, output_verify='silentfix') processed.append(final_img) processed_names.append(final_filename) # Create sky image and subtract from every science frame if self.needs_sky_subtraction(fil): sky_frame = self.get_msky_name(paths, fil, amp, binn) if not os.path.exists(sky_frame): self.create_sky(processed_names, fil, amp, binn, paths, log=log) sky_frame = self.load_sky(paths, fil, amp, binn) for i, frame in enumerate(processed): # Robust scale: sigma-clipped median so bright sources don't bias sky level mean, med, stddev = sigma_clipped_stats( frame.data, sigma_upper=3.0, sigma_lower=3.0, maxiters=5, ) # Scale normalized sky to same units as science (electrons) so subtract is valid science_unit = frame.unit if frame.unit is not None else u.electron frame_sky = sky_frame.multiply(med * science_unit, propagate_uncertainties=True, handle_meta='first_found') processed[i] = frame.subtract(frame_sky, propagate_uncertainties=True, handle_meta='first_found') processed[i].header['SKYBKG']=med processed[i].header['SATURATE']-=med # Rewrite file final_filename = self.get_sci_name(processed[i].header, paths['work']) if log: log.info(f'Writing final file: {final_filename}') processed[i].write(final_filename, overwrite=True, output_verify='silentfix') return(processed)