roborock.devices.traits.b01.q10
Traits for Q10 B01 devices.
1"""Traits for Q10 B01 devices.""" 2 3import asyncio 4import logging 5from typing import Any 6 7from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP 8from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses 9from roborock.devices.traits import Trait 10from roborock.devices.transport.mqtt_channel import MqttChannel 11 12from .command import CommandTrait 13from .status import StatusTrait 14from .vacuum import VacuumTrait 15 16__all__ = [ 17 "Q10PropertiesApi", 18] 19 20_LOGGER = logging.getLogger(__name__) 21 22 23class Q10PropertiesApi(Trait): 24 """API for interacting with B01 devices.""" 25 26 command: CommandTrait 27 """Trait for sending commands to Q10 devices.""" 28 29 status: StatusTrait 30 """Trait for managing the status of Q10 devices.""" 31 32 vacuum: VacuumTrait 33 """Trait for sending vacuum related commands to Q10 devices.""" 34 35 def __init__(self, channel: MqttChannel) -> None: 36 """Initialize the B01Props API.""" 37 self._channel = channel 38 self.command = CommandTrait(channel) 39 self.vacuum = VacuumTrait(self.command) 40 self.status = StatusTrait() 41 self._subscribe_task: asyncio.Task[None] | None = None 42 43 async def start(self) -> None: 44 """Start any necessary subscriptions for the trait.""" 45 self._subscribe_task = asyncio.create_task(self._subscribe_loop()) 46 47 async def close(self) -> None: 48 """Close any resources held by the trait.""" 49 if self._subscribe_task is not None: 50 self._subscribe_task.cancel() 51 try: 52 await self._subscribe_task 53 except asyncio.CancelledError: 54 pass # ignore cancellation errors 55 self._subscribe_task = None 56 57 async def refresh(self) -> None: 58 """Refresh all traits.""" 59 # Sending the REQUEST_DPS will cause the device to send all DPS values 60 # to the device. Updates will be received by the subscribe loop below. 61 await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) 62 63 async def _subscribe_loop(self) -> None: 64 """Persistent loop to listen for status updates.""" 65 async for decoded_dps in stream_decoded_responses(self._channel): 66 _LOGGER.debug("Received Q10 status update: %s", decoded_dps) 67 68 # Notify all traits about a new message and each trait will 69 # only update what fields that it is responsible for. 70 # More traits can be added here below. 71 self.status.update_from_dps(decoded_dps) 72 73 74def create(channel: MqttChannel) -> Q10PropertiesApi: 75 """Create traits for B01 devices.""" 76 return Q10PropertiesApi(channel)
24class Q10PropertiesApi(Trait): 25 """API for interacting with B01 devices.""" 26 27 command: CommandTrait 28 """Trait for sending commands to Q10 devices.""" 29 30 status: StatusTrait 31 """Trait for managing the status of Q10 devices.""" 32 33 vacuum: VacuumTrait 34 """Trait for sending vacuum related commands to Q10 devices.""" 35 36 def __init__(self, channel: MqttChannel) -> None: 37 """Initialize the B01Props API.""" 38 self._channel = channel 39 self.command = CommandTrait(channel) 40 self.vacuum = VacuumTrait(self.command) 41 self.status = StatusTrait() 42 self._subscribe_task: asyncio.Task[None] | None = None 43 44 async def start(self) -> None: 45 """Start any necessary subscriptions for the trait.""" 46 self._subscribe_task = asyncio.create_task(self._subscribe_loop()) 47 48 async def close(self) -> None: 49 """Close any resources held by the trait.""" 50 if self._subscribe_task is not None: 51 self._subscribe_task.cancel() 52 try: 53 await self._subscribe_task 54 except asyncio.CancelledError: 55 pass # ignore cancellation errors 56 self._subscribe_task = None 57 58 async def refresh(self) -> None: 59 """Refresh all traits.""" 60 # Sending the REQUEST_DPS will cause the device to send all DPS values 61 # to the device. Updates will be received by the subscribe loop below. 62 await self.command.send(B01_Q10_DP.REQUEST_DPS, params={}) 63 64 async def _subscribe_loop(self) -> None: 65 """Persistent loop to listen for status updates.""" 66 async for decoded_dps in stream_decoded_responses(self._channel): 67 _LOGGER.debug("Received Q10 status update: %s", decoded_dps) 68 69 # Notify all traits about a new message and each trait will 70 # only update what fields that it is responsible for. 71 # More traits can be added here below. 72 self.status.update_from_dps(decoded_dps)
API for interacting with B01 devices.
Q10PropertiesApi(channel: roborock.devices.transport.mqtt_channel.MqttChannel)
36 def __init__(self, channel: MqttChannel) -> None: 37 """Initialize the B01Props API.""" 38 self._channel = channel 39 self.command = CommandTrait(channel) 40 self.vacuum = VacuumTrait(self.command) 41 self.status = StatusTrait() 42 self._subscribe_task: asyncio.Task[None] | None = None
Initialize the B01Props API.
command: roborock.devices.traits.b01.q10.command.CommandTrait
Trait for sending commands to Q10 devices.
status: roborock.devices.traits.b01.q10.status.StatusTrait
Trait for managing the status of Q10 devices.
vacuum: roborock.devices.traits.b01.q10.vacuum.VacuumTrait
Trait for sending vacuum related commands to Q10 devices.
async def
start(self) -> None:
44 async def start(self) -> None: 45 """Start any necessary subscriptions for the trait.""" 46 self._subscribe_task = asyncio.create_task(self._subscribe_loop())
Start any necessary subscriptions for the trait.
async def
close(self) -> None:
48 async def close(self) -> None: 49 """Close any resources held by the trait.""" 50 if self._subscribe_task is not None: 51 self._subscribe_task.cancel() 52 try: 53 await self._subscribe_task 54 except asyncio.CancelledError: 55 pass # ignore cancellation errors 56 self._subscribe_task = None
Close any resources held by the trait.
async def
refresh(self) -> None:
58 async def refresh(self) -> None: 59 """Refresh all traits.""" 60 # Sending the REQUEST_DPS will cause the device to send all DPS values 61 # to the device. Updates will be received by the subscribe loop below. 62 await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})
Refresh all traits.