Source code for graphnet.data.extractors.icecube.i3truthextractor

"""I3Extractor class(es) for extracting truth-level information."""

import numpy as np
import matplotlib.path as mpath
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from .i3extractor import I3Extractor
from .utilities.frames import (
    frame_is_montecarlo,
    frame_is_noise,
)
from graphnet.utilities.imports import has_icecube_package

if has_icecube_package() or TYPE_CHECKING:
    from icecube import (
        dataclasses,
        icetray,
        phys_services,
    )  # pyright: reportMissingImports=false


[docs] class I3TruthExtractor(I3Extractor): """Class for extracting truth-level information.""" def __init__( self, name: str = "truth", borders: Optional[List[np.ndarray]] = None, mctree: Optional[str] = "I3MCTree", ): """Construct I3TruthExtractor. Args: name: Name of the `I3Extractor` instance. borders: Array of boundaries of the detector volume as ((x,y),z)- coordinates, for identifying, e.g., particles starting and stopping within the detector. Defaults to hard-coded boundary coordinates. mctree: Str of which MCTree to use for truth values. """ # Base class constructor super().__init__(name) if borders is None: border_xy = np.array( [ (-256.1400146484375, -521.0800170898438), (-132.8000030517578, -501.45001220703125), (-9.13000011444092, -481.739990234375), (114.38999938964844, -461.989990234375), (237.77999877929688, -442.4200134277344), (361.0, -422.8299865722656), (405.8299865722656, -306.3800048828125), (443.6000061035156, -194.16000366210938), (500.42999267578125, -58.45000076293945), (544.0700073242188, 55.88999938964844), (576.3699951171875, 170.9199981689453), (505.2699890136719, 257.8800048828125), (429.760009765625, 351.0199890136719), (338.44000244140625, 463.7200012207031), (224.5800018310547, 432.3500061035156), (101.04000091552734, 412.7900085449219), (22.11000061035156, 509.5), (-101.05999755859375, 490.2200012207031), (-224.08999633789062, 470.8599853515625), (-347.8800048828125, 451.5199890136719), (-392.3800048828125, 334.239990234375), (-437.0400085449219, 217.8000030517578), (-481.6000061035156, 101.38999938964844), (-526.6300048828125, -15.60000038146973), (-570.9000244140625, -125.13999938964844), (-492.42999267578125, -230.16000366210938), (-413.4599914550781, -327.2699890136719), (-334.79998779296875, -424.5), ] ) border_z = np.array([-512.82, 524.56]) self._borders = [border_xy, border_z] else: self._borders = borders self._mctree = mctree def __call__( self, frame: "icetray.I3Frame", padding_value: Any = -1 ) -> Dict[str, Any]: """Extract truth-level information.""" is_mc = frame_is_montecarlo(frame, self._mctree) is_noise = frame_is_noise(frame, self._mctree) sim_type = self._find_data_type(is_mc, self._i3_file) output = { "energy": padding_value, "position_x": padding_value, "position_y": padding_value, "position_z": padding_value, "azimuth": padding_value, "zenith": padding_value, "pid": padding_value, "event_time": frame["I3EventHeader"].start_time.utc_daq_time, "sim_type": sim_type, "interaction_type": padding_value, "elasticity": padding_value, "RunID": frame["I3EventHeader"].run_id, "SubrunID": frame["I3EventHeader"].sub_run_id, "EventID": frame["I3EventHeader"].event_id, "SubEventID": frame["I3EventHeader"].sub_event_id, "dbang_decay_length": padding_value, "track_length": padding_value, "stopped_muon": padding_value, "energy_track": padding_value, "energy_cascade": padding_value, "inelasticity": padding_value, "DeepCoreFilter_13": padding_value, "CascadeFilter_13": padding_value, "MuonFilter_13": padding_value, "OnlineL2Filter_17": padding_value, "L3_oscNext_bool": padding_value, "L4_oscNext_bool": padding_value, "L5_oscNext_bool": padding_value, "L6_oscNext_bool": padding_value, "L7_oscNext_bool": padding_value, } # Only InIceSplit P frames contain ML appropriate I3RecoPulseSeriesMap etc. # At low levels i3files contain several other P frame splits (e.g NullSplit), # we remove those here. if frame["I3EventHeader"].sub_event_stream not in [ "InIceSplit", "Final", ]: return output if "FilterMask" in frame: if "DeepCoreFilter_13" in frame["FilterMask"]: output["DeepCoreFilter_13"] = int( bool(frame["FilterMask"]["DeepCoreFilter_13"]) ) if "CascadeFilter_13" in frame["FilterMask"]: output["CascadeFilter_13"] = int( bool(frame["FilterMask"]["CascadeFilter_13"]) ) if "MuonFilter_13" in frame["FilterMask"]: output["MuonFilter_13"] = int( bool(frame["FilterMask"]["MuonFilter_13"]) ) if "OnlineL2Filter_17" in frame["FilterMask"]: output["OnlineL2Filter_17"] = int( bool(frame["FilterMask"]["OnlineL2Filter_17"]) ) elif "DeepCoreFilter_13" in frame: output["DeepCoreFilter_13"] = int(bool(frame["DeepCoreFilter_13"])) if "L3_oscNext_bool" in frame: output["L3_oscNext_bool"] = int(bool(frame["L3_oscNext_bool"])) if "L4_oscNext_bool" in frame: output["L4_oscNext_bool"] = int(bool(frame["L4_oscNext_bool"])) if "L5_oscNext_bool" in frame: output["L5_oscNext_bool"] = int(bool(frame["L5_oscNext_bool"])) if "L6_oscNext_bool" in frame: output["L6_oscNext_bool"] = int(bool(frame["L6_oscNext_bool"])) if "L7_oscNext_bool" in frame: output["L7_oscNext_bool"] = int(bool(frame["L7_oscNext_bool"])) if is_mc and (not is_noise): ( MCInIcePrimary, interaction_type, elasticity, ) = self._get_primary_particle_interaction_type_and_elasticity( frame, sim_type ) try: ( energy_track, energy_cascade, inelasticity, ) = self._get_primary_track_energy_and_inelasticity(frame) except RuntimeError: # track energy fails on northeren tracks with ""Hadrons" has no mass implemented. Cannot get total energy." energy_track, energy_cascade, inelasticity = ( padding_value, padding_value, padding_value, ) output.update( { "energy": MCInIcePrimary.energy, "position_x": MCInIcePrimary.pos.x, "position_y": MCInIcePrimary.pos.y, "position_z": MCInIcePrimary.pos.z, "azimuth": MCInIcePrimary.dir.azimuth, "zenith": MCInIcePrimary.dir.zenith, "pid": MCInIcePrimary.pdg_encoding, "interaction_type": interaction_type, "elasticity": elasticity, "dbang_decay_length": self._extract_dbang_decay_length( frame, padding_value ), "energy_track": energy_track, "energy_cascade": energy_cascade, "inelasticity": inelasticity, } ) if abs(output["pid"]) == 13: output.update( { "track_length": MCInIcePrimary.length, } ) muon_final = self._muon_stopped(output, self._borders) output.update( { "position_x": muon_final[ "x" ], # position_xyz has no meaning for muons. These will now be updated to muon final position, given track length/azimuth/zenith "position_y": muon_final["y"], "position_z": muon_final["z"], "stopped_muon": muon_final["stopped"], } ) return output def _extract_dbang_decay_length( self, frame: "icetray.I3Frame", padding_value: float = -1 ) -> float: mctree = frame[self._mctree] try: p_true = mctree.primaries[0] p_daughters = mctree.get_daughters(p_true) if len(p_daughters) == 2: for p_daughter in p_daughters: if p_daughter.type == dataclasses.I3Particle.Hadrons: casc_0_true = p_daughter else: hnl_true = p_daughter hnl_daughters = mctree.get_daughters(hnl_true) else: decay_length = padding_value hnl_daughters = [] if len(hnl_daughters) > 0: for count_hnl_daughters, hnl_daughter in enumerate( hnl_daughters ): if not count_hnl_daughters: casc_1_true = hnl_daughter else: assert casc_1_true.pos == hnl_daughter.pos casc_1_true.energy = ( casc_1_true.energy + hnl_daughter.energy ) decay_length = ( phys_services.I3Calculator.distance( casc_0_true, casc_1_true ) / icetray.I3Units.m ) else: decay_length = padding_value return decay_length except: # noqa: E722 return padding_value def _muon_stopped( self, truth: Dict[str, Any], borders: List[np.ndarray], shrink_horizontally: float = 100.0, shrink_vertically: float = 100.0, ) -> Dict[str, Any]: """Calculate whether a simulated muon within the detector volume. IMPORTANT: The final position of the muon is saved in truth extractor/ databases as position_x, position_y and position_z. This is analogouos to the neutrinos whose interaction vertex is saved under the same name. Args: truth: Dictionary of already extracted truth-level information. borders: The first entry are the (x,y) coordinates, the second entry is the z-axis min/max depths. See I3TruthExtractor constructor for hard-code example. shrink_horizontally: Shrink (x,y)-plane further with exclusion zone. Defaults to 100 meters. shrink_vertically: Further shrink detector depth with exclusion height. Defaults to 100 meters. Returns: Dictionary containing the (x,y,z)-coordinates of final the muon position as well as a boolean indicating whether the muon stopped within the chosen fiducial volume. """ # @TODO: Remove hard-coded border coords and replace with GCD file # contents using string no's border = mpath.Path(borders[0]) start_pos = np.array( [truth["position_x"], truth["position_y"], truth["position_z"]] ) travel_vec = -1 * np.array( [ truth["track_length"] * np.cos(truth["azimuth"]) * np.sin(truth["zenith"]), truth["track_length"] * np.sin(truth["azimuth"]) * np.sin(truth["zenith"]), truth["track_length"] * np.cos(truth["zenith"]), ] ) end_pos = start_pos + travel_vec stopped_xy = border.contains_point( (end_pos[0], end_pos[1]), radius=-shrink_horizontally ) stopped_z = (end_pos[2] > borders[1][0] + shrink_vertically) * ( end_pos[2] < borders[1][1] - shrink_vertically ) return { "x": end_pos[0], "y": end_pos[1], "z": end_pos[2], "stopped": (stopped_xy * stopped_z), } def _get_primary_particle_interaction_type_and_elasticity( self, frame: "icetray.I3Frame", sim_type: str, padding_value: float = -1.0, ) -> Tuple[Any, int, float]: """Return primary particle, interaction type, and elasticity. A case handler that does two things: 1) Catches issues related to determining the primary MC particle. 2) Error handles cases where interaction type and elasticity doesn't exist Args: frame: Physics frame containing MC record. sim_type: Simulation type. padding_value: The value used for padding. Returns A tuple containing the MCInIcePrimary, if it exists; the primary particle, encoded as 1 (charged current), 2 (neutral current), or 0 (neither); and the elasticity in the range ]0,1[. """ if sim_type != "noise": try: MCInIcePrimary = frame["MCInIcePrimary"] except KeyError: MCInIcePrimary = frame[self._mctree][0] if ( MCInIcePrimary.energy != MCInIcePrimary.energy ): # This is a nan check. Only happens for some muons where second item in MCTree is primary. Weird! MCInIcePrimary = frame[self._mctree][ 1 ] # For some strange reason the second entry is identical in all variables and has no nans (always muon) else: MCInIcePrimary = None try: interaction_type = frame["I3MCWeightDict"]["InteractionType"] except KeyError: interaction_type = padding_value try: elasticity = frame["I3GENIEResultDict"]["y"] except KeyError: elasticity = padding_value return MCInIcePrimary, interaction_type, elasticity def _get_primary_track_energy_and_inelasticity( self, frame: "icetray.I3Frame", ) -> Tuple[float, float, float]: """Get the total energy of tracks from primary, and inelasticity. Args: frame: Physics frame containing MC record. Returns: Tuple containing the energy of tracks from primary, and the corresponding inelasticity. """ mc_tree = frame[self._mctree] primary = mc_tree.primaries[0] daughters = mc_tree.get_daughters(primary) tracks = [] for daughter in daughters: if ( str(daughter.shape) == "StartingTrack" or str(daughter.shape) == "Dark" ): tracks.append(daughter) energy_total = primary.total_energy energy_track = sum(track.total_energy for track in tracks) energy_cascade = energy_total - energy_track inelasticity = 1.0 - energy_track / energy_total return energy_track, energy_cascade, inelasticity # Utility methods def _find_data_type(self, mc: bool, input_file: str) -> str: """Determine the data type. Args: mc: Whether `input_file` is Monte Carlo simulation. input_file: Path to I3-file. Returns: The simulation/data type. """ # @TODO: Rewrite to automatically infer `mc` from `input_file`? if not mc: sim_type = "data" elif "muon" in input_file: sim_type = "muongun" elif "corsika" in input_file: sim_type = "corsika" elif "genie" in input_file or "nu" in input_file.lower(): sim_type = "genie" elif "noise" in input_file: sim_type = "noise" elif "L2" in input_file: # not robust sim_type = "dbang" else: sim_type = "NuGen" return sim_type