import logging
from mxcubecore.BaseHardwareObjects import HardwareObject
__credits__ = ["EMBL Hamburg"]
__category__ = "General"
[docs]class EMBLPPUControl(HardwareObject):
"""
Allows to restart processes on PPU
"""
def __init__(self, name):
super().__init__(name)
self.all_status = None
self.status_result = None
self.restart_result = None
self.error_state = None
self.is_error = False
self.file_transfer_in_error = False
self.cmd_all_status = None
self.cmd_furka_restart = None
self.cmd_all_restart = None
self.chan_all_status = None
self.chan_file_info = None
self.chan_all_restart = None
self.msg = ""
self.status_running = None
self.restart_running = None
self.execution_state = None
[docs] def init(self):
self.all_status = ""
self.status_result = ""
self.restart_result = ""
self.execution_state = self.get_property("executionState")
self.error_state = self.get_property("errorState")
self.chan_all_status = self.get_channel_object("chanAllStatus")
self.cmd_all_status = self.get_command_object("cmdAllStatus")
self.cmd_all_restart = self.get_command_object("cmdAllRestart")
self.cmd_furka_restart = self.get_command_object("cmdFurkaRestart")
self.cmd_furka_restart("")
self.get_status()
self.chan_file_info = self.get_channel_object("chanFileInfo", optional=True)
if self.chan_file_info is not None:
self.chan_file_info.connect_signal("update", self.file_info_changed)
self.connect(self.chan_all_status, "update", self.all_status_changed)
self.chan_all_restart = self.get_channel_object("chanAllRestart")
self.connect(self.chan_all_restart, "update", self.all_restart_changed)
[docs] def all_status_changed(self, status):
"""
Updates status
:param status: str
:return:
"""
if self.status_running and not status:
self.all_status = self.cmd_all_status.get() # status
self.update_status()
self.status_running = status
[docs] def all_restart_changed(self, status):
"""
Updates status after all process restart
:param status:
:return:
"""
if self.restart_running and not status:
self.restart_result = self.cmd_all_restart.get() # status
self.restart_running = status
[docs] def file_info_changed(self, values):
"""
Updated information about transfered frames
values is a list of 3 values, where the last one indicates the number
of droped frames
:param values:
:return:
"""
values = list(values)
# if len(values) == 2:
# value = values[1]
# else:
# value = values[0]
self.file_transfer_in_error = values[2] > 0
self.emit("fileTranferStatusChanged", (values))
self.is_error = (
self.all_status.startswith(self.error_state) or self.file_transfer_in_error
)
if self.file_transfer_in_error:
self.emit("ppuStatusChanged", self.is_error, "File tansfer in error")
[docs] def get_status(self):
"""
Returns status
:return: boolean, str
"""
self.cmd_all_status("")
return self.is_error, self.all_status
[docs] def update_status(self):
"""
Update status method
:return:
"""
self.is_error = (
self.all_status.startswith(self.error_state) or self.file_transfer_in_error
)
if self.all_status.startswith(self.error_state):
msg_list = self.all_status.split("\n")
logging.getLogger("GUI").error("PPU control is in Error state!")
if len(msg_list) > 1:
for msg_line in msg_list:
if msg_line:
msg = "PPU control: %s" % msg_line
logging.getLogger("GUI").error(msg)
else:
msg = "PPUControl: %s" % self.all_status
logging.getLogger("HWR").debug(msg)
self.msg = (
"Restart result:\n\n%s\n\n" % self.restart_result
+ "All status result:\n\n%s\n" % self.all_status
)
self.emit("ppuStatusChanged", self.is_error, self.msg)
return self.is_error, self.all_status
[docs] def restart_all(self):
"""
Restarts all processes
:return:
"""
self.emit("ppuStatusChanged", False, "Restarting.... ")
self.cmd_all_restart("")
self.get_status()
[docs] def re_emit_values(self):
"""
Reemits signals
:return:
"""
self.emit("ppuStatusChanged", self.is_error, self.msg)