Source code for mxcubeweb.core.adapter.data_publisher_adapter

import logging

from mxcubecore.BaseHardwareObjects import HardwareObjectState

from mxcubeweb.core.adapter.adapter_base import AdapterBase


[docs]class DataPublisherAdapter(AdapterBase): ATTRIBUTES = ["current_data", "all_data", "current"] def __init__(self, ho, *args): """ Args: (object): Hardware object. """ super(DataPublisherAdapter, self).__init__(ho, *args) self._all_data_list = [] self._current_data_list = [] self._current_info = {} try: ho.connect("data", self._new_data_handler) ho.connect("start", self._start_handler) ho.connect("end", self._end_handler) except Exception: msg = "Could not initialize DataPublisherAdapter" logging.getLogger("MX3.HWR").exception(msg) else: self._available = True def _new_data_handler(self, data): self._current_data_list.append(data["data"]) self.emit_ho_attribute_changed( "current_data", [data["data"]], operation="UPDATE" ) def _start_handler(self, data): self._current_info = data self.emit_ho_changed() def _end_handler(self, data): self._all_data_list.append(data) self.emit_ho_changed()
[docs] def state(self): return HardwareObjectState.READY.name
def current_data(self) -> list: return self._current_data_list def current(self) -> dict: return self._current_info def all_data(self) -> list: return self._all_data_list