roborock.devices.device_manager
Module for discovering Roborock devices.
1"""Module for discovering Roborock devices.""" 2 3import asyncio 4import enum 5import logging 6from collections.abc import Callable, Mapping 7from dataclasses import dataclass 8from typing import Any 9 10import aiohttp 11 12from roborock.data import ( 13 HomeData, 14 HomeDataDevice, 15 HomeDataProduct, 16 RoborockCategory, 17 UserData, 18) 19from roborock.devices.device import DeviceReadyCallback, RoborockDevice 20from roborock.diagnostics import Diagnostics, redact_device_data 21from roborock.exceptions import RoborockException 22from roborock.map.map_parser import MapParserConfig 23from roborock.mqtt.roborock_session import create_lazy_mqtt_session 24from roborock.mqtt.session import MqttSession, SessionUnauthorizedHook 25from roborock.protocol import create_mqtt_params 26from roborock.web_api import RoborockApiClient, UserWebApiClient 27 28from .cache import Cache, DeviceCache, NoCache 29from .rpc.v1_channel import create_v1_channel 30from .traits import Trait, a01, b01, v1 31from .transport.channel import Channel 32from .transport.mqtt_channel import create_mqtt_channel 33 34_LOGGER = logging.getLogger(__name__) 35 36__all__ = [ 37 "create_device_manager", 38 "UserParams", 39 "DeviceManager", 40] 41 42 43DeviceCreator = Callable[[HomeData, HomeDataDevice, HomeDataProduct], RoborockDevice] 44 45 46class DeviceVersion(enum.StrEnum): 47 """Enum for device versions.""" 48 49 V1 = "1.0" 50 A01 = "A01" 51 B01 = "B01" 52 UNKNOWN = "unknown" 53 54 55class UnsupportedDeviceError(RoborockException): 56 """Exception raised when a device is unsupported.""" 57 58 59class DeviceManager: 60 """Central manager for Roborock device discovery and connections.""" 61 62 def __init__( 63 self, 64 web_api: UserWebApiClient, 65 device_creator: DeviceCreator, 66 mqtt_session: MqttSession, 67 cache: Cache, 68 diagnostics: Diagnostics, 69 ) -> None: 70 """Initialize the DeviceManager with user data and optional cache storage. 71 72 This takes ownership of the MQTT session and will close it when the manager is closed. 73 """ 74 self._web_api = web_api 75 self._cache = cache 76 self._device_creator = device_creator 77 self._devices: dict[str, RoborockDevice] = {} 78 self._mqtt_session = mqtt_session 79 self._diagnostics = diagnostics 80 self._home_data: HomeData | None = None 81 82 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 83 """Discover all devices for the logged-in user.""" 84 self._diagnostics.increment("discover_devices") 85 cache_data = await self._cache.get() 86 if not cache_data.home_data or not prefer_cache: 87 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 88 self._diagnostics.increment("fetch_home_data") 89 try: 90 cache_data.home_data = await self._web_api.get_home_data() 91 except RoborockException as ex: 92 if not cache_data.home_data: 93 raise 94 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 95 await self._cache.set(cache_data) 96 self._home_data = cache_data.home_data 97 98 device_products = self._home_data.device_products 99 _LOGGER.debug("Discovered %d devices", len(device_products)) 100 101 # These are connected serially to avoid overwhelming the MQTT broker 102 new_devices = {} 103 start_tasks = [] 104 supported_devices_counter = self._diagnostics.subkey("supported_devices") 105 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 106 for duid, (device, product) in device_products.items(): 107 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 108 if duid in self._devices: 109 continue 110 try: 111 new_device = self._device_creator(self._home_data, device, product) 112 except UnsupportedDeviceError: 113 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 114 unsupported_devices_counter.increment(device.pv or "unknown") 115 continue 116 supported_devices_counter.increment(device.pv or "unknown") 117 start_tasks.append(new_device.start_connect()) 118 new_devices[duid] = new_device 119 120 self._devices.update(new_devices) 121 await asyncio.gather(*start_tasks) 122 return list(self._devices.values()) 123 124 async def get_device(self, duid: str) -> RoborockDevice | None: 125 """Get a specific device by DUID.""" 126 return self._devices.get(duid) 127 128 async def get_devices(self) -> list[RoborockDevice]: 129 """Get all discovered devices.""" 130 return list(self._devices.values()) 131 132 async def close(self) -> None: 133 """Close all MQTT connections and clean up resources.""" 134 tasks = [device.close() for device in self._devices.values()] 135 self._devices.clear() 136 tasks.append(self._mqtt_session.close()) 137 await asyncio.gather(*tasks) 138 139 def diagnostic_data(self) -> Mapping[str, Any]: 140 """Return diagnostics information about the device manager.""" 141 return { 142 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 143 "devices": [device.diagnostic_data() for device in self._devices.values()], 144 "diagnostics": self._diagnostics.as_dict(), 145 } 146 147 148@dataclass 149class UserParams: 150 """Parameters for creating a new session with Roborock devices. 151 152 These parameters include the username, user data for authentication, 153 and an optional base URL for the Roborock API. The `user_data` and `base_url` 154 parameters are obtained from `RoborockApiClient` during the login process. 155 """ 156 157 username: str 158 """The username (email) used for logging in.""" 159 160 user_data: UserData 161 """This is the user data containing authentication information.""" 162 163 base_url: str | None = None 164 """Optional base URL for the Roborock API. 165 166 This is used to speed up connection times by avoiding the need to 167 discover the API base URL each time. If not provided, the API client 168 will attempt to discover it automatically which may take multiple requests. 169 """ 170 171 172def create_web_api_wrapper( 173 user_params: UserParams, 174 *, 175 cache: Cache | None = None, 176 session: aiohttp.ClientSession | None = None, 177 unauthorized_hook: SessionUnauthorizedHook | None = None, 178) -> UserWebApiClient: 179 """Create a home data API wrapper from an existing API client.""" 180 181 # Note: This will auto discover the API base URL. This can be improved 182 # by caching this next to `UserData` if needed to avoid unnecessary API calls. 183 client = RoborockApiClient(username=user_params.username, base_url=user_params.base_url, session=session) 184 185 return UserWebApiClient(client, user_params.user_data, unauthorized_hook=unauthorized_hook) 186 187 188async def create_device_manager( 189 user_params: UserParams, 190 *, 191 cache: Cache | None = None, 192 map_parser_config: MapParserConfig | None = None, 193 session: aiohttp.ClientSession | None = None, 194 ready_callback: DeviceReadyCallback | None = None, 195 mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None, 196 prefer_cache: bool = True, 197) -> DeviceManager: 198 """Convenience function to create and initialize a DeviceManager. 199 200 Args: 201 user_params: Parameters for creating the user session. 202 cache: Optional cache implementation to use for caching device data. 203 map_parser_config: Optional configuration for parsing maps. 204 session: Optional aiohttp ClientSession to use for HTTP requests. 205 ready_callback: Optional callback to be notified when a device is ready. 206 mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized 207 events which may indicate rate limiting or revoked credentials. The 208 caller may use this to refresh authentication tokens as needed. 209 prefer_cache: Whether to prefer cached device data over always fetching it from the API. 210 211 Returns: 212 An initialized DeviceManager with discovered devices. 213 """ 214 if cache is None: 215 cache = NoCache() 216 217 web_api = create_web_api_wrapper( 218 user_params, session=session, cache=cache, unauthorized_hook=mqtt_session_unauthorized_hook 219 ) 220 user_data = user_params.user_data 221 222 diagnostics = Diagnostics() 223 224 mqtt_params = create_mqtt_params(user_data.rriot) 225 mqtt_params.diagnostics = diagnostics.subkey("mqtt_session") 226 mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook 227 mqtt_session = await create_lazy_mqtt_session(mqtt_params) 228 229 def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice: 230 channel: Channel 231 trait: Trait 232 device_cache: DeviceCache = DeviceCache(device.duid, cache) 233 match device.pv: 234 case DeviceVersion.V1: 235 if product.category != RoborockCategory.VACUUM: 236 raise UnsupportedDeviceError( 237 f"Device {device.name} has unsupported V1 category {product.category}: {product.model}" 238 ) 239 channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache) 240 trait = v1.create( 241 device.duid, 242 product, 243 home_data, 244 channel.rpc_channel, 245 channel.mqtt_rpc_channel, 246 channel.map_rpc_channel, 247 channel.add_dps_listener, 248 web_api, 249 device_cache=device_cache, 250 map_parser_config=map_parser_config, 251 region=user_data.region, 252 ) 253 case DeviceVersion.A01: 254 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 255 trait = a01.create(product, channel) 256 case DeviceVersion.B01: 257 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 258 model_part = product.model.split(".")[-1] 259 if "ss" in model_part: 260 trait = b01.q10.create(channel) 261 elif "sc" in model_part: 262 # Q7 devices start with 'sc' in their model naming. 263 trait = b01.q7.create(product, device, channel) 264 else: 265 raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}") 266 case _: 267 raise UnsupportedDeviceError( 268 f"Device {device.name} has unsupported version {device.pv} {product.model}" 269 ) 270 271 dev = RoborockDevice(device, product, channel, trait) 272 if ready_callback: 273 dev.add_ready_callback(ready_callback) 274 return dev 275 276 manager = DeviceManager( 277 web_api, 278 device_creator, 279 mqtt_session=mqtt_session, 280 cache=cache, 281 diagnostics=diagnostics, 282 ) 283 await manager.discover_devices(prefer_cache) 284 return manager
189async def create_device_manager( 190 user_params: UserParams, 191 *, 192 cache: Cache | None = None, 193 map_parser_config: MapParserConfig | None = None, 194 session: aiohttp.ClientSession | None = None, 195 ready_callback: DeviceReadyCallback | None = None, 196 mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None, 197 prefer_cache: bool = True, 198) -> DeviceManager: 199 """Convenience function to create and initialize a DeviceManager. 200 201 Args: 202 user_params: Parameters for creating the user session. 203 cache: Optional cache implementation to use for caching device data. 204 map_parser_config: Optional configuration for parsing maps. 205 session: Optional aiohttp ClientSession to use for HTTP requests. 206 ready_callback: Optional callback to be notified when a device is ready. 207 mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized 208 events which may indicate rate limiting or revoked credentials. The 209 caller may use this to refresh authentication tokens as needed. 210 prefer_cache: Whether to prefer cached device data over always fetching it from the API. 211 212 Returns: 213 An initialized DeviceManager with discovered devices. 214 """ 215 if cache is None: 216 cache = NoCache() 217 218 web_api = create_web_api_wrapper( 219 user_params, session=session, cache=cache, unauthorized_hook=mqtt_session_unauthorized_hook 220 ) 221 user_data = user_params.user_data 222 223 diagnostics = Diagnostics() 224 225 mqtt_params = create_mqtt_params(user_data.rriot) 226 mqtt_params.diagnostics = diagnostics.subkey("mqtt_session") 227 mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook 228 mqtt_session = await create_lazy_mqtt_session(mqtt_params) 229 230 def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice: 231 channel: Channel 232 trait: Trait 233 device_cache: DeviceCache = DeviceCache(device.duid, cache) 234 match device.pv: 235 case DeviceVersion.V1: 236 if product.category != RoborockCategory.VACUUM: 237 raise UnsupportedDeviceError( 238 f"Device {device.name} has unsupported V1 category {product.category}: {product.model}" 239 ) 240 channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache) 241 trait = v1.create( 242 device.duid, 243 product, 244 home_data, 245 channel.rpc_channel, 246 channel.mqtt_rpc_channel, 247 channel.map_rpc_channel, 248 channel.add_dps_listener, 249 web_api, 250 device_cache=device_cache, 251 map_parser_config=map_parser_config, 252 region=user_data.region, 253 ) 254 case DeviceVersion.A01: 255 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 256 trait = a01.create(product, channel) 257 case DeviceVersion.B01: 258 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 259 model_part = product.model.split(".")[-1] 260 if "ss" in model_part: 261 trait = b01.q10.create(channel) 262 elif "sc" in model_part: 263 # Q7 devices start with 'sc' in their model naming. 264 trait = b01.q7.create(product, device, channel) 265 else: 266 raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}") 267 case _: 268 raise UnsupportedDeviceError( 269 f"Device {device.name} has unsupported version {device.pv} {product.model}" 270 ) 271 272 dev = RoborockDevice(device, product, channel, trait) 273 if ready_callback: 274 dev.add_ready_callback(ready_callback) 275 return dev 276 277 manager = DeviceManager( 278 web_api, 279 device_creator, 280 mqtt_session=mqtt_session, 281 cache=cache, 282 diagnostics=diagnostics, 283 ) 284 await manager.discover_devices(prefer_cache) 285 return manager
Convenience function to create and initialize a DeviceManager.
Args: user_params: Parameters for creating the user session. cache: Optional cache implementation to use for caching device data. map_parser_config: Optional configuration for parsing maps. session: Optional aiohttp ClientSession to use for HTTP requests. ready_callback: Optional callback to be notified when a device is ready. mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized events which may indicate rate limiting or revoked credentials. The caller may use this to refresh authentication tokens as needed. prefer_cache: Whether to prefer cached device data over always fetching it from the API.
Returns: An initialized DeviceManager with discovered devices.
149@dataclass 150class UserParams: 151 """Parameters for creating a new session with Roborock devices. 152 153 These parameters include the username, user data for authentication, 154 and an optional base URL for the Roborock API. The `user_data` and `base_url` 155 parameters are obtained from `RoborockApiClient` during the login process. 156 """ 157 158 username: str 159 """The username (email) used for logging in.""" 160 161 user_data: UserData 162 """This is the user data containing authentication information.""" 163 164 base_url: str | None = None 165 """Optional base URL for the Roborock API. 166 167 This is used to speed up connection times by avoiding the need to 168 discover the API base URL each time. If not provided, the API client 169 will attempt to discover it automatically which may take multiple requests. 170 """
Parameters for creating a new session with Roborock devices.
These parameters include the username, user data for authentication,
and an optional base URL for the Roborock API. The user_data and base_url
parameters are obtained from RoborockApiClient during the login process.
This is the user data containing authentication information.
60class DeviceManager: 61 """Central manager for Roborock device discovery and connections.""" 62 63 def __init__( 64 self, 65 web_api: UserWebApiClient, 66 device_creator: DeviceCreator, 67 mqtt_session: MqttSession, 68 cache: Cache, 69 diagnostics: Diagnostics, 70 ) -> None: 71 """Initialize the DeviceManager with user data and optional cache storage. 72 73 This takes ownership of the MQTT session and will close it when the manager is closed. 74 """ 75 self._web_api = web_api 76 self._cache = cache 77 self._device_creator = device_creator 78 self._devices: dict[str, RoborockDevice] = {} 79 self._mqtt_session = mqtt_session 80 self._diagnostics = diagnostics 81 self._home_data: HomeData | None = None 82 83 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 84 """Discover all devices for the logged-in user.""" 85 self._diagnostics.increment("discover_devices") 86 cache_data = await self._cache.get() 87 if not cache_data.home_data or not prefer_cache: 88 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 89 self._diagnostics.increment("fetch_home_data") 90 try: 91 cache_data.home_data = await self._web_api.get_home_data() 92 except RoborockException as ex: 93 if not cache_data.home_data: 94 raise 95 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 96 await self._cache.set(cache_data) 97 self._home_data = cache_data.home_data 98 99 device_products = self._home_data.device_products 100 _LOGGER.debug("Discovered %d devices", len(device_products)) 101 102 # These are connected serially to avoid overwhelming the MQTT broker 103 new_devices = {} 104 start_tasks = [] 105 supported_devices_counter = self._diagnostics.subkey("supported_devices") 106 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 107 for duid, (device, product) in device_products.items(): 108 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 109 if duid in self._devices: 110 continue 111 try: 112 new_device = self._device_creator(self._home_data, device, product) 113 except UnsupportedDeviceError: 114 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 115 unsupported_devices_counter.increment(device.pv or "unknown") 116 continue 117 supported_devices_counter.increment(device.pv or "unknown") 118 start_tasks.append(new_device.start_connect()) 119 new_devices[duid] = new_device 120 121 self._devices.update(new_devices) 122 await asyncio.gather(*start_tasks) 123 return list(self._devices.values()) 124 125 async def get_device(self, duid: str) -> RoborockDevice | None: 126 """Get a specific device by DUID.""" 127 return self._devices.get(duid) 128 129 async def get_devices(self) -> list[RoborockDevice]: 130 """Get all discovered devices.""" 131 return list(self._devices.values()) 132 133 async def close(self) -> None: 134 """Close all MQTT connections and clean up resources.""" 135 tasks = [device.close() for device in self._devices.values()] 136 self._devices.clear() 137 tasks.append(self._mqtt_session.close()) 138 await asyncio.gather(*tasks) 139 140 def diagnostic_data(self) -> Mapping[str, Any]: 141 """Return diagnostics information about the device manager.""" 142 return { 143 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 144 "devices": [device.diagnostic_data() for device in self._devices.values()], 145 "diagnostics": self._diagnostics.as_dict(), 146 }
Central manager for Roborock device discovery and connections.
63 def __init__( 64 self, 65 web_api: UserWebApiClient, 66 device_creator: DeviceCreator, 67 mqtt_session: MqttSession, 68 cache: Cache, 69 diagnostics: Diagnostics, 70 ) -> None: 71 """Initialize the DeviceManager with user data and optional cache storage. 72 73 This takes ownership of the MQTT session and will close it when the manager is closed. 74 """ 75 self._web_api = web_api 76 self._cache = cache 77 self._device_creator = device_creator 78 self._devices: dict[str, RoborockDevice] = {} 79 self._mqtt_session = mqtt_session 80 self._diagnostics = diagnostics 81 self._home_data: HomeData | None = None
Initialize the DeviceManager with user data and optional cache storage.
This takes ownership of the MQTT session and will close it when the manager is closed.
83 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 84 """Discover all devices for the logged-in user.""" 85 self._diagnostics.increment("discover_devices") 86 cache_data = await self._cache.get() 87 if not cache_data.home_data or not prefer_cache: 88 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 89 self._diagnostics.increment("fetch_home_data") 90 try: 91 cache_data.home_data = await self._web_api.get_home_data() 92 except RoborockException as ex: 93 if not cache_data.home_data: 94 raise 95 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 96 await self._cache.set(cache_data) 97 self._home_data = cache_data.home_data 98 99 device_products = self._home_data.device_products 100 _LOGGER.debug("Discovered %d devices", len(device_products)) 101 102 # These are connected serially to avoid overwhelming the MQTT broker 103 new_devices = {} 104 start_tasks = [] 105 supported_devices_counter = self._diagnostics.subkey("supported_devices") 106 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 107 for duid, (device, product) in device_products.items(): 108 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 109 if duid in self._devices: 110 continue 111 try: 112 new_device = self._device_creator(self._home_data, device, product) 113 except UnsupportedDeviceError: 114 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 115 unsupported_devices_counter.increment(device.pv or "unknown") 116 continue 117 supported_devices_counter.increment(device.pv or "unknown") 118 start_tasks.append(new_device.start_connect()) 119 new_devices[duid] = new_device 120 121 self._devices.update(new_devices) 122 await asyncio.gather(*start_tasks) 123 return list(self._devices.values())
Discover all devices for the logged-in user.
125 async def get_device(self, duid: str) -> RoborockDevice | None: 126 """Get a specific device by DUID.""" 127 return self._devices.get(duid)
Get a specific device by DUID.
129 async def get_devices(self) -> list[RoborockDevice]: 130 """Get all discovered devices.""" 131 return list(self._devices.values())
Get all discovered devices.
133 async def close(self) -> None: 134 """Close all MQTT connections and clean up resources.""" 135 tasks = [device.close() for device in self._devices.values()] 136 self._devices.clear() 137 tasks.append(self._mqtt_session.close()) 138 await asyncio.gather(*tasks)
Close all MQTT connections and clean up resources.
140 def diagnostic_data(self) -> Mapping[str, Any]: 141 """Return diagnostics information about the device manager.""" 142 return { 143 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 144 "devices": [device.diagnostic_data() for device in self._devices.values()], 145 "diagnostics": self._diagnostics.as_dict(), 146 }
Return diagnostics information about the device manager.