Source code for tests.run_200x200_pixel_regions

"""
Similar to legacypipe/py/test/runbrick_test.py, this runs and analyzes the
output of legacypipe running on 200x200 pixels regions that obiwan
has injected simulated sources intoself.

See 'test_200x200_pixel_regions.py' for how to use this file
"""
from __future__ import print_function
if __name__ == "__main__":
    import matplotlib
    matplotlib.use('Agg')
import os
import matplotlib.pyplot as plt
import numpy as np
import re
import sys

import fitsio

from obiwan.qa.visual import plotImage, readImage
from obiwan.common import get_brickinfo_hack

# try:
from astrometry.util.fits import fits_table
from astrometry.libkd.spherematch import match_radec
from legacypipe.survey import LegacySurveyData, wcs_for_brick
from obiwan.kenobi import get_parser, get_checkpoint_fn
from obiwan.kenobi import main as main_kenobi
# except ImportError:
    # pass

SURVEYS= ['decals','bass_mzls']
DATASETS= ['dr3','dr5','dr6','cosmos']


def nanomag2mag(nmgy):
    return -2.5 * (np.log10(nmgy) - 9)

[docs]class run_kenobi_main(object): """Runs main() in kenobi.py, which runs legacypipe Works for either DECaLS or MzLS/BASS. Use setup_testcase() on the result Args: survey: decals or bass_mzls dataset: string, 'DR3', 'DR5', bands: 'grz','z' obj: 'elg','star' rowstart: default is 0 because reads randoms table from 1st row add_noise: to add Poisson noise to simulated galaxy profiles all_blobs: to fit models to all blobs, not just the blobs containing sims on_edge: to add randoms at edge of region, not well within the boundaries early_coadds: creates coadds and stops. useful for ML training/test samples checkpoint: whether to save model fitting (fitblobs) checkpoints skip_ccd_cuts= True to use all ccds in the survey-ccds fits tables no_cleanup: at the end of running, obiwan reorganzes outputs, True means don't do this stage: runbrick.py stages, default is None which runs everything """ def __init__(self, survey=None, dataset=None, bands='grz', obj='elg',rowstart=0, add_noise=False,all_blobs=False, on_edge=False, early_coadds=False, checkpoint=False,skip_ccd_cuts=False, no_cleanup=False,stage=None): assert(survey in SURVEYS) assert(dataset in DATASETS) self.survey= survey self.dataset= dataset self.bands= bands self.obj= obj self.rowstart= rowstart self.all_blobs= all_blobs self.add_noise= add_noise self.on_edge= on_edge self.early_coadds= early_coadds self.checkpoint= checkpoint self.no_cleanup=no_cleanup self.stage=stage self.skip_ccd_cuts= skip_ccd_cuts self.testcase_dir= os.path.join(os.path.dirname(__file__), 'testcase_%s_%s' % (survey,bands)) self.outname= 'out_testcase_%s_%s_%s_%s' % (survey,bands, self.dataset,self.obj) if self.all_blobs: self.outname += '_allblobs' if self.add_noise: self.outname += '_addnoise' if self.on_edge: self.outname += '_on_edge' if self.early_coadds: self.outname += '_coadds' if self.checkpoint: self.outname += '_ckpt' self.outdir= os.path.join(os.path.dirname(__file__), self.outname) self.logfn=os.path.join(self.outdir,'log.txt') if self.survey == 'decals' and self.bands == 'grz': self.brick='0285m165' self.zoom= [3077, 3277, 2576, 2776] elif self.survey == 'decals' and self.bands == 'z': self.brick='1741p242' self.zoom= [90, 290, 2773, 2973] elif self.survey == 'bass_mzls': self.brick='2176p330' self.zoom= [2776,2976, 2900,3100] else: raise ValueError('bands= %s no allowed' % bands) os.environ["LEGACY_SURVEY_DIR"]= self.testcase_dir
[docs] def run(self): """Add the sources and run legacypipe Args: no_cleanup: don't run cleanup step """ print('Running testcase: %s' % self.outname) extra_cmd_line = [] if self.add_noise: extra_cmd_line += ['--add_sim_noise'] if self.all_blobs: extra_cmd_line += ['--all_blobs'] if self.early_coadds: extra_cmd_line += ['--early_coadds'] if self.checkpoint: extra_cmd_line += ['--checkpoint'] if self.stage: extra_cmd_line += ['--stage',self.stage] if self.no_cleanup: extra_cmd_line += ['--no_cleanup'] if self.skip_ccd_cuts: extra_cmd_line += ['--skip_ccd_cuts'] randoms_fn= os.path.join(os.environ["LEGACY_SURVEY_DIR"], 'randoms_%s.fits' % self.obj) if self.on_edge: randoms_fn= randoms_fn.replace('.fits','_on_edge.fits') cmd_line=['--dataset', self.dataset, '-b', self.brick, '-rs',str(self.rowstart), '-n', '4', '--zoom', str(self.zoom[0]), str(self.zoom[1]), str(self.zoom[2]), str(self.zoom[3]), '-o', self.obj, '--outdir', self.outdir, '--randoms_from_fits', randoms_fn] + extra_cmd_line parser= get_parser() args = parser.parse_args(args=cmd_line) if self.checkpoint: # Globally redirect stdout if os.path.exists(self.logfn): os.remove(self.logfn) try: os.makedirs(os.path.dirname(self.logfn)) except OSError: pass # already exists sys.stdout = open(self.logfn, "w") main_kenobi(args=args) if self.checkpoint: # Reset stdout sys.stdout = sys.__stdout__ # The checkpoint file and log should exist ckpt_fn= get_checkpoint_fn(self.outdir, self.brick,self.rowstart) assert(os.path.exists(ckpt_fn)) assert(os.path.exists(self.logfn))
[docs]class run_kenobi_main_cosmos(object): """Supports subsets 60-69""" def __init__(self, survey=None): self.dataset='cosmos' self.survey=survey self.rowstart=0 self.obj='elg' if self.survey == 'decals': self.brick='1501p020' else: raise ValueError('survey = bass_mzls not supported yet') os.environ["LEGACY_SURVEY_DIR"]= os.path.join(os.path.dirname(__file__), 'testcase_cosmos') def run(self): self.subset=60 print('WARNING: testcase_cosmos runs subset 60, only, for simplicity') print('but obiwan works for subsets 60-69 when running on a full dataset') self.outdir= os.path.join(os.path.dirname(__file__), 'out_testcase_%s_cosmos_subset%d' % \ (self.survey,self.subset)) # download data if not os.path.exists(os.environ["LEGACY_SURVEY_DIR"]): print('Do the following to download the testcase, they are 500 MB') print('cd %s' % os.path.dirname(__file__)) print('wget http://portal.nersc.gov/project/desi/users/kburleigh/obiwan/testcase_cosmos.tar.gz') print('tar -xzvf testcase_cosmos.tar.gz') print("testcase_cosmos is also on kaylanb's hpss at nersc") return 0 #sys.exit(0) # exit for success randoms_fn= os.path.join(os.environ["LEGACY_SURVEY_DIR"], 'randoms_%s.fits' % self.obj) cmd_line=['--subset', str(self.subset), '--dataset', self.dataset, '-b', self.brick, '-rs',str(self.rowstart), '-n', '4', '-o', self.obj, '--outdir', self.outdir, '--randoms_from_fits', randoms_fn] parser= get_parser() args = parser.parse_args(args=cmd_line) main_kenobi(args=args)
[docs]class Tolerances(object): """Tolerances on Tractor measured mags and test_flux_shape_measurements Args: survey: decals, bass_mzls obj: star,elg bands: grz, z tolerance dict: mw_transmission_input_minus_measured: obiwan looks up mw_trans before injecting, it should agree with tractor's at that ra,dec mag_psql_minus_input_corrected_for_ext: in AB mag, ext added when inject, so compare what have in db to what injected w/out ext mag_input_minus_measured: in AB mag rhalf_input_minus_measured: arcsec that rhalf measurement can be off by """ @staticmethod def get(survey=None,obj=None,bands=None): assert survey in SURVEYS assert(bands in ['grz','z']) assert(obj in ['elg','star']) return getattr(Tolerances(),survey)(obj,bands) #elif survey == 'bass_mzls': # return Tolerances().bass_mzls(obj,bands)
[docs] @staticmethod def decals(obj=None,bands=None): """Returns dict of Tolerances Args: bands: either 'grz' or 'z' obj: either 'elg' or 'star' """ tol= dict(mw_transmission_input_minus_measured=2.e-5, mag_psql_minus_input_corrected_for_ext= 1e-14, mag_input_minus_measured=dict(g=0.12, r=0.1, z=0.2), rhalf_input_minus_measured= 0.15) if bands == 'grz': tol.update(rhalf_input_minus_measured=0.35) if obj == 'star': tol.update(mag_input_minus_measured=dict(g=0.02, r=0.02, z=0.02)) return tol
# if bands == 'z': # rhalf= 0.15 # else: # rhalf= 0.65
[docs] @staticmethod def bass_mzls(obj=None,bands=None): """Returns dict of Tolerances Args: bands: 'grz' obj: either 'elg' or 'star' """ return dict(mw_transmission_input_minus_measured=5.e-6, mag_psql_minus_input_corrected_for_ext= 1e-14, mag_input_minus_measured=dict(g=2.6, r=2.6, z=0.11), rhalf_input_minus_measured= 0.1)
[docs]class analysis_setup(run_kenobi_main): """Loads the outputs from run_kenobi_main().run() and measurement tolerances Args: kwargs: the same inputs to run_kenobi_main Attributes: same as run_kenobi_main tol: tolerance dict for flux, rhalf, etc. """ def __init__(self,**kw): super(analysis_setup, self).__init__(**kw) # raise ValueError('hey') self.tol= Tolerances().get(survey=self.survey, bands=self.bands,obj=self.obj) self.config_dir= os.path.join(os.path.dirname(self.outdir), 'testcase_%s_%s' % \ (kw['survey'],kw['bands'])) self.rsdir='rs0' survey = LegacySurveyData() brickinfo= get_brickinfo_hack(survey,self.brick) self.brickwcs = wcs_for_brick(brickinfo)
[docs] def load_outputs(self): """Loads every output we could possibly need to evaluate a test Attributes: simcat, obitractor: jpg_coadds: fits_coadds """ print('Loading from %s' % self.config_dir) self.randoms= fits_table(os.path.join(self.config_dir,'randoms_elg.fits')) print('Loading from %s' % self.outdir) dr= '%s/%s/%s' % (self.brick[:3],self.brick,self.rsdir) if not self.early_coadds: self.obitractor= fits_table(os.path.join(self.outdir,'tractor', dr,'tractor-%s.fits' % self.brick)) self.blobs= fitsio.FITS(os.path.join(self.outdir,'metrics', dr,'blobs-%s.fits.gz' % self.brick))[0].read() self.model_jpg= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-model.jpg' % self.brick), jpeg=True) self.resid_jpg= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-resid.jpg' % self.brick), jpeg=True) self.simcat= fits_table(os.path.join(self.outdir,'obiwan', dr,'simcat-%s-%s.fits' % (self.obj,self.brick))) self.img_jpg= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-image.jpg' % self.brick), jpeg=True) self.img_fits,self.ivar_fits,self.sims_fits= {},{},{} for b in self.bands: self.img_fits[b]= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-image-%s.fits.fz' % \ (self.brick,b))) self.ivar_fits[b]= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-invvar-%s.fits.fz' % \ (self.brick,b))) self.sims_fits[b]= readImage(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-sims-%s.fits.fz' % \ (self.brick,b)))
[docs]def at_most_N_is_false(bool_array,N=1): """Returns True if bool_array has at most N elements that are False Example, all(bool_array) is equivalent to at_most_N_is_false(bool_array,N=0) Args: N: the number of false elements allowed""" return bool_array.sum() >= len(bool_array)-1
[docs]class test_flux_shape_measurements(analysis_setup): def __init__(self,**kw): super(test_flux_shape_measurements, self).__init__(**kw) self.load_outputs() self.run_test() # def load_outputs(self): # """Each output from the testcase becomes an attribute # # Attributes: # simcat, obitractor: # jpg_coadds: # fits_coadds # """ # print('Loading from %s' % self.outdir) # dr= '%s/%s/%s' % (self.brick[:3],self.brick,self.rsdir) # self.obitractor= fits_table(os.path.join(self.outdir,'tractor', # dr,'tractor-%s.fits' % self.brick)) # self.simcat= fits_table(os.path.join(self.outdir,'obiwan', # dr,'simcat-%s-%s.fits' % (self.obj,self.brick)))
[docs] def run_test(self): """Compare input flux and shape parameters to Tractor's""" isim,itrac,d = match_radec(self.simcat.ra,self.simcat.dec, self.obitractor.ra,self.obitractor.dec, 1./3600,nearest=True) print('mw_transmission_input_minus_measured') for b in self.bands: diff= self.simcat.get('mw_transmission_%s' % b)[isim] -\ self.obitractor.get('mw_transmission_%s' % b)[itrac] print(b+' :',diff) assert(np.all(np.abs(diff) < self.tol['mw_transmission_input_minus_measured'])) print('PASS') print('mag_psql_minus_input_corrected_for_ext') for b in self.bands: dmag= self.randoms.get(b) - \ nanomag2mag(self.simcat.get(b+'flux')/self.simcat.get('mw_transmission_'+b)) print(b+' :',dmag) if (self.on_edge) or (self.obj == 'star'): assert(np.all(np.abs(dmag) < 1.e-3)) else: assert(np.all(np.abs(dmag) < self.tol['mag_psql_minus_input_corrected_for_ext'])) print('PASS') print('mag_input_minus_measured') for b in self.bands: dmag= nanomag2mag(self.simcat.get(b+'flux')[isim]) - \ nanomag2mag(self.obitractor.get('flux_'+b)[itrac]) print(b+': ',dmag) assert(np.all(np.abs(dmag) < self.tol['mag_input_minus_measured'][b])) print('PASS') print('rhalf_input_minus_measured') if self.obj != 'star': rhalf= np.max((self.obitractor[itrac].shapeexp_r, self.obitractor[itrac].shapedev_r),axis=0) diff= self.simcat.rhalf - rhalf print(diff) assert(at_most_N_is_false(np.abs(diff) < self.tol['rhalf_input_minus_measured'], N=1)) print('PASS')
[docs]class test_detected_simulated_and_real_sources(analysis_setup): def __init__(self,**kw): super(test_detected_simulated_and_real_sources, self).__init__(**kw) self.load_outputs() self.run_test()
[docs] def get_index_of_real_galaxy_at_center(self): """There is a real galaxy at center, which tractor cat index is it?""" # real galaxy to tractor ra_real,dec_real= self.brickwcs.pixelxy2radec( [self.zoom[0] + 100.], [self.zoom[2] + 100.]) _,ireal,d= match_radec(ra_real, dec_real, self.obitractor.ra, self.obitractor.dec, 10./3600,nearest=True) return ireal
def run_test(self): isim,itrac,d = match_radec(self.simcat.ra,self.simcat.dec, self.obitractor.ra,self.obitractor.dec, 1./3600,nearest=True) print('all_input_sources_recovered_by_tractor') assert((len(isim) == 4) & (len(itrac) == 4)) print('PASS') ireal= self.get_index_of_real_galaxy_at_center() print('real_galaxy_at_center_recovered_by_tractor') if self.all_blobs: assert(len(ireal) ==1) # found the real galaxy else: if self.bands == 'grz': assert(len(ireal) == 1) # central galaxy in blob on a sim elif self.bands == 'z': assert(len(ireal) == 0) print('PASS')
[docs]class test_draw_circles_around_sources_check_by_eye(analysis_setup): def __init__(self,**kw): super(test_draw_circles_around_sources_check_by_eye, self).__init__(**kw) self.load_outputs() # Add x,y pix to simcat table, juts like tractor has bx,by _,x,y=self.brickwcs.radec2pixelxy(self.simcat.ra,self.simcat.dec) self.simcat.set('x',x - self.zoom[0]) self.simcat.set('y',y - self.zoom[2]) # make the plots self.run_test() def run_test(self): if not self.early_coadds: fig,ax=plt.subplots(2,2,figsize=(6,6)) plotImage().imshow(self.blobs,ax[0,0],qs=None) plotImage().imshow(self.img_jpg,ax[0,1],qs=None) plotImage().imshow(self.model_jpg,ax[1,0],qs=None) plotImage().imshow(self.resid_jpg,ax[1,1],qs=None) fn=os.path.join(self.outdir,'blob_img_mod_res.png') plt.savefig(fn,dpi=150) print('Wrote %s' % fn) plt.close() fig,ax=plt.subplots(3,3,figsize=(7,6)) for i,b in enumerate(self.bands) : plotImage().imshow(self.img_fits[b],ax[0,i]) plotImage().imshow(self.sims_fits[b],ax[1,i],qs=None) plotImage().circles(self.obitractor.bx,self.obitractor.by,ax[1,i], img_shape=self.sims_fits[b].shape, r_pixels=5./0.262,color='y') plotImage().circles(self.simcat.x,self.simcat.y,ax[1,i], img_shape=self.sims_fits[b].shape, r_pixels=4./0.262,color='m') plotImage().imshow(self.img_fits[b] - self.sims_fits[b],ax[2,i]) fn=os.path.join(self.outdir,'fits_coadd_img_sims_res.png') plt.savefig(fn,dpi=150) print('Wrote %s' % fn) plt.close() print("test_draw_circles_around_sources_check_by_eye\nLOOK AT THE PNGs") else: for name in ['tractor','metrics']: assert(not os.path.exists(os.path.join(self.outdir,name))) for name in ['obiwan','coadd']: assert(os.path.exists(os.path.join(self.outdir,name))) dr= '%s/%s/%s' % (self.brick[:3],self.brick,self.rsdir) for name in ['model','resid']: assert(not os.path.exists(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-%s.jpg' % (self.brick,name)))) for name in ['image']: assert(os.path.exists(os.path.join(self.outdir,'coadd', dr,'legacysurvey-%s-%s.jpg' % (self.brick,name)))) print("test_draw_circles_around_sources_check_by_eye: PASS")