diff --git a/spine/data/optical.py b/spine/data/optical.py index 0515b5524..125a24f8a 100644 --- a/spine/data/optical.py +++ b/spine/data/optical.py @@ -20,6 +20,8 @@ class Flash(PosDataBase): ---------- id : int Index of the flash in the list + interaction_id : int + Index of the interaction in the list volume_id : int Index of the optical volume in which the flahs was recorded time : float @@ -48,6 +50,7 @@ class Flash(PosDataBase): Units in which the position coordinates are expressed """ id: int = -1 + interaction_id: int = -1 volume_id: int = -1 frame: int = -1 in_beam_frame: bool = False @@ -108,3 +111,51 @@ def from_larcv(cls, flash): time_abs=flash.absTime(), time_width=flash.timeWidth(), total_pe=flash.TotalPE(), pe_per_ch=pe_per_ch, center=center, width=width) + + @classmethod + def from_hypothesis(cls, flash, interaction_id, id, volume_id=None,negative_id=True): + """Builds and returns a Flash object from a flashmatch::Flash_t object. + From the hypothesis flash. + + Parameters + ---------- + flash : flashmatch::Flash_t + Flash object + interaction_id : int + Interaction ID to make the flash + id : int + ID of the flash + volume_id : int, optional + Volume ID to use for the flash, set manually if provided + negative_id : bool, default True + If `True`, use the negative of the id as the flash ID. This is used to identify which hypothesis is matched. If it's positive, it's matched. + + Returns + ------- + Flash + Flash object + """ + # Get the number of PEs per optical channel + pe_per_ch = np.array(flash.pe_v, dtype=np.float32) + + # Get the center and width of the flash + center = np.array([flash.x, flash.y, flash.z]) + width = np.array([flash.x_err, flash.y_err, flash.z_err]) + + #Get the volume ID + if volume_id is None: + volume_id = -1 + for attr in ('tpc', 'volume_id'): + if hasattr(flash, attr): + volume_id = getattr(flash, attr)() + + # Create the Flash object + if negative_id: + id = -(id+1) #+1 to avoid negative zero + return cls(id=id, interaction_id=interaction_id, volume_id=volume_id, + time=flash.time, + time_width=flash.time_width, + total_pe=flash.TotalPE(), pe_per_ch=pe_per_ch, + center=center, width=width) + def __str__(self): + return f"Flash(id={self.id}, interaction_id={self.interaction_id}, volume_id={self.volume_id}, time={self.time}, time_width={self.time_width}, total_pe={self.total_pe}, center={self.center}, width={self.width})" diff --git a/spine/post/optical/__init__.py b/spine/post/optical/__init__.py index 5818b57a2..84c31e58a 100644 --- a/spine/post/optical/__init__.py +++ b/spine/post/optical/__init__.py @@ -1 +1,2 @@ from .flash_matching import * +from .fill_hypothesis import * \ No newline at end of file diff --git a/spine/post/optical/fill_hypothesis.py b/spine/post/optical/fill_hypothesis.py new file mode 100644 index 000000000..60dd638de --- /dev/null +++ b/spine/post/optical/fill_hypothesis.py @@ -0,0 +1,167 @@ +""" +Post-processor in charge of filling the hypothesis into the data product. +""" + +from spine.post.base import PostBase +from spine.utils.geo import Geometry +from spine.data.out.base import OutBase +import numpy as np + +from .hypothesis import Hypothesis + +__all__ = ['FillFlashHypothesisProcessor'] + +class FillFlashHypothesisProcessor(PostBase): + """Fills the hypothesis into the data product.""" + + # Name of the post-processor (as specified in the configuration) + name = 'fill_hypothesis' + + # Alternative allowed names of the post-processor + aliases = ('fill_hypo',) + + def __init__(self, flash_key, volume, ref_volume_id=None, detector=None, parent_path=None, + geometry_file=None, run_mode='reco', truth_point_mode='points', + truth_dep_mode='depositions', hypothesis_key='flash_hypos', **kwargs): + """Initialize the fill hypothesis processor. + + Parameters + ---------- + flash_key : str + Flash data product name. In most cases, this is unambiguous, unless + there are multiple types of segregated optical detectors + volume : str + Physical volume corresponding to each flash ('module' or 'tpc') + ref_volume_id : str, optional + If specified, the flash matching expects all interactions/flashes + to live into a specific optical volume. Must shift everything. + detector : str, optional + Detector to get the geometry from + geometry_file : str, optional + Path to a `.yaml` geometry file to load the geometry from + parent_path : str, optional + Path to the parent directory of the main analysis configuration. + This allows for the use of relative paths in the post-processors. + hypothesis_key : str, default 'flash_hypo' + Key to use for the hypothesis data product + """ + # Initialize the parent class + super().__init__( + 'interaction', run_mode, truth_point_mode, truth_dep_mode, + parent_path=parent_path) + + # Initialize the hypothesis key + self.flash_key = flash_key + self.hypothesis_key = hypothesis_key + + # Initialize the detector geometry + self.geo = Geometry(detector, geometry_file) + + # Get the volume within which each flash is confined + assert volume in ('tpc', 'module'), ( + "The `volume` must be one of 'tpc' or 'module'.") + self.volume = volume + self.ref_volume_id = ref_volume_id + + # Initialize the hypothesis algorithm + self.hypothesis = Hypothesis(detector=detector, parent_path=self.parent_path, **kwargs) + + #Assert that we have flashes and interactions + self.update_keys({self.flash_key: True}) + + def match_hypothesis(self, hypothesis_v, flash_info_v): + """Match the hypothesis to the flash. The hypothesis has the interaction ID, + whereas the interaction has the flash ID. So we will match the hypothesis interaction ID to + the interaction that's matched to the flash, then set the flash ID to the hypothesis ID. + + Parameters + ---------- + hypothesis_v : list + List of hypothesis objects + flash_info_v : list + List of tuples of interaction ID, flash IDs, and flash volumes + + Returns + ------- + None + Modifies the hypothesis objects in place + """ + + #Make a dictionary of the flash IDs and volumes + int_id_dict = {ii[0]: (ii[1],ii[2]) for ii in flash_info_v} + + #Modify the hypothesis objects + for hypo in hypothesis_v: + #If the interaction ID is in the dictionary, and the hypothesis volume matches the flash volume, set the flash IDs + if hypo.interaction_id in int_id_dict and int_id_dict[hypo.interaction_id][1] == hypo.volume_id: + hypo.id = int_id_dict[hypo.interaction_id][0] + + + def process(self, data): + """Fills the hypothesis into the data product. + + Parameters + ---------- + data : dict + Data product to fill the hypothesis into + """ + + volume_ids = np.asarray([f.volume_id for f in data[self.flash_key]]) + + #Loop over optical volumes, make the hypotheses in each + for k in self.interaction_keys: + # Fetch interactions, nothing to do if there are not any + interactions = data[k] + if not len(interactions): + continue + + # Make sure the interaction coordinates are expressed in cm + self.check_units(interactions[0]) + + # Loop over the optical volumes + id_offset = 0 + hypothesis_v = [] + for volume_id in np.unique(volume_ids): + # Crop interactions to only include depositions in the optical volume + interactions_v = [] + flash_info_v = [] + for inter in interactions: + # Fetch the points in the current optical volume + sources = self.get_sources(inter) + if self.volume == 'module': + index = self.geo.get_volume_index(sources, volume_id) + + elif self.volume == 'tpc': + num_cpm = self.geo.tpc.num_chambers_per_module + module_id, tpc_id = volume_id//num_cpm, volume_id%num_cpm + index = self.geo.get_volume_index(sources, module_id, tpc_id) + + # If there are no points in this volume, proceed + if len(index) == 0: + continue + + # Fetch points and depositions + points = self.get_points(inter)[index] + depositions = self.get_depositions(inter)[index] + if self.ref_volume_id is not None: + # If the reference volume is specified, shift positions + points = self.geo.translate( + points, volume_id, self.ref_volume_id) + + # Create an interaction which holds positions/depositions + inter_v = OutBase( + id=inter.id, points=points, depositions=depositions) + interactions_v.append(inter_v) + for fid,fvol in zip(inter.flash_ids,inter.flash_volume_ids): + if fvol == volume_id: + flash_info_v.append((inter.id,fid,fvol)) #needed for matching + # Make the hypothesis + _hypo_v = self.hypothesis.make_hypothesis_list(interactions_v, id_offset, volume_id) + hypothesis_v.extend(_hypo_v) + id_offset += len(_hypo_v) #increment the offset for the next volume + + # Match the matched hypothesis to the flash if provided in interactions_v + self.match_hypothesis(hypothesis_v, flash_info_v) + + # Fill the hypothesis into the data product + return {self.hypothesis_key: hypothesis_v} \ No newline at end of file diff --git a/spine/post/optical/hypothesis.py b/spine/post/optical/hypothesis.py new file mode 100644 index 000000000..b75ea7305 --- /dev/null +++ b/spine/post/optical/hypothesis.py @@ -0,0 +1,154 @@ +"""Module for storing flash hypotheses into flash objects.""" + +#TODO: Make base class between likelihood and this + +import os +import sys +import numpy as np +import re + +# Import the base class and Flash data structure +from .opt0_interface import OpT0Interface +from spine.data.optical import Flash + + +class Hypothesis(OpT0Interface): + """ + Interface class between flash hypothesis generation and OpT0Finder. + + Inherits common initialization and QCluster creation from OpT0Interface. + Uses an OpT0Finder hypothesis algorithm (e.g., SemiAnalyticalModel, + PhotonLibHypothesis) to generate predicted flash PEs from TPC interactions. + """ + + def __init__(self, cfg, detector, parent_path=None, scaling=1., alpha=0.21, + recombination_mip=0.6, legacy=False): + """Initialize the flash hypothesis algorithm. + + Parameters + ---------- + cfg : str + Flash matching configuration file path + detector : str, optional + Detector to get the geometry from + parent_path : str, optional + Path to the parent configuration file (allows for relative paths) + scaling : Union[float, str], default 1. + Global scaling factor for the depositions (can be an expression) + alpha : float, default 0.21 + Number of excitons (Ar*) divided by number of electron-ion pairs (e-,Ar+) + recombination_mip : float, default 0.6 + Recombination factor for MIP-like particles in LAr + legacy : bool, default False + Use the legacy OpT0Finder function(s). TODO: remove when dropping legacy + """ + # Call the parent class initializer for common setup + super().__init__(cfg, detector, parent_path, scaling, alpha, + recombination_mip, legacy) + + # Initialize hypothesis-specific attributes + self.hypothesis_v = None + + def _initialize_algorithm(self, cfg_params): + """ + Initialize the specific Hypothesis algorithm based on configuration. + + Parameters + ---------- + cfg_params : flashmatch::PSet + The loaded OpT0Finder configuration parameters. + """ + from flashmatch import flashmatch + # Get FlashMatchManager configuration section to find the HypothesisAlgo name + # Assuming the relevant parameters are under 'FlashMatchManager' PSet + # Adjust 'FlashMatchManager' if your config structure is different + manager_params = cfg_params.get['flashmatch::FMParams']('FlashMatchManager') + + # Parse the configuration dump to find the HypothesisAlgo value + config_dump = manager_params.dump() + match = re.search(r'HypothesisAlgo\s*:\s*"([^"]+)"', config_dump) + if match: + algo_name = match.group(1) + else: + # Fallback: Check if the hypothesis algo config exists directly under top level + # This depends on how the .cfg file is structured + found_algo = False + for name in ['SemiAnalyticalModel', 'PhotonLibHypothesis']: # Add other known hypothesis algos + if cfg_params.contains_pset(name): + algo_name = name + found_algo = True + break + if not found_algo: + raise ValueError(f"Could not find HypothesisAlgo parameter within " + f"'FlashMatchManager' PSet in configuration: {config_dump}") + + + print(f'HypothesisAlgo: {algo_name}') + + # Create the hypothesis algorithm based on the extracted name + # Ensure the factory name matches the class name used in OpT0Finder registration + try: + self.hypothesis = flashmatch.FlashHypothesisFactory.get().create( + algo_name, algo_name) # Factory name and instance name often match + except Exception as e: + raise ValueError(f"Failed to create hypothesis algorithm '{algo_name}'. " + f"Is it registered correctly in OpT0Finder? Error: {e}") + + # Configure the hypothesis algorithm using its own PSet + try: + algo_pset = cfg_params.get['flashmatch::FMParams'](algo_name) + self.hypothesis.Configure(algo_pset) + except Exception as e: + raise ValueError(f"Failed to configure hypothesis algorithm '{algo_name}' " + f"using PSet '{algo_name}'. Error: {e}") + + def make_hypothesis_list(self, interactions, id_offset=0, volume_id=None): + """ + Runs the hypothesis algorithm on a list of interactions to create + a list of spine Flash objects representing the predicted light. + + Parameters + ---------- + interactions : List[Union[Interaction, TruthInteraction]] + List of TPC interactions + id_offset : int, default 0 + Offset to add to the flash ID + volume_id : int, optional + Volume ID to use for the hypothesis + + Returns + ------- + List[Flash] + List of generated spine Flash objects. + """ + # Make the QCluster_t objects using the base class method + # Store them in self.qcluster_v as the base class expects + self.qcluster_v = self.make_qcluster_list(interactions) + + # Map original interaction index to qcluster object for easy lookup + qcluster_map = {qc.idx: qc for qc in self.qcluster_v} + + # Initialize the list of generated spine Flash objects + self.hypothesis_v = [] + + # Run the hypothesis algorithm for each interaction that produced a valid qcluster + for i, inter in enumerate(interactions): + # Find the corresponding QCluster_t object using the original index + qcluster = qcluster_map.get(inter.id) # Assuming inter.id is the original index used in make_qcluster_list + + # Skip if no valid qcluster was created for this interaction + if qcluster is None: + continue + + # Run the hypothesis algorithm + flash_hypothesis_fm = self.hypothesis.GetEstimate(qcluster) # flashmatch::Flash_t + + # Create a new spine Flash object from the hypothesis result + # Pass the original interaction ID (inter.id) + flash = Flash.from_hypothesis(flash_hypothesis_fm, inter.id, i + id_offset, volume_id) + + # Append the generated spine Flash object + self.hypothesis_v.append(flash) + + return self.hypothesis_v + diff --git a/spine/post/optical/likelihood.py b/spine/post/optical/likelihood.py index 3a6ba672c..0ceabfeb8 100644 --- a/spine/post/optical/likelihood.py +++ b/spine/post/optical/likelihood.py @@ -2,19 +2,23 @@ import os import sys - import numpy as np +# Import the base class +from .opt0_interface import OpT0Interface + -class LikelihoodFlashMatcher: - """Interface class between full chain outputs and OpT0Finder +class LikelihoodFlashMatcher(OpT0Interface): + """ + Interface class between full chain outputs and OpT0Finder for likelihood matching. + Inherits common initialization and QCluster creation from OpT0Interface. See https://github.com/drinkingkazu/OpT0Finder for more details about it. """ def __init__(self, cfg, detector, parent_path=None, reflash_merging_window=None, scaling=1., alpha=0.21, - recombination_mip=0.65, legacy=False): + recombination_mip=0.6, legacy=False): """Initialize the likelihood-based flash matching algorithm. Parameters @@ -31,96 +35,32 @@ def __init__(self, cfg, detector, parent_path=None, Global scaling factor for the depositions (can be an expression) alpha : float, default 0.21 Number of excitons (Ar*) divided by number of electron-ion pairs (e-,Ar+) - recombination_mip : float, default 0.65 + recombination_mip : float, default 0.6 Recombination factor for MIP-like particles in LAr legacy : bool, default False Use the legacy OpT0Finder function(s). TODO: remove when dropping legacy """ - # Initialize the flash manager (OpT0Finder wrapper) - self.initialize_backend(cfg, detector, parent_path) + super().__init__(cfg, detector, parent_path, scaling, alpha, + recombination_mip, legacy) - # Get the external parameters self.reflash_merging_window = reflash_merging_window - self.scaling = scaling - if isinstance(self.scaling, str): - self.scaling = eval(self.scaling) - self.alpha = alpha - if isinstance(self.alpha, str): - self.alpha = eval(self.alpha) - self.recombination_mip = recombination_mip - if isinstance(self.recombination_mip, str): - self.recombination_mip = eval(self.recombination_mip) - self.legacy = legacy - - # Initialize flash matching attributes + self.matches = None - self.qcluster_v = None self.flash_v = None - def initialize_backend(self, cfg, detector, parent_path): - """Initialize OpT0Finder (backend). - - Expects that the environment variable `FMATCH_BASEDIR` is set. - You can either set it by hand (to the path where one can find - OpT0Finder) or you can source `OpT0Finder/configure.sh` if you - are running code from a command line. + def _initialize_algorithm(self, cfg_params): + """ + Initialize the FlashMatchManager for likelihood matching. Parameters ---------- - cfg: str - Path to config for OpT0Finder - detector : str, optional - Detector to get the geometry from - parent_path : str, optional - Path to the parent configuration file (allows for relative paths) + cfg_params : flashmatch::PSet + The loaded OpT0Finder configuration parameters. """ - # Add OpT0finder python interface to the python path - basedir = os.getenv('FMATCH_BASEDIR') - assert basedir is not None, ( - "You need to source OpT0Finder's configure.sh or set the " - "FMATCH_BASEDIR environment variable before running flash " - "matching.") - sys.path.append(os.path.join(basedir, 'python')) - - # Add the OpT0Finder library to the dynamic link loader - lib_path = os.path.join(basedir, 'build/lib') - os.environ['LD_LIBRARY_PATH'] = '{}:{}'.format( - lib_path, os.environ['LD_LIBRARY_PATH']) - - # Add the OpT0Finder data directory if it is not yet set - if 'FMATCH_DATADIR' not in os.environ: - os.environ['FMATCH_DATADIR'] = os.path.join(basedir, 'dat') - - # Load up the detector specifications - if detector is None: - det_cfg = os.path.join(basedir, 'dat/detector_specs.cfg') - else: - det_cfg = os.path.join(basedir, f'dat/detector_specs_{detector}.cfg') - - if not os.path.isfile(det_cfg): - raise FileNotFoundError( - f"Cannot file detector specification file: {det_cfg}.") - from flashmatch import flashmatch - flashmatch.DetectorSpecs.GetME(det_cfg) - - # Fetch and initialize the OpT0Finder configuration - if parent_path is not None and not os.path.isfile(cfg): - cfg = os.path.join(parent_path, cfg) - if not os.path.isfile(cfg): - raise FileNotFoundError( - f"Cannot find flash-matcher config: {cfg}") - - cfg = flashmatch.CreateFMParamsFromFile(cfg) - # Initialize The OpT0Finder flash match manager self.mgr = flashmatch.FlashMatchManager() - self.mgr.Configure(cfg) - - # Get the light path algorithm to produce QCluster_t objects - self.light_path = flashmatch.CustomAlgoFactory.get().create( - 'LightPath', 'ToyMCLightPath') - self.light_path.Configure(cfg.get['flashmatch::FMParams']('LightPath')) + self.mgr.Configure(cfg_params) def get_matches(self, interactions, flashes): """Find TPC interactions compatible with optical flashes. @@ -159,56 +99,6 @@ def get_matches(self, interactions, flashes): return result - def make_qcluster_list(self, interactions): - """Converts a list of SPINE interaction into a list of OpT0Finder - flashmatch.QCluster_t objects. - - Parameters - ---------- - interactions : List[Union[Interaction, TruthInteraction]] - List of TPC interactions - - Returns - ------- - List[QCluster_t] - List of OpT0Finder flashmatch::QCluster_t objects - """ - # Loop over the interacions - from flashmatch import flashmatch - qcluster_v = [] - for idx, inter in enumerate(interactions): - # Produce a mask to remove negative value points (can happen) - valid_mask = np.where(inter.depositions > 0.)[0] - - # Skip interactions with less than 2 points - if len(valid_mask) < 2: - continue - - # Initialize qcluster - qcluster = flashmatch.QCluster_t() - qcluster.idx = idx - qcluster.time = 0 - - # Get the point coordinates - points = inter.points[valid_mask] - - # Get the depositions - depositions = inter.depositions[valid_mask] - - # Fill the trajectory - pytraj = np.hstack([points, depositions[:, None]]) - traj = flashmatch.as_geoalgo_trajectory(pytraj) - if self.legacy: - qcluster += self.light_path.MakeQCluster(traj, self.scaling) - else: - qcluster += self.light_path.MakeQCluster( - traj, self.scaling, self.alpha, self.recombination_mip) - - # Append - qcluster_v.append(qcluster) - - return qcluster_v - def make_flash_list(self, flashes): """Creates a list of flashmatch.Flash_t from the local class. @@ -341,7 +231,7 @@ def get_flash(self, idx, array=False): if array: return flashmatch.as_ndarray(flash) else: return flash - raise Exception('Flash {idx} does not exist in self.flash_v') + raise Exception(f'Flash {idx} does not exist in self.flash_v') def get_match(self, idx): @@ -387,7 +277,7 @@ def get_matched_flash(self, idx): flash_id = m.flash_id if flash_id is None: return None if flash_id > len(self.flash_v): - raise Exception('Flash {flash_id} does not exist in self.flash_v') + raise Exception(f'Flash {flash_id} does not exist in self.flash_v') return self.flash_v[flash_id] diff --git a/spine/post/optical/opt0_interface.py b/spine/post/optical/opt0_interface.py new file mode 100644 index 000000000..eae40e692 --- /dev/null +++ b/spine/post/optical/opt0_interface.py @@ -0,0 +1,189 @@ +"""Base module for interfacing with OpT0Finder algorithms.""" + +import os +import sys +import numpy as np +import re +from abc import ABC, abstractmethod + +class OpT0Interface(ABC): + """ + Abstract base class for OpT0Finder interfaces (Likelihood and Hypothesis). + + Handles common initialization logic, environment setup, configuration loading, + and QCluster creation. + """ + + def __init__(self, cfg, detector, parent_path=None, scaling=1., alpha=0.21, + recombination_mip=0.6, legacy=False): + """ + Initialize common attributes and the OpT0Finder backend. + + Parameters + ---------- + cfg : str + Flash matching configuration file path + detector : str, optional + Detector to get the geometry from + parent_path : str, optional + Path to the parent configuration file (allows for relative paths) + scaling : Union[float, str], default 1. + Global scaling factor for the depositions (can be an expression) + alpha : float, default 0.21 + Number of excitons (Ar*) divided by number of electron-ion pairs (e-,Ar+) + recombination_mip : float, default 0.6 + Recombination factor for MIP-like particles in LAr + legacy : bool, default False + Use the legacy OpT0Finder function(s). TODO: remove when dropping legacy + """ + # Store external parameters + self.scaling = scaling + if isinstance(self.scaling, str): + self.scaling = eval(self.scaling) + self.alpha = alpha + if isinstance(self.alpha, str): + self.alpha = eval(self.alpha) + self.recombination_mip = recombination_mip + if isinstance(self.recombination_mip, str): + self.recombination_mip = eval(self.recombination_mip) + self.legacy = legacy + + # Initialize the flash manager (OpT0Finder wrapper) + self.initialize_backend(cfg, detector, parent_path) + + # Initialize common attributes potentially used by subclasses + self.qcluster_v = None + + def initialize_backend(self, cfg, detector, parent_path): + """ + Initialize the common OpT0Finder backend components. + + Sets up environment variables, loads detector specs, loads configuration, + and initializes the LightPath algorithm. Calls the abstract method + `_initialize_algorithm` for subclass-specific initialization. + + Parameters + ---------- + cfg : str + Path to config for OpT0Finder + detector : str, optional + Detector to get the geometry from + parent_path : str, optional + Path to the parent configuration file (allows for relative paths) + """ + # Add OpT0finder python interface to the python path + basedir = os.getenv('FMATCH_BASEDIR') + assert basedir is not None, ( + "You need to source OpT0Finder's configure.sh or set the " + "FMATCH_BASEDIR environment variable before running flash " + "matching.") + sys.path.append(os.path.join(basedir, 'python')) + + # Add the OpT0Finder library to the dynamic link loader + lib_path = os.path.join(basedir, 'build/lib') + # Avoid prepending if already present to prevent excessive path length + if lib_path not in os.environ.get('LD_LIBRARY_PATH', ''): + os.environ['LD_LIBRARY_PATH'] = '{}:{}'.format( + lib_path, os.environ.get('LD_LIBRARY_PATH', '')) + + # Add the OpT0Finder data directory if it is not yet set + if 'FMATCH_DATADIR' not in os.environ: + os.environ['FMATCH_DATADIR'] = os.path.join(basedir, 'dat') + + # Load up the detector specifications + if detector is None: + det_cfg = os.path.join(basedir, 'dat/detector_specs.cfg') + else: + det_cfg = os.path.join(basedir, f'dat/detector_specs_{detector}.cfg') + + if not os.path.isfile(det_cfg): + raise FileNotFoundError( + f"Cannot file detector specification file: {det_cfg}.") + + from flashmatch import flashmatch + flashmatch.DetectorSpecs.GetME(det_cfg) + + # Fetch and initialize the OpT0Finder configuration + if parent_path is not None and not os.path.isabs(cfg): + cfg = os.path.join(parent_path, cfg) + if not os.path.isfile(cfg): + raise FileNotFoundError( + f"Cannot find flash-matcher config: {cfg}") + + cfg_params = flashmatch.CreateFMParamsFromFile(cfg) + + # Get the light path algorithm to produce QCluster_t objects + self.light_path = flashmatch.CustomAlgoFactory.get().create( + 'LightPath', 'ToyMCLightPath') + self.light_path.Configure(cfg_params.get['flashmatch::FMParams']('LightPath')) + + # Initialize the specific algorithm (FlashMatchManager or Hypothesis) + self._initialize_algorithm(cfg_params) + + + @abstractmethod + def _initialize_algorithm(self, cfg_params): + """ + Abstract method for initializing the specific OpT0Finder algorithm. + + Subclasses must implement this to initialize their specific backend + (e.g., FlashMatchManager for likelihood, HypothesisAlgo for hypothesis). + + Parameters + ---------- + cfg_params : flashmatch::PSet + The loaded OpT0Finder configuration parameters. + """ + pass + + def make_qcluster_list(self, interactions): + """ + Converts a list of SPINE interaction into a list of OpT0Finder + flashmatch.QCluster_t objects. + + Parameters + ---------- + interactions : List[Union[Interaction, TruthInteraction]] + List of TPC interactions + + Returns + ------- + List[QCluster_t] + List of OpT0Finder flashmatch::QCluster_t objects + """ + # Loop over the interacions + from flashmatch import flashmatch + qcluster_v = [] + for idx, inter in enumerate(interactions): + # Produce a mask to remove negative value points (can happen) + valid_mask = np.where(inter.depositions > 0.)[0] + + # Skip interactions with less than 2 points + if len(valid_mask) < 2: + continue + + # Initialize qcluster + qcluster = flashmatch.QCluster_t() + qcluster.idx = idx + qcluster.time = 0 # Assume t=0 for hypothesis/likelihood generation + + # Get the point coordinates + points = inter.points[valid_mask] + + # Get the depositions + depositions = inter.depositions[valid_mask] + + # Fill the trajectory + pytraj = np.hstack([points, depositions[:, None]]) + traj = flashmatch.as_geoalgo_trajectory(pytraj) + if self.legacy: + qcluster += self.light_path.MakeQCluster(traj, self.scaling) + else: + qcluster += self.light_path.MakeQCluster( + traj, self.scaling, self.alpha, self.recombination_mip) + + # Append + qcluster_v.append(qcluster) + + return qcluster_v + diff --git a/spine/vis/ellipsoid.py b/spine/vis/ellipsoid.py index 424101e59..29e9171a9 100644 --- a/spine/vis/ellipsoid.py +++ b/spine/vis/ellipsoid.py @@ -9,7 +9,7 @@ def ellipsoid_trace(points=None, centroid=None, covmat=None, contour=0.5, num_samples=10, color=None, intensity=None, hovertext=None, - showscale=False, **kwargs): + showscale=False, size_scale=1, **kwargs): """Converts a cloud of points or a covariance matrix into a 3D ellipsoid. This function uses the centroid and the covariance matrix of a cloud of @@ -39,6 +39,8 @@ def ellipsoid_trace(points=None, centroid=None, covmat=None, contour=0.5, Text associated with the box showscale : bool, default False If True, show the colorscale of the :class:`plotly.graph_objs.Mesh3d` + size_scale : float, optional, default 1 + Scale factor for the size of the ellipsoid **kwargs : dict, optional Additional parameters to pass to the underlying :class:`plotly.graph_objs.Mesh3d` object @@ -78,8 +80,7 @@ def ellipsoid_trace(points=None, centroid=None, covmat=None, contour=0.5, "The `contour` parameter should be a probability.") radius = np.sqrt(2*gammaincinv(1.5, contour)) - ell_points = centroid + radius*np.dot(unit_points, rotmat) - + ell_points = centroid + size_scale*radius*np.dot(unit_points, rotmat) # Convert the color provided to a set of intensities, if needed if color is not None and not isinstance(color, str): assert intensity is None, ( @@ -105,7 +106,7 @@ def ellipsoid_trace(points=None, centroid=None, covmat=None, contour=0.5, def ellipsoid_traces(centroids, covmat, color=None, hovertext=None, cmin=None, cmax=None, shared_legend=True, legendgroup=None, - showlegend=True, name=None, **kwargs): + showlegend=True, name=None, size_scale=1, **kwargs): """Function which produces a list of plotly traces of ellipsoids given a list of centroids and one covariance matrix in x, y and z. @@ -131,6 +132,8 @@ def ellipsoid_traces(centroids, covmat, color=None, hovertext=None, cmin=None, Whether to show legends on not name : str, optional Name of the trace(s) + size_scale : List[float], optional + List of scale factors for the size of the ellipsoids **kwargs : dict, optional List of additional arguments to pass to the underlying list of :class:`plotly.graph_objs.Mesh3D` @@ -181,10 +184,15 @@ def ellipsoid_traces(centroids, covmat, color=None, hovertext=None, cmin=None, else: name_i = f'{name} {i}' + if size_scale is not None: + size_scale_i = size_scale[i] + else: + size_scale_i = 1 + # Append list of traces traces.append(ellipsoid_trace( centroid=centroid, covmat=covmat, contour=None, color=col, hovertext=hov, cmin=cmin, cmax=cmax, legendgroup=legendgroup, - showlegend=showlegend, name=name_i, **kwargs)) + showlegend=showlegend, name=name_i, size_scale=size_scale_i, **kwargs)) return traces diff --git a/spine/vis/geo.py b/spine/vis/geo.py index b1475ed67..633f16697 100644 --- a/spine/vis/geo.py +++ b/spine/vis/geo.py @@ -89,7 +89,7 @@ def tpc_traces(self, meta=None, draw_faces=False, shared_legend=True, def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, name='Optical', color='rgba(0,0,255,0.25)', cmin=None, - cmax=None, zero_supress=False, volume_id=None, **kwargs): + cmax=None, zero_supress=False, volume_id=None,size=1, offset=[0,0,0], **kwargs): """Function which produces a list of traces which represent the optical detectors in a 3D event display. @@ -115,6 +115,10 @@ def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, volume_id : int, optional Specifies which optical volume to represent. If not specified, all the optical volumes are drawn + size : Union[int, np.ndarray], optional + Size of the optical detectors. Default is 1 (no scaling) + offest : List[float], optional + Offset of the optical detectors [x,y,z] **kwargs : dict, optional List of additional arguments to pass to spine.vis.ellipsoid.ellipsoid_traces or spine.vis.box.box_traces @@ -134,7 +138,7 @@ def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, else: positions = self.geo.optical.positions[volume_id] half_dimensions = self.geo.optical.dimensions/2 - + positions += offset*np.sign(positions) # If there is more than one detector shape, fetch shape IDs shape_ids = None if self.geo.optical.shape_ids is not None: @@ -174,21 +178,30 @@ def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, if shape_ids is None: pos = positions col = color + sz = size else: index = np.where(np.asarray(shape_ids) == i)[0] pos = positions[index] + #Set the color of the optical detectors if color is not None and not np.isscalar(color): col = color[index] else: col = color + #Set the size of the optical detectors + if not np.isscalar(size): + sz = size[index] + else: + sz = size + + # If zero-supression is requested, only draw the optical detectors # which record a non-zero signal if zero_supress and color is not None and not np.isscalar(color): index = np.where(np.asarray(col) != 0)[0] pos = pos[index] col = col[index] - + sz = sz[index] # Determine wheter to show legends or not showlegend = not shared_legend or i == 0 @@ -197,7 +210,6 @@ def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, if shape == 'box': # Convert the positions/dimensions to box lower/upper bounds lower, upper = pos - hd, pos + hd - # Build boxes traces += box_traces( lower, upper, shared_legend=shared_legend, name=name, @@ -207,11 +219,10 @@ def optical_traces(self, meta=None, shared_legend=True, legendgroup=None, else: # Convert the optical detector dimensions to a covariance matrix covmat = np.diag(hd**2) - # Build ellipsoids traces += ellipsoid_traces( pos, covmat, shared_legend=shared_legend, name=name, - color=col, cmin=cmin, cmax=cmax, + color=col, cmin=cmin, cmax=cmax, size_scale=sz, legendgroup=legendgroup, showlegend=showlegend, **kwargs) return traces diff --git a/spine/vis/layout.py b/spine/vis/layout.py index f6069094c..b19224016 100644 --- a/spine/vis/layout.py +++ b/spine/vis/layout.py @@ -102,6 +102,10 @@ def layout3d(ranges=None, meta=None, detector=None, titles=None, ranges[:, 0] -= lengths*0.1 ranges[:, 1] += lengths*0.1 + # Add extra padding to the x-axis + ranges[0, 0] -= lengths[0]*0.2 + ranges[0, 1] += lengths[0]*0.2 + # If pixel coordinates are requested, use meta to make the conversion if detector_coords is False: assert meta is not None, ( diff --git a/spine/vis/out.py b/spine/vis/out.py index bdb276a7f..e3dbd7aa4 100644 --- a/spine/vis/out.py +++ b/spine/vis/out.py @@ -156,11 +156,11 @@ def get_index(self, obj): return obj.index else: return getattr(obj, self.truth_index_mode) - def get(self, obj_type, attr=None, color_attr=None, draw_raw=False, - draw_end_points=False, draw_directions=False, draw_vertices=False, - draw_flashes=False, synchronize=False, titles=None, - split_traces=False, matched_flash_only=True): + draw_end_points=False, draw_directions=False, draw_vertices=False, draw_flashes=False, + synchronize=False, titles=None, split_traces=False, + matched_flash_only=True, draw_flash_hypotheses=False, hypo_interaction_id=None, + use_size_for_flash=False): """Draw the requested object type with the requested mode. Parameters @@ -190,6 +190,12 @@ def get(self, obj_type, attr=None, color_attr=None, draw_raw=False, If `True`, one trace is produced for each object matched_flash_only : bool, default True If `True`, only flashes matched to interactions are drawn + draw_flash_hypotheses : bool, default False + If `True`, draw the flash hypotheses for the given interaction ID + hypo_interaction_id : int, optional + Interaction ID of the hypothesis to draw + use_size_for_flash : bool, default False + If `True`, use the size of the flash to draw the amount of PE for the flash Returns ------- @@ -241,6 +247,12 @@ def get(self, obj_type, attr=None, color_attr=None, draw_raw=False, "Must provide interactions to draw their vertices.") traces[prefix] += self._vertex_trace(obj_name) + # Flash configuration + if draw_flash_hypotheses and draw_flashes: + opacity = 0.5 + else: + opacity = 1. + # Fetch the flashes, if requested if draw_flashes: assert 'flashes' in self.data, ( @@ -249,7 +261,15 @@ def get(self, obj_type, attr=None, color_attr=None, draw_raw=False, obj_name = f'{prefix}_interactions' assert obj_name in self.data, ( "Must provide interactions to draw matched flashes.") - traces[prefix] += self._flash_trace(obj_name, matched_flash_only) + traces[prefix] += self._flash_trace(obj_name, matched_flash_only, use_size_for_flash, opacity=opacity) + + # Fetch the flash hypotheses, if requested + if draw_flash_hypotheses: + assert 'flash_hypos' in self.data, ( + "Must provide the `flash_hypos` objects to draw them.") + for prefix in self.prefixes: + obj_name = f'flash_hypos' + traces[prefix] += self._hypothesis_trace(obj_name, hypo_interaction_id, use_size_for_flash, opacity=opacity,offset=[40,0,0]) # Add the TPC traces, if available if self.geo_drawer is not None: @@ -631,8 +651,7 @@ def _direction_trace(self, obj_name, color='black', **kwargs): return scatter_arrows( points, dirs, hovertext=np.array(hovertext), name=name, color=color, **kwargs) - - def _flash_trace(self, obj_name, matched_only, **kwargs): + def _flash_trace(self, obj_name, matched_only, use_size, opacity, **kwargs): """Draw the cumlative PEs of flashes that have been matched to interactions specified by `obj_name`. @@ -642,6 +661,10 @@ def _flash_trace(self, obj_name, matched_only, **kwargs): Name of the object to draw matched_only : bool If `True`, only flashes matched to interactions are drawn + use_size : bool + If `True`, scale the size of the flashes by the number of PEs. Otherwise, the size is fixed and the color is scaled by the number of PEs. + opacity : float + Opacity of the flashes **kwargs : dict, optional List of additional arguments to pass to :func:`optical_traces` @@ -667,17 +690,91 @@ def _flash_trace(self, obj_name, matched_only, **kwargs): flash_ids = np.arange(len(self.data['flashes'])) # Sum values from each flash to build a a global color scale - color = np.zeros(self.geo_drawer.geo.optical.num_detectors) + size = np.zeros(self.geo_drawer.geo.optical.num_detectors) + color = size.copy() opt_det_ids = self.geo_drawer.geo.optical.det_ids for flash_id in flash_ids: flash = self.data['flashes'][flash_id] index = self.geo_drawer.geo.optical.volume_index(flash.volume_id) pe_per_ch = flash.pe_per_ch + #Currently, the time of the flash is the same for the entire flash, so it's not a useful attribute + #time = flash.time + if opt_det_ids is not None: + pe_per_ch = np.bincount(opt_det_ids, weights=pe_per_ch) + if use_size: + size[index] += pe_per_ch + color[index] += pe_per_ch + else: + size[index] = 1 + color[index] += pe_per_ch + + color = np.where(size == 0, 0, color) + + if use_size: + #Normalize the size to be between 0.5 and 2 + size = (size - np.min(size))/(np.max(size) - np.min(size)) + size = size*(2 - 0.5) + 0.5 + # Return the set of optical detectors with a color scale + return self.geo_drawer.optical_traces( + meta=self.meta, color=color, size=size, zero_supress=True, + colorscale='Inferno', name=name, opacity=opacity, **kwargs) + + def _hypothesis_trace(self, obj_name, interaction_id, use_size, opacity, **kwargs): + """Draw the hypothesis object. + + Parameters + ---------- + obj_name : str + Name of the object to draw + interaction_id : int + Interaction ID of the hypothesis to draw + opacity : float + Opacity of the hypothesis + **kwargs : dict, optional + List of additional arguments to pass to :func:`optical_traces` + + Returns + ------- + list + List of optical detector traces + """ + # If there was no geometry provided by the user, nothing to do here + assert self.geo_drawer is not None, ( + "Cannot draw optical detectors without geometry information.") + assert interaction_id in [hypo.interaction_id for hypo in self.data['flash_hypos']], ( + f"Interaction ID {interaction_id} not found in flash hypotheses. Available IDs: {np.unique([int(hypo.interaction_id) for hypo in self.data['flash_hypos']])}.") + + # Define the name of the trace + name = ' '.join(obj_name.split('_')).capitalize()[:-1] + ' hypothesis' + + # Find the list of flash IDs to draw that match the interaction ID + hypo_flashes = [hypo for hypo in self.data['flash_hypos'] if hypo.interaction_id == interaction_id] + + # Sum values from each flash to build a a global color scale + size = np.zeros(self.geo_drawer.geo.optical.num_detectors) + color = size.copy() + opt_det_ids = self.geo_drawer.geo.optical.det_ids + + for hypo in hypo_flashes: + index = self.geo_drawer.geo.optical.volume_index(hypo.volume_id) + pe_per_ch = hypo.pe_per_ch + #Currently, the time of the flash is the same for the entire flash, so it's not a useful attribute + #time = hypo.time if opt_det_ids is not None: pe_per_ch = np.bincount(opt_det_ids, weights=pe_per_ch) - color[index] += pe_per_ch + if use_size: + size[index] += pe_per_ch + color[index] += pe_per_ch + else: + size[index] = 1. + color[index] += pe_per_ch + if use_size: + #Normalize the size to be between 0.5 and 2 + size = (size - np.min(size))/(np.max(size) - np.min(size)) + size = size*(2 - 0.5) + 0.5 # Return the set of optical detectors with a color scale return self.geo_drawer.optical_traces( - meta=self.meta, color=color, zero_supress=True, - colorscale='Inferno', name=name) + meta=self.meta, color=color, size=size, zero_supress=True, + colorscale='Inferno', name=name, opacity=opacity, **kwargs) +