Skip to content

Commit

Permalink
Merge pull request #366 from LSSTDESC/rail-tomography
Browse files Browse the repository at this point in the history
Rail tomographic summarizer
  • Loading branch information
joezuntz authored Aug 29, 2024
2 parents 5750d47 + 8f5b65c commit 79aa3e5
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 100 deletions.
13 changes: 8 additions & 5 deletions examples/metacal/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,12 @@ PZRailSummarizeLens:
zmax: 3.0
nzbins: 50
name: PZRailSummarizeLens
catalog_group: "photometry"
catalog_group: "lens"
tomography_name: "lens"
bands: ugrizy
aliases:
tomography_catalog: lens_tomography_catalog
photometry_catalog: photometry_catalog
binned_catalog: binned_lens_catalog
model: lens_direct_calibration_model
photoz_stack: lens_photoz_stack
model: None
Expand All @@ -331,10 +332,12 @@ PZRailSummarizeSource:
nzbins: 50
mag_prefix: "/shear/mcal_mag_"
tomography_name: "source"
catalog_group: shear
name: PZRailSummarizeSource
bands: riz
aliases:
tomography_catalog: shear_tomography_catalog
photometry_catalog: shear_catalog
binned_catalog: binned_shear_catalog
model: source_direct_calibration_model
photoz_stack: shear_photoz_stack

Expand Down Expand Up @@ -388,15 +391,15 @@ TXParqetToHDF:

NZDirInformerSource:
name: NZDirInformerSource
usecols: [r, i, z]
usecols: riz
hdf5_groupname: photometry
aliases:
input: spectroscopic_catalog
model: source_direct_calibration_model

NZDirInformerLens:
name: NZDirInformerLens
usecols: [u, g, r, i, z, "y"]
usecols: ugrizy
hdf5_groupname: photometry
aliases:
input: spectroscopic_catalog
Expand Down
12 changes: 7 additions & 5 deletions examples/metadetect/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ PZEstimatorLens:
mag_y: 27.05



# Mock version of stacking:
TXSourceTrueNumberDensity:
nz: 301
Expand Down Expand Up @@ -319,11 +318,12 @@ PZRailSummarizeLens:
zmax: 3.0
nzbins: 50
name: PZRailSummarizeLens
catalog_group: "photometry"
catalog_group: lens
tomography_name: "lens"
bands: ugrizy
aliases:
tomography_catalog: lens_tomography_catalog
photometry_catalog: photometry_catalog
binned_catalog: binned_lens_catalog
model: lens_direct_calibration_model
photoz_stack: lens_photoz_stack
photoz_realizations: lens_photoz_realizations
Expand All @@ -337,11 +337,13 @@ PZRailSummarizeSource:
nzbins: 50
nsamples: 100
mag_prefix: "/shear/00/mag_"
tomography_name: "source"
catalog_group: shear
tomography_name: source
bands: riz
name: PZRailSummarizeSource
aliases:
tomography_catalog: shear_tomography_catalog
photometry_catalog: shear_catalog
binned_catalog: binned_shear_catalog
model: source_direct_calibration_model
photoz_stack: shear_photoz_stack
photoz_realizations: source_photoz_realizations
Expand Down
7 changes: 5 additions & 2 deletions examples/metadetect_source_only/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,15 @@ PZRailSummarizeSource:
zmin: 0.0
zmax: 3.0
nzbins: 50
nsamples: 100
mag_prefix: "/shear/00/mag_"
tomography_name: "source"
catalog_group: shear
tomography_name: source
bands: riz
name: PZRailSummarizeSource
aliases:
tomography_catalog: shear_tomography_catalog
photometry_catalog: shear_catalog
binned_catalog: binned_shear_catalog
model: source_direct_calibration_model
photoz_stack: shear_photoz_stack
photoz_realizations: source_photoz_realizations
Expand Down
14 changes: 14 additions & 0 deletions txpipe/data_types/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,20 @@ def get_bands(self):
return bands


class BinnedCatalog(HDFFile):
required_datasets = []
def get_bins(self, group_name):
group = self.file[group_name]
info = dict(group.attrs)
bins = []
for i in range(info["nbin"]):
code = info[f"bin_{i}"]
name = f"bin_{code}"
bins.append(name)
return bins



class TomographyCatalog(HDFFile):
required_datasets = []

Expand Down
162 changes: 74 additions & 88 deletions txpipe/rail/summarize.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from ..base_stage import PipelineStage
from ..data_types import HDFFile, PickleFile, PNGFile, QPMultiFile, QPNOfZFile, TomographyCatalog
from ..data_types import HDFFile, PickleFile, PNGFile, QPMultiFile, QPNOfZFile, BinnedCatalog
import numpy as np
import collections
import os

class PZRailSummarize(PipelineStage):
name = "PZRailSummarize"
inputs = [
("tomography_catalog", TomographyCatalog),
("photometry_catalog", HDFFile),
("binned_catalog", BinnedCatalog),
("model", PickleFile),

]
Expand All @@ -18,123 +18,109 @@ class PZRailSummarize(PipelineStage):

# pull these out automatically
config_options = {
"catalog_group": str,
"mag_prefix": "photometry/mag_",
"tomography_name": str,
"bands": "ugrizy",
}

def run(self):
import rail.estimation
import pickle
from rail.estimation.algos.nz_dir import NZDirSummarizer
from rail.core.data import TableHandle
from rail.core.data import DataStore
import qp


model_filename = self.get_input("model")
# Get the list of bins from the binned catalog.
# Mostly these are numbered, bin_0, bin_1, etc.
# But there is also often a special bin_all
# that is non-tomographic
group_name = self.config["catalog_group"]
with self.open_input("binned_catalog", wrapper=True) as f:
bins = f.get_bins(group_name)

