Source code for mxcubecore.HardwareObjects.ISARAMaint

from typing import Optional
from tango import DeviceProxy, DevFailed
from mxcubecore.BaseHardwareObjects import HardwareObject

"""
The sample changer maintenance hardware object for ISARA2 robot.

Provide commands for powering on and off the robot. Populates the 'equipment' tab
with a handful of other useful command, while operating the sample changer.

This hardware object assumes that ISARA2 sample changer is exposed
via tango device server implementation as implemented here:
https://gitlab.com/MaxIV/isara/dev-maxiv-isara-ns-09

Example XML configuration for this hardware object:

  <object class="ISARAMaint">
     <tangoname>b312-e/ctl/sm-01</tangoname>
     <polling>400</polling>
  </object>

  tangoname - the sample changers tango device to use
  polling   - optional, sets the polling frequency of device attributes, in milliseconds
"""

DEFAULT_POLLING = 1000


def _reword_isara_error(err_message: str) -> str:
    """
    Reword some of the ISARA error messages to make them easier to understand for the user.
    """
    msg = err_message.lower()
    if msg == "remote mode requested":
        return "it is not in remote mode"

    if msg == "doors must be closed":
        return "hutch is not searched"

    # let's use error as-is
    return msg


def _get_isara_command_error(ex: DevFailed) -> Optional[str]:
    """
    Check if specified exception encodes an ISARA command error.

    Returns the human-readable error message from ISARA, or None
    if this is not an ISARA command error exception.
    """

    # The ISARA command errors are signaled by raising DevError exception,
    # with reason set to 'ISARACommandError' and 'desc' to the error message
    # from ISARA. The DevError will be wrapped inside DevFailed exception by
    # tango.
    #
    # Check if provided exception contains an DevError signalling ISARA command error.
    for err in ex.args:
        if err.reason == "ISARACommandError":
            return _reword_isara_error(err.desc)

    # this is not an ISARA command error exception
    return None


[docs]class ISARAMaint(HardwareObject): def __init__(self, name): super().__init__(name) self._commands_state = dict( PowerOn=False, PowerOff=False, openLid=True, closeLid=True, home=True, dry=True, soak=True, clearMemory=True, reset=True, back=True, abort=True, ) self._powered = None self._position_name = None self._message = None
[docs] def init(self): self.isara_dev = DeviceProxy(self.tangoname) polling = self._get_polling() self._poll_attribute("Powered", polling, self._powered_updated) self._poll_attribute("PositionName", polling, self._position_name_updated) self._poll_attribute("Message", polling, self._message_updated)
def _get_polling(self): """ get polling frequency to use for device attribute poller """ try: # polling is specified in the XML file return self.polling except AttributeError: # no polling is specified in the XML, use default polling value return DEFAULT_POLLING def _poll_attribute(self, attr_name: str, polling: int, callback): channel = self.add_channel( { "type": "tango", "name": f"_chn{attr_name}", "tangoname": self.tangoname, "polling": polling, }, attr_name, ) channel.connect_signal("update", callback) def _update_state(self): for attr in [self._powered, self._position_name, self._message]: if attr is None: # some of the values are still unknown, # wait until we get all values return # # update 'power' commands # self._commands_state["PowerOn"] = not self._powered self._commands_state["PowerOff"] = self._powered # # update 'positions' commands # if not self._powered or self._position_name == "undefined": # when powered off or running a trajectory, # disable position commands self._commands_state["home"] = False self._commands_state["dry"] = False self._commands_state["soak"] = False else: self._commands_state["home"] = True self._commands_state["dry"] = True self._commands_state["soak"] = True if self._position_name in ["home", "dry", "soak"]: # can't move to same position self._commands_state[self._position_name] = False self._emit_global_state_changed() def _powered_updated(self, powered): self._powered = powered self._update_state() def _position_name_updated(self, position_name): self._position_name = position_name.lower() self._update_state() def _message_updated(self, message): self._message = message self._update_state() def _emit_global_state_changed(self): self.emit( "globalStateChanged", (self._commands_state, self._message), ) def _toggle_power(self, power_on: bool): """ issue powerOn or powerOff commands, catching ISARA command error exception """ command = self.isara_dev.PowerOn if power_on else self.isara_dev.PowerOff try: command() except DevFailed as ex: isara_err = _get_isara_command_error(ex) # this is not an ISARA command error, pass on the exception if isara_err is None: raise ex state = "on" if power_on else "off" raise RuntimeError(f"Can't power {state} sample changer, {isara_err}.") def send_command(self, cmd_name, _args=None): if cmd_name == "PowerOn": self._toggle_power(True) elif cmd_name == "PowerOff": self._toggle_power(False) elif cmd_name == "openLid": self.isara_dev.OpenLid() elif cmd_name == "closeLid": self.isara_dev.CloseLid() elif cmd_name == "home": self.isara_dev.Home() elif cmd_name == "dry": self.isara_dev.Dry() elif cmd_name == "soak": self.isara_dev.Soak() elif cmd_name == "clearMemory": self.isara_dev.ClearMemory() elif cmd_name == "reset": self.isara_dev.Reset() elif cmd_name == "back": self.isara_dev.Back() elif cmd_name == "abort": self.isara_dev.Abort() else: raise Exception(f"ISARA MAINT: unexpected command '{cmd_name}'") def get_global_state(self): return dict(glob_state="dummy"), self._commands_state, self._message def get_cmd_info(self): return [ [ "Power", [ ["PowerOn", "PowerOn", "Switch Power On"], ["PowerOff", "PowerOff", "Switch Power Off"], ], ], [ "Lid", [ ["openLid", "Open Lid", "Open Lid"], ["closeLid", "Close Lid", "Close Lid"], ], ], [ "Positions", [ ["home", "Home", "Actions", "Home (trajectory)"], ["dry", "Dry", "Actions", "Dry (trajectory)"], ["soak", "Soak", "Actions", "Soak (trajectory)"], ], ], [ "Recovery", [ [ "clearMemory", "Clear Memory", "Clear Info in Robot Memory " " (includes info about sample on Diffr)", ], [ "reset", "Reset Fault", "Acknowledge security fault", ], [ "back", "Back", "Return sample back to Dewar", ], ], ], ["Abort", [["abort", "Abort", "Abort running trajectory"]]], ]