roborock.devices.device_manager

Module for discovering Roborock devices.

  1"""Module for discovering Roborock devices."""
  2
  3import asyncio
  4import enum
  5import logging
  6from collections.abc import Callable, Mapping
  7from dataclasses import dataclass
  8from typing import Any
  9
 10import aiohttp
 11
 12from roborock.data import (
 13    HomeData,
 14    HomeDataDevice,
 15    HomeDataProduct,
 16    UserData,
 17)
 18from roborock.devices.device import DeviceReadyCallback, RoborockDevice
 19from roborock.diagnostics import Diagnostics, redact_device_data
 20from roborock.exceptions import RoborockException
 21from roborock.map.map_parser import MapParserConfig
 22from roborock.mqtt.roborock_session import create_lazy_mqtt_session
 23from roborock.mqtt.session import MqttSession, SessionUnauthorizedHook
 24from roborock.protocol import create_mqtt_params
 25from roborock.web_api import RoborockApiClient, UserWebApiClient
 26
 27from .cache import Cache, DeviceCache, NoCache
 28from .rpc.v1_channel import create_v1_channel
 29from .traits import Trait, a01, b01, v1
 30from .transport.channel import Channel
 31from .transport.mqtt_channel import create_mqtt_channel
 32
 33_LOGGER = logging.getLogger(__name__)
 34
 35__all__ = [
 36    "create_device_manager",
 37    "UserParams",
 38    "DeviceManager",
 39]
 40
 41
 42DeviceCreator = Callable[[HomeData, HomeDataDevice, HomeDataProduct], RoborockDevice]
 43
 44
 45class DeviceVersion(enum.StrEnum):
 46    """Enum for device versions."""
 47
 48    V1 = "1.0"
 49    A01 = "A01"
 50    B01 = "B01"
 51    UNKNOWN = "unknown"
 52
 53
 54class UnsupportedDeviceError(RoborockException):
 55    """Exception raised when a device is unsupported."""
 56
 57
 58class DeviceManager:
 59    """Central manager for Roborock device discovery and connections."""
 60
 61    def __init__(
 62        self,
 63        web_api: UserWebApiClient,
 64        device_creator: DeviceCreator,
 65        mqtt_session: MqttSession,
 66        cache: Cache,
 67        diagnostics: Diagnostics,
 68    ) -> None:
 69        """Initialize the DeviceManager with user data and optional cache storage.
 70
 71        This takes ownership of the MQTT session and will close it when the manager is closed.
 72        """
 73        self._web_api = web_api
 74        self._cache = cache
 75        self._device_creator = device_creator
 76        self._devices: dict[str, RoborockDevice] = {}
 77        self._mqtt_session = mqtt_session
 78        self._diagnostics = diagnostics
 79        self._home_data: HomeData | None = None
 80
 81    async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]:
 82        """Discover all devices for the logged-in user."""
 83        self._diagnostics.increment("discover_devices")
 84        cache_data = await self._cache.get()
 85        if not cache_data.home_data or not prefer_cache:
 86            _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache)
 87            self._diagnostics.increment("fetch_home_data")
 88            try:
 89                cache_data.home_data = await self._web_api.get_home_data()
 90            except RoborockException as ex:
 91                if not cache_data.home_data:
 92                    raise
 93                _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex)
 94            await self._cache.set(cache_data)
 95        self._home_data = cache_data.home_data
 96
 97        device_products = self._home_data.device_products
 98        _LOGGER.debug("Discovered %d devices", len(device_products))
 99
