Source code for mxcubecore.HardwareObjects.GenericDiffractometer

#
#  Project: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
GenericDiffractometer
"""

import copy
import enum
import json
import logging
import math
import os
import time
from typing import (
    Dict,
    List,
    Tuple,
    Union,
)

import gevent
import gevent.event
import numpy
from pydantic.v1 import (
    BaseModel,
    Field,
    ValidationError,
)

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects import sample_centring
from mxcubecore.model import queue_model_objects

try:
    unicode
except Exception:
    # A quick fix for python3
    unicode = str


__credits__ = ["MXCuBE collaboration"]

__version__ = "2.2."
__status__ = "Draft"


[docs]class DiffractometerState: """ Enumeration of diffractometer states """ Created = 0 Initializing = 1 On = 2 Off = 3 Closed = 4 Open = 5 Ready = 6 Busy = 7 Moving = 8 Standby = 9 Running = 10 Started = 11 Stopped = 12 Paused = 13 Remote = 14 Reset = 15 Closing = 16 Disable = 17 Waiting = 18 Positioned = 19 Starting = 20 Loading = 21 Unknown = 22 Alarm = 23 Fault = 24 Invalid = 25 Offline = 26 STATE_DESC = { Created: "Created", Initializing: "Initializing", On: "On", Off: "Off", Closed: "Closed", Open: "Open", Ready: "Ready", Busy: "Busy", Moving: "Moving", Standby: "Standby", Running: "Running", Started: "Started", Stopped: "Stopped", Paused: "Paused", Remote: "Remote", Reset: "Reset", Closing: "Closing", Disable: "Disable", Waiting: "Waiting", Positioned: " Positioned", Starting: "Starting", Loading: "Loading", Unknown: "Unknown", Alarm: "Alarm", Fault: "Fault", Invalid: "Invalid", Offline: "Offline", } @staticmethod def tostring(state): return DiffractometerState.STATE_DESC.get(state, "Unknown")
[docs]class PhaseEnum(str, enum.Enum): centring = "Centring" data_collection = "DataCollection" beam_location = "BeamLocation" transfer = "Transfer" unknown = "Unknown"
class PhaseModel(BaseModel): value: PhaseEnum = PhaseEnum.unknown
[docs]class HeadTypeEnum(str, enum.Enum): no_kappa = "NO_KAPPA" mini_kappa = "MINI_KAPPA" chip = "CHIP" plate = "PLATE"
[docs]class HolderTypeEnum(str, enum.Enum): known_geometry = "KNOWN_GEOMETRY" free_geometry = "FREE_GEOMETRY"
[docs]class BlockShapeEnum(str, enum.Enum): rectangular = "RECTANGULAR" elliptical = "ELLIPTICAL"
class SampleHolderSectionModel(BaseModel): section_offset: Tuple[int, int] = Field( [0, 0], description="Block offset in grid layout system coordinates x, y" ) block_size: Tuple[float, float] = Field( [15, 15], description="Block size horizontal, vertical in mm" ) block_spacing: Tuple[float, float] = Field( [15, 15], description="Spacing between blocks horizontal, vertical in mm" ) block_shape: BlockShapeEnum = BlockShapeEnum.rectangular number_of_rows: int = Field(6, description="Numer of rows") number_of_collumns: int = Field(6, description="Numer of collumns") row_labels: List[str] = Field([], description="Row lables") column_lables: List[str] = Field([], description="Collumn lables") targets_per_block: Tuple[int, int] = Field( [20, 20], description="Targets per block dim1 and dim2" ) class CalibrationData(BaseModel): top_left: Tuple[float, float, float] = Field( [0, 0, 0], description="Top left corner motor position" ) top_right: Tuple[float, float, float] = Field( [0, 0, 0], description="Top right corner motor position" ) bottom_left: Tuple[float, float, float] = Field( [0, 0, 0], description="Bottom left corner motor position" ) class ChipLayout(BaseModel): head_type: HeadTypeEnum = HeadTypeEnum.chip holder_type: HolderTypeEnum = HolderTypeEnum.known_geometry holder_brand: str = Field("", description="Brand/make of sample holder") holder_size: Tuple[float, float] = Field( [0, 0], description="Size of sample holder in mm horizontal and vertical" ) sections: List[SampleHolderSectionModel] calibration_data: CalibrationData class GonioHeadConfiguration(BaseModel): current: str = Field("", description="Selected chip layout") available: Dict[str, ChipLayout]
[docs]class GenericDiffractometer(HardwareObject): """ Abstract base class for diffractometers """ CENTRING_MOTORS_NAME = [ "phi", "phiz", "phiy", "sampx", "sampy", "kappa", "kappa_phi", "beam_x", "beam_y", ] STATE_CHANGED_EVENT = "stateChanged" STATUS_CHANGED_EVENT = "statusChanged" MOTOR_POSITION_CHANGED_EVENT = "motorPositionsChanged" MOTOR_STATUS_CHANGED_EVENT = "motorStatusChanged" HEAD_TYPE_MINIKAPPA = "MiniKappa" HEAD_TYPE_SMARTMAGNET = "SmartMagnet" HEAD_TYPE_PLATE = "Plate" HEAD_TYPE_PERMANENT = "Permanent" CENTRING_METHOD_MANUAL = "Manual 3-click" CENTRING_METHOD_AUTO = "Computer automatic" CENTRING_METHOD_MOVE_TO_BEAM = "Move to beam" # TODO NBNB FIX THIS CONFUSION!!! MANUAL3CLICK_MODE = CENTRING_METHOD_MANUAL C3D_MODE = CENTRING_METHOD_AUTO PHASE_TRANSFER = "Transfer" PHASE_CENTRING = "Centring" PHASE_COLLECTION = "DataCollection" PHASE_BEAM = "BeamLocation" PHASE_UNKNOWN = "Unknown" def __init__(self, name): HardwareObject.__init__(self, name) # Hardware objects ---------------------------------------------------- self.motor_hwobj_dict = {} self.centring_motors_list = None self.front_light_switch = None self.back_light_switch = None self.aperture = None self.beamstop = None self.cryo = None self.capillary = None self.use_sc = False # Channels and commands ----------------------------------------------- self.channel_dict = {} self.used_channels_list = [] self.command_dict = {} self.used_commands_list = [] # flag for using sample_centring hwobj or not self.use_sample_centring = None # time to delay for state polling for controllers # not updating state inmediately after cmd started self.delay_state_polling = None self.delay_state_polling = ( None # time to delay for state polling for controllers ) # not updating state inmediately after cmd started # Internal values ----------------------------------------------------- self.ready_event = None self.head_type = GenericDiffractometer.HEAD_TYPE_MINIKAPPA self.phase_list = [] self.grid_direction = None self.reversing_rotation = None self.beam_position = None self.zoom_centre = None self.pixels_per_mm_x = None self.pixels_per_mm_y = None self.current_state = None self.current_phase = None self.transfer_mode = None self.current_centring_procedure = None self.current_centring_method = None self.current_motor_positions = {} self.current_motor_states = {} self.fast_shutter_is_open = None self.centring_status = {"valid": False} self.centring_time = 0 self.user_confirms_centring = None self.user_clicked_event = None self.automatic_centring_try_count = 1 self.omega_reference_par = None self.move_to_motors_positions_task = None self.move_to_motors_positions_procedure = None self.centring_methods = { GenericDiffractometer.CENTRING_METHOD_MANUAL: self.start_manual_centring, GenericDiffractometer.CENTRING_METHOD_AUTO: self.start_automatic_centring, GenericDiffractometer.CENTRING_METHOD_MOVE_TO_BEAM: self.start_move_to_beam, } self.connect(self, "equipmentReady", self.equipment_ready) self.connect(self, "equipmentNotReady", self.equipment_not_ready) # HACK self.get_motor_positions = self.get_positions
[docs] def init(self): super().init() # Internal values ----------------------------------------------------- self.ready_event = gevent.event.Event() self.user_clicked_event = gevent.event.AsyncResult() self.user_confirms_centring = True self.beamstop = self.get_object_by_role("beamstop") self.aperture = self.get_object_by_role("aperture") self.capillary = self.get_object_by_role("capillary") self.cryo = self.get_object_by_role("cryo") # Hardware objects ---------------------------------------------------- # if HWR.beamline.sample_view.camera is not None: # self.image_height = HWR.beamline.sample_view.camera.get_height() # self.image_width = HWR.beamline.sample_view.camera.get_width() # else: # logging.getLogger("HWR").debug( # "Diffractometer: " + "Camera hwobj is not defined" # ) if HWR.beamline.beam is not None: self.beam_position = HWR.beamline.beam.get_beam_position_on_screen() self.connect( HWR.beamline.beam, "beamPosChanged", self.beam_position_changed ) else: self.beam_position = [0, 0] logging.getLogger("HWR").warning( "Diffractometer: " + "BeamInfo hwobj is not defined" ) self.front_light_switch = self.get_object_by_role("frontlightswitch") self.back_light_switch = self.get_object_by_role("backlightswitch") # Channels ----------------------------------------------------------- ss0 = self.get_property("used_channels") if ss0: try: self.used_channels_list = eval(ss0) except Exception: pass # used the default value for channel_name in self.used_channels_list: self.channel_dict[channel_name] = self.get_channel_object(channel_name) if self.channel_dict[channel_name] is None: continue if channel_name == "TransferMode": self.connect( self.channel_dict["TransferMode"], "update", self.transfer_mode_changed, ) elif channel_name == "CurrentPhase": self.connect( self.channel_dict["CurrentPhase"], "update", self.current_phase_changed, ) elif channel_name == "HeadType": self.connect( self.channel_dict["HeadType"], "update", self.head_type_changed ) elif channel_name == "State": self.connect(self.channel_dict["State"], "update", self.state_changed) # Commands ----------------------------------------------------------- try: self.used_commands_list = eval(self.get_property("used_commands", "[]")) except Exception: pass # used the default value for command_name in self.used_commands_list: self.command_dict[command_name] = self.get_command_object(command_name) # Centring motors ---------------------------------------------------- try: self.centring_motors_list = eval(self.get_property("centring_motors")) except Exception: self.centring_motors_list = GenericDiffractometer.CENTRING_MOTORS_NAME queue_model_objects.CentredPosition.set_diffractometer_motor_names( *self.centring_motors_list ) for motor_name in self.centring_motors_list: # NBNB TODO refactor configuration, and set properties directly (see below) temp_motor_hwobj = self.get_object_by_role(motor_name) if temp_motor_hwobj is not None: #logging.getLogger("HWR").debug( #"Diffractometer: Adding " #+ "%s motor to centring motors" % motor_name #) self.motor_hwobj_dict[motor_name] = temp_motor_hwobj self.connect(temp_motor_hwobj, "stateChanged", self.motor_state_changed) self.connect( temp_motor_hwobj, "valueChanged", self.centring_motor_moved ) if motor_name == "phi": self.connect( temp_motor_hwobj, "valueChanged", self.emit_diffractometer_moved, ) elif motor_name == "zoom": self.connect( temp_motor_hwobj, "predefinedPositionChanged", self.zoom_motor_predefined_position_changed, ) self.connect( temp_motor_hwobj, "stateChanged", self.zoom_motor_state_changed ) else: logging.getLogger("HWR").warning( "Diffractometer: Motor " + "%s listed in the centring motor list, but not initalized" % motor_name ) # sample changer ----------------------------------------------------- if HWR.beamline.sample_changer is None: logging.getLogger("HWR").warning( "Diffractometer: Sample Changer is not defined" ) else: # By default use sample changer if it's defined and transfer_mode # is set to SAMPLE_CHANGER. # if not defined, set use_sc to True if self.transfer_mode is None or self.transfer_mode == "SAMPLE_CHANGER": self.use_sc = True try: self.use_sample_centring = self.get_property("sample_centring") if self.use_sample_centring: self.centring_phi = sample_centring.CentringMotor( self.motor_hwobj_dict["phi"], direction=-1 ) self.centring_phiz = sample_centring.CentringMotor( self.motor_hwobj_dict["phiz"] ) self.centring_phiy = sample_centring.CentringMotor( self.motor_hwobj_dict["phiy"], direction=-1 ) self.centring_sampx = sample_centring.CentringMotor( self.motor_hwobj_dict["sampx"] ) self.centring_sampy = sample_centring.CentringMotor( self.motor_hwobj_dict["sampy"] ) except Exception: pass # used the default value try: self.delay_state_polling = self.get_property("delay_state_polling") except Exception: pass # Other parameters --------------------------------------------------- try: self.zoom_centre = eval(self.get_property("zoom_centre")) except Exception: self.zoom_centre = {"x": 0, "y": 0} logging.getLogger("HWR").warning( "Diffractometer: " + "zoom centre not configured" ) self.reversing_rotation = self.get_property("reversing_rotation") try: # grid_direction describes how a grid is collected # 'fast' is collection direction and 'slow' describes # move to the next collection line self.grid_direction = eval(self.get_property("grid_direction")) except Exception: self.grid_direction = {"fast": (0, 1), "slow": (1, 0), "omega_ref": 0} logging.getLogger("HWR").warning( "Diffractometer: Grid " + "direction is not defined. Using default." ) try: self.phase_list = eval(self.get_property("phase_list")) except Exception: self.phase_list = [ GenericDiffractometer.PHASE_TRANSFER, GenericDiffractometer.PHASE_CENTRING, GenericDiffractometer.PHASE_COLLECTION, GenericDiffractometer.PHASE_BEAM, ]
# to make it compatibile def __getattr__(self, attr): if attr.startswith("__"): raise AttributeError(attr) else: if attr == "currentCentringProcedure": return self.current_centring_procedure if attr == "centringStatus": return self.centring_status if attr == "imageClicked": return self.image_clicked if attr == "cancelCentringMethod": return self.cancel_centring_method if attr == "pixelsPerMmY": return self.pixels_per_mm_x if attr == "pixelsPerMmZ": return self.pixels_per_mm_y return HardwareObject.__getattr__(self, attr) def get_motors(self): return self.motor_hwobj_dict # Contained Objects # NBNB Temp[orary hack - should be cleaned up together with configuration @property def omega(self): """omega motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("phi") @property def kappa(self): """kappa motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("kappa") @property def kappa_phi(self): """kappa_phi motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("kappa_phi") @property def centring_x(self): """centring_x motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("sampx") @property def centring_y(self): """centring_y motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("sampy") @property def alignment_x(self): """alignment_x motor object (also used as graphics.focus) Returns: AbstractActuator """ return self.motor_hwobj_dict.get("focus") @property def alignment_y(self): """alignment_y motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("phiy") @property def alignment_z(self): """alignment_z motor object Returns: AbstractActuator """ return self.motor_hwobj_dict.get("phiz") @property def zoom(self): """zoom motor object NBNB HACK TODO - ocnfigure this in graphics object (which now calls this property) Returns: AbstractActuator """ return self.motor_hwobj_dict.get("zoom")
[docs] def is_ready(self): """ Detects if device is ready """ return self.current_state == DiffractometerState.tostring( DiffractometerState.Ready )
def wait_device_not_ready(self, timeout=5): with gevent.Timeout(timeout, Exception("Timeout waiting for device not ready")): while self.is_ready(): time.sleep(0.01)
[docs] def wait_device_ready(self, timeout=30): """Waits when diffractometer status is ready: :param timeout: timeout in second :type timeout: int """ with gevent.Timeout(timeout, Exception("Timeout waiting for device ready")): while not self.is_ready(): time.sleep(0.01)
wait_ready = wait_device_ready
[docs] def execute_server_task(self, method, timeout=30, *args): """Method is used to execute commands and wait till diffractometer is in ready state :param method: method to be executed :type method: instance :param timeout: timeout in seconds :type timeout: seconds """ # self.ready_event.clear() self.current_state = DiffractometerState.tostring(DiffractometerState.Busy) method(*args) time.sleep(5) # gevent.sleep(2) self.wait_device_ready(timeout) self.ready_event.set()
[docs] def in_plate_mode(self): """Returns True if diffractometer in plate mod :returns: boolean """ return self.head_type == GenericDiffractometer.HEAD_TYPE_PLATE
[docs] def get_head_type(self): """Returns head type :returns: string """ return self.head_type
[docs] def use_sample_changer(self): """Returns True if sample changer is in use :returns: boolean """ return self.use_sc
[docs] def set_use_sc(self, flag): """Sets use_sc flag, that indicates if sample changer is used :param flag: use sample changer flag :type flag: boolean """ if flag: # check both transfer_mode and sample_Changer if HWR.beamline.sample_changer is None: logging.getLogger("HWR").error( "Diffractometer: Sample " + "Changer is not available" ) return False if ( self.transfer_mode is None or self.channel_dict["TransferMode"].get_value() == "SAMPLE_CHANGER" ): # if transferMode is not defined, ignore the checkup self.use_sc = True else: logging.getLogger("HWR").error( "Diffractometer: Set the " + "diffractometer TransferMode to SAMPLE_CHANGER first!!" ) return False else: self.use_sc = False return True
[docs] def transfer_mode_changed(self, transfer_mode): """ Descript. : """ logging.getLogger("HWR").info( "current_transfer_mode is set to %s" % transfer_mode ) self.transfer_mode = transfer_mode if transfer_mode != "SAMPLE_CHANGER": self.use_sc = False self.emit("minidiffTransferModeChanged", (transfer_mode,))
def get_transfer_mode(self): """ """ return self.transfer_mode
[docs] def get_current_phase(self): """ Descript. : """ return self.current_phase
[docs] def get_grid_direction(self): """ Descript. : """ return self.grid_direction
[docs] def get_available_centring_methods(self): """ Descript. : """ return self.centring_methods.keys()
[docs] def get_current_centring_method(self): """ Descript. : """ return self.current_centring_method
def is_reversing_rotation(self): """ """ return self.reversing_rotation is True
[docs] def beam_position_changed(self, value): """ Descript. : """ self.beam_position = list(value)
# def get_motor_positions(self): # return # TODO rename to get_motor_positions
[docs] def get_positions(self): """ Descript. : """ self.current_motor_positions["beam_x"] = ( self.beam_position[0] - self.zoom_centre["x"] ) / self.pixels_per_mm_y self.current_motor_positions["beam_y"] = ( self.beam_position[1] - self.zoom_centre["y"] ) / self.pixels_per_mm_x return self.current_motor_positions
[docs] def get_motors(self): """Get motor_name:Motor dictionary""" return self.motor_hwobj_dict.copy()
# def get_omega_position(self): # """ # Descript. : # """ # return self.current_positions_dict.get("phi")
[docs] def get_snapshot(self): """ Get snapshot from sample view Returns: bytes: A bytes object of the current camera image. """ if HWR.beamline.sample_view: return HWR.beamline.sample_view.get_snapshot()
def save_snapshot(self, filename): """ """ if HWR.beamline.sample_view: return HWR.beamline.sample_view.save_snapshot(filename)
[docs] def get_pixels_per_mm(self): """ Returns tuple with pixels_per_mm_x and pixels_per_mm_y :returns: list with two floats """ return (self.pixels_per_mm_x, self.pixels_per_mm_y)
[docs] def get_phase_list(self): """ Returns list of available phases :returns: list with str """ return self.phase_list
def start_centring_method(self, method, sample_info=None, wait=False): """ """ if self.current_centring_method is not None: logging.getLogger("HWR").error( "Diffractometer: already in centring method %s" % self.current_centring_method ) return curr_time = time.strftime("%Y-%m-%d %H:%M:%S") self.centring_status = { "valid": False, "startTime": curr_time, "angleLimit": None, } self.emit_centring_started(method) try: centring_method = self.centring_methods[method] except KeyError as diag: logging.getLogger("HWR").error( "Diffractometer: unknown centring method (%s)" % str(diag) ) self.emit_centring_failed() else: try: centring_method(sample_info, wait_result=wait) except Exception: logging.getLogger("HWR").exception( "Diffractometer: problem while centring" ) self.emit_centring_failed() def cancel_centring_method(self, reject=False): """ """ if self.current_centring_procedure is not None: try: self.current_centring_procedure.kill() except Exception: logging.getLogger("HWR").exception( "Diffractometer: problem aborting the centring method" ) try: # TODO... do we need this at all? # fun = self.cancel_centring_methods[self.current_centring_method] pass except KeyError: self.emit_centring_failed() else: try: fun() except Exception: self.emit_centring_failed() else: self.emit_centring_failed() self.emit_progress_message("") if reject: self.reject_centring() def start_manual_centring(self, sample_info=None, wait_result=None): """ """ self.emit_progress_message("Manual 3 click centring...") if self.use_sample_centring: self.current_centring_procedure = sample_centring.start( { "phi": self.centring_phi, "phiy": self.centring_phiy, "sampx": self.centring_sampx, "sampy": self.centring_sampy, "phiz": self.centring_phiz, }, self.pixels_per_mm_x, self.pixels_per_mm_y, self.beam_position[0], self.beam_position[1], ) else: self.current_centring_procedure = gevent.spawn(self.manual_centring) self.current_centring_procedure.link(self.centring_done) def start_automatic_centring( self, sample_info=None, loop_only=False, wait_result=None ): """ """ self.emit_progress_message("Automatic centring...") while self.automatic_centring_try_count > 0: if self.use_sample_centring: self.current_centring_procedure = sample_centring.start_auto( HWR.beamline.sample_view, { "phi": self.centring_phi, "phiy": self.centring_phiy, "sampx": self.centring_sampx, "sampy": self.centring_sampy, "phiz": self.centring_phiz, }, self.pixels_per_mm_x, self.pixels_per_mm_y, self.beam_position[0], self.beam_position[1], msg_cb=self.emit_progress_message, new_point_cb=lambda point: self.emit( "newAutomaticCentringPoint", (point,) ), ) else: self.current_centring_procedure = gevent.spawn(self.automatic_centring) self.current_centring_procedure.link(self.centring_done) if wait_result: self.ready_event.wait() self.ready_event.clear() self.automatic_centring_try_count -= 1 self.automatic_centring_try_count = 1
[docs] def start_move_to_beam( self, coord_x=None, coord_y=None, omega=None, wait_result=None ): """ Descript. : """ try: self.emit_progress_message("Move to beam...") self.centring_time = time.time() curr_time = time.strftime("%Y-%m-%d %H:%M:%S") self.centring_status = { "valid": True, "startTime": curr_time, "endTime": curr_time, } if coord_x is None and coord_y is None: coord_x = self.beam_position[0] coord_y = self.beam_position[1] motors = self.get_centred_point_from_coord( coord_x, coord_y, return_by_names=True ) if omega is not None: motors["phi"] = omega self.centring_status["motors"] = motors self.centring_status["valid"] = True self.centring_status["angleLimit"] = True self.emit_progress_message("") self.accept_centring() self.current_centring_method = None self.current_centring_procedure = None except Exception: logging.exception("Diffractometer: Could not complete 2D centring")
[docs] def centring_done(self, centring_procedure): """ Descript. : """ try: motor_pos = centring_procedure.get() if isinstance(motor_pos, gevent.GreenletExit): raise motor_pos except Exception: logging.exception("Could not complete centring") self.emit_centring_failed() else: self.emit_progress_message("Moving sample to centred position...") self.emit_centring_moving() try: logging.getLogger("HWR").debug( "Centring finished. Moving motors to position %s" % str(motor_pos) ) self.move_to_motors_positions(motor_pos, wait=True) except Exception: logging.exception("Could not move to centred position") self.emit_centring_failed() else: # if 3 click centring move -180. well. dont, in principle the calculated # centred positions include omega to initial position pass # if not self.in_plate_mode(): # logging.getLogger("HWR").debug("Centring finished. Moving omega back to initial position") # self.motor_hwobj_dict['phi'].set_value_relative(-180, timeout=None) # logging.getLogger("HWR").debug(" Moving omega done") if ( self.current_centring_method == GenericDiffractometer.CENTRING_METHOD_AUTO ): self.emit("newAutomaticCentringPoint", motor_pos) self.ready_event.set() self.centring_time = time.time() self.emit_centring_successful() self.emit_progress_message("")
def manual_centring(self): """ """ raise NotImplementedError def automatic_centring(self): """ """ raise NotImplementedError def centring_motor_moved(self, pos): """ """ if time.time() - self.centring_time > 1.0: self.invalidate_centring() self.emit_diffractometer_moved() def invalidate_centring(self): """ """ if self.current_centring_procedure is None and self.centring_status["valid"]: self.centring_status = {"valid": False} self.emit_progress_message("") self.emit("centringInvalid", ()) def emit_diffractometer_moved(self, *args): """ """ self.emit("diffractometerMoved", ()) def motor_positions_to_screen(self, centred_positions_dict): """ """ if self.use_sample_centring: self.update_zoom_calibration() if None in (self.pixels_per_mm_x, self.pixels_per_mm_y): return 0, 0 phi_angle = math.radians( self.centring_phi.direction * self.centring_phi.get_value() ) sampx = self.centring_sampx.direction * ( centred_positions_dict["sampx"] - self.centring_sampx.get_value() ) sampy = self.centring_sampy.direction * ( centred_positions_dict["sampy"] - self.centring_sampy.get_value() ) phiy = self.centring_phiy.direction * ( centred_positions_dict["phiy"] - self.centring_phiy.get_value() ) phiz = self.centring_phiz.direction * ( centred_positions_dict["phiz"] - self.centring_phiz.get_value() ) rot_matrix = numpy.matrix( [ math.cos(phi_angle), -math.sin(phi_angle), math.sin(phi_angle), math.cos(phi_angle), ] ) rot_matrix.shape = (2, 2) inv_rot_matrix = numpy.array(rot_matrix.I) dx, dy = ( numpy.dot(numpy.array([sampx, sampy]), inv_rot_matrix) * self.pixels_per_mm_x ) x = (phiy * self.pixels_per_mm_x) + self.beam_position[0] y = dy + (phiz * self.pixels_per_mm_y) + self.beam_position[1] return x, y else: raise NotImplementedError def move_to_centred_position(self, centred_position): """ """ self.move_motors(centred_position) def move_to_motors_positions(self, motors_positions, wait=False): """ """ self.emit_progress_message("Moving to motors positions...") self.move_to_motors_positions_procedure = gevent.spawn( self.move_motors, motors_positions ) self.move_to_motors_positions_procedure.link(self.move_motors_done) if wait: self.wait_device_ready(10)
[docs] def move_motors(self, motor_positions, timeout=15): """ Moves diffractometer motors to the requested positions :param motors_dict: dictionary with motor names or hwobj and target values. :type motors_dict: dict """ if not isinstance(motor_positions, dict): motor_positions = motor_positions.as_dict() self.wait_device_ready(timeout) for motor in motor_positions.keys(): position = motor_positions[motor] self.log.debug(f"moving motor {motor} to position {position}") """ if isinstance(motor, (str, unicode)): logging.getLogger("HWR").debug(" Moving %s to %s" % (motor, position)) else: logging.getLogger("HWR").debug( " Moving %s to %s" % (str(motor.name()), position) ) """ if isinstance(motor, (str, unicode)): motor_role = motor motor = self.motor_hwobj_dict.get(motor_role) # del motor_positions[motor_role] if None in (motor, position): continue # motor_positions[motor] = position motor.set_value(position) self.wait_device_ready(timeout) if self.delay_state_polling is not None and self.delay_state_polling > 0: # delay polling for state in the # case of controller not reporting MOVING inmediately after cmd gevent.sleep(self.delay_state_polling) self.wait_device_ready(timeout)
[docs] def move_motors_done(self, move_motors_procedure): """ Descript. : """ self.move_to_motors_positions_procedure = None self.emit_progress_message("")
[docs] def move_to_beam(self, x, y, omega=None): """ Descript. : function to create a centring point based on all motors positions. """ try: pos = self.get_centred_point_from_coord(x, y, return_by_names=False) if omega is not None: pos["phiMotor"] = omega self.move_to_motors_positions(pos) except Exception: logging.getLogger("HWR").exception( "Diffractometer: could not center to beam, aborting" )
[docs] def image_clicked(self, x, y, xi=None, yi=None): """ Descript. : """ if self.use_sample_centring: sample_centring.user_click(x, y) else: self.user_clicked_event.set((x, y))
[docs] def accept_centring(self): """ Descript. : Arg. " fully_centred_point. True if 3 click centring else False """ self.centring_status["valid"] = True self.centring_status["accepted"] = True centring_status = self.get_centring_status() if "motors" not in centring_status: centring_status["motors"] = self.get_positions() self.emit("centringAccepted", (True, centring_status)) self.emit("fsmConditionChanged", "centering_position_accepted", True)
[docs] def reject_centring(self): """ Descript. : """ if self.current_centring_procedure: self.current_centring_procedure.kill() self.centring_status = {"valid": False} self.emit_progress_message("") self.emit("centringAccepted", (False, self.get_centring_status())) self.emit("fsmConditionChanged", "centering_position_accepted", False)
[docs] def emit_centring_started(self, method): """ Descript. : """ self.current_centring_method = method self.emit("centringStarted", (method, False))
[docs] def emit_centring_moving(self): """ Descript. : """ self.emit("centringMoving", ())
[docs] def emit_centring_failed(self): """ Descript. : """ self.centring_status = {"valid": False} method = self.current_centring_method self.current_centring_method = None self.current_centring_procedure = None self.emit("centringFailed", (method, self.get_centring_status()))
[docs] def emit_centring_successful(self): """ Descript. : """ if self.current_centring_procedure is not None: curr_time = time.strftime("%Y-%m-%d %H:%M:%S") self.centring_status["endTime"] = curr_time motor_pos = self.current_centring_procedure.get() self.centring_status["motors"] = self.convert_from_obj_to_name(motor_pos) self.centring_status["method"] = self.current_centring_method self.centring_status["valid"] = True method = self.current_centring_method self.emit("centringSuccessful", (method, self.get_centring_status())) self.current_centring_method = None self.current_centring_procedure = None else: logging.getLogger("HWR").debug( "Diffractometer: Trying to emit " + "centringSuccessful outside of a centring" )
[docs] def emit_progress_message(self, msg=None): """ Descript. : """ self.emit("progressMessage", (msg,))
[docs] def get_centring_status(self): """ Descript. : """ return copy.deepcopy(self.centring_status)
def get_centred_point_from_coord(self, x, y, return_by_names=None): """ """ raise NotImplementedError
[docs] def get_point_between_two_points( self, point_one, point_two, frame_num, frame_total ): """ Method returns a centring point between two centring points It is used to get a position on a helical line based on frame number and total frame number """ new_point = {} point_one = point_one.as_dict() point_two = point_two.as_dict() for motor in point_one.keys(): new_motor_pos = ( frame_num / float(frame_total) * abs(point_one[motor] - point_two[motor]) + point_one[motor] ) new_motor_pos += 0.5 * (point_two[motor] - point_one[motor]) / frame_total new_point[motor] = new_motor_pos return new_point
def convert_from_obj_to_name(self, motor_pos): """ """ motors = {} for motor_role in self.centring_motors_list: motor_obj = self.get_object_by_role(motor_role) try: motors[motor_role] = motor_pos[motor_obj] except KeyError: if motor_obj: motors[motor_role] = motor_obj.get_value() motors["beam_x"] = ( self.beam_position[0] - self.zoom_centre["x"] ) / self.pixels_per_mm_y motors["beam_y"] = ( self.beam_position[1] - self.zoom_centre["y"] ) / self.pixels_per_mm_x return motors
[docs] def visual_align(self, point_1, point_2): """ Descript. : """ return
[docs] def move_omega_relative(self, relative_angle): """ Descript. : """ return
[docs] def get_osc_limits(self): """Returns osc limits""" return
def get_osc_max_speed(self): return
[docs] def get_scan_limits(self, speed=None, num_images=None, exp_time=None): """ Gets scan limits. Necessary for example in the plate mode where osc range is limited """ return
[docs] def set_phase(self, phase, timeout=None): """Sets diffractometer to selected phase By default available phase is Centring, BeamLocation, DataCollection, Transfer :param phase: phase :type phase: string :param timeout: timeout in sec :type timeout: int """ if timeout: self.ready_event.clear() set_phase_task = gevent.spawn( self.execute_server_task, self.command_dict["startSetPhase"], timeout, phase, ) self.ready_event.wait() self.ready_event.clear() else: self.command_dict["startSetPhase"](phase)
def update_zoom_calibration(self): """ """ self.pixels_per_mm_x = 1.0 / self.channel_dict["CoaxCamScaleX"].get_value() self.pixels_per_mm_y = 1.0 / self.channel_dict["CoaxCamScaleY"].get_value() self.emit("pixelsPerMmChanged", ((self.pixels_per_mm_x, self.pixels_per_mm_y))) def zoom_motor_state_changed(self, state): """ """ self.emit("zoomMotorStateChanged", (state,)) self.emit("minidiffStateChanged", (state,)) def zoom_motor_predefined_position_changed(self, position_name, offset): """ """ self.update_zoom_calibration() self.emit("zoomMotorPredefinedPositionChanged", (position_name, offset)) def equipment_ready(self): """ """ self.emit("minidiffReady", ()) def equipment_not_ready(self): """ """ self.emit("minidiffNotReady", ()) """ def state_changed(self, state): logging.getLogger("HWR").debug("State changed %s" % str(state)) self.current_state = state self.emit("minidiffStateChanged", (self.current_state)) """
[docs] def motor_state_changed(self, state): """ Descript. : """ self.emit("minidiffStateChanged", (state,))
[docs] def current_phase_changed(self, current_phase): """ Descript. : """ self.current_phase = current_phase if current_phase != GenericDiffractometer.PHASE_UNKNOWN: logging.getLogger("GUI").info( "Diffractometer: Current phase changed to %s" % current_phase ) self.emit("minidiffPhaseChanged", (current_phase,))
[docs] def sample_is_loaded_changed(self, sample_is_loaded): """ Descript. : """ self.sample_is_loaded = sample_is_loaded # logging.getLogger("HWR").info("sample is loaded changed %s" % sample_is_loaded) self.emit("minidiffSampleIsLoadedChanged", (sample_is_loaded,))
[docs] def head_type_changed(self, head_type): """ Descript. : """ self.head_type = head_type # logging.getLogger("HWR").info("new head type is %s" % head_type) self.emit("minidiffHeadTypeChanged", (head_type,)) if "SampleIsLoaded" not in str(self.used_channels_list): return try: self.disconnect( self.channel_dict["SampleIsLoaded"], "update", self.sample_is_loaded_changed, ) except Exception: pass if ( head_type == GenericDiffractometer.HEAD_TYPE_MINIKAPPA or head_type == GenericDiffractometer.HEAD_TYPE_SMARTMAGNET ): self.connect( self.channel_dict["SampleIsLoaded"], "update", self.sample_is_loaded_changed, ) else: logging.getLogger("HWR").info( "Diffractometer: SmartMagnet " + "is not available, only works for Minikappa and SmartMagnet head" )
def move_kappa_and_phi(self, kappa, kappa_phi): return
[docs] def close_kappa(self): """ Descript. : """ return
def get_osc_dynamic_limits(self): return (-10000, 10000) def get_osc_max_speed(self): """ """ return None # raise NotImplementedError def zoom_in(self): return def zoom_out(self): return def save_centring_positions(self): pass
[docs] def force_emit_signals(self): for motor_hwobj in self.motor_hwobj_dict.values(): motor_hwobj.force_emit_signals()
def get_head_configuration(self) -> Union[GonioHeadConfiguration, None]: chip_def_fpath = self.get_property("chip_definition_file", "") chip_def_fpath = HWR.get_hardware_repository().find_in_repository( chip_def_fpath ) data = None if os.path.isfile(chip_def_fpath): with open(chip_def_fpath, "r") as _f: chip_def = json.load(_f) try: data = GonioHeadConfiguration(**chip_def) except ValidationError: logging.getLogger("HWR").exception( "Validation error in %s" % chip_def_fpath ) return data def set_head_configuration(self, str_data: str) -> None: data = json.loads(str_data) chip_def_fpath = self.get_property("chip_definition_file", "") chip_def_fpath = HWR.get_hardware_repository().find_in_repository( chip_def_fpath ) if os.path.isfile(chip_def_fpath): with open(chip_def_fpath, "w+") as _f: try: GonioHeadConfiguration(**data) except ValidationError: logging.getLogger("HWR").exception( "Validation error in %s" % chip_def_fpath ) else: _f.write(json.dumps(data, indent=4)) def set_chip_layout(self, layout_name: str) -> bool: data = self.get_head_configuration().dict() data["current"] = layout_name self.set_head_configuration(json.dumps(data)) return True