Source code for potpyri.primitives.image_procs

"""Image calibration, masking, error arrays, alignment, and stacking.

Provides WCS handling, alignment to a common grid, cosmic-ray rejection,
satellite masking, master calibration combination, and stacked science
output. Authors: Charlie Kilpatrick.
"""
from potpyri._version import __version__

import os
import time
import numpy as np
import logging
import sys
import copy

# Needed for satellite trail detection - dependency can be removed later if 
# we turn off satellite masking or use some other algorithm (e.g., the Rubin one)
import acstools

import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits, ascii
from astropy.stats import sigma_clipped_stats
from astropy.wcs import WCS

from ccdproc import wcs_project
from ccdproc import CCDData
from ccdproc import combine
from ccdproc import cosmicray_lacosmic

# Internal dependencies
from potpyri.primitives import photometry
from potpyri.primitives import solve_wcs
from potpyri.utils import utilities

[docs] def remove_pv_distortion(header): """Remove PV* distortion keywords from a FITS header (modifies in place). Parameters ---------- header : astropy.io.fits.Header Header to modify. Returns ------- astropy.io.fits.Header The same header reference (modified in place). """ done = False while not done: bad_key = False for key in header.keys(): if key.startswith('PV'): if key in header.keys(): bad_key = True del header[key] if not bad_key: done = True return(header)
[docs] def get_fieldcenter(images): """Compute mean RA/Dec of image centers from a list of FITS files. Parameters ---------- images : list of str Paths to FITS files with WCS in primary header. Returns ------- list [mean_ra_deg, mean_dec_deg]. """ ras = [] ; des = [] for file in images: with fits.open(file) as hdu: w = WCS(hdu[0].header) data_shape = hdu[0].data.shape center_pix = (data_shape[1]/2., data_shape[0]/2.) coord = w.pixel_to_world(*center_pix) ras.append(coord.ra.degree) des.append(coord.dec.degree) mean_ra = np.mean(ras) mean_de = np.mean(des) return([mean_ra, mean_de])
[docs] def generate_wcs(tel, binn, fieldcenter, out_size): """Build a TAN WCS for a given field center and output size. Parameters ---------- tel : Instrument Instrument instance (for pixel scale). binn : str Binning string (e.g. '22'). fieldcenter : sequence [ra_deg, dec_deg] or parseable coordinate. out_size : int Output image size (out_size x out_size pixels). Returns ------- astropy.wcs.WCS WCS instance. """ w = {'NAXES':2, 'NAXIS1': out_size, 'NAXIS2': out_size, 'EQUINOX': 2000.0, 'LONPOLE': 180.0, 'LATPOLE': 0.0} pixscale = tel.get_pixscale() cdelt = pixscale * int(str(binn)[0])/3600.0 w['CDELT1'] = -1.0 * cdelt w['CDELT2'] = cdelt w['CTYPE1']='RA---TAN' w['CTYPE2']='DEC--TAN' w['CUNIT1']='deg' w['CUNIT2']='deg' w['CRPIX1']=float(out_size)/2 + 0.5 w['CRPIX2']=float(out_size)/2 + 0.5 coord = utilities.parse_coord(fieldcenter[0], fieldcenter[1]) w['CRVAL1']=coord.ra.degree w['CRVAL2']=coord.dec.degree w = WCS(w) return(w)
[docs] def align_images(reduced_files, paths, tel, binn, use_wcs=None, fieldcenter=None, out_size=None, skip_gaia=False, keep_all_astro=False, log=None): """Solve WCS and align reduced images to a common grid. Runs astrometry.net and optionally Gaia alignment, then reprojects to a common WCS. Parameters ---------- reduced_files : list of str Paths to reduced FITS files. paths : dict Paths dict from options.add_paths. tel : Instrument Instrument instance. binn : str Binning string. use_wcs : astropy.wcs.WCS, optional If provided, use this WCS instead of solving. fieldcenter : sequence, optional [ra, dec] for generating WCS. out_size : int, optional Output image size. skip_gaia : bool, optional If True, skip Gaia alignment. Default is False. keep_all_astro : bool, optional If True, keep all images regardless of astrometric dispersion. log : ColoredLogger, optional Logger for progress. Returns ------- tuple or None (aligned_data, masks, errors) as lists, or None if no images aligned. """ solved_images = [] aligned_images = [] aligned_data = [] for file in reduced_files: # Coarse WCS solution using astrometry.net success = solve_wcs.solve_astrometry(file, tel, binn, paths, shift_only=False, log=log) if not success: continue # Fine WCS solution using Gaia DR3 point sources if skip_gaia: hdu = fits.open(file) hdu[0].header['RADISP']=0.0 hdu[0].header['DEDISP']=0.0 hdu.writeto(file, overwrite=True) else: success = solve_wcs.align_to_gaia(file, tel, log=log) if not success: continue solved_images.append(file) # Reject images from stack where either RADISP or DEDISP is >5-sigma outlier if not keep_all_astro: if len(solved_images)>2: radisp = [] ; dedisp = [] for file in solved_images: hdu = fits.open(file, mode='readonly') radisp.append(hdu[0].header['RADISP']) dedisp.append(hdu[0].header['DEDISP']) radisp = np.array(radisp) ; dedisp = np.array(dedisp) ramean, ramedian, rastddev = sigma_clipped_stats(radisp) demean, demedian, destddev = sigma_clipped_stats(dedisp) if log: log.info(f'Median dispersion in R.A.={ramedian}') if log: log.info(f'Median dispersion in Decl.={demedian}') mask = (radisp-ramedian <= 5 * rastddev) &\ (dedisp-demedian <= 5 * destddev) if log: log.info('Rejecting the following images for high astrometric dispersion:') if np.all(mask): log.info('No images rejected') else: for i,m in enumerate(mask): if not m: log.info(solved_images[i]) solved_images = np.array(solved_images)[mask] else: if log: log.info('Keeping all images (even poorly aligned).') solved_images = np.array(solved_images) if len(solved_images)==0: return(None, None) # Determine what the value of use_wcs should be if use_wcs is None: if fieldcenter is None: fieldcenter = get_fieldcenter(solved_images) use_wcs = generate_wcs(tel, binn, fieldcenter, out_size) for file in solved_images: if log: log.info(f'Reprojecting {file}...') # Create a reprojection of the file in a common astrometric frame hdu = fits.open(file) header = remove_pv_distortion(hdu[0].header) wcs = WCS(hdu[0].header) if use_wcs is None: use_wcs = wcs image = CCDData(hdu[0].data, meta=hdu[0].header, wcs=wcs, unit=u.electron) if out_size: target_shape = (out_size, out_size) else: target_shape = None reprojected = wcs_project(image, target_wcs=use_wcs, order='bilinear', target_shape=target_shape) # Get rid of mask in data reprojected.mask = None outfile = file.replace('.fits','_reproj.fits') reprojected.write(file.replace('.fits','_reproj.fits'), overwrite=True) aligned_images.append(outfile) aligned_data.append(reprojected) return(aligned_images, aligned_data)
[docs] def image_proc(image_data, tel, paths, skip_skysub=False, fieldcenter=None, out_size=None, satellites=True, cosmic_ray=True, skip_gaia=False, keep_all_astro=False, relative_calibration=False, log=None): """Full image processing: align, mask, stack, and optionally detrend. Orchestrates WCS solving, alignment, satellite/cosmic-ray masking, stacking, and optional sky subtraction. Optionally calibrates frames to each other using SExtractor catalogs and RA/Dec cross-matching before stacking. Parameters ---------- image_data : astropy.table.Table File table subset (e.g. one TargType) with 'File' column. tel : Instrument Instrument instance. paths : dict Paths dict from options.add_paths. skip_skysub : bool, optional If True, skip sky subtraction. Default is False. fieldcenter : sequence, optional [ra, dec] for alignment. out_size : int, optional Output image size. satellites : bool, optional If True, mask satellite trails. Default is True. cosmic_ray : bool, optional If True, run cosmic-ray rejection. Default is True. skip_gaia : bool, optional If True, skip Gaia alignment. Default is False. keep_all_astro : bool, optional If True, keep all images regardless of astrometric dispersion. relative_calibration : bool, optional If True, run SExtractor on each aligned frame, cross-match sources in RA/Dec, and use relative fluxes to set scale factors for the stack combine. Default is False. log : ColoredLogger, optional Logger for progress. Returns ------- str or None Path to stacked output FITS, or None if processing failed. """ wavelength = tel.wavelength red_path = paths['red'] work_path = paths['work'] # All data in the table should be for one target assert np.all([image_data['Target']==image_data['Target'][0]]) assert np.all([image_data['TargType']==image_data['TargType'][0]]) cal_type = image_data['TargType'][0] target = image_data['Target'][0] fil = image_data['Filter'][0] amp = image_data['Amp'][0] binn = image_data['Binning'][0] # Load bias frame if tel.bias: if log: log.info('Loading master bias.') try: mbias = tel.load_bias(paths, amp, binn) except Exception as e: if log: log.error('No master bias found for this configuration') log.error(f'Skipping reduction for: {cal_type}') log.error(e) return(None) else: mbias = None # Load bias frame if tel.dark: if log: log.info('Loading master dark.') try: mdark = tel.load_dark(paths, amp, binn) except Exception as e: if log: log.error('No master dark found for this configuration') log.error(f'Skipping reduction for: {cal_type}') log.error(e) return(None) else: mdark = None # Load flat frame if tel.flat: if log: log.info('Loading master flat.') try: mflat = tel.load_flat(paths, fil, amp, binn) except Exception as e: if log: log.error('No master bias found for this configuration') log.error(f'Skipping reduction for: {cal_type}') log.error(e) return(None) else: mflat = None t1 = time.time() if log: log.info(f'Processing data for {cal_type}') # Bias subtraction, gain correction, flat correction, and flat fielding files = image_data['File'] processed = tel.process_science(files, fil, amp, binn, paths, mbias=mbias, mflat=mflat, mdark=mdark, skip_skysub=skip_skysub, log=log) # Get filenames for output processed data reduced_files = [p.header['FILENAME'] for p in processed] t2 = time.time() if log: log.info(f'Data processed in {t2-t1} sec') if log: log.info('Aligning images.') # If masking satellite trails, this needs to be done before reprojection so # that the acstools algorithm accurately models the edges of the detector if satellites: mask_satellites(processed, reduced_files, log=log) # Sort files so deepest exposure is first exptimes = [] for file in reduced_files: hdu = fits.open(file) exptimes.append(tel.get_exptime(hdu[0].header)) idx = np.flip(np.argsort(exptimes)) reduced_files = np.array(reduced_files)[idx] if out_size is None: out_size = tel.get_out_size(processed[0].header) if fieldcenter is not None: use_wcs = generate_wcs(tel, binn, fieldcenter, out_size) else: use_wcs = None aligned_images, aligned_data = align_images(reduced_files, paths, tel, binn, use_wcs=use_wcs, fieldcenter=fieldcenter, out_size=out_size, skip_gaia=skip_gaia, keep_all_astro=keep_all_astro, log=log) if aligned_images is None or aligned_data is None: return(None) if log: log.info('Creating mask and error arrays.') masks = [] errors = [] data_images = [] for stack_img in aligned_images: hdu = fits.open(stack_img, mode='readonly') hdr = hdu[0].header clean, mask = create_mask(stack_img, hdr['SATURATE'], np.mean(tel.get_rdnoise(hdr)), log=log, cosmic_ray=cosmic_ray, outpath=work_path) error = create_error(stack_img, mask, np.mean(tel.get_rdnoise(hdr))) masks.append(mask) errors.append(error) # Set nan values to 0 now that they are masked data = hdu[0].data data[np.isnan(data)] = 0.0 # Create final image triplet before stacking with data, mask, and error hdulist = fits.HDUList() sci_hdu = fits.ImageHDU() sci_hdu.data = data sci_hdu.header = hdu[0].header msk_hdu = fits.ImageHDU() msk_hdu.data = mask.data msk_hdu.header = mask.header err_hdu = fits.ImageHDU() err_hdu.data = error.data err_hdu.header = error.header hdulist.append(sci_hdu) hdulist.append(msk_hdu) hdulist.append(err_hdu) hdulist[0].name='SCI' hdulist[1].name='MASK' hdulist[2].name='ERROR' hdulist[0].header['EXTNAME']='SCI' hdulist[1].header['EXTNAME']='MASK' hdulist[2].header['EXTNAME']='ERROR' filename = hdulist[0].header['FILENAME'].replace('.fits', '_data.fits') fullfilename = os.path.join(work_path, filename) if log: log.info(f'Writing out all file data: {fullfilename}') else: print(f'Writing out all file data: {fullfilename}') hdulist.writeto(fullfilename, overwrite=True, output_verify='silentfix') data_images.append(fullfilename) if log: log.info('Creating median stack.') if len(aligned_data)>1: relative_scales = None if relative_calibration: exptimes = np.array([tel.get_exptime(d.header) for d in aligned_data]) relative_scales = compute_relative_scales(data_images, paths, exptimes, log=log) sci_med = stack_data(aligned_data, tel, masks, errors, log=log, relative_scales=relative_scales) sci_med = add_stack_mask(sci_med, aligned_data) if tel.detrend: if log: log.info('Detrending stack') sci_med = detrend_stack(sci_med) else: if log: log.info('Skipping detrending') else: sci_med = fits.open(data_images[0]) sci_med[0].data = sci_med[0].data.astype(np.float32) sci_med[1].data = sci_med[1].data.astype(np.uint8) sci_med[2].data = sci_med[2].data.astype(np.float32) sci_med[0].header['EXTNAME']='SCI' sci_med[1].header['EXTNAME']='MASK' sci_med[2].header['EXTNAME']='ERROR' sci_med[0].header['BITPIX'] = -32 sci_med[1].header['BITPIX'] = 8 sci_med[2].header['BITPIX'] = -32 # Get time parameters from aligned data mid_time = np.average([tel.get_time(d.header) for d in aligned_data]) exptimes = np.array([tel.get_exptime(d.header) for d in aligned_data]) eff_time = np.sum(exptimes**2)/np.sum(exptimes) total_time = np.sum(exptimes) # Rescale both data and error images by effective exposure times so they # are in e- instead of e-/s sci_med[0].data = sci_med[0].data * eff_time sci_med[2].data = sci_med[2].data * eff_time # Explicitly note that data and error extensions are in ELECTRONS sci_med[0].header['BUNIT'] = 'ELECTRONS' sci_med[2].header['BUNIT'] = 'ELECTRONS' # Add both formats for code that requires either sci_med[0].header['MJD-OBS'] = (mid_time, 'Mid-MJD of the observation sequence.') sci_med[0].header['MJD'] = (mid_time, 'Mid-MJD of the observation sequence.') # Since we generated stack with median, effective exposure should be a # weighted average of the input exposure times sci_med[0].header['EXPTIME'] = (eff_time, 'Effective expsoure time in seconds.') sci_med[0].header['EXPTOT'] = (total_time, 'Total exposure time in seconds') sci_med[0].header['GAIN'] = (len(aligned_data), 'Effecetive gain for stack.') # Calculate read noise rdnoise = np.mean(tel.get_rdnoise(sci_med[0].header))/np.sqrt(len(aligned_data)) sci_med[0].header['RDNOISE'] = (rdnoise, 'Readnoise of stack.') sci_med[0].header['NFILES'] = (len(aligned_data), 'Number of images in stack') sci_med[0].header['FILTER'] = fil sci_med[0].header['OBSTYPE'] = 'OBJECT' # Generate stack name and write out stackname = tel.get_stk_name(sci_med[0].header, red_path) sci_med.writeto(stackname, overwrite=True, output_verify='silentfix') if log: log.info(f'Stack made for {stackname}') else: print(f'Stack made for {stackname}') return(stackname)
[docs] def detrend_stack(stack): """Remove row and column median from stack science data (detrend). Parameters ---------- stack : astropy.io.fits.HDUList HDU list with [0]=SCI, [1]=MASK. Modified in place. Returns ------- astropy.io.fits.HDUList The same stack reference (modified in place). """ data = stack[0].data mask = stack[1].data.astype(bool) mean, med, stddev = sigma_clipped_stats(data, mask=mask, axis=1, sigma_upper=2.5) data = data - med[:,None] row_med = np.nanmedian(med) mean, med, stddev = sigma_clipped_stats(data, mask=mask, axis=0, sigma_upper=2.5) data = data - med[None,:] col_med = np.nanmedian(med) # Reapply mask to data data[mask] = 0.0 stack[0].data = data stack[0].header['SATURATE'] = stack[0].header['SATURATE'] - (row_med+col_med) return(stack)
[docs] def compute_relative_scales(data_images, paths, exptimes, log=None, match_radius_arcsec=2.0, min_sources=5, min_rel_err=0.01): """Compute relative flux scale factors from SExtractor catalogs and RA/Dec cross-matching. Runs Source Extractor on each aligned science image, cross-matches sources between frames in RA/Dec, and uses the inverse-variance-weighted mean of flux ratios (using FLUXERR_AUTO) to define scale factors so that combine() can stack on a common flux scale. For frames where matching fails (too few matches), the scale is set to 1.0/exptime for that frame. Parameters ---------- data_images : list of str Paths to FITS with SCI extension (e.g. *_data.fits from image_proc). paths : dict Paths dict; paths.get('source_extractor') used for SExtractor binary. exptimes : array-like Exposure time in seconds for each image (same order as data_images). log : ColoredLogger, optional Logger for progress. match_radius_arcsec : float, optional Match radius in arcsec for cross-matching sources. Default 2.0. min_sources : int, optional Minimum matched sources per frame to compute scale. Default 5. min_rel_err : float, optional Minimum relative flux error (err/flux) used in variance of ratio to avoid zero variance. Default 0.01. Returns ------- np.ndarray or None Relative scale factors, one per image (reference frame scale = 1.0). Frames that fail get scale = 1.0/exptime. None if catalogs cannot be built at all (caller should use exposure-time-only scaling). """ nimg = len(data_images) if nimg < 2: return None exptimes = np.atleast_1d(np.asarray(exptimes, dtype=float)) if len(exptimes) != nimg: if log: log.warning('Relative calibration: exptimes length does not match data_images.') return None sextractor_path = paths.get('source_extractor') def _col(cat, name): """Get column by case-insensitive name.""" for c in cat.colnames: if c.upper() == name.upper(): return c return None catalogs = [] for p in data_images: cat = photometry.run_sextractor(p, log=log, sextractor_path=sextractor_path) if cat is None or len(cat) < min_sources: if log: log.warning(f'Relative calibration: insufficient catalog for {p}, skipping.') return None ra_col = _col(cat, 'ALPHA_J2000') dec_col = _col(cat, 'DELTA_J2000') flux_col = _col(cat, 'FLUX_AUTO') fluxerr_col = _col(cat, 'FLUXERR_AUTO') if not (ra_col and dec_col and flux_col): if log: log.warning('Relative calibration: catalog missing RA/Dec or FLUX_AUTO.') return None catalogs.append((cat, ra_col, dec_col, flux_col, fluxerr_col)) # Reference = frame with most sources ref_idx = int(np.argmax([len(c[0]) for c in catalogs])) ref, ref_ra, ref_dec, ref_flux, ref_fluxerr = catalogs[ref_idx] ref_coords = SkyCoord(ref[ref_ra], ref[ref_dec], unit='deg') match_radius = match_radius_arcsec * u.arcsec relative_scales = np.ones(nimg, dtype=float) # Reference frame scale = 1.0; failed frames will get 1.0/exptime for i in range(nimg): if i == ref_idx: continue cat, ra_col, dec_col, flux_col, fluxerr_col = catalogs[i] coords = SkyCoord(cat[ra_col], cat[dec_col], unit='deg') # For each source in this frame, find nearest in reference idx_ref, d2d, _ = coords.match_to_catalog_sky(ref_coords) keep = d2d < match_radius n_match = np.sum(keep) if n_match < min_sources: if log: log.warning(f'Relative calibration: frame {i} has {n_match} matches (< {min_sources}), using 1/exptime.') relative_scales[i] = 1.0 / exptimes[i] continue flux_ref = np.array(ref[ref_flux][idx_ref[keep]], dtype=float) flux_i = np.array(cat[flux_col][keep], dtype=float) # Flux errors: use FLUXERR_AUTO if present, else fall back to unweighted if ref_fluxerr and fluxerr_col: err_ref = np.array(ref[ref_fluxerr][idx_ref[keep]], dtype=float) err_i = np.array(cat[fluxerr_col][keep], dtype=float) else: err_ref = err_i = None # Avoid zeros and negatives in flux valid = (flux_i > 0) & (flux_ref > 0) if np.sum(valid) < min_sources: if log: log.warning(f'Relative calibration: frame {i} has too few valid flux pairs, using 1/exptime.') relative_scales[i] = 1.0 / exptimes[i] continue ratio = flux_ref[valid] / flux_i[valid] if err_ref is not None and err_i is not None: err_ref = err_ref[valid] err_i = err_i[valid] # Relative errors with floor to avoid zero variance rel_err_ref = np.maximum(np.abs(err_ref) / np.maximum(flux_ref[valid], 1e-30), min_rel_err) rel_err_i = np.maximum(np.abs(err_i) / np.maximum(flux_i[valid], 1e-30), min_rel_err) # Variance of ratio R = A/B: var(R) ≈ R^2 * ( (σ_A/A)^2 + (σ_B/B)^2 ) var_ratio = ratio ** 2 * (rel_err_ref ** 2 + rel_err_i ** 2) var_ratio = np.maximum(var_ratio, 1e-30) weight = 1.0 / var_ratio relative_scales[i] = float(np.average(ratio, weights=weight)) else: relative_scales[i] = float(np.median(ratio)) if log: log.info(f'Relative calibration: scales (ref frame {ref_idx} = 1.0): {relative_scales}') return relative_scales
[docs] def stack_data(stacking_data, tel, masks, errors, mem_limit=8.0e9, log=None, relative_scales=None): """Combine aligned CCDData list with exposure scaling (median or average). Masks are applied (masked pixels set to nan) before combination. Optionally uses relative flux scales from source cross-matching so that frames are combined on a common flux scale. Parameters ---------- stacking_data : list of ccdproc.CCDData Aligned science images. tel : Instrument Instrument instance (for stack_method and get_exptime). masks : list of astropy.io.fits.ImageHDU Per-image masks. errors : list of astropy.io.fits.ImageHDU Per-image error arrays. mem_limit : float, optional Memory limit in bytes for ccdproc.combine. Default is 8e9. log : ColoredLogger, optional Logger for progress. relative_scales : np.ndarray, optional Relative flux scale factors (one per image; reference = 1.0). If provided, scale = relative_scales (exposure-time scaling is not applied). Returns ------- astropy.io.fits.HDUList HDU list [science, mask, error] (mask/error from first image). """ stack_method = tel.stack_method if not stack_method: if log: log.critical('Could not get stacking method for these images') logging.shutdown() sys.exit(-1) else: raise Exception('Could not get stacking method for these images') exptimes = [] for i,stk in enumerate(stacking_data): mask = masks[i].data.astype(bool) stacking_data[i].data[mask] = np.nan exptimes.append(float(tel.get_exptime(stk.header))) exptimes = np.array(exptimes) scale = 1.0 / exptimes if relative_scales is not None: scale = np.asarray(relative_scales, dtype=float) if len(scale)==1: stack_method='average' sci_med = combine(stacking_data, scale=scale, method=stack_method, mem_limit=mem_limit) sci_med = sci_med.to_hdu() return(sci_med)
[docs] def add_stack_mask(stack, stacking_data): """Update stack mask from per-image masks and saturation; set masked pixels to 0. Parameters ---------- stack : astropy.io.fits.HDUList HDU list with [0]=SCI, [1]=MASK, [2]=ERROR. Modified in place. stacking_data : list of ccdproc.CCDData Per-image data (headers used for SATURATE). Returns ------- astropy.io.fits.HDUList The same stack reference (modified in place). """ data = stack[0].data mask = stack[1].data error = stack[2].data # Keep track of original masked values bp_mask = (mask > 0) | (data==0.0) | np.isnan(data) | np.isnan(mask) |\ np.isnan(error) | (error==0.0) | np.isinf(data) # Reset mask mask = np.zeros(mask.shape).astype(np.uint8) # Add bad pixels back to mask mask[bp_mask] = 1 # Add saturation to mask sat = np.median([d.header['SATURATE'] for d in stacking_data]) stack[0].header['SATURATE'] = sat sat_mask = data >= sat mask[sat_mask] = 4 # Set masked values to 0 data[mask>0] = 0.0 error[mask>0] = 0.0 # Make sure that nan and inf values are removed from data data[np.isnan(data)]=0.0 data[np.isinf(data)]=0.0 stack[0].data = data stack[1].data = mask stack[2].data = error return(stack)
[docs] def mask_satellites(images, filenames, log=None): """Detect and mask satellite trails in science images using acstools.satdet. Parameters ---------- images : list of ccdproc.CCDData Science images; data is modified in place (trail pixels set to nan). filenames : list of str Paths corresponding to images (used for temp files). log : ColoredLogger, optional Logger for progress. Returns ------- None """ out_data = [] for i,science_data in enumerate(images): if log: log.info(f'Checking for satellite trails in: {filenames[i]}') data = np.ascontiguousarray(science_data.data.astype(np.float32)) tmphdu = fits.PrimaryHDU() tmpfile = filenames[i].replace('.fits','.tmp.fits') tmphdu.data = data tmphdu.writeto(tmpfile, overwrite=True) tmpshape = data.shape buf = int(np.round(np.max(tmpshape)/10.)) results, errors = acstools.satdet.detsat(tmpfile, chips=[0], sigma=4, h_thresh=0.1, buf=buf, verbose=False) trail_coords = results[(tmpfile, 0)] if len(trail_coords)>0: n_trails = len(trail_coords) trail_masks = [] for i,coord in enumerate(trail_coords): if log: log.info(f'Masking for satellite trail {i+1}/{n_trails}') try: mask = acstools.satdet.make_mask(tmpfile, 0, coord, verbose=False) except ValueError: continue trail_masks.append(mask) trail_mask = np.any(trail_masks, axis=0) # Set actual pixel values to nan so they don't contribute to image science_data.data[trail_mask] = np.nan out_data.append(science_data) os.remove(tmpfile)
[docs] def create_mask(science_data, saturation, rdnoise, sigclip=3.0, sigfrac=0.10, objlim=4.0, niter=6, outpath='', grow=0, cosmic_ray=True, fsmode='convolve', cleantype='medmask', log=None): """Build a bad-pixel/saturation/cosmic-ray mask using LACosmic and optional growth. Parameters ---------- science_data : str Path to science FITS file. saturation : float Saturation level (ADU); pixels above this get flag 4. rdnoise : float Read noise for LACosmic (electrons). sigclip : float, optional LACosmic sigma clipping. Default is 3.0. sigfrac : float, optional LACosmic sigfrac. Default is 0.10. objlim : float, optional LACosmic objlim. Default is 4.0. niter : int, optional LACosmic iterations. Default is 6. outpath : str, optional Output path for debug files. Default is ''. grow : int, optional Pixels to grow mask (adds flag 8). Default is 0. cosmic_ray : bool, optional If True, run LACosmic. Default is True. fsmode : str, optional LACosmic fsmode. Default is 'convolve'. cleantype : str, optional LACosmic cleantype. Default is 'medmask'. log : ColoredLogger, optional Logger for progress. Returns ------- tuple (cleaned_data_ndarray, mask_ImageHDU). Mask uses additive flags: 1=bad, 2=cosmic ray, 4=saturated, 8=neighbor. """ t_start = time.time() if log: log.info(f'Running astroscrappy on {science_data}') else: print(f'Running astroscrappy on {science_data}') hdu = fits.open(science_data) data = np.ascontiguousarray(hdu[0].data.astype(np.float32)) # Estimate FWHM size for LACosmic table = photometry.run_sextractor(science_data, log=log) if table is not None: # Clip fwhm_stars by fwhm fwhm, meanfwhm, stdfwhm = sigma_clipped_stats(table['FWHM_IMAGE']) if log: log.info(f'Using FWHM for cosmic rays: {fwhm}') else: print(f'Using FWHM for cosmic rays: {fwhm}') else: fwhm = 3.5 # Astroscrappy requires added sky background, so add this value back # Set the sky background to some nominal value if it is too low for CR rej skybkg = hdu[0].header['SKYBKG'] if log: log.info(f'Sky background in science frame is {skybkg}') else: print(f'Sky background in science frame is {skybkg}') if skybkg < 2000.0: skybkg = 2000.0 if log: log.info('Setting sky background to 2000.0') else: print('Setting sky background to 2000.0') # This needs to be done before data is modified to preserve bad pixels mask_bp = (data==0.0) | np.isnan(data) # Set data background to skybkg data = data + skybkg # Also need to adjust the saturation level by SKYBKG for saturated pixels saturation += skybkg if log: log.info('Masking saturated pixels.') mask_sat = np.zeros(data.shape).astype(bool) # create empty mask mask_sat = mask_sat.astype(np.uint8) #set saturated star mask type mask_sat[data >= saturation] = 4 #set saturated pixel flag if log: log.info('Cleaning and masking cosmic rays.') if log: log.info(f'Using sigclip={sigclip}, sigfrac={sigfrac}, objlim={objlim}') #clean science image of cosmic rays and create cosmic ray mask inmask = (mask_sat+mask_bp).astype(bool) scidata = CCDData(data, unit=u.electron, mask=inmask, wcs=WCS(hdu[0].header), meta=hdu[0].header) mask_cr = np.zeros(data.shape) mask_cr = mask_cr.astype(np.uint8) psfsize = int(np.round(2.5*fwhm)) if psfsize%2==0: psfsize+=1 if cosmic_ray: newdata, mask_cr = cosmicray_lacosmic(data, readnoise=rdnoise, satlevel=saturation, verbose=True, sigclip=sigclip, sigfrac=sigfrac, objlim=objlim, niter=niter, psffwhm=fwhm, psfsize=psfsize, fsmode=fsmode, cleantype=cleantype) mask_cr = mask_cr.astype(np.uint8) #set cosmic ray mask type else: newdata = copy.copy(data) mask_cr[mask_cr == 1] = 2 #set cosmic ray flag #combine bad pixel, cosmic ray, saturated star and satellite trail masks mask = mask_bp+mask_sat+mask_cr if grow>0: shape = mask.shape grow_mask = np.zeros(shape) shape = mask.shape mask = data > 0 xs, ys = np.where(mask.astype(bool)) # Grow mask by grow factor g = int(np.ceil((grow-1)/2)) for p in zip(xs, ys): grow_mask[np.max([p[0]-g,0]):np.min([p[0]+g,shape[0]-1]), np.max([p[1]-g,0]):np.min([p[1]+g,shape[1]-1])]=8 grow_mask = grow_mask.astype(np.uint8) mask = mask_bp+mask_sat+mask_cr+grow_mask # Get number of bad, cosmic-ray flagged, and saturated pixels nbad = np.sum(mask & 1 == 1) ncr = np.sum(mask & 2 == 2) nsat = np.sum(mask & 4 == 4) ngrow = np.sum(mask & 8 == 8) mask_hdu = fits.ImageHDU(mask) #create mask Primary HDU mask_hdu.header['VER'] = (__version__, 'Version of image procedures used.') #name of log file mask_hdu.header['USE'] = 'Complex mask using additive flags.'#header comment mask_hdu.header['M-BP'] = (1, 'Value of masked bad pixels.') mask_hdu.header['M-BPNUM'] = (nbad, 'Number of bad pixels.') mask_hdu.header['M-CR'] = (2, 'Value of masked cosmic ray pixels.') mask_hdu.header['M-CRNUM'] = (ncr, 'Number of cosmic ray pixels.') mask_hdu.header['SATURATE'] = (saturation, 'Level of saturation.') mask_hdu.header['M-SP'] = (4, 'Value of masked saturated pixels.') mask_hdu.header['M-SPNUM'] = (nsat, 'Number of saturated pixels.') mask_hdu.header['M-NE'] = (8, 'Value of masked neighbor pixels.') mask_hdu.header['M-NENUM'] = (ngrow, 'Number of neighboring masked pixels.') if log: log.info('Mask created.') log.info(f'{nbad} bad, {ncr} cosmic-ray, and {nsat} saturated pixels') log.info(f'{ngrow} neighbor masked pixels.') else: print('Mask created.') print(f'{nbad} bad, {ncr} cosmic-ray, and {nsat} saturated pixels') print(f'{ngrow} neighbor masked pixels.') t_end = time.time() if log: log.info(f'Mask creation completed in {t_end-t_start} sec') else: print(f'Mask creation completed in {t_end-t_start} sec') return(newdata, mask_hdu)
[docs] def create_error(science_data, mask_data, rdnoise): """Compute per-pixel error array from science image, mask, and read noise. Error = sqrt(poisson + rms^2 + rdnoise^2). Masked pixels are excluded from RMS estimation. science_data can be a path or HDU; mask_data is an HDU with mask array. Parameters ---------- science_data : str or astropy.io.fits.HDUList Path to science FITS or open HDU list. mask_data : astropy.io.fits.PrimaryHDU or ImageHDU HDU containing boolean or integer mask (masked pixels excluded from rms). rdnoise : float Read noise in electrons. Returns ------- astropy.io.fits.PrimaryHDU Error array in electrons (BUNIT='ELECTRONS'). """ with fits.open(science_data) as hdu: img_data = hdu[0].data.astype(np.float32) mask = mask_data.data.astype(bool) # Check for issue with all of the file being masked if np.all(mask): rms = hdu[0].header['SATURATE'] else: rms = 0.5 * ( np.percentile(img_data[~mask], 84.13) - np.percentile(img_data[~mask], 15.86) ) poisson = img_data.copy() poisson[poisson < 0.] = 0. error = np.sqrt(poisson + rms**2 + rdnoise) # Sanitize error array mask = np.isnan(error) error[mask] = np.nanmedian(error) mask = error < 0.0 error[mask] = np.nanmedian(error) maxval = np.float32(hdu[0].header['SATURATE']) mask = np.isinf(error) error[mask] = maxval error_hdu = fits.PrimaryHDU(error) # create mask Primary HDU error_hdu.header['VER'] = (__version__, 'Version of image procedures used used.') error_hdu.header['USE'] = 'Error array for Poisson, read, and RMS noise' error_hdu.header['BUNIT'] = ('ELECTRONS', 'Units of the error array.') return(error_hdu)