100        # These are connected serially to avoid overwhelming the MQTT broker
101        new_devices = {}
102        start_tasks = []
103        supported_devices_counter = self._diagnostics.subkey("supported_devices")
104        unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices")
105        for duid, (device, product) in device_products.items():
106            _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info())
107            if duid in self._devices:
108                continue
109            try:
110                new_device = self._device_creator(self._home_data, device, product)
111            except UnsupportedDeviceError:
112                _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info())
113                unsupported_devices_counter.increment(device.pv or "unknown")
114                continue
115            supported_devices_counter.increment(device.pv or "unknown")
116            start_tasks.append(new_device.start_connect())
117            new_devices[duid] = new_device
118
119        self._devices.update(new_devices)
120        await asyncio.gather(*start_tasks)
121        return list(self._devices.values())
122
123    async def get_device(self, duid: str) -> RoborockDevice | None:
124        """Get a specific device by DUID."""
125        return self._devices.get(duid)
126
127    async def get_devices(self) -> list[RoborockDevice]:
128        """Get all discovered devices."""
129        return list(self._devices.values())
130
131    async def close(self) -> None:
132        """Close all MQTT connections and clean up resources."""
133        tasks = [device.close() for device in self._devices.values()]
134        self._devices.clear()
135        tasks.append(self._mqtt_session.close())
136        await asyncio.gather(*tasks)
137
138    def diagnostic_data(self) -> Mapping[str, Any]:
139        """Return diagnostics information about the device manager."""
140        return {
141            "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None,
142            "devices": [device.diagnostic_data() for device in self._devices.values()],
143            "diagnostics": self._diagnostics.as_dict(),
144        }
145
146
147@dataclass
148class UserParams:
149    """Parameters for creating a new session with Roborock devices.
150
151    These parameters include the username, user data for authentication,
152    and an optional base URL for the Roborock API. The `user_data` and `base_url`
153    parameters are obtained from `RoborockApiClient` during the login process.
154    """
155
156    username: str
157    """The username (email) used for logging in."""
158
159    user_data: UserData
160    """This is the user data containing authentication information."""
161
162    base_url: str | None = None
163    """Optional base URL for the Roborock API.
164
165    This is used to speed up connection times by avoiding the need to
166    discover the API base URL each time. If not provided, the API client
167    will attempt to discover it automatically which may take multiple requests.
168    """
169
170
171def create_web_api_wrapper(
172    user_params: UserParams,
173    *,
174    cache: Cache | None = None,
175    session: aiohttp.ClientSession | None = None,
176) -> UserWebApiClient:
177    """Create a home data API wrapper from an existing API client."""
178
179    # Note: This will auto discover the API base URL. This can be improved
180    # by caching this next to `UserData` if needed to avoid unnecessary API calls.
181    client = RoborockApiClient(username=user_params.username, base_url=user_params.base_url, session=session)
182
183    return UserWebApiClient(client, user_params.user_data)
184
185
186async def create_device_manager(
187    user_params: UserParams,
188    *,
189    cache: Cache | None = None,
190    map_parser_config: MapParserConfig | None = None,
191    session: aiohttp.ClientSession | None = None,
192    ready_callback: DeviceReadyCallback | None = None,
193    mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None,
194    prefer_cache: bool = True,
195) -> DeviceManager:
196    """Convenience function to create and initialize a DeviceManager.
197
198    Args:
199        user_params: Parameters for creating the user session.
200        cache: Optional cache implementation to use for caching device data.
201        map_parser_config: Optional configuration for parsing maps.
202        session: Optional aiohttp ClientSession to use for HTTP requests.
203        ready_callback: Optional callback to be notified when a device is ready.
204        mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized
205          events which may indicate rate limiting or revoked credentials. The
206          caller may use this to refresh authentication tokens as needed.
207        prefer_cache: Whether to prefer cached device data over always fetching it from the API.
208
209    Returns:
210        An initialized DeviceManager with discovered devices.
211    """
212    if cache is None:
213        cache = NoCache()
214
215    web_api = create_web_api_wrapper(user_params, session=session, cache=cache)
216    user_data = user_params.user_data
217
218    diagnostics = Diagnostics()
219
220    mqtt_params = create_mqtt_params(user_data.rriot)
221    mqtt_params.diagnostics = diagnostics.subkey("mqtt_session")
222    mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook
223    mqtt_session = await create_lazy_mqtt_session(mqtt_params)
224
225    def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice:
226        channel: Channel
227        trait: Trait
228        device_cache: DeviceCache = DeviceCache(device.duid, cache)
229        match device.pv:
230            case DeviceVersion.V1:
231                channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache)
232                trait = v1.create(
233                    device.duid,
234                    product,
235                    home_data,
236                    channel.rpc_channel,
237                    channel.mqtt_rpc_channel,
238                    channel.map_rpc_channel,
239                    web_api,
240                    device_cache=device_cache,
241                    map_parser_config=map_parser_config,
242                )
243            case DeviceVersion.A01:
244                channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device)
245                trait = a01.create(product, channel)
246            case DeviceVersion.B01:
247                channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device)
248                model_part = product.model.split(".")[-1]
249                if "ss" in model_part:
250                    trait = b01.q10.create(channel)
251                elif "sc" in model_part:
252                    # Q7 devices start with 'sc' in their model naming.
253                    trait = b01.q7.create(channel)
254                else:
255                    raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}")
256            case _:
257                raise UnsupportedDeviceError(
258                    f"Device {device.name} has unsupported version {device.pv} {product.model}"
259                )
260
261        dev = RoborockDevice(device, product, channel, trait)
262        if ready_callback:
263            dev.add_ready_callback(ready_callback)
264        return dev
265
266    manager = DeviceManager(web_api, device_creator, mqtt_session=mqtt_session, cache=cache, diagnostics=diagnostics)
267    await manager.discover_devices(prefer_cache)
268    return manager
async def create_device_manager( user_params: UserParams, *, cache: roborock.devices.cache.Cache | None = None, map_parser_config: roborock.map.MapParserConfig | None = None, session: aiohttp.client.ClientSession | None = None, ready_callback: Callable[roborock.devices.device.RoborockDevice, None] | None = None, mqtt_session_unauthorized_hook: Callable[[], None] | None = None, prefer_cache: bool = True) -> DeviceManager:
187async def create_device_manager(
188    user_params: UserParams,
189    *,
190    cache: Cache | None = None,
191    map_parser_config: MapParserConfig | None = None,
192    session: aiohttp.ClientSession | None = None,
193    ready_callback: DeviceReadyCallback | None = None,
194    mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None,
195    prefer_cache: bool = True,
196) -> DeviceManager:
197    """Convenience function to create and initialize a DeviceManager.
198
199    Args:
200        user_params: Parameters for creating the user session.
201        cache: Optional cache implementation to use for caching device data.
202        map_parser_config: Optional configuration for parsing maps.
203        session: Optional aiohttp ClientSession to use for HTTP requests.
204        ready_callback: Optional callback to be notified when a device is ready.
205        mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized
206          events which may indicate rate limiting or revoked credentials. The
207          caller may use this to refresh authentication tokens as needed.
208        prefer_cache: Whether to prefer cached device data over always fetching it from the API.
209
210    Returns:
211        An initialized DeviceManager with discovered devices.
212    """
213    if cache is None:
214        cache = NoCache()
215
216    web_api = create_web_api_wrapper(user_params, session=session, cache=cache)
217    user_data = user_params.user_data
218
219    diagnostics = Diagnostics()
220
221    mqtt_params = create_mqtt_params(user_data.rriot)
222    mqtt_params.diagnostics = diagnostics.subkey("mqtt_session")
223    mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook
224    mqtt_session = await create_lazy_mqtt_session(mqtt_params)
225
226    def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice:
227        channel: Channel
228        trait: Trait
229        device_cache: DeviceCache = DeviceCache(device.duid, cache)
230        match device.pv:
231            case DeviceVersion.V1:
232                channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache)
233                trait = v1.create(
234                    device.duid,
235                    product,
236                    home_data,
237                    channel.rpc_channel,
238                    channel.mqtt_rpc_channel,
239                    channel.map_rpc_channel,
240                    web_api,
241                    device_cache=device_cache,
242                    map_parser_config=map_parser_config,
243                )
244            case DeviceVersion.A01:
245                channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device)
246                trait = a01.create(product, channel)
247            case DeviceVersion.B01:
248                channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device)
249                model_part = product.model.split(".")[-1]
250                if "ss" in model_part:
251                    trait = b01.q10.create(channel)
252                elif "sc" in model_part:
253                    # Q7 devices start with 'sc' in their model naming.
254                    trait = b01.q7.create(channel)
255                else:
256                    raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}")
257            case _:
258                raise UnsupportedDeviceError(
259                    f"Device {device.name} has unsupported version {device.pv} {product.model}"
260                )
261
262        dev = RoborockDevice(device, product, channel, trait)
263        if ready_callback:
264            dev.add_ready_callback(ready_callback)
265        return dev
266
267    manager = DeviceManager(web_api, device_creator, mqtt_session=mqtt_session, cache=cache, diagnostics=diagnostics)
268    await manager.discover_devices(prefer_cache)
269    return manager

