Source code for graphnet.data.extractors.icecube.i3truthextractor
"""I3Extractor class(es) for extracting truth-level information."""
import numpy as np
import matplotlib.path as mpath
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from .i3extractor import I3Extractor
from .utilities.frames import (
frame_is_montecarlo,
frame_is_noise,
)
from graphnet.utilities.imports import has_icecube_package
if has_icecube_package() or TYPE_CHECKING:
from icecube import (
dataclasses,
icetray,
phys_services,
) # pyright: reportMissingImports=false
[docs]
class I3TruthExtractor(I3Extractor):
"""Class for extracting truth-level information."""
def __init__(
self,
name: str = "truth",
borders: Optional[List[np.ndarray]] = None,
mctree: Optional[str] = "I3MCTree",
):
"""Construct I3TruthExtractor.
Args:
name: Name of the `I3Extractor` instance.
borders: Array of boundaries of the detector volume as ((x,y),z)-
coordinates, for identifying, e.g., particles starting and
stopping within the detector. Defaults to hard-coded boundary
coordinates.
mctree: Str of which MCTree to use for truth values.
"""
# Base class constructor
super().__init__(name)
if borders is None:
border_xy = np.array(
[
(-256.1400146484375, -521.0800170898438),
(-132.8000030517578, -501.45001220703125),
(-9.13000011444092, -481.739990234375),
(114.38999938964844, -461.989990234375),
(237.77999877929688, -442.4200134277344),
(361.0, -422.8299865722656),
(405.8299865722656, -306.3800048828125),
(443.6000061035156, -194.16000366210938),
(500.42999267578125, -58.45000076293945),
(544.0700073242188, 55.88999938964844),
(576.3699951171875, 170.9199981689453),
(505.2699890136719, 257.8800048828125),
(429.760009765625, 351.0199890136719),
(338.44000244140625, 463.7200012207031),
(224.5800018310547, 432.3500061035156),
(101.04000091552734, 412.7900085449219),
(22.11000061035156, 509.5),
(-101.05999755859375, 490.2200012207031),
(-224.08999633789062, 470.8599853515625),
(-347.8800048828125, 451.5199890136719),
(-392.3800048828125, 334.239990234375),
(-437.0400085449219, 217.8000030517578),
(-481.6000061035156, 101.38999938964844),
(-526.6300048828125, -15.60000038146973),
(-570.9000244140625, -125.13999938964844),
(-492.42999267578125, -230.16000366210938),
(-413.4599914550781, -327.2699890136719),
(-334.79998779296875, -424.5),
]
)
border_z = np.array([-512.82, 524.56])
self._borders = [border_xy, border_z]
else:
self._borders = borders
self._mctree = mctree
def __call__(
self, frame: "icetray.I3Frame", padding_value: Any = -1
) -> Dict[str, Any]:
"""Extract truth-level information."""
is_mc = frame_is_montecarlo(frame, self._mctree)
is_noise = frame_is_noise(frame, self._mctree)
sim_type = self._find_data_type(is_mc, self._i3_file)
output = {
"energy": padding_value,
"position_x": padding_value,
"position_y": padding_value,
"position_z": padding_value,
"azimuth": padding_value,
"zenith": padding_value,
"pid": padding_value,
"event_time": frame["I3EventHeader"].start_time.utc_daq_time,
"sim_type": sim_type,
"interaction_type": padding_value,
"elasticity": padding_value,
"RunID": frame["I3EventHeader"].run_id,
"SubrunID": frame["I3EventHeader"].sub_run_id,
"EventID": frame["I3EventHeader"].event_id,
"SubEventID": frame["I3EventHeader"].sub_event_id,
"dbang_decay_length": padding_value,
"track_length": padding_value,
"stopped_muon": padding_value,
"energy_track": padding_value,
"energy_cascade": padding_value,
"inelasticity": padding_value,
"DeepCoreFilter_13": padding_value,
"CascadeFilter_13": padding_value,
"MuonFilter_13": padding_value,
"OnlineL2Filter_17": padding_value,
"L3_oscNext_bool": padding_value,
"L4_oscNext_bool": padding_value,
"L5_oscNext_bool": padding_value,
"L6_oscNext_bool": padding_value,
"L7_oscNext_bool": padding_value,
}
# Only InIceSplit P frames contain ML appropriate
# for example I3RecoPulseSeriesMap, etc.
# At low levels i3 files contain several other P frame splits
# (e.g NullSplit). We remove those here.
if frame["I3EventHeader"].sub_event_stream not in [
"InIceSplit",
"Final",
]:
return output
if "FilterMask" in frame:
if "DeepCoreFilter_13" in frame["FilterMask"]:
output["DeepCoreFilter_13"] = int(
bool(frame["FilterMask"]["DeepCoreFilter_13"])
)
if "CascadeFilter_13" in frame["FilterMask"]:
output["CascadeFilter_13"] = int(
bool(frame["FilterMask"]["CascadeFilter_13"])
)
if "MuonFilter_13" in frame["FilterMask"]:
output["MuonFilter_13"] = int(
bool(frame["FilterMask"]["MuonFilter_13"])
)
if "OnlineL2Filter_17" in frame["FilterMask"]:
output["OnlineL2Filter_17"] = int(
bool(frame["FilterMask"]["OnlineL2Filter_17"])
)
elif "DeepCoreFilter_13" in frame:
output["DeepCoreFilter_13"] = int(bool(frame["DeepCoreFilter_13"]))
if "L3_oscNext_bool" in frame:
output["L3_oscNext_bool"] = int(bool(frame["L3_oscNext_bool"]))
if "L4_oscNext_bool" in frame:
output["L4_oscNext_bool"] = int(bool(frame["L4_oscNext_bool"]))
if "L5_oscNext_bool" in frame:
output["L5_oscNext_bool"] = int(bool(frame["L5_oscNext_bool"]))
if "L6_oscNext_bool" in frame:
output["L6_oscNext_bool"] = int(bool(frame["L6_oscNext_bool"]))
if "L7_oscNext_bool" in frame:
output["L7_oscNext_bool"] = int(bool(frame["L7_oscNext_bool"]))
if is_mc and (not is_noise):
(
MCInIcePrimary,
interaction_type,
elasticity,
) = self._get_primary_particle_interaction_type_and_elasticity(
frame, sim_type
)
try:
(
energy_track,
energy_cascade,
inelasticity,
) = self._get_primary_track_energy_and_inelasticity(frame)
except (
RuntimeError
): # track energy fails on northeren tracks with ""Hadrons"
# has no mass implemented. Cannot get total energy."
energy_track, energy_cascade, inelasticity = (
padding_value,
padding_value,
padding_value,
)
output.update(
{
"energy": MCInIcePrimary.energy,
"position_x": MCInIcePrimary.pos.x,
"position_y": MCInIcePrimary.pos.y,
"position_z": MCInIcePrimary.pos.z,
"azimuth": MCInIcePrimary.dir.azimuth,
"zenith": MCInIcePrimary.dir.zenith,
"pid": MCInIcePrimary.pdg_encoding,
"interaction_type": interaction_type,
"elasticity": elasticity,
"dbang_decay_length": self._extract_dbang_decay_length(
frame, padding_value
),
"energy_track": energy_track,
"energy_cascade": energy_cascade,
"inelasticity": inelasticity,
}
)
if abs(output["pid"]) == 13:
output.update(
{
"track_length": MCInIcePrimary.length,
}
)
muon_final = self._muon_stopped(output, self._borders)
output.update(
{
"position_x": muon_final["x"],
# position_xyz has no meaning for muons.
# These will now be updated to muon final position,
# given track length/azimuth/zenith
"position_y": muon_final["y"],
"position_z": muon_final["z"],
"stopped_muon": muon_final["stopped"],
}
)
return output
def _extract_dbang_decay_length(
self, frame: "icetray.I3Frame", padding_value: float = -1
) -> float:
mctree = frame[self._mctree]
try:
p_true = mctree.primaries[0]
p_daughters = mctree.get_daughters(p_true)
if len(p_daughters) == 2:
for p_daughter in p_daughters:
if p_daughter.type == dataclasses.I3Particle.Hadrons:
casc_0_true = p_daughter
else:
hnl_true = p_daughter
hnl_daughters = mctree.get_daughters(hnl_true)
else:
decay_length = padding_value
hnl_daughters = []
if len(hnl_daughters) > 0:
for count_hnl_daughters, hnl_daughter in enumerate(
hnl_daughters
):
if not count_hnl_daughters:
casc_1_true = hnl_daughter
else:
assert casc_1_true.pos == hnl_daughter.pos
casc_1_true.energy = (
casc_1_true.energy + hnl_daughter.energy
)
decay_length = (
phys_services.I3Calculator.distance(
casc_0_true, casc_1_true
)
/ icetray.I3Units.m
)
else:
decay_length = padding_value
return decay_length
except: # noqa: E722
return padding_value
def _muon_stopped(
self,
truth: Dict[str, Any],
borders: List[np.ndarray],
shrink_horizontally: float = 100.0,
shrink_vertically: float = 100.0,
) -> Dict[str, Any]:
"""Calculate whether a simulated muon within the detector volume.
IMPORTANT: The final position of the muon is saved in truth extractor/
databases as position_x, position_y and position_z. This is analogouos
to the neutrinos whose interaction vertex is saved under the same name.
Args:
truth: Dictionary of already extracted truth-level information.
borders: The first entry are the (x,y) coordinates, the second
entry is the z-axis min/max depths. See I3TruthExtractor
constructor for hard-code example.
shrink_horizontally: Shrink (x,y)-plane further with exclusion
zone. Defaults to 100 meters. shrink_vertically: Further shrink
detector depth with exclusion height. Defaults to 100 meters.
Returns:
Dictionary containing the (x,y,z)-coordinates of final the muon
position as well as a boolean indicating whether the muon
stopped within the chosen fiducial volume.
"""
# @TODO: Remove hard-coded border coords and replace with GCD file
# contents using string no's
border = mpath.Path(borders[0])
start_pos = np.array(
[truth["position_x"], truth["position_y"], truth["position_z"]]
)
travel_vec = -1 * np.array(
[
truth["track_length"]
* np.cos(truth["azimuth"])
* np.sin(truth["zenith"]),
truth["track_length"]
* np.sin(truth["azimuth"])
* np.sin(truth["zenith"]),
truth["track_length"] * np.cos(truth["zenith"]),
]
)
end_pos = start_pos + travel_vec
stopped_xy = border.contains_point(
(end_pos[0], end_pos[1]), radius=-shrink_horizontally
)
stopped_z = (end_pos[2] > borders[1][0] + shrink_vertically) * (
end_pos[2] < borders[1][1] - shrink_vertically
)
return {
"x": end_pos[0],
"y": end_pos[1],
"z": end_pos[2],
"stopped": (stopped_xy * stopped_z),
}
def _get_primary_particle_interaction_type_and_elasticity(
self,
frame: "icetray.I3Frame",
sim_type: str,
padding_value: float = -1.0,
) -> Tuple[Any, int, float]:
"""Return primary particle, interaction type, and elasticity.
A case handler that does two things:
1) Catches issues related to determining the primary MC particle.
2) Error handles cases where interaction type and elasticity
doesn't exist
Args:
frame: Physics frame containing MC record.
sim_type: Simulation type.
padding_value: The value used for padding.
Returns
A tuple containing the MCInIcePrimary, if it exists; the primary
particle, encoded as 1 (charged current), 2 (neutral current),
or 0 (neither); and the elasticity in the range ]0,1[.
"""
if sim_type != "noise":
try:
MCInIcePrimary = frame["MCInIcePrimary"]
except KeyError:
MCInIcePrimary = frame[self._mctree][0]
if (
MCInIcePrimary.energy != MCInIcePrimary.energy
): # This is a nan check. Only happens for some muons
# where second item in MCTree is primary. Weird!
MCInIcePrimary = frame[self._mctree][1]
# For some strange reason the second entry is identical in
# all variables and has no nans (always muon)
else:
MCInIcePrimary = None
try:
interaction_type = frame["I3MCWeightDict"]["InteractionType"]
except KeyError:
interaction_type = padding_value
try:
elasticity = frame["I3GENIEResultDict"]["y"]
except KeyError:
elasticity = padding_value
return MCInIcePrimary, interaction_type, elasticity
def _get_primary_track_energy_and_inelasticity(
self,
frame: "icetray.I3Frame",
) -> Tuple[float, float, float]:
"""Get the total energy of tracks from primary, and inelasticity.
Args:
frame: Physics frame containing MC record.
Returns:
Tuple containing the energy of tracks from primary, and the
corresponding inelasticity.
"""
mc_tree = frame[self._mctree]
primary = mc_tree.primaries[0]
daughters = mc_tree.get_daughters(primary)
tracks = []
for daughter in daughters:
if (
str(daughter.shape) == "StartingTrack"
or str(daughter.shape) == "Dark"
):
tracks.append(daughter)
energy_total = primary.total_energy
energy_track = sum(track.total_energy for track in tracks)
energy_cascade = energy_total - energy_track
inelasticity = 1.0 - energy_track / energy_total
return energy_track, energy_cascade, inelasticity
# Utility methods
def _find_data_type(self, mc: bool, input_file: str) -> str:
"""Determine the data type.
Args:
mc: Whether `input_file` is Monte Carlo simulation.
input_file: Path to I3-file.
Returns:
The simulation/data type.
"""
# @TODO: Rewrite to automatically infer `mc` from `input_file`?
if not mc:
sim_type = "data"
elif "muon" in input_file:
sim_type = "muongun"
elif "corsika" in input_file:
sim_type = "corsika"
elif "genie" in input_file or "nu" in input_file.lower():
sim_type = "genie"
elif "noise" in input_file:
sim_type = "noise"
elif "L2" in input_file: # not robust
sim_type = "dbang"
else:
sim_type = "NuGen"
return sim_type