Source code for raypyng.runner

###############################################################################
# RayUI process handling and RayUI command API

# from sre_constants import SUCCESS
import atexit
import os
import select
import signal
import subprocess
import time

import psutil

from . import config
from .errors import RayPyError, RayPyRunnerError, TimeoutError

# import sys # used for some debugging


###############################################################################
[docs] class RayUIRunner: """RayUIRunner class implements all logic to start a RayUI process, load and rml file, trace and export. Args: ray_path (str, optional): the path to the RAY-UI installation folder. Defaults to config.ray_path, that will look for the ray_path in the standard installation folders. ray_binary (_type_, optional): the binary file of RAY-UI. Defaults to "rayui.sh". background (bool, optional): activate background mode. Defaults to True. hide (bool, optional): Hide the RAY-UI graphical instances. Available only if xvfb is installed. Defaults to False. """ def __init__( self, ray_path=config.ray_path, ray_binary=config.ray_binary, background=True, hide=False ) -> None: """ Args: ray_path (str, optional): the path to the RAY-UI installation folder. Defaults to config.ray_path, that will look for the ray_path in the standard installation folders. ray_binary (_type_, optional): the binary file of RAY-UI. Defaults to "rayui.sh". background (bool, optional): activate background mode. Defaults to True. hide (bool, optional): Hide the RAY-UI graphical instances. Available only if xvfb is installed. Defaults to False. Raises: Exception: _description_ """ if ray_path is None: ray_path = self.__detect_ray_path() if ray_path is None: raise Exception("ray_path must be defined for now!") self._path = ray_path self._binary = ray_binary full_path = os.path.join(self._path, self._binary) if not os.path.isfile(full_path): raise FileNotFoundError(f"{full_path} does not exist.") elif not os.access(full_path, os.X_OK): raise PermissionError( f"{full_path} exists but is NOT executable.\n" f"To make it executable, run:\n\n" f' chmod +x "{full_path}"\n' ) self._options = "-b" if background else "" self._process = None self._verbose = False if hide: self._hide = ["xvfb-run", "--auto-servernum", "--server-num=3000"] else: self._hide = [] # internal configuration parameters self._auto_flush = True # flush on write calls self._stdout_buffer = bytearray()
[docs] def run(self): """Open one instance of RAY-UI using subprocess Raises: RayPyRunnerError: if the RAY-UI executable is not found raise an error """ if not self.isrunning: fullpath = os.path.join(self._path, self._binary) if not os.path.isfile(fullpath): raise RayPyRunnerError("Ray executable {0} is not found".format(fullpath)) cmd = [*self._hide, fullpath] if self._options: cmd.append(self._options) env = dict(os.environ) # TODO:: rethink a bit about this line self._process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, start_new_session=True, ) os.set_blocking(self._process.stdout.fileno(), False) self._stdout_buffer.clear() atexit.register(self.kill) return self
@property def isrunning(self): """Check weather a process is running and rerutn a boolean Returns: bool: returns True if the process is running, otherwise False """ if self._process is None: return False else: return self._process.poll() is None
[docs] def kill(self): """kill a RAY-UI process""" if self._process is None: return if self.isrunning: pid = self._process.pid try: os.killpg(pid, signal.SIGTERM) self._process.wait(timeout=5) except (ProcessLookupError, subprocess.TimeoutExpired): try: process = psutil.Process(pid) for proc in process.children(recursive=True): proc.kill() process.kill() except psutil.NoSuchProcess: pass try: os.killpg(pid, signal.SIGKILL) except ProcessLookupError: pass self._close_pipes() self._process = None
@property def pid(self): """Get process id of the RayUI process Returns: int: PID of the process if it running, None otherwise """ if self.isrunning: return self._process.pid else: return None def _write(self, instr: str, endline="\n"): """Write command to RayUI interface Args: instr (str): _description_ endline (str, optional): _description_. Defaults to endline character. Raises: RayPyRunnerError: _description_ """ if self.isrunning: payload = bytes(instr + endline, "utf8") self._process.stdin.write(payload) if self._auto_flush: self._process.stdin.flush() else: raise RayPyRunnerError("RayUI process is not started") def _readline(self) -> str: """Read a line from the stdout of the process and convert to a string Returns: str: line read from the input """ return self._readline_with_timeout(timeout=None) def _readline_with_timeout(self, timeout=None) -> str: """Read one line from stdout, optionally timing out if no new line arrives.""" if not self.isrunning: return None deadline = None if timeout is None else time.monotonic() + timeout stdout_fd = self._process.stdout.fileno() while True: newline_index = self._stdout_buffer.find(b"\n") if newline_index != -1: line = self._stdout_buffer[:newline_index] del self._stdout_buffer[: newline_index + 1] decoded_line = line.decode("utf8", errors="replace").rstrip("\r") if self._verbose: print(decoded_line) return decoded_line if not self.isrunning: if self._stdout_buffer: line = self._stdout_buffer.decode("utf8", errors="replace").rstrip("\r") self._stdout_buffer.clear() if self._verbose: print(line) return line return None wait_time = None if deadline is not None: wait_time = max(0.0, deadline - time.monotonic()) if wait_time == 0.0: return None readable, _, _ = select.select([stdout_fd], [], [], wait_time) if not readable: return None chunk = os.read(stdout_fd, 4096) if not chunk: if self._stdout_buffer: line = self._stdout_buffer.decode("utf8", errors="replace").rstrip("\r") self._stdout_buffer.clear() if self._verbose: print(line) return line return None self._stdout_buffer.extend(chunk) def _close_pipes(self): for stream_name in ("stdin", "stdout", "stderr"): stream = getattr(self._process, stream_name, None) if stream is not None: try: stream.close() except Exception: pass self._stdout_buffer.clear() def __detect_ray_path(self) -> str: """Internal function to autodetect installation path of RAY-UI Raises: RayPyRunnerError: is case no RAY-UI installations can be detected Returns: str: string with the detected RAY-UI installation path """ basepaths = ("~", "~/Applications", "/opt", "/Applications") installpaths = ("RAY-UI-development", "RAY-UI", "Ray-UI") pathlist = [ os.path.expanduser(p) for p in [os.path.join(x, y) for x in basepaths for y in installpaths] ] for ray_path in pathlist: if os.path.isdir(ray_path): return ray_path raise RayPyRunnerError("Can not detect rayui installation path! Please provide it manually")
###############################################################################
[docs] class RayUIAPI: """RayUIAPI class implements (hopefully all) command interface of the RAY-UI""" def __init__(self, runner: RayUIRunner = None) -> None: """Optional Reference to an existing RayUIRunner Args: runner (RayUIRunner, optional): reference to existing runner. If set to None a new runner instance will be automaticlly created. Defaults to None. """ if runner is None: runner = RayUIRunner().run() self._runner = runner # if rayui does not send anything to stdio this delay # will be used before next attempt to read self._read_wait_delay = 0.01 self._quit_timeout = 300 # default timeout for commands like quit self._simulation_done = False
[docs] def quit(self): """quit RAY-UI if it is running""" if self._runner.isrunning: self._runner._write("quit") try: self._runner._process.wait(self._quit_timeout) except subprocess.TimeoutExpired: raise TimeoutError("Timeout while trying to quit") from None
[docs] def load(self, rml_path, **kwargs): """Load an rml file Args: rml_path (str): path to the rml file """ self._simulation_done = False return self._cmd_io("load", rml_path, **kwargs)
[docs] def save(self, rml_path, **kwargs): """Save an rml file Args: rml_path (path): path to save the rml file """ return self._cmd_io("save", rml_path, **kwargs)
[docs] def trace(self, analyze=True, **kwargs): """Trace an rml file (must have been loaded before). Args: analyze (bool, optional): If True RAY-UI will perform analysis of the rays. Defaults to True. """ return self._cmd_io("trace", None if analyze else "noanalyze", **kwargs)
[docs] def export(self, objects: str, parameters: str, export_path: str, data_prefix: str, **kwargs): """Export simulation results from RAY-UI. Args: objects (str): string with objects list, e.g. "Dipole,DetectorAtFocus" parameters (str): stromg with parameters to export, e.g. "ScalarBeamProperties,ScalarElementProperties" export_path (str): path where to save the data data_prefix (str): prefix for the putput files """ payload = '"' + objects + '"' + " " + parameters + " " + export_path + " " + data_prefix return self._cmd_io("export", payload, **kwargs)
def _cmd_io(self, cmd: str, payload: str = None, /, cbNewLine=None): """The _cmd_io is an internal method which helps to execute a RAY-UI command. All commands are run in the following way: 1. A command is sent to RAY-UI 2. If Ray-UI acknowleges the command it prints it back 3. Some extra output can happen (depending on the command) 4. RAY-UI writes info "success" or "failed" Args: cmd (str): string with command (e.g. "load" or "trace") payload (str, optional): possible payload for the command (e.g. rml file path for the load command). Defaults to None. Raises: RayPyError: in case of an unsupported reply Returns: bool: True on success, False on RAY-UI side error """ if payload is None: payload = "" if cmd == "load": self._simulation_done = False cmdstr = cmd + " " + payload self._runner._write(cmdstr) status = self._wait_for_cmd_io(cmd, cbdataread=cbNewLine) if status == "success": if cmd == "trace": self._simulation_done = True return True elif status == "failed": return False elif ( status == "ed failed" ): # specical workaround case for the "loaded failed" reply to laod command return False else: raise RayPyError("Got unsupported reply from ray while waiting for command IO") def _wait_for_cmd_io(self, cmd, timeout=None, cbdataread=None): timecnt = 0.0 line = "" # we shall see cmd twice - once as ACK for the execution # and once as status return cmd_seen = False while True: poll_timeout = self._read_wait_delay if timeout is not None else None line = self._runner._readline_with_timeout(timeout=poll_timeout) if line is None: if timeout is None: continue timecnt += self._read_wait_delay if timecnt > timeout: raise TimeoutError("timeout while waiting ray command io") continue if line.startswith(cmd): if cmd_seen: break else: cmd_seen = True else: if cbdataread is not None: cbdataread(line) return line.lstrip(cmd).strip()