Convenience function to create and initialize a DeviceManager.

Args: user_params: Parameters for creating the user session. cache: Optional cache implementation to use for caching device data. map_parser_config: Optional configuration for parsing maps. session: Optional aiohttp ClientSession to use for HTTP requests. ready_callback: Optional callback to be notified when a device is ready. mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized events which may indicate rate limiting or revoked credentials. The caller may use this to refresh authentication tokens as needed. prefer_cache: Whether to prefer cached device data over always fetching it from the API.

Returns: An initialized DeviceManager with discovered devices.

@dataclass
class UserParams:
148@dataclass
149class UserParams:
150    """Parameters for creating a new session with Roborock devices.
151
152    These parameters include the username, user data for authentication,
153    and an optional base URL for the Roborock API. The `user_data` and `base_url`
154    parameters are obtained from `RoborockApiClient` during the login process.
155    """
156
157    username: str
158    """The username (email) used for logging in."""
159
160    user_data: UserData
161    """This is the user data containing authentication information."""
162
163    base_url: str | None = None
164    """Optional base URL for the Roborock API.
165
166    This is used to speed up connection times by avoiding the need to
167    discover the API base URL each time. If not provided, the API client
168    will attempt to discover it automatically which may take multiple requests.
169    """

Parameters for creating a new session with Roborock devices.