# The usual way of opening pickle files puts a bunch
# of provenance at the start of them. External pickle files
# like the ones from RAIL don't have this, so the file content
# comes out wrong.
# Once we've moved the provenance stuff from TXPipe to ceci
# then prov tracking should be harmonized, then we can replace
# these two lines:
with open(model_filename, "rb") as f:
model = pickle.load(f)
# with these:
# with self.open_input("nz_dir_model", wrapper=True) as f:
# model = f.read()
# These parameters are common to all the bins runs
cat_name = self.get_input("binned_catalog")
model = self.get_input("model")

# We will run the ensemble estimation for each bin, so we want a separate output file for each
# we use the main file name as the base name for this, but add the bin name to the end.
# So to do this we split up the file name into root and extension, and then add the bin name
# to the root.
out1_root, out1_ext = os.path.splitext(self.get_output("photoz_realizations", final_name=True) )
out2_root, out2_ext = os.path.splitext(self.get_output("photoz_stack", final_name=True) )

bands = model["szusecols"]
prefix = self.config["mag_prefix"]

# This is the bit that will not work with realistically sized
# data sets. Need the RAIL parallel interface when ready.
with self.open_input("photometry_catalog") as f:
full_data = {b: f[f"{prefix}{b}"][:] for b in bands}

with self.open_input("tomography_catalog") as f:
g = f['tomography']
nbin = g.attrs[f'nbin']
bins = g[f'bin'][:]


# Generate the configuration for RAIL. Anything set in the
# config.yml file will also be put in here by the bit below,
# so we don't need to try all possible RAIL options.
# Configuration options that will be needed by the sub-stage
sub_config = {
"model": model,
"usecols": bands,
"hdf5_groupname": "",
"phot_weightcol":"",
"output_mode": "none", # actually anything except "default" will work here
"usecols": [f"mag_{b}" for b in self.config["bands"]],
"phot_weightcol":"weight",
"output_mode": "none",
"comm": self.comm,
"input": cat_name,
}

# TODO: Make this flexible
# We want to make this customizable.
substage_class = NZDirSummarizer

# Move relevant configuration items from the config
# for this stage into the substage config
for k, v in self.config.items():
if k in substage_class.config_options:
sub_config[k] = v


# Just do things with the first bin to begin with
qp_per_bin = []
realizations_per_bin = {}

for i in range(nbin):

# Extract the chunk of the data assigned to this tomographic
# bin.
index = (bins==i)
nobj = index.sum()
data = {b: full_data[b][index] for b in bands}
print(f"Computing n(z) for bin {i}: {nobj} objects")

# Store this data set. Once this is parallelised will presumably have to delete
# it afterwards to avoid running out of memory.
data_handle = substage_class.data_store.add_data(f"tomo_bin_{i}", data, TableHandle)
substage = substage_class.make_stage(name=f"NZDir_{i}", **sub_config)
substage.set_data('input', data_handle)
substage.run()

if self.rank == 0:
realizations_per_bin[f'bin_{i}'] = substage.get_handle('output').data
bin_qp = substage.get_handle('single_NZ').data
qp_per_bin.append(bin_qp)

# now we do the 2D case
index = bins >= 0
nobj = index.sum()
data = {b: full_data[b][index] for b in bands}
print(f"Computing n(z) for bin {i}: {nobj} objects")

# Store this data set. Once this is parallelised will presumably have to delete
# it afterwards to avoid running out of memory.
data_handle = substage_class.data_store.add_data(f"tomo_bin_2d", data, TableHandle)
substage = substage_class.make_stage(name=f"NZDir_2d", **sub_config)
substage.set_data('input', data_handle)
substage.run()
if self.rank == 0:
realizations_2d = substage.get_handle('output').data
qp_2d = substage.get_handle('single_NZ').data

# Only the root process writes the output
if self.rank > 0:
# We collate ensemble results for both a single primary n(z) estimate,
# and for a suite of realizations, both for each bin.
main_ensemble_results = []
realizations_results = []

for b in bins:
print("Running ensemble estimation for bin: ", b)

# Make the file names for this bin
realizations_output = "{}_{}{}".format(out1_root, b, out1_ext)
stack_output = "{}_{}{}".format(out2_root, b, out2_ext)

# Create the sub-stage.
run_nzdir = substage_class.make_stage(hdf5_groupname=f"/{group_name}/{b}",
output=realizations_output,
single_NZ=stack_output,
**sub_config
)
# I have never really understood the RAIL DataStore.
# But if I don't do this the system complains that there is already an
# output with the name we have given when we get to the second iteration
ds = DataStore()
run_nzdir.data_store = ds

# actually run and finalize the stage
run_nzdir.run()
run_nzdir.finalize()

# read the result. This is small so can stay in memory.
main_ensemble_results.append(qp.read(stack_output))
realizations_results.append(qp.read(realizations_output))

# Only the root process writes the data, so the others are done now.
if self.rank != 0:
return

combined_qp = qp.concatenate(qp_per_bin + [qp_2d])

# Collate the results from all the bins into a single file
# and write to the main output file.
combined_qp = qp.concatenate(main_ensemble_results)
with self.open_output("photoz_stack", wrapper=True) as f:
f.write_ensemble(combined_qp)


# Write the realizations to the its file, again, collating all the bins
with self.open_output("photoz_realizations", wrapper=True) as f:
for key, realizations in realizations_per_bin.items():
f.write_ensemble(realizations, key)
f.write_ensemble(realizations_2d, "bin_2d")
f.file['qps'].attrs['nbin'] = nbin
for b, realizations in zip(bins, realizations_results):
f.write_ensemble(realizations, b)





class PZRealizationsPlot(PipelineStage):
Expand Down

0 comments on commit 79aa3e5

Please sign in to comment.