import csv
import itertools
import json
import logging
import os
import re
import shutil
import signal
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
import pandas as pd
import psutil
from tqdm import tqdm
from .helper_functions import RingBuffer, collect_unique_values
from .postprocessing import PostProcess
from .recipes import SimulationRecipe
from .rml import ObjectElement, ParamElement, RMLFile
from .runner import RayUIAPI, RayUIRunner
################################################################
[docs]
class SimulationParams:
"""Handles the setup and management of simulation parameters for RAY-UI simulations.
This class is responsible for organizing simulation parameters, both independent and dependent,
and generating the necessary parameter sets for conducting simulations.
Attributes:
rml (RMLFile or str): The RML file or the path to the RML file used as the template
for simulations.
params (list of dict): A list of dictionaries where each dictionary represents a
set of parameters to simulate. Each key in the dictionary
is a ParamElement, and its value is the parameter value(s)
to simulate.
"""
def __init__(self, rml=None, param_list=None, **kwargs) -> None:
"""Initializes the SimulationParams class with a RML file and a list
of parameter dictionaries.
Args:
rml (RMLFile or str, optional): The RML file or the path to the RML
file used as the template for simulations.
param_list (list of dict, optional): A list of dictionaries where each
dictionary represents a set of parameters
to simulate. Defaults to None.
"""
self._rml = self._initialize_rml(
rml, **kwargs
) # Initializes the RML file or RMLFile object
self.params = param_list or [] # List of dictionaries for parameters to simulate
self.ind_param_values = [] # Independent parameter values for simulations
self.ind_par = [] # Independent parameters
self.dep_param_dependency = [] # Dependencies between parameters
self.dep_value_dependency = [] # Values dependent on other parameters
self.dep_par = [] # Dependent parameters
def _initialize_rml(self, rml, **kwargs):
"""Initializes the RML file or RMLFile object based on the provided RML path or object.
Args:
rml (RMLFile or str): The RML file or the path to the RML file used as
the template for simulations.
Returns:
RMLFile: An initialized RMLFile object.
"""
if rml is None:
raise ValueError("An RML file or RMLFile object must be provided.")
elif isinstance(rml, RMLFile):
return rml
elif isinstance(rml, str):
return RMLFile(rml, **kwargs)
else:
raise ValueError(
f"The rml should be either a string point to n rml file or \
an instance of the RMLFile class. You passed a {type(rml)}"
)
@property
def rml(self):
return self._rml
@property
def params(self):
return self._params
@params.setter
def params(self, value):
self._validate_params(value)
self._params = value
self._extract_param()
def _validate_params(self, value):
"""Validates the input parameter list to ensure it is in the correct format.
Args:
value (list): The parameter list to be validated.
Raises:
TypeError: If the input value is not a list or if the elements of the
list are not dictionaries.
"""
if not isinstance(value, list):
raise TypeError("params must be a list")
for item in value:
if not isinstance(item, dict):
raise TypeError("Each element in params must be a dictionary")
self._validate_param_keys_values(item)
def _validate_param_keys_values(self, param):
"""Validates the keys and values of a parameter dictionary.
Args:
param (dict): The parameter dictionary to be validated.
Raises:
TypeError: If the keys are not instances of ParamElement or
if the values are not valid types.
"""
for key, value in param.items():
if not isinstance(key, ParamElement):
raise TypeError(f"Keys must be ParamElement instances, found {type(key)}")
self._validate_value_type(param, value, key)
def _validate_value_type(self, param, value, key):
if not isinstance(value, (list, float, int, str)) and not hasattr(value, "__iter__"):
raise TypeError(f"Invalid type for parameter {key}: {type(value)}")
if not isinstance(value, list):
param[key] = [value] if isinstance(value, (float, int, str)) else list(value)
def compute_skip_factors(self, skip_params):
result = [1] * len(skip_params)
for i in range(len(skip_params) - 2, -1, -1):
result[i] = skip_params[i + 1] * result[i + 1]
if result:
result[-1] = 0
return result
def _extract_param(self):
"""Extracts and organizes parameters from the input parameter list.
Returns:
tuple: A tuple containing the organized parameters and their values.
"""
self._reset_extraction_variables()
skip_params = []
self.count_dep_params = 0
for parameters_dict in self.params:
if len(parameters_dict.keys()) > 1:
skip_params.append(len(next(iter(parameters_dict.values()))))
self.skip_params = self.compute_skip_factors(skip_params)
for parameters_dict in self.params:
self._process_parameter_dict(parameters_dict)
def _reset_extraction_variables(self):
"""Reset or initialize variables for a new extraction."""
self.ind_param_values, self.ind_par = [], []
self.dep_param_dependency, self.dep_value_dependency, self.dep_par = {}, [], []
def _validate_dependency_length(self, parameters_dict, independent_key, dependent_key):
"""Ensure dependent parameters match the length of their independent counterparts."""
if len(parameters_dict[dependent_key]) != len(parameters_dict[independent_key]):
raise ValueError(f"Dependent parameter lengths do not match for {dependent_key}.")
def _get_dependency_indices(self):
"""Get indices of independent parameters that dependent parameters rely on."""
return [self.ind_par.index(dep) for dep in self.dep_param_dependency.values()]
def _check_if_enabled(self, param):
"""Check if a parameter is enabled
Args:
param (RML object): an parameter to simulate
Returns:
(bool): True if the parameter is enabled, False otherwise
"""
return param.enabled == "T"
def _enable_param(self, param):
"""Set :code:`enabled='T'` and :code:`auto='F'` in a beamline object
Args:
param (RML object): beamline object
"""
if not self._check_if_enabled(param):
param.enabled = "T"
try:
param.auto = "F"
except AttributeError:
pass
def _write_value_to_param(self, param, value):
"""Write a value to a parameter.
Additionally it makes sure that enable is T
and auto is F
Args:
param (RML object): beamline object
value (str,int,float): the value to set the beamline object to
"""
self._enable_param(param)
if not isinstance(value, str):
value = str(value)
param.cdata = value
def _calc_number_sim(self):
"""Calculates the total number of simulations based on the provided parameters.
Returns:
int: The total number of simulations.
"""
from functools import reduce
from operator import mul
sim_per_round = reduce(mul, (len(value_list) for _, value_list in self.ind_param_values), 1)
return sim_per_round
def _process_parameter_dict(self, parameters_dict):
"""Process each dictionary of parameters."""
keys = list(parameters_dict.keys())
items = list(parameters_dict.items())
# First key is always considered independent
independent_param = keys[0]
independent_values = items[0][1]
self.ind_par.append(independent_param)
self.ind_param_values.append((independent_param, independent_values))
# If no dependent params, nothing else to do
if len(keys) == 1:
return
# Register dependent param relationships
for dep_param in keys[1:]:
self.dep_param_dependency[dep_param] = independent_param
if dep_param not in self.dep_par:
self.dep_par.append(dep_param)
# Create list of ring buffer
num_values = len(independent_values)
for dep_param in keys[1:]:
self.dep_value_dependency.append(
RingBuffer(skip=self.skip_params[self.count_dep_params])
)
index = len(self.dep_value_dependency) - 1
for i in range(num_values):
dep_value = parameters_dict[dep_param][i]
self.dep_value_dependency[index].add(dep_value)
self.count_dep_params += 1
[docs]
def simulation_parameters_generator(self):
"""Generates a dictionary of parameters for each simulation
based on the input parameter list.
Yields:
dict: A dictionary of parameters for a single simulation.
"""
# Unpack independent parameters and their value lists
keys, value_lists = zip(*self.ind_param_values, strict=False)
# Generate all combinations of independent values
self._calc_number_sim()
# Loop through each simulation index
for values_combination in itertools.product(*value_lists):
# Start with independent parameters
simulation_params = dict(zip(keys, values_combination, strict=False))
if self.dep_par:
for value_index, dep_par in enumerate(self.dep_par):
simulation_params[dep_par] = self.dep_value_dependency[value_index].next()
yield simulation_params
################################################################
[docs]
class Simulate:
"""A class that takes care of performing the simulations with RAY-UI
Args:
rml (RMLFile/string, optional): string pointing to an rml file with
the beamline template, or an RMLFile
class object. Defaults to None.
hide (bool, optional): force hiding of GUI leftovers, xvfb needs
to be installed. Defaults to False.
ray_path (str, optional): the path to the RAY-UI installation folder.
If None, the program will look for RAY-UI in
the standard installation paths.
"""
def __init__(self, rml=None, hide=False, ray_path=None, **kwargs) -> None:
"""Initialize the class with a rml file
Args:
rml (RMLFile/string, optional): string pointing to an rml file with
the beamline template, or an RMLFile
class object. Defaults to None.
hide (bool, optional): force hiding of GUI leftovers, xvfb needs
to be installed. Defaults to False.
ray_path (str, optional): the path to the RAY-UI installation folder.
If None, the program will look for RAY-UI in
the standard installation paths.
Raises:
Exception: If the rml file is not defined an exception is raised
"""
if rml is not None:
if isinstance(rml, RMLFile):
self._rml = rml
else: # assume that parameter is the file name as required for RMLFile
self._rml = RMLFile(None, template=rml)
else:
raise Exception("rml file must be defined")
self._rml = (
rml if isinstance(rml, RMLFile) else RMLFile(None, template=rml) if rml else None
)
self.path = None # Path for simulation execution
self.prefix = "RAYPy_Simulation" # Simulation prefix
self._hide = hide # Hide GUI leftovers
self.analyze = True # Enable RAY-UI analysis
self._repeat = 1 # Number of simulation repeats
self.raypyng_analysis = False # Enable RAYPyNG analysis
self.ray_path = ray_path # RAY-UI installation path
self.overwrite_rml = True # Overwrite RML files
self._sim_folder = None # Simulation folder name
self.batch_size = 50 # 1e6
self._simulation_timeout = 60
self._batch_number = None
self._simulation_name = None # Custom simulation name
self._exports = [] # Files to export after simulation
self._exports_list = [] # Processed list of exports
self._exported_obj_names_list = [] # List containing the names of the objects to export
self._undulator_table = None # holder for undulator table pandas dataframe
self._efficiency = None # holder for efficiency pandas dataframe
self.sp = None # SimulationParams instance
self.sim_list_path = [] # Paths to RML files
self.sim_path = None # Simulation directory path
self.durations = [] # Durations of simulations
self.remove_rawrays = False # remove or not the rawrays files
self.total_duration = None # Total duration of all simulations
self.completed_simulations = None # Count of completed simulations
self._possible_exports = [
"AnglePhiDistribution", # possible exports when RAY-UI analysis is active
"AnglePsiDistribution",
"BeamPropertiesPlotSnapshot",
"EnergyDistribution",
"FootprintAbsorbedRays",
"FootprintAllRays",
"FootprintOutgoingRays",
"FootprintPlotSnapshot",
"FootprintWastedRays",
"IntensityPlotSnapshot",
"IntensityX",
"IntensityYZ",
"PathlengthDistribution",
"RawRaysBeam",
"RawRaysIncoming",
"RawRaysOutgoing",
"ScalarBeamProperties",
"ScalarElementProperties",
]
self._possible_exports_without_analysis = [
"RawRaysIncoming", # possible exports when RAY-UI analysis is not active
"RawRaysOutgoing",
]
@property
def possible_exports(self):
"""A list of the files that can be exported by RAY-UI
Returns:
list: list of the names of the possible exports for RAY-UI
"""
return self._possible_exports
@property
def possible_exports_without_analysis(self):
"""A list of the files that can be exported by RAY-UI when the
analysis option is turned off
Returns:
list: list of the names of the possible exports for RAY-UI when analysis is off
"""
return self._possible_exports_without_analysis
@property
def rml(self):
"""RMLFile object instantiated in init"""
return self._rml
@property
def simulation_name(self):
"""A string to append to the folder where the simulations will be executed."""
return self._simulation_name
@simulation_name.setter
def simulation_name(self, value):
self._simulation_name = value
self._sim_folder = self.prefix + "_" + self._simulation_name
@property
def analyze(self):
"""Turn on or off the RAY-UI analysis of the results.
The analysis of the results takes time, so turn it on only if needed
Returns:
bool: True: analysis on, False: analysis off
"""
return self._analyze
@analyze.setter
def analyze(self, value):
if not isinstance(value, bool):
raise ValueError("Only bool are allowed")
self._analyze = value
@property
def raypyng_analysis(self):
"""Turn on or off the RAYPyNG analysis of the results.
Returns:
bool: True: analysis on, False: analysis off
"""
return self._raypyng_analysis
@raypyng_analysis.setter
def raypyng_analysis(self, value):
if not isinstance(value, bool):
raise ValueError("Only bool are allowed")
self._raypyng_analysis = value
@property
def repeat(self):
"""The simulations can be repeated an arbitrary number of times
If the statitcs are not good enough using 2 millions of rays is suggested
to repeat them instead of increasing the number of rays
Returns:
int: the number of repetition of the simulations, by default is 1
"""
return self._repeat
@repeat.setter
def repeat(self, value):
if not isinstance(value, int):
raise ValueError("Only int are allowed")
self._repeat = value
@property
def path(self):
"""The path where to execute the simlations
Returns:
string: by default the path is the current path from which
the program is executed
"""
return self._path
@path.setter
def path(self, value):
if value is None:
value = os.getcwd()
if not isinstance(value, str):
raise ValueError("Only str are allowed")
if not os.path.exists(value):
raise ValueError("The path does not exist")
self._path = value
@property
def prefix(self):
return self._prefix
@prefix.setter
def prefix(self, value):
if not isinstance(value, str):
raise ValueError("Only str are allowed")
self._prefix = value
@property
def efficiency(self):
"""The parameters to scan, as a list of dictionaries.
For each dictionary the keys are the parameters elements
of the beamline, and the values are the
values to be assigned.
"""
return self._efficiency
@efficiency.setter
def efficiency(self, value):
self._validate_efficiency(value)
self._efficiency = value
def _validate_efficiency(self, value):
# Check if the value is an instance of pandas DataFrame
if not isinstance(value, pd.DataFrame):
raise TypeError("The efficiency must be a pandas DataFrame.")
# Check that the column names are "Efficiency" and 'Energy[eV]'
required_columns = {"Efficiency", "Energy[eV]"}
if not required_columns.issubset(value.columns):
raise ValueError(
f"The DataFrame must contain the following \
columns: {required_columns}"
)
@property
def undulator_table(self):
"""The undulator table, as a pandas DataFrame."""
return self._undulator_table
@undulator_table.setter
def undulator_table(self, value):
self._validate_undulator_table(value)
self._undulator_table = value
def _validate_undulator_table(self, value):
# check that source is not Dipole or Undulator File
for oe in self._rml.beamline.children():
if hasattr(oe, "numberRays"):
if oe["type"] == "Dipole" or oe["type"] == "Undulator File":
raise Exception(
f'The undulator table can not be used with source type \
"Dipole" and "Undulator File", the source type in the rml \
file is {oe["type"]}'
)
# Check if the value is an instance of pandas DataFrame
if not isinstance(value, pd.DataFrame):
raise TypeError("The undulator_table must be a pandas DataFrame.")
# Check if the number of columns is even
num_columns = value.shape[1] # shape[1] returns the number of columns
if num_columns % 2 != 0:
raise ValueError("The DataFrame must have an even number of columns.")
# Validate column names according to specified patterns
for i in range(0, num_columns, 2): # Iterate over even indices representing odd columns
expected_energy_name = f"Energy{2 * (i // 2 + 1) - 1}[eV]"
expected_harmonic_name = f"Photons{2 * (i // 2 + 1) - 1}"
if value.columns[i + 1] != expected_harmonic_name:
raise ValueError(
f"Expected column {i} to be named '{expected_harmonic_name}',\
but got '{value.columns[i]}'."
)
if value.columns[i] != expected_energy_name:
raise ValueError(
f"Expected column {i + 1} to be named '{expected_energy_name}', \
but got '{value.columns[i + 1]}'."
)
@property
def exports(self):
"""Get the list of files to export after the simulation is complete."""
return self._exports
@exports.setter
def exports(self, value):
"""
Validates and sets the exports list for simulation results.
Args:
value (list): A list of dictionaries specifying the exports configuration.
Raises:
TypeError: If the input is not a list or the contents of the list are not as expected.
"""
self._validate_export_list(value)
self._exports = value
self._exports_list = self._generate_exports_list(value)
self._exported_obj_names_list = self._generate_exported_obj_names_list(value)
self._exported_file_type = collect_unique_values(self._exports)
def _validate_export_list(self, export_list):
"""
Validates that the provided export list is properly formatted.
Args:
export_list (list): The exports list to validate.
Raises:
TypeError: If the export list is not a list or contains non-dictionary items.
"""
if not isinstance(export_list, list):
raise TypeError("The exports must be a list.")
for export_dict in export_list:
self._validate_export_dict(export_dict)
def _validate_export_dict(self, export_dict):
"""
Validates that each dictionary in the export list is correctly structured.
Args:
export_dict (dict): A dictionary representing an export configuration.
Raises:
TypeError: If the export configuration is not a dictionary or has
incorrect key/value types.
"""
if not isinstance(export_dict, dict):
raise TypeError("Each export configuration must be a dictionary.")
for object_element, export_files in export_dict.items():
self._validate_export_entry(object_element, export_files)
def _validate_export_entry(self, object_element, export_files):
"""
Validates each export entry within the export configuration dictionary.
Args:
object_element (ObjectElement): The object element associated with the export.
export_files (str or list): The file or files to be exported for the object element.
Raises:
TypeError: If the keys are not instances of ObjectElement or if
export_files are not correctly specified.
"""
if not isinstance(object_element, ObjectElement):
raise TypeError("Keys of the export dictionary must be instances of ObjectElement.")
if not isinstance(export_files, list):
raise ValueError("The exported files should be written as a list")
if not all(isinstance(file, str) for file in export_files):
raise TypeError("Export files must be specified as a list of strings.")
self._validate_export_files_existence(export_files)
def _validate_export_files_existence(self, export_files):
"""
Validates that the specified export files are eligible for export based on current settings.
Args:
export_files (list): A list of filenames to be exported.
Raises:
ValueError: If any of the specified files cannot be exported based
on the current configuration.
"""
possible_exports = (
self.possible_exports if self.analyze else self.possible_exports_without_analysis
)
for file in export_files:
if file not in possible_exports:
raise ValueError(f"Cannot export {file}. Check spelling or analysis settings.")
def _generate_exports_list(self, export_list):
"""
Generates a comprehensive list of exports based on the provided export configurations.
Args:
export_list (list): The validated list of export configurations.
Returns:
list: A list of tuples, each containing the name of an object element
and a filename to export.
"""
exports_list = []
for export_dict in export_list:
for object_element, export_files in export_dict.items():
if isinstance(export_files, str):
export_files = [export_files] # Ensure it's a list
for file in export_files:
exports_list.append((object_element.attributes().original()["name"], file))
return exports_list
def _generate_exported_obj_names_list(self, export_list):
"""
Generates a list with the names of the objects that will be exported.
Args:
export_list (list): The validated list of export configurations.
Returns:
list: A list of str representing the names of the objects to export.
"""
self._exported_obj_names_list = []
for export_dict in export_list:
for object_element, export_files in export_dict.items():
if isinstance(export_files, str):
export_files = [export_files] # Ensure it's a list
for _file in export_files:
self._exported_obj_names_list.append(
object_element.attributes().original()["name"]
)
return self._exported_obj_names_list
@property
def params(self):
"""The parameters to scan, as a list of dictionaries.
For each dictionary the keys are the parameters elements of the beamline,
and the values are the values to be assigned.
"""
if hasattr(self.sp, "params"):
value = self.sp.params
else:
value = []
return value
@params.setter
def params(self, value):
if not isinstance(value, SimulationParams):
self.sp = SimulationParams(self.rml)
self.sp.params = value
else:
self.sp = value
def _save_parameters_to_file(self, dir):
"""Save user input parameters to file.
It takes the values from the SimulationParams class
Args:
dir (str): the folder where to save the parameters
"""
for i, p in enumerate(self.sp.ind_par):
filename = str(p.get_full_path().removeprefix("lab.beamline."))
filename = "input_param_" + filename.replace(".", "_")
filename += ".dat"
filepath = os.path.join(dir, filename)
with open(filepath, "w") as f:
values = self.sp.ind_param_values[i]
for item in values[1]:
f.write(f"{item}\n")
for i, p in enumerate(self.sp.dep_par):
filename = str(p.get_full_path().removeprefix("lab.beamline."))
filename = "input_param_" + filename.replace(".", "_")
filename += ".dat"
filepath = os.path.join(dir, filename)
with open(filepath, "w") as f:
dependency = self.sp.dep_value_dependency[i]
for values in dependency._data:
f.write(f"{values}\n")
[docs]
def rml_list(self, recipe=None, overwrite_rml=True):
"""
Creates the folder structure and RML files needed for simulation.
This method organizes simulation parameters into RML files and prepares
the directory structure for simulations, which is useful for
pre-simulation checks and manual adjustments.
Args:
recipe (SimulationRecipe, optional): Recipe to use for setting
up the simulation. Defaults to None.
overwrite_rml (bool, optional): If True, existing RML files will be overwritten.
Defaults to True.
"""
self.overwrite_rml = overwrite_rml
self._setup_simulation_environment(recipe)
self._initialize_simulation_directory()
if overwrite_rml:
recap_csv_path = os.path.join(self.sim_path, "looper.csv")
if os.path.exists(recap_csv_path):
os.remove(recap_csv_path)
recap_txt_path = os.path.join(self.sim_path, "looper.txt")
if os.path.exists(recap_txt_path):
os.remove(recap_txt_path)
for round_number in range(self.repeat):
sim_number = 0
for params in self.sp.simulation_parameters_generator():
_ = self._generate_rml_file(sim_number, round_number, params)
if round_number == 0:
self._update_simulation_recap_files(params, sim_number)
sim_number += 1
self._save_parameters_to_file(self.sim_path)
def _initialize_simulation_directory(self):
"""Initializes the directory structure for simulations."""
self.sim_list_path = []
self.sim_path = os.path.join(self.path, f"{self.prefix}_{self.simulation_name}")
if not os.path.exists(self.sim_path):
os.makedirs(self.sim_path)
for round_n in range(self.repeat):
round_folder_path = os.path.join(self.sim_path, "round_" + str(round_n))
if not os.path.exists(round_folder_path):
os.makedirs(round_folder_path)
def _generate_rml_file(self, sim_number, round_n, param_set):
"""
Generates an RML file for a given simulation setup.
Args:
sim_folder (str): The folder where the RML file should be saved.
sim_number (int): The simulation number within the current round.
param_set (dict): The parameter set for the current simulation.
Returns:
str: The path to the generated RML file.
"""
round_folder = "round_" + str(round_n)
rml_path = os.path.join(
self._sim_folder, round_folder, f"{sim_number}_{self.simulation_name}.rml"
)
for param, value in param_set.items():
self.sp._write_value_to_param(param, value)
if self.overwrite_rml or not os.path.exists(rml_path):
self.rml.write(rml_path)
self.sim_list_path.append(rml_path)
return rml_path
def _update_simulation_recap_files(self, params, simulation_number):
"""Updates or creates recap CSV and TXT files summarizing the simulations.
Args:
params (list): The parameters for the current simulation batch.
simulation_number (int): The number of the simulation being processed.
"""
# Paths for the recap files in the main simulation directory
recap_csv_path = os.path.join(self.sim_path, "looper.csv")
recap_txt_path = os.path.join(self.sim_path, "looper.txt")
# Check if the files exist to determine if headers need to be written
csv_file_exists = os.path.exists(recap_csv_path)
txt_file_exists = os.path.exists(recap_txt_path)
# Prepare data for CSV and TXT files
header = ["Simulation Number"] + [
f"{param._parent['name']}.{param['id']}" for param in params
]
row = [str(simulation_number)] + [param for param in params.values()]
# Update CSV file
with open(recap_csv_path, "a", newline="") as csvfile:
writer = csv.writer(csvfile)
if not csv_file_exists:
writer.writerow(header)
writer.writerow(row)
# Prepare and update TXT file with nice formatting
with open(recap_txt_path, "a") as txtfile:
if not txt_file_exists:
# Write header with nice formatting
txtfile.write(" ".join(header) + "\n")
# Determine the maximum width for each column
column_widths = [
max(len(str(simulation_number)), max(len(h), max(len(str(r)) for r in row)))
for h, r in zip(header, row, strict=False)
]
# Format row with aligned columns
formatted_row = " ".join(
str(r).ljust(w) for r, w in zip(row, column_widths, strict=False)
)
# Write the formatted row to the TXT file
txtfile.write(formatted_row + "\n")
def _is_simulation_missing(self, sim_index, repeat):
"""
Checks if a simulation is missing based on the existence of its export files.
Args:
simulation_index (int): The index of the simulation in the simulation list.
Returns:
bool: True if the simulation is missing any export files, False otherwise.
"""
round_folder = "round_" + str(repeat)
folder = os.path.join(self.sim_path, round_folder)
for export_config in self._exports_list:
if self.raypyng_analysis:
export_file = os.path.join(
folder, f"{sim_index}_{export_config[0]}_analyzed_rays_{export_config[1]}.dat"
)
else:
export_file = os.path.join(
folder, f"{sim_index}_{export_config[0]}-{export_config[1]}.csv"
)
if not os.path.exists(export_file):
return True # Missing at least one export file
return False
def _missing_simulations_for_round(self, round_number):
"""Return missing simulation indices for a round using a single directory scan."""
round_folder = os.path.join(self.sim_path, "round_" + str(round_number))
expected_ids = set(range(self.sp._calc_number_sim()))
if self.raypyng_analysis:
expected_suffixes = [
f"_{export_config[0]}_analyzed_rays_{export_config[1]}.dat"
for export_config in self._exports_list
]
else:
expected_suffixes = [
f"_{export_config[0]}-{export_config[1]}.csv" for export_config in self._exports_list
]
found_per_export = {suffix: set() for suffix in expected_suffixes}
self.logger.info(f"Scanning round {round_number} outputs in {round_folder}")
with os.scandir(round_folder) as entries:
for entry in entries:
if not entry.is_file():
continue
filename = entry.name
for suffix in expected_suffixes:
if not filename.endswith(suffix):
continue
sim_index = filename[: -len(suffix)].split("_", 1)[0]
if sim_index.isdigit():
sim_index = int(sim_index)
if sim_index in expected_ids:
found_per_export[suffix].add(sim_index)
break
completed_ids = expected_ids.copy()
for suffix, found_ids in found_per_export.items():
self.logger.info(
f"Round {round_number}: found {len(found_ids)}/{len(expected_ids)} files for {suffix}"
)
completed_ids &= found_ids
return sorted(expected_ids - completed_ids)
def _make_exports_list(self, sim_number, round_n):
exports_list = []
path = os.path.join(self.sim_path, "round_" + str(round_n))
for d in self.exports:
for exp_oe in d.keys():
for exp in d[exp_oe]:
temp_exp_list = []
temp_exp_list.append(exp_oe["name"])
temp_exp_list.append(exp)
temp_exp_list.append(path)
temp_exp_list.append(str(sim_number) + "_")
exports_list.append(temp_exp_list)
return exports_list
def _format_eta(self, seconds):
"""Format seconds into days, hours, and minutes."""
days, seconds = divmod(int(seconds), 86400)
hours, seconds = divmod(int(seconds), 3600)
minutes, seconds = divmod(int(seconds), 60)
if days > 0:
return f"{days} day(s), {int(hours):02d}h:{int(minutes):02d}min"
else:
return f"{int(hours):02d}h:{int(minutes):02d}min"
def _initialize_progress_bar(self, total_simulations, description="Simulations Completed"):
bar_format = "{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} {postfix}]"
progress_bar = tqdm(total=total_simulations, bar_format=bar_format, desc=description)
return progress_bar
def _print_simulations_info(self):
total_simulations = self.sp._calc_number_sim() * self.repeat
# Prepare data for printing
data = [
["RML File ", os.path.basename(self._rml._template)],
["Simulation Name", self._sim_folder],
["Independent Parameters", len(self.sp.ind_par)],
["Dependent Parameters", len(self.sp.dep_par)],
["Rounds of Simulations", self._repeat],
["Total Number of Simulations", total_simulations],
]
# Determine column widths by the longest item in each column
col_widths = [max(len(str(item)) for item in col) for col in zip(*data, strict=False)]
print()
print("Simulation Info")
# Print the data rows
print("-" * (sum(col_widths) + 3)) # for the separator and spaces
for row in data[0:]:
print(f"{row[0]:<{col_widths[0]}} | {row[1]:>{col_widths[1]}}")
print("-" * (sum(col_widths) + 3)) # for the separator and spaces
print()
def _init_logging(self):
"""Initializes logging for the simulation."""
log_filename = os.path.join(self.sim_path, "simulation.log")
logging.basicConfig(
filename=log_filename,
filemode="a",
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
)
self.logger = logging.getLogger(__name__)
self.logger.info(f"Simulation started, using {self._workers} workers")
def _resolve_multiprocessing_workers(self, multiprocessing):
if isinstance(multiprocessing, str):
mode = multiprocessing.lower()
if mode not in {"auto", "max"}:
raise ValueError(
"The 'multiprocessing' argument must be an integer greater than 0, "
"or one of: 'auto', 'max'."
)
cpu_count = os.cpu_count() or 1
available_ram_gb = max(1, int(psutil.virtual_memory().available / (1024**3)))
if mode == "auto":
return max(1, min(cpu_count, available_ram_gb - 2))
return max(1, min(cpu_count, available_ram_gb))
if not isinstance(multiprocessing, int) or multiprocessing < 1:
raise ValueError(
"The 'multiprocessing' argument must be an integer greater than 0, "
"or one of: 'auto', 'max'."
)
return multiprocessing
[docs]
def run(
self,
recipe=None,
multiprocessing=1,
force=False,
overwrite_rml=True,
force_exit=False,
remove_rawrays=False,
remove_round_folders=False,
):
"""
Execute simulations with optional recipe, multiprocessing, and file management options.
This method orchestrates the setup and execution of simulations, managing multiprocessing,
file generation, and progress tracking.
Args:
recipe (SimulationRecipe, optional): Recipe for simulation setup.
Defaults to None.
multiprocessing (int or str, optional): Number of processes for parallel execution,
or 'auto'/'max' to derive it from available
CPUs and RAM. Defaults to 1.
force (bool, optional): Force re-execution of simulations.
Defaults to False.
overwrite_rml (bool, optional): Overwrite existing RML files. Defaults to True.
force_exit (bool, optional): emergency fallback that calls os._exit when the
simulations are complete. Defaults to False.
remove_rawrays (bool, optional): removes RawRaysIncoming and RawRaysOutgoing files,
if present.
remove_round_folders (bool, optional): remove the round folders after the simulations
are done.
"""
multiprocessing = self._resolve_multiprocessing_workers(multiprocessing)
if remove_rawrays and not self.raypyng_analysis:
raise Exception(
"Setting remove_rawrays to True is allowed only raypyng_analysis is set to True"
)
if remove_rawrays:
self.remove_rawrays = remove_rawrays
# test that we car run RAY-UI
runner = RayUIRunner(ray_path=self.ray_path, hide=True)
runner.kill()
self._batch_number = 0
self._workers = multiprocessing
self.batch_size = int(self._workers) * 5
self._prepare_simulation_environment(recipe, overwrite_rml)
self._init_logging()
total_simulations = self.sp._calc_number_sim() * self.repeat
self.simulations_checked = False
self._executor_has_unfinished_futures = False
pbar = self._initialize_progress_bar(total_simulations, description="Simulations Completed")
try:
self._execute_simulations(multiprocessing, force, total_simulations, pbar)
self.logger.info("Simulation completed successfully.")
except KeyboardInterrupt:
self.logger.error("Simulation Interrupted.", exc_info=True)
self.cleanup_child_processes()
raise
except Exception:
self.logger.error("Simulation failed.", exc_info=True)
self.cleanup_child_processes()
raise
finally:
pbar.close()
if self.raypyng_analysis:
self.logger.info("Starting cleanup")
pp = PostProcess()
pp.cleanup(
self.sim_path,
self.repeat,
self._exported_obj_names_list,
exported_file_type=self._exported_file_type,
)
self.logger.info("Done with the cleanup")
if self.analyze is False and self.raypyng_analysis is True:
self.logger.info("Create Pandas Recap Files")
self._create_results_dataframe(self._exported_file_type)
self._write_analysis_metadata_file()
if remove_round_folders:
self._remove_round_folders()
self.logger.info("End of the Simulations")
if force_exit:
self.cleanup_child_processes()
os._exit(0)
[docs]
def cleanup_child_processes(self):
"""Clean up all child processes initiated by this process
and any specific Xvfb processes."""
current_process = psutil.Process()
children = current_process.children(recursive=True)
announced_cleanup = False
# First, terminate general child processes
for child in children:
try:
if not announced_cleanup:
print("Terminating child processes...")
announced_cleanup = True
child.terminate()
child.wait(timeout=3) # Give it some time to gracefully shut down
except psutil.NoSuchProcess:
continue
# Now target specific Xvfb processes with display numbers higher than 3000
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
if "Xvfb" in proc.info["name"]:
cmdline = proc.info["cmdline"]
if len(cmdline) > 1:
display_part = cmdline[1] # The part of the cmdline where ':XXXX' is expected
if display_part.startswith(":") and display_part[1:].isdigit():
display_number = int(display_part[1:])
if display_number > 3000:
# print(f"Killing Xvfb process on display {display_number}
# with PID {proc.pid}")
os.kill(proc.pid, signal.SIGTERM) # Terminate the Xvfb process
try:
proc.wait(timeout=3) # Wait for the process to terminate
except psutil.TimeoutExpired:
proc.kill() # Force kill if not terminated after timeout
def _remove_round_folders(self):
for round_n in range(self._repeat):
round_folder_path = os.path.join(self.sim_path, "round_" + str(round_n))
if os.path.exists(round_folder_path):
shutil.rmtree(round_folder_path)
def _create_results_dataframe(self, exported_file_type):
looper_path = os.path.join(self.sim_path, "looper.csv")
looper = pd.read_csv(looper_path)
for export in self._exported_obj_names_list:
for in_out in exported_file_type:
oe_path = os.path.join(self.sim_path, f"{export}_{in_out}.csv")
# Reading the data into a DataFrame, specify no comment
# handling and read headers normally
res = pd.read_csv(oe_path, comment=None, header=0, index_col=False)
# Manually remove the '#' from the first column name
res.columns = [col.replace("#", "").strip() for col in res.columns]
res_combined = pd.concat([looper, res], axis=1)
res_combined = res_combined.loc[:, ~res_combined.columns.str.contains("^Unnamed")]
res_combined.to_csv(os.path.join(self.sim_path, f"{export}_{in_out}.csv"))
def _unit_for_output_column(self, column_name):
analysis_units = {
"Simulation Number": "index",
"SourcePhotonFlux": "photons/s",
"SourceBandwidth": "eV",
"NumberRaysSurvived": "count",
"PercentageRaysSurvived": "%",
"PhotonEnergy": "eV",
"Bandwidth": "eV",
"HorizontalFocusFWHM": "mm",
"VerticalFocusFWHM": "mm",
"HorizontalDivergenceFWHM": "deg",
"VerticalDivergenceFWHM": "deg",
"HorizontalCenter": "mm",
"VerticalCenter": "mm",
"PhotonFlux": "photons/s",
"EnergyPerMilPerBw": None,
"FluxPerMilPerBwPerc": None,
"FluxPerMilPerBwAbs": None,
"AXUVCurrentAmp": "A",
"GaAsPCurrentAmp": "A",
}
parameter_units = {
"photonEnergy": "eV",
"numberRays": "count",
"totalHeight": "mm",
"cFactor": None,
}
if column_name.startswith("Unnamed:"):
return None
if column_name in analysis_units:
return analysis_units[column_name]
if "." in column_name:
param_id = column_name.split(".")[-1]
return parameter_units.get(param_id)
return None
def _write_analysis_metadata_file(self):
sample_columns = None
for export in self._exported_obj_names_list:
for in_out in self._exported_file_type:
output_path = os.path.join(self.sim_path, f"{export}_{in_out}.csv")
if os.path.exists(output_path):
sample_columns = list(pd.read_csv(output_path, nrows=0).columns)
break
if sample_columns is not None:
break
if sample_columns is None:
return
first_analysis_column = "SourcePhotonFlux"
if first_analysis_column in sample_columns:
sample_columns = sample_columns[sample_columns.index(first_analysis_column) :]
metadata = {
"applies_to": "all raypyng analysis output files in this folder",
"columns": [
{"name": column_name, "unit": self._unit_for_output_column(column_name)}
for column_name in sample_columns
],
}
metadata_path = os.path.join(self.sim_path, "raypyng_analysis_metadata.json")
with open(metadata_path, "w", encoding="utf-8") as metadata_file:
json.dump(metadata, metadata_file, indent=2)
def _remove_recap_files(
self,
):
# Filter files ending with ".csv" or ".data"
files_to_remove = ["looper.csv", "looper.txt"]
# Remove filtered files
for file in files_to_remove:
to_be_removed = os.path.join(self.sim_path, file)
if os.path.exists(to_be_removed):
os.remove(to_be_removed)
def _prepare_simulation_environment(self, recipe, overwrite_rml):
"""
Prepares the simulation environment based on a given recipe and file management options.
Args:
recipe (SimulationRecipe, optional): Recipe for simulation setup. Defaults to None.
overwrite_rml (bool, optional): Overwrite existing RML files. Defaults to True.
"""
self.overwrite_rml = overwrite_rml
self._setup_simulation_environment(recipe)
self._initialize_simulation_directory()
self._save_parameters_to_file(self.sim_path)
self._remove_recap_files()
self._print_simulations_info()
def _execute_simulations(
self, multiprocessing, force, total_simulations, pbar, update_reacap_files=True
):
"""
Executes the simulations in batches with multiprocessing support.
Args:
multiprocessing (int): Number of processes for parallel execution.
force (bool): Force re-execution of simulations.
total_simulations (int): Total number of simulations to be executed.
pbar (tqdm): Progress bar object for tracking simulation progress.
"""
simulations_durations = [] # Track durations of all simulations for average calculation
executor = None
rerun_missing = False
rerun_pbar = None
try:
executor = ProcessPoolExecutor(max_workers=multiprocessing)
simulation_params_batch = []
batch_length = 0
remaining_simulations = total_simulations
for round_number in range(self.repeat):
self.logger.info(f"Start round {round_number}")
for sim_number, params in enumerate(self.sp.simulation_parameters_generator()):
if round_number == 0 and update_reacap_files is True:
self._update_simulation_recap_files(params, sim_number)
if self._is_simulation_missing(sim_number, round_number) or force:
self._prepare_and_submit_simulation(
params,
sim_number,
round_number,
simulation_params_batch,
executor,
force,
)
else:
pbar.update(1) # If not missing or forced, update progress bar directly
batch_length += 1
remaining_simulations -= 1
if batch_length == self.batch_size or remaining_simulations == 0:
self._wait_for_simulation_batch(
simulations_durations, simulation_params_batch, executor, pbar
)
self.logger.info(f"Waiting For batch, {self.batch_size} simulations to go")
batch_length = 0
if remaining_simulations == 0:
self.logger.info(
f"Remaning Simulations {remaining_simulations}, \
stop the simulations loop"
)
rerun_missing, rerun_pbar = self._final_check_on_simulations_and_shutdown(
pbar
)
break
except Exception as e:
traceback.print_exc()
self.logger.info(f"Error in _execute simulations: {e}")
raise
finally:
if executor is not None:
shutdown_wait = not rerun_missing and not self._executor_has_unfinished_futures
force_cancel = rerun_missing or self._executor_has_unfinished_futures
executor.shutdown(wait=shutdown_wait, cancel_futures=force_cancel)
self.logger.info(
"Executor shutdown completed%s.",
" without waiting" if not shutdown_wait else "",
)
if force_cancel:
self.cleanup_child_processes()
if rerun_missing:
self._execute_simulations(
self._workers,
False,
total_simulations,
rerun_pbar,
update_reacap_files=False,
)
def _final_check_on_simulations_and_shutdown(self, old_pbar):
# check that all simulatins are completed:
self.logger.info(
"Checking that all simulations are completed before stopping the ProcessPoolExecutor"
)
missing_sim = []
for round_number in range(self.repeat):
round_missing = self._missing_simulations_for_round(round_number)
self.logger.info(
f"Round {round_number}: final check found {len(round_missing)} missing simulations"
)
for sim_number in round_missing:
self.logger.info(f"This simulation is missing: round {round_number}, number {sim_number}")
missing_sim.append({"round": round_number, "sim_number": sim_number})
if len(missing_sim) >= 1 and self.simulations_checked is False:
self.logger.info("Finish missing simulations")
total_simulations = self.sp._calc_number_sim() * self.repeat
old_pbar.close()
pbar = self._initialize_progress_bar(
total_simulations, description="Checking Simulations"
)
self.simulations_checked = True
return True, pbar
return False, old_pbar
def _prepare_and_submit_simulation(
self, params, sim_number, round_number, simulation_params_batch, executor, force
):
"""
Prepares and submits a single simulation for execution.
Args:
params (dict): Parameters for the current simulation.
sim_number (int): Simulation number within the current batch.
round_number (int): Current round of simulations.
simulation_params_batch (list): Batch of simulation parameters for submission.
executor (ProcessPoolExecutor): Executor for multiprocessing.
force (bool): Force re-execution of simulations.
"""
rml_file_path = self._generate_rml_file(sim_number, round_number, params)
exp_list = self._make_exports_list(sim_number, round_number)
simulation_params = (
(
rml_file_path,
self._hide,
self.analyze,
self.raypyng_analysis,
self.ray_path,
self.remove_rawrays,
self.undulator_table,
self.efficiency,
),
exp_list,
)
simulation_params_batch.append(simulation_params)
self.logger.info(f"Prepared sim number: {sim_number}: {rml_file_path}")
def _wait_for_simulation_batch(
self, simulations_durations, simulation_params_batch, executor, pbar
):
"""
Waits for a batch of simulations to complete and updates the progress bar.
Args:
simulations_durations (list): List to track durations of completed simulations.
simulation_params_batch (list): Batch of simulation parameters that were submitted.
executor (ProcessPoolExecutor): Executor for multiprocessing.
pbar (tqdm): Progress bar object for tracking simulation progress.
"""
futures = {
executor.submit(run_rml_func, sim_params): sim_params
for sim_params in simulation_params_batch
}
completed_sim = 0
remaining_simulations = self.batch_size
self.logger.info(
f"Waiting for batch number: {self._batch_number}, timeout: {self._simulation_timeout}s"
)
self._batch_number += 1
try:
for future in as_completed(futures, timeout=self._simulation_timeout):
sim_duration, rml_filename = future.result()
simulations_durations.append(sim_duration)
self._simulation_timeout = (
np.mean(simulations_durations) * self.batch_size / self._workers * 1.2
)
self._update_progress_bar(simulations_durations, pbar)
completed_sim += 1
remaining_simulations -= 1
self.logger.info(
f"Completed: {completed_sim}, \
remaining: {remaining_simulations}, {rml_filename}"
)
except Exception as e:
self.logger.info(f"Exception during simulation: {e}")
unfinished_futures = [future for future in futures if not future.done()]
if unfinished_futures:
self._executor_has_unfinished_futures = True
self.logger.info(
f"Marking executor as having {len(unfinished_futures)} unfinished futures"
)
for future in unfinished_futures:
future.cancel()
try:
wait_time = self._simulation_timeout / 4
for sim in simulation_params_batch:
_, exp_list = sim
exp = exp_list[0]
round_n = int(re.findall(r"(?<=round_)\d+", exp[-2])[0])
sim_n = int(re.findall(r"\d+", exp[-1])[0])
sim_file = sim[0][0]
while self._is_simulation_missing(sim_n, round_n) and wait_time > 0:
time.sleep(5)
wait_time -= 5
self.logger.info(f"Waiting for file {sim_file}, wait_time {wait_time}")
except Exception as e:
self.logger.info(f"Exception checking simulations: {e}")
if wait_time >= 0:
self.logger.info(
f"Found all simulations of the batch,\
futures missed {remaining_simulations} simulations"
)
else:
self.logger.info(
f"Found most simulations of the batch,\
futures missed at least {remaining_simulations} simulations"
)
if len(simulations_durations) == 0:
simulations_durations.append(self._simulation_timeout)
self.logger.info("Updating progress bar")
for _i in range(remaining_simulations):
try:
self._update_progress_bar(simulations_durations, pbar)
except Exception as e:
self.logger.info(f"Exception updating progress bar: {e}")
simulation_params_batch.clear() # Reset batch for next set of simulations
self.logger.info("Batch Completed")
def _update_progress_bar(self, simulations_durations, pbar):
"""
Updates the progress bar based on completed simulations.
Args:
simulations_durations (list): List of durations for completed simulations.
pbar (tqdm): Progress bar object for tracking simulation progress.
"""
avg_duration = sum(simulations_durations) / len(simulations_durations)
last_duration = simulations_durations[-1]
remaining_simulations = pbar.total - pbar.n
eta_seconds = avg_duration * remaining_simulations / self._workers
eta_str = self._format_eta(eta_seconds)
pbar.set_postfix_str(
f"ETA: {eta_str}, Last: {last_duration:.2f}s, Avg: {avg_duration:.2f}s/it", refresh=True
)
pbar.update(1)
def _setup_simulation_environment(self, recipe):
"""
Sets up the simulation environment based on the provided recipe.
Args:
recipe (SimulationRecipe or None): Recipe to apply for the simulation setup.
Raises:
TypeError: If the recipe is not a SimulationRecipe instance or None.
"""
if recipe:
if not isinstance(recipe, SimulationRecipe):
raise TypeError("Unsupported type of the recipe!")
print("recipe")
self.params = recipe.params(self)
self.exports = recipe.exports(self)
self.simulation_name = recipe.simulation_name(self)
[docs]
def reflectivity(self, value):
"""
Switch the reflectivity of all the optical elements in the beamline on or off.
Args:
value (bool, optional): If :code:`True` the reflectivity is switched on,
if :code:`False` the reflectivity is switched off.
"""
if value:
on_off = "1"
else:
on_off = "0"
for oe in self.rml.beamline.children():
if hasattr(oe, "reflectivityType"):
oe.reflectivityType.cdata = on_off
[docs]
def slope_errors(self, slope_errors):
"""
Switch the slope errors of all the optical elements in the beamline on or off.
Args:
value (bool, optional): If `True` the slope errors are switched on,
if `False` the slope errors are switched off.
"""
if slope_errors:
on_off = "0"
else:
on_off = "1"
for oe in self.rml.beamline.children():
if hasattr(oe, "slopeError"):
oe.slopeError.cdata = on_off
# oe.slopeError.attributes()['enabled'] = enabled
[docs]
def alignment_errors(self, value):
"""
Switch the alignment errors of all the optical elements in the beamline on or off.
Args:
value (bool, optional): If `True`, the alignment errors are switched on.
If `False`, the alignment errors are switched off.
Returns:
None
"""
if value:
on_off = "0"
else:
on_off = "1"
for oe in self.rml.beamline.children():
if hasattr(oe, "alignmentError"):
oe.alignmentError.cdata = on_off
def run_rml_func(parameters):
"""
Executes a simulation for a given RML file and handles exporting of results.
Args:
parameters (tuple): A tuple containing the necessary parameters for the simulation run,
which includes the RML filename, hide flag, analyze flag,
raypyng analysis flag, and the path to the RAY-UI installation.
"""
st = time.time()
(
rml_filename,
hide,
analyze,
raypyng_analysis,
ray_path,
remove_rawrays,
undulator_table,
efficiency,
), exports = parameters
runner = RayUIRunner(ray_path=ray_path, hide=hide)
api = RayUIAPI(runner)
pp = PostProcess()
try:
runner.run()
api.load(rml_filename)
api.trace(analyze=analyze)
api.save(rml_filename)
for export_params in exports:
api.export(*export_params)
if raypyng_analysis:
for export_params in exports:
pp.postprocess_RawRays(
*export_params,
rml_filename,
suffix=export_params[1],
remove_rawrays=remove_rawrays,
undulator_table=undulator_table,
efficiency=efficiency,
)
except Exception as e:
print(f"WARNING! Got exception while processing {rml_filename}, the error was: {e}")
finally:
# Ensure resources are cleaned up properly
try:
api.quit()
except Exception as e:
print(
f"WARNING! Got exception while quitting API for {rml_filename}, the error was: {e}"
)
runner.kill()
et = time.time()
simulation_duration = et - st
return simulation_duration, rml_filename