Source code for graphnet.deployment.deployer

"""Contains the graphnet deployment module."""
import random
from abc import abstractmethod, ABC
import multiprocessing
from typing import TYPE_CHECKING, List, Union, Sequence, Any
import time

from graphnet.utilities.imports import has_torch_package
from .deployment_module import DeploymentModule
from graphnet.utilities.logging import Logger

if has_torch_package or TYPE_CHECKING:
    import torch


[docs] class Deployer(ABC, Logger): """A generic baseclass for applying `DeploymentModules` to analysis files. Modules are applied in the order that they appear in `modules`. """ @abstractmethod def _process_files( self, settings: Any, ) -> None: """Process a single file. If n_workers > 1, this function is run in parallel n_worker times. Each worker will loop over an allocated set of files. """ raise NotImplementedError @abstractmethod def _prepare_settings( self, input_files: List[str], output_folder: str ) -> List[Any]: """Produce a list of inputs for each worker. This function must produce and return a list of arguments to each worker. """ raise NotImplementedError def __init__( self, modules: Union[DeploymentModule, Sequence[DeploymentModule]], n_workers: int = 1, ) -> None: """Initialize `Deployer`. Will apply `DeploymentModules` to files in the order in which they appear in `modules`. Each module is run independently. Args: modules: List of `DeploymentModules`. Order of appearence in the list determines order of deployment. n_workers: Number of workers. The deployer will divide the number of input files across workers. Defaults to 1. """ super().__init__(name=__name__, class_name=self.__class__.__name__) # This makes sure that one worker cannot access more # than 1 core's worth of compute. if torch.get_num_interop_threads() > 1: torch.set_num_interop_threads(1) if torch.get_num_threads() > 1: torch.set_num_threads(1) # Check if isinstance(modules, list): self._modules = modules else: self._modules = [modules] # Member Variables self._n_workers = n_workers def _launch_jobs(self, settings: List[Any]) -> None: """Will launch jobs in parallel if n_workers > 1, else run on main.""" if self._n_workers > 1: processes = [] for i in range(self._n_workers): processes.append( multiprocessing.Process( target=self._process_files, args=[settings[i]], # type: ignore ) ) for process in processes: process.start() for process in processes: process.join() else: self._process_files(settings[0])
[docs] def run( self, input_files: Union[List[str], str], output_folder: str, ) -> None: """Apply `modules` to input files. Args: input_files: Path(s) to i3 file(s) that you wish to apply the graphnet modules to. output_folder: The output folder to which the i3 files are written. """ start_time = time.time() if isinstance(input_files, list): random.shuffle(input_files) else: input_files = [input_files] settings = self._prepare_settings( input_files=input_files, output_folder=output_folder ) assert len(settings) == self._n_workers self.info( f"""processing {len(input_files)} files \n using {self._n_workers} workers""" ) self._launch_jobs(settings) self.info( f"""Processing {len(input_files)} files was completed in \n {time.time() - start_time} seconds using {self._n_workers} cores.""" )