roborock.devices.traits.b01.q10

Traits for Q10 B01 devices.

 1"""Traits for Q10 B01 devices."""
 2
 3import asyncio
 4import logging
 5from typing import Any
 6
 7from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
 8from roborock.devices.rpc.b01_q10_channel import stream_decoded_responses
 9from roborock.devices.traits import Trait
10from roborock.devices.transport.mqtt_channel import MqttChannel
11
12from .command import CommandTrait
13from .status import StatusTrait
14from .vacuum import VacuumTrait
15
16__all__ = [
17    "Q10PropertiesApi",
18]
19
20_LOGGER = logging.getLogger(__name__)
21
22
23class Q10PropertiesApi(Trait):
24    """API for interacting with B01 devices."""
25
26    command: CommandTrait
27    """Trait for sending commands to Q10 devices."""
28
29    status: StatusTrait
30    """Trait for managing the status of Q10 devices."""
31
32    vacuum: VacuumTrait
33    """Trait for sending vacuum related commands to Q10 devices."""
34
35    def __init__(self, channel: MqttChannel) -> None:
36        """Initialize the B01Props API."""
37        self._channel = channel
38        self.command = CommandTrait(channel)
39        self.vacuum = VacuumTrait(self.command)
40        self.status = StatusTrait()
41        self._subscribe_task: asyncio.Task[None] | None = None
42
43    async def start(self) -> None:
44        """Start any necessary subscriptions for the trait."""
45        self._subscribe_task = asyncio.create_task(self._subscribe_loop())
46
47    async def close(self) -> None:
48        """Close any resources held by the trait."""
49        if self._subscribe_task is not None:
50            self._subscribe_task.cancel()
51            try:
52                await self._subscribe_task
53            except asyncio.CancelledError:
54                pass  # ignore cancellation errors
55            self._subscribe_task = None
56
57    async def refresh(self) -> None:
58        """Refresh all traits."""
59        # Sending the REQUEST_DPS will cause the device to send all DPS values
60        # to the device. Updates will be received by the subscribe loop below.
61        await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})
62
63    async def _subscribe_loop(self) -> None:
64        """Persistent loop to listen for status updates."""
65        async for decoded_dps in stream_decoded_responses(self._channel):
66            _LOGGER.debug("Received Q10 status update: %s", decoded_dps)
67
68            # Notify all traits about a new message and each trait will
69            # only update what fields that it is responsible for.
70            # More traits can be added here below.
71            self.status.update_from_dps(decoded_dps)
72
73
74def create(channel: MqttChannel) -> Q10PropertiesApi:
75    """Create traits for B01 devices."""
76    return Q10PropertiesApi(channel)
class Q10PropertiesApi(roborock.devices.traits.Trait):
24class Q10PropertiesApi(Trait):
25    """API for interacting with B01 devices."""
26
27    command: CommandTrait
28    """Trait for sending commands to Q10 devices."""
29
30    status: StatusTrait
31    """Trait for managing the status of Q10 devices."""
32
33    vacuum: VacuumTrait
34    """Trait for sending vacuum related commands to Q10 devices."""
35
36    def __init__(self, channel: MqttChannel) -> None:
37        """Initialize the B01Props API."""
38        self._channel = channel
39        self.command = CommandTrait(channel)
40        self.vacuum = VacuumTrait(self.command)
41        self.status = StatusTrait()
42        self._subscribe_task: asyncio.Task[None] | None = None
43
44    async def start(self) -> None:
45        """Start any necessary subscriptions for the trait."""
46        self._subscribe_task = asyncio.create_task(self._subscribe_loop())
47
48    async def close(self) -> None:
49        """Close any resources held by the trait."""
50        if self._subscribe_task is not None:
51            self._subscribe_task.cancel()
52            try:
53                await self._subscribe_task
54            except asyncio.CancelledError:
55                pass  # ignore cancellation errors
56            self._subscribe_task = None
57
58    async def refresh(self) -> None:
59        """Refresh all traits."""
60        # Sending the REQUEST_DPS will cause the device to send all DPS values
61        # to the device. Updates will be received by the subscribe loop below.
62        await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})
63
64    async def _subscribe_loop(self) -> None:
65        """Persistent loop to listen for status updates."""
66        async for decoded_dps in stream_decoded_responses(self._channel):
67            _LOGGER.debug("Received Q10 status update: %s", decoded_dps)
68
69            # Notify all traits about a new message and each trait will
70            # only update what fields that it is responsible for.
71            # More traits can be added here below.
72            self.status.update_from_dps(decoded_dps)

API for interacting with B01 devices.

Q10PropertiesApi(channel: roborock.devices.transport.mqtt_channel.MqttChannel)
36    def __init__(self, channel: MqttChannel) -> None:
37        """Initialize the B01Props API."""
38        self._channel = channel
39        self.command = CommandTrait(channel)
40        self.vacuum = VacuumTrait(self.command)
41        self.status = StatusTrait()
42        self._subscribe_task: asyncio.Task[None] | None = None

Initialize the B01Props API.

command: roborock.devices.traits.b01.q10.command.CommandTrait

Trait for sending commands to Q10 devices.

status: roborock.devices.traits.b01.q10.status.StatusTrait

Trait for managing the status of Q10 devices.

vacuum: roborock.devices.traits.b01.q10.vacuum.VacuumTrait

Trait for sending vacuum related commands to Q10 devices.

async def start(self) -> None:
44    async def start(self) -> None:
45        """Start any necessary subscriptions for the trait."""
46        self._subscribe_task = asyncio.create_task(self._subscribe_loop())

Start any necessary subscriptions for the trait.

async def close(self) -> None:
48    async def close(self) -> None:
49        """Close any resources held by the trait."""
50        if self._subscribe_task is not None:
51            self._subscribe_task.cancel()
52            try:
53                await self._subscribe_task
54            except asyncio.CancelledError:
55                pass  # ignore cancellation errors
56            self._subscribe_task = None

Close any resources held by the trait.

async def refresh(self) -> None:
58    async def refresh(self) -> None:
59        """Refresh all traits."""
60        # Sending the REQUEST_DPS will cause the device to send all DPS values
61        # to the device. Updates will be received by the subscribe loop below.
62        await self.command.send(B01_Q10_DP.REQUEST_DPS, params={})

Refresh all traits.