These parameters include the username, user data for authentication, and an optional base URL for the Roborock API. The user_data and base_url parameters are obtained from RoborockApiClient during the login process.

UserParams( username: str, user_data: roborock.data.containers.UserData, base_url: str | None = None)
username: str

The username (email) used for logging in.

This is the user data containing authentication information.

base_url: str | None = None

Optional base URL for the Roborock API.

This is used to speed up connection times by avoiding the need to discover the API base URL each time. If not provided, the API client will attempt to discover it automatically which may take multiple requests.

class DeviceManager:
 59class DeviceManager:
 60    """Central manager for Roborock device discovery and connections."""
 61
 62    def __init__(
 63        self,
 64        web_api: UserWebApiClient,
 65        device_creator: DeviceCreator,
 66        mqtt_session: MqttSession,
 67        cache: Cache,
 68        diagnostics: Diagnostics,
 69    ) -> None:
 70        """Initialize the DeviceManager with user data and optional cache storage.
 71
 72        This takes ownership of the MQTT session and will close it when the manager is closed.
 73        """
 74        self._web_api = web_api
 75        self._cache = cache
 76        self._device_creator = device_creator
 77        self._devices: dict[str, RoborockDevice] = {}
 78        self._mqtt_session = mqtt_session
 79        self._diagnostics = diagnostics
 80        self._home_data: HomeData | None = None
 81
 82    async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]:
 83        """Discover all devices for the logged-in user."""
 84        self._diagnostics.increment("discover_devices")
 85        cache_data = await self._cache.get()
 86        if not cache_data.home_data or not prefer_cache:
 87            _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache)
 88            self._diagnostics.increment("fetch_home_data")
 89            try:
 90                cache_data.home_data = await self._web_api.get_home_data()
 91            except RoborockException as ex:
 92                if not cache_data.home_data:
 93                    raise
 94                _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex)
 95            await self._cache.set(cache_data)
 96        self._home_data = cache_data.home_data
 97
 98        device_products = self._home_data.device_products
 99        _LOGGER.debug("Discovered %d devices", len(device_products))
100
101        # These are connected serially to avoid overwhelming the MQTT broker
102        new_devices = {}
103        start_tasks = []
104        supported_devices_counter = self._diagnostics.subkey("supported_devices")
105        unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices")
106        for duid, (device, product) in device_products.items():
107            _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info())
108            if duid in self._devices:
109                continue
110            try:
111                new_device = self._device_creator(self._home_data, device, product)
112            except UnsupportedDeviceError:
113                _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info())
114                unsupported_devices_counter.increment(device.pv or "unknown")
115                continue
116            supported_devices_counter.increment(device.pv or "unknown")
117            start_tasks.append(new_device.start_connect())
118            new_devices[duid] = new_device
119
120        self._devices.update(new_devices)
121        await asyncio.gather(*start_tasks)
122        return list(self._devices.values())
123
124    async def get_device(self, duid: str) -> RoborockDevice | None:
125        """Get a specific device by DUID."""
126        return self._devices.get(duid)
127
128    async def get_devices(self) -> list[RoborockDevice]:
129        """Get all discovered devices."""
130        return list(self._devices.values())
131
132    async def close(self) -> None:
133        """Close all MQTT connections and clean up resources."""
134        tasks = [device.close() for device in self._devices.values()]
135        self._devices.clear()
136        tasks.append(self._mqtt_session.close())
137        await asyncio.gather(*tasks)
138
139    def diagnostic_data(self) -> Mapping[str, Any]:
140        """Return diagnostics information about the device manager."""
141        return {
142            "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None,
143            "devices": [device.diagnostic_data() for device in self._devices.values()],
144            "diagnostics": self._diagnostics.as_dict(),
145        }

Central manager for Roborock device discovery and connections.

DeviceManager( web_api: roborock.web_api.UserWebApiClient, device_creator: Callable[[roborock.data.containers.HomeData, roborock.data.containers.HomeDataDevice, roborock.data.containers.HomeDataProduct], roborock.devices.device.RoborockDevice], mqtt_session: roborock.mqtt.session.MqttSession, cache: roborock.devices.cache.Cache, diagnostics: roborock.diagnostics.Diagnostics)
62    def __init__(
63        self,
64        web_api: UserWebApiClient,
65        device_creator: DeviceCreator,
66        mqtt_session: MqttSession,
67        cache: Cache,
68        diagnostics: Diagnostics,
69    ) -> None:
70        """Initialize the DeviceManager with user data and optional cache storage.
71
72        This takes ownership of the MQTT session and will close it when the manager is closed.
73        """
74        self._web_api = web_api
75        self._cache = cache
76        self._device_creator = device_creator
77        self._devices: dict[str, RoborockDevice] = {}
78        self._mqtt_session = mqtt_session
79        self._diagnostics = diagnostics
80        self._home_data: HomeData | None = None

Initialize the DeviceManager with user data and optional cache storage.

This takes ownership of the MQTT session and will close it when the manager is closed.

async def discover_devices( self, prefer_cache: bool = True) -> list[roborock.devices.device.RoborockDevice]:
 82    async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]:
 83        """Discover all devices for the logged-in user."""
 84        self._diagnostics.increment("discover_devices")
 85        cache_data = await self._cache.get()
 86        if not cache_data.home_data or not prefer_cache:
 87            _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache)
 88            self._diagnostics.increment("fetch_home_data")
 89            try:
 90                cache_data.home_data = await self._web_api.get_home_data()
 91            except RoborockException as ex:
 92                if not cache_data.home_data:
 93                    raise
 94                _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex)
 95            await self._cache.set(cache_data)
 96        self._home_data = cache_data.home_data
 97
 98        device_products = self._home_data.device_products
 99        _LOGGER.debug("Discovered %d devices", len(device_products))
100
101        # These are connected serially to avoid overwhelming the MQTT broker
102        new_devices = {}
103        start_tasks = []
104        supported_devices_counter = self._diagnostics.subkey("supported_devices")
105        unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices")
106        for duid, (device, product) in device_products.items():
107            _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info())
108            if duid in self._devices:
109                continue
110            try:
111                new_device = self._device_creator(self._home_data, device, product)
112            except UnsupportedDeviceError:
113                _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info())
114                unsupported_devices_counter.increment(device.pv or "unknown")
115                continue
116            supported_devices_counter.increment(device.pv or "unknown")
117            start_tasks.append(new_device.start_connect())
118            new_devices[duid] = new_device
119
120        self._devices.update(new_devices)
121        await asyncio.gather(*start_tasks)
122        return list(self._devices.values())

Discover all devices for the logged-in user.

async def get_device(self, duid: str) -> roborock.devices.device.RoborockDevice | None:
124    async def get_device(self, duid: str) -> RoborockDevice | None:
125        """Get a specific device by DUID."""
126        return self._devices.get(duid)

Get a specific device by DUID.

async def get_devices(self) -> list[roborock.devices.device.RoborockDevice]:
128    async def get_devices(self) -> list[RoborockDevice]:
129        """Get all discovered devices."""
130        return list(self._devices.values())

Get all discovered devices.

async def close(self) -> None:
132    async def close(self) -> None:
133        """Close all MQTT connections and clean up resources."""
134        tasks = [device.close() for device in self._devices.values()]
135        self._devices.clear()
136        tasks.append(self._mqtt_session.close())
137        await asyncio.gather(*tasks)

Close all MQTT connections and clean up resources.

def diagnostic_data(self) -> Mapping[str, typing.Any]:
139    def diagnostic_data(self) -> Mapping[str, Any]:
140        """Return diagnostics information about the device manager."""
141        return {
142            "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None,
143            "devices": [device.diagnostic_data() for device in self._devices.values()],
144            "diagnostics": self._diagnostics.as_dict(),
145        }

Return diagnostics information about the device manager.