Source code for libpurecoollink.dyson_device

"""Base Dyson devices."""

# pylint: disable=too-many-public-methods,too-many-instance-attributes

from queue import Queue
import logging
import json
import abc
import time

from .utils import printable_fields
from .utils import decrypt_password

_LOGGER = logging.getLogger(__name__)

MQTT_RETURN_CODES = {
    0: "Connection successful",
    1: "Connection refused - incorrect protocol version",
    2: "Connection refused - invalid client identifier",
    3: "Connection refused - server unavailable",
    4: "Connection refused - bad username or password",
    5: "Connection refused - not authorised"
}

DEFAULT_PORT = 1883


[docs]class NetworkDevice: """Network device.""" def __init__(self, name, address, port): """Create a new network device. :param name: Device name :param address: Device address :param port: Device port """ self._name = name self._address = address self._port = port @property def name(self): """Device name.""" return self._name @property def address(self): """Device address.""" return self._address @property def port(self): """Device port.""" return self._port def __repr__(self): """Return a String representation.""" fields = [("name", self.name), ("address", self.address), ("port", str(self.port))] return 'NetworkDevice(' + ",".join(printable_fields(fields)) + ')'
class DysonDevice: """Abstract Dyson device.""" @staticmethod def on_connect(client, userdata, flags, return_code): # pylint: disable=unused-argument """Set function callback when connected.""" if return_code == 0: _LOGGER.debug("Connected with result code: %s", return_code) client.subscribe(userdata.status_topic) userdata.connection_callback(True) else: _LOGGER.error("Connection error: %s", MQTT_RETURN_CODES[return_code]) userdata.connection_callback(False) def __init__(self, json_body): """Create a new Dyson device. :param json_body: JSON message returned by the HTTPS API """ self._active = json_body['Active'] self._serial = json_body['Serial'] self._name = json_body['Name'] self._version = json_body['Version'] self._credentials = decrypt_password(json_body['LocalCredentials']) self._auto_update = json_body['AutoUpdate'] self._new_version_available = json_body['NewVersionAvailable'] self._product_type = json_body['ProductType'] self._network_device = None self._connected = False self._mqtt = None self._callback_message = [] self._device_available = False self._current_state = None self._state_data_available = Queue() self._search_device_queue = Queue() self._connection_queue = Queue() def connection_callback(self, connected): """Set function called when device is connected.""" self._connection_queue.put_nowait(connected) @abc.abstractmethod def connect(self, device_ip, device_port=DEFAULT_PORT): """Connect to the device using ip address. :param device_ip: Device IP address :param device_port: Device Port (default: 1883) :return: True if connected, else False """ return @property @abc.abstractmethod def status_topic(self): """MQTT status topic.""" return @property def command_topic(self): """MQTT command topic.""" return "{0}/{1}/command".format(self._product_type, self._serial) def request_current_state(self): """Request new state message.""" if self._connected: payload = { "msg": "REQUEST-CURRENT-STATE", "time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } self._mqtt.publish(self.command_topic, json.dumps(payload)) else: _LOGGER.warning( "Unable to send commands because device %s is not connected", self.serial) @property def state(self): """Device state.""" return self._current_state @state.setter def state(self, value): """Set current state.""" self._current_state = value @property def active(self): """Active status.""" return self._active @property def serial(self): """Device serial.""" return self._serial @property def name(self): """Device name.""" return self._name @property def version(self): """Device version.""" return self._version @property def credentials(self): """Device encrypted credentials.""" return self._credentials @property def auto_update(self): """Auto update configuration.""" return self._auto_update @property def new_version_available(self): """Return if new version available.""" return self._new_version_available @property def product_type(self): """Product type.""" return self._product_type @property def network_device(self): """Network device.""" return self._network_device def _add_network_device(self, network_device): """Add network device. :param network_device: Network device """ self._search_device_queue.put_nowait(network_device) @property def callback_message(self): """Return callback functions when message are received.""" return self._callback_message def add_message_listener(self, callback_message): """Add message listener.""" self._callback_message.append(callback_message) def remove_message_listener(self, callback_message): """Remove a message listener.""" if callback_message in self._callback_message: self.callback_message.remove(callback_message) def clear_message_listener(self): """Clear all message listener.""" self.callback_message.clear() @property def device_available(self): """Return True if device is fully available, else false.""" return self._device_available def state_data_available(self): """Call when first state data are available. Internal method.""" _LOGGER.debug("State data available for device %s", self._serial) self._state_data_available.put_nowait(True) def _fields(self): """Return list of field tuples.""" fields = [("serial", self.serial), ("active", str(self.active)), ("name", self.name), ("version", self.version), ("auto_update", str(self.auto_update)), ("new_version_available", str(self.new_version_available)), ("product_type", self.product_type), ("network_device", str(self.network_device))] return fields