roborock.devices.device_manager
Module for discovering Roborock devices.
1"""Module for discovering Roborock devices.""" 2 3import asyncio 4import enum 5import logging 6from collections.abc import Callable, Mapping 7from dataclasses import dataclass 8from typing import Any 9 10import aiohttp 11 12from roborock.data import ( 13 HomeData, 14 HomeDataDevice, 15 HomeDataProduct, 16 UserData, 17) 18from roborock.devices.device import DeviceReadyCallback, RoborockDevice 19from roborock.diagnostics import Diagnostics, redact_device_data 20from roborock.exceptions import RoborockException 21from roborock.map.map_parser import MapParserConfig 22from roborock.mqtt.roborock_session import create_lazy_mqtt_session 23from roborock.mqtt.session import MqttSession, SessionUnauthorizedHook 24from roborock.protocol import create_mqtt_params 25from roborock.web_api import RoborockApiClient, UserWebApiClient 26 27from .cache import Cache, DeviceCache, NoCache 28from .rpc.v1_channel import create_v1_channel 29from .traits import Trait, a01, b01, v1 30from .transport.channel import Channel 31from .transport.mqtt_channel import create_mqtt_channel 32 33_LOGGER = logging.getLogger(__name__) 34 35__all__ = [ 36 "create_device_manager", 37 "UserParams", 38 "DeviceManager", 39] 40 41 42DeviceCreator = Callable[[HomeData, HomeDataDevice, HomeDataProduct], RoborockDevice] 43 44 45class DeviceVersion(enum.StrEnum): 46 """Enum for device versions.""" 47 48 V1 = "1.0" 49 A01 = "A01" 50 B01 = "B01" 51 UNKNOWN = "unknown" 52 53 54class UnsupportedDeviceError(RoborockException): 55 """Exception raised when a device is unsupported.""" 56 57 58class DeviceManager: 59 """Central manager for Roborock device discovery and connections.""" 60 61 def __init__( 62 self, 63 web_api: UserWebApiClient, 64 device_creator: DeviceCreator, 65 mqtt_session: MqttSession, 66 cache: Cache, 67 diagnostics: Diagnostics, 68 ) -> None: 69 """Initialize the DeviceManager with user data and optional cache storage. 70 71 This takes ownership of the MQTT session and will close it when the manager is closed. 72 """ 73 self._web_api = web_api 74 self._cache = cache 75 self._device_creator = device_creator 76 self._devices: dict[str, RoborockDevice] = {} 77 self._mqtt_session = mqtt_session 78 self._diagnostics = diagnostics 79 self._home_data: HomeData | None = None 80 81 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 82 """Discover all devices for the logged-in user.""" 83 self._diagnostics.increment("discover_devices") 84 cache_data = await self._cache.get() 85 if not cache_data.home_data or not prefer_cache: 86 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 87 self._diagnostics.increment("fetch_home_data") 88 try: 89 cache_data.home_data = await self._web_api.get_home_data() 90 except RoborockException as ex: 91 if not cache_data.home_data: 92 raise 93 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 94 await self._cache.set(cache_data) 95 self._home_data = cache_data.home_data 96 97 device_products = self._home_data.device_products 98 _LOGGER.debug("Discovered %d devices", len(device_products)) 99 100 # These are connected serially to avoid overwhelming the MQTT broker 101 new_devices = {} 102 start_tasks = [] 103 supported_devices_counter = self._diagnostics.subkey("supported_devices") 104 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 105 for duid, (device, product) in device_products.items(): 106 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 107 if duid in self._devices: 108 continue 109 try: 110 new_device = self._device_creator(self._home_data, device, product) 111 except UnsupportedDeviceError: 112 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 113 unsupported_devices_counter.increment(device.pv or "unknown") 114 continue 115 supported_devices_counter.increment(device.pv or "unknown") 116 start_tasks.append(new_device.start_connect()) 117 new_devices[duid] = new_device 118 119 self._devices.update(new_devices) 120 await asyncio.gather(*start_tasks) 121 return list(self._devices.values()) 122 123 async def get_device(self, duid: str) -> RoborockDevice | None: 124 """Get a specific device by DUID.""" 125 return self._devices.get(duid) 126 127 async def get_devices(self) -> list[RoborockDevice]: 128 """Get all discovered devices.""" 129 return list(self._devices.values()) 130 131 async def close(self) -> None: 132 """Close all MQTT connections and clean up resources.""" 133 tasks = [device.close() for device in self._devices.values()] 134 self._devices.clear() 135 tasks.append(self._mqtt_session.close()) 136 await asyncio.gather(*tasks) 137 138 def diagnostic_data(self) -> Mapping[str, Any]: 139 """Return diagnostics information about the device manager.""" 140 return { 141 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 142 "devices": [device.diagnostic_data() for device in self._devices.values()], 143 "diagnostics": self._diagnostics.as_dict(), 144 } 145 146 147@dataclass 148class UserParams: 149 """Parameters for creating a new session with Roborock devices. 150 151 These parameters include the username, user data for authentication, 152 and an optional base URL for the Roborock API. The `user_data` and `base_url` 153 parameters are obtained from `RoborockApiClient` during the login process. 154 """ 155 156 username: str 157 """The username (email) used for logging in.""" 158 159 user_data: UserData 160 """This is the user data containing authentication information.""" 161 162 base_url: str | None = None 163 """Optional base URL for the Roborock API. 164 165 This is used to speed up connection times by avoiding the need to 166 discover the API base URL each time. If not provided, the API client 167 will attempt to discover it automatically which may take multiple requests. 168 """ 169 170 171def create_web_api_wrapper( 172 user_params: UserParams, 173 *, 174 cache: Cache | None = None, 175 session: aiohttp.ClientSession | None = None, 176) -> UserWebApiClient: 177 """Create a home data API wrapper from an existing API client.""" 178 179 # Note: This will auto discover the API base URL. This can be improved 180 # by caching this next to `UserData` if needed to avoid unnecessary API calls. 181 client = RoborockApiClient(username=user_params.username, base_url=user_params.base_url, session=session) 182 183 return UserWebApiClient(client, user_params.user_data) 184 185 186async def create_device_manager( 187 user_params: UserParams, 188 *, 189 cache: Cache | None = None, 190 map_parser_config: MapParserConfig | None = None, 191 session: aiohttp.ClientSession | None = None, 192 ready_callback: DeviceReadyCallback | None = None, 193 mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None, 194 prefer_cache: bool = True, 195) -> DeviceManager: 196 """Convenience function to create and initialize a DeviceManager. 197 198 Args: 199 user_params: Parameters for creating the user session. 200 cache: Optional cache implementation to use for caching device data. 201 map_parser_config: Optional configuration for parsing maps. 202 session: Optional aiohttp ClientSession to use for HTTP requests. 203 ready_callback: Optional callback to be notified when a device is ready. 204 mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized 205 events which may indicate rate limiting or revoked credentials. The 206 caller may use this to refresh authentication tokens as needed. 207 prefer_cache: Whether to prefer cached device data over always fetching it from the API. 208 209 Returns: 210 An initialized DeviceManager with discovered devices. 211 """ 212 if cache is None: 213 cache = NoCache() 214 215 web_api = create_web_api_wrapper(user_params, session=session, cache=cache) 216 user_data = user_params.user_data 217 218 diagnostics = Diagnostics() 219 220 mqtt_params = create_mqtt_params(user_data.rriot) 221 mqtt_params.diagnostics = diagnostics.subkey("mqtt_session") 222 mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook 223 mqtt_session = await create_lazy_mqtt_session(mqtt_params) 224 225 def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice: 226 channel: Channel 227 trait: Trait 228 device_cache: DeviceCache = DeviceCache(device.duid, cache) 229 match device.pv: 230 case DeviceVersion.V1: 231 channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache) 232 trait = v1.create( 233 device.duid, 234 product, 235 home_data, 236 channel.rpc_channel, 237 channel.mqtt_rpc_channel, 238 channel.map_rpc_channel, 239 web_api, 240 device_cache=device_cache, 241 map_parser_config=map_parser_config, 242 ) 243 case DeviceVersion.A01: 244 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 245 trait = a01.create(product, channel) 246 case DeviceVersion.B01: 247 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 248 model_part = product.model.split(".")[-1] 249 if "ss" in model_part: 250 trait = b01.q10.create(channel) 251 elif "sc" in model_part: 252 # Q7 devices start with 'sc' in their model naming. 253 trait = b01.q7.create(channel) 254 else: 255 raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}") 256 case _: 257 raise UnsupportedDeviceError( 258 f"Device {device.name} has unsupported version {device.pv} {product.model}" 259 ) 260 261 dev = RoborockDevice(device, product, channel, trait) 262 if ready_callback: 263 dev.add_ready_callback(ready_callback) 264 return dev 265 266 manager = DeviceManager(web_api, device_creator, mqtt_session=mqtt_session, cache=cache, diagnostics=diagnostics) 267 await manager.discover_devices(prefer_cache) 268 return manager
187async def create_device_manager( 188 user_params: UserParams, 189 *, 190 cache: Cache | None = None, 191 map_parser_config: MapParserConfig | None = None, 192 session: aiohttp.ClientSession | None = None, 193 ready_callback: DeviceReadyCallback | None = None, 194 mqtt_session_unauthorized_hook: SessionUnauthorizedHook | None = None, 195 prefer_cache: bool = True, 196) -> DeviceManager: 197 """Convenience function to create and initialize a DeviceManager. 198 199 Args: 200 user_params: Parameters for creating the user session. 201 cache: Optional cache implementation to use for caching device data. 202 map_parser_config: Optional configuration for parsing maps. 203 session: Optional aiohttp ClientSession to use for HTTP requests. 204 ready_callback: Optional callback to be notified when a device is ready. 205 mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized 206 events which may indicate rate limiting or revoked credentials. The 207 caller may use this to refresh authentication tokens as needed. 208 prefer_cache: Whether to prefer cached device data over always fetching it from the API. 209 210 Returns: 211 An initialized DeviceManager with discovered devices. 212 """ 213 if cache is None: 214 cache = NoCache() 215 216 web_api = create_web_api_wrapper(user_params, session=session, cache=cache) 217 user_data = user_params.user_data 218 219 diagnostics = Diagnostics() 220 221 mqtt_params = create_mqtt_params(user_data.rriot) 222 mqtt_params.diagnostics = diagnostics.subkey("mqtt_session") 223 mqtt_params.unauthorized_hook = mqtt_session_unauthorized_hook 224 mqtt_session = await create_lazy_mqtt_session(mqtt_params) 225 226 def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDataProduct) -> RoborockDevice: 227 channel: Channel 228 trait: Trait 229 device_cache: DeviceCache = DeviceCache(device.duid, cache) 230 match device.pv: 231 case DeviceVersion.V1: 232 channel = create_v1_channel(user_data, mqtt_params, mqtt_session, device, device_cache) 233 trait = v1.create( 234 device.duid, 235 product, 236 home_data, 237 channel.rpc_channel, 238 channel.mqtt_rpc_channel, 239 channel.map_rpc_channel, 240 web_api, 241 device_cache=device_cache, 242 map_parser_config=map_parser_config, 243 ) 244 case DeviceVersion.A01: 245 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 246 trait = a01.create(product, channel) 247 case DeviceVersion.B01: 248 channel = create_mqtt_channel(user_data, mqtt_params, mqtt_session, device) 249 model_part = product.model.split(".")[-1] 250 if "ss" in model_part: 251 trait = b01.q10.create(channel) 252 elif "sc" in model_part: 253 # Q7 devices start with 'sc' in their model naming. 254 trait = b01.q7.create(channel) 255 else: 256 raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}") 257 case _: 258 raise UnsupportedDeviceError( 259 f"Device {device.name} has unsupported version {device.pv} {product.model}" 260 ) 261 262 dev = RoborockDevice(device, product, channel, trait) 263 if ready_callback: 264 dev.add_ready_callback(ready_callback) 265 return dev 266 267 manager = DeviceManager(web_api, device_creator, mqtt_session=mqtt_session, cache=cache, diagnostics=diagnostics) 268 await manager.discover_devices(prefer_cache) 269 return manager
Convenience function to create and initialize a DeviceManager.
Args: user_params: Parameters for creating the user session. cache: Optional cache implementation to use for caching device data. map_parser_config: Optional configuration for parsing maps. session: Optional aiohttp ClientSession to use for HTTP requests. ready_callback: Optional callback to be notified when a device is ready. mqtt_session_unauthorized_hook: Optional hook for MQTT session unauthorized events which may indicate rate limiting or revoked credentials. The caller may use this to refresh authentication tokens as needed. prefer_cache: Whether to prefer cached device data over always fetching it from the API.
Returns: An initialized DeviceManager with discovered devices.
148@dataclass 149class UserParams: 150 """Parameters for creating a new session with Roborock devices. 151 152 These parameters include the username, user data for authentication, 153 and an optional base URL for the Roborock API. The `user_data` and `base_url` 154 parameters are obtained from `RoborockApiClient` during the login process. 155 """ 156 157 username: str 158 """The username (email) used for logging in.""" 159 160 user_data: UserData 161 """This is the user data containing authentication information.""" 162 163 base_url: str | None = None 164 """Optional base URL for the Roborock API. 165 166 This is used to speed up connection times by avoiding the need to 167 discover the API base URL each time. If not provided, the API client 168 will attempt to discover it automatically which may take multiple requests. 169 """
Parameters for creating a new session with Roborock devices.
These parameters include the username, user data for authentication,
and an optional base URL for the Roborock API. The user_data and base_url
parameters are obtained from RoborockApiClient during the login process.
This is the user data containing authentication information.
59class DeviceManager: 60 """Central manager for Roborock device discovery and connections.""" 61 62 def __init__( 63 self, 64 web_api: UserWebApiClient, 65 device_creator: DeviceCreator, 66 mqtt_session: MqttSession, 67 cache: Cache, 68 diagnostics: Diagnostics, 69 ) -> None: 70 """Initialize the DeviceManager with user data and optional cache storage. 71 72 This takes ownership of the MQTT session and will close it when the manager is closed. 73 """ 74 self._web_api = web_api 75 self._cache = cache 76 self._device_creator = device_creator 77 self._devices: dict[str, RoborockDevice] = {} 78 self._mqtt_session = mqtt_session 79 self._diagnostics = diagnostics 80 self._home_data: HomeData | None = None 81 82 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 83 """Discover all devices for the logged-in user.""" 84 self._diagnostics.increment("discover_devices") 85 cache_data = await self._cache.get() 86 if not cache_data.home_data or not prefer_cache: 87 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 88 self._diagnostics.increment("fetch_home_data") 89 try: 90 cache_data.home_data = await self._web_api.get_home_data() 91 except RoborockException as ex: 92 if not cache_data.home_data: 93 raise 94 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 95 await self._cache.set(cache_data) 96 self._home_data = cache_data.home_data 97 98 device_products = self._home_data.device_products 99 _LOGGER.debug("Discovered %d devices", len(device_products)) 100 101 # These are connected serially to avoid overwhelming the MQTT broker 102 new_devices = {} 103 start_tasks = [] 104 supported_devices_counter = self._diagnostics.subkey("supported_devices") 105 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 106 for duid, (device, product) in device_products.items(): 107 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 108 if duid in self._devices: 109 continue 110 try: 111 new_device = self._device_creator(self._home_data, device, product) 112 except UnsupportedDeviceError: 113 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 114 unsupported_devices_counter.increment(device.pv or "unknown") 115 continue 116 supported_devices_counter.increment(device.pv or "unknown") 117 start_tasks.append(new_device.start_connect()) 118 new_devices[duid] = new_device 119 120 self._devices.update(new_devices) 121 await asyncio.gather(*start_tasks) 122 return list(self._devices.values()) 123 124 async def get_device(self, duid: str) -> RoborockDevice | None: 125 """Get a specific device by DUID.""" 126 return self._devices.get(duid) 127 128 async def get_devices(self) -> list[RoborockDevice]: 129 """Get all discovered devices.""" 130 return list(self._devices.values()) 131 132 async def close(self) -> None: 133 """Close all MQTT connections and clean up resources.""" 134 tasks = [device.close() for device in self._devices.values()] 135 self._devices.clear() 136 tasks.append(self._mqtt_session.close()) 137 await asyncio.gather(*tasks) 138 139 def diagnostic_data(self) -> Mapping[str, Any]: 140 """Return diagnostics information about the device manager.""" 141 return { 142 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 143 "devices": [device.diagnostic_data() for device in self._devices.values()], 144 "diagnostics": self._diagnostics.as_dict(), 145 }
Central manager for Roborock device discovery and connections.
62 def __init__( 63 self, 64 web_api: UserWebApiClient, 65 device_creator: DeviceCreator, 66 mqtt_session: MqttSession, 67 cache: Cache, 68 diagnostics: Diagnostics, 69 ) -> None: 70 """Initialize the DeviceManager with user data and optional cache storage. 71 72 This takes ownership of the MQTT session and will close it when the manager is closed. 73 """ 74 self._web_api = web_api 75 self._cache = cache 76 self._device_creator = device_creator 77 self._devices: dict[str, RoborockDevice] = {} 78 self._mqtt_session = mqtt_session 79 self._diagnostics = diagnostics 80 self._home_data: HomeData | None = None
Initialize the DeviceManager with user data and optional cache storage.
This takes ownership of the MQTT session and will close it when the manager is closed.
82 async def discover_devices(self, prefer_cache: bool = True) -> list[RoborockDevice]: 83 """Discover all devices for the logged-in user.""" 84 self._diagnostics.increment("discover_devices") 85 cache_data = await self._cache.get() 86 if not cache_data.home_data or not prefer_cache: 87 _LOGGER.debug("Fetching home data (prefer_cache=%s)", prefer_cache) 88 self._diagnostics.increment("fetch_home_data") 89 try: 90 cache_data.home_data = await self._web_api.get_home_data() 91 except RoborockException as ex: 92 if not cache_data.home_data: 93 raise 94 _LOGGER.debug("Failed to fetch home data, using cached data: %s", ex) 95 await self._cache.set(cache_data) 96 self._home_data = cache_data.home_data 97 98 device_products = self._home_data.device_products 99 _LOGGER.debug("Discovered %d devices", len(device_products)) 100 101 # These are connected serially to avoid overwhelming the MQTT broker 102 new_devices = {} 103 start_tasks = [] 104 supported_devices_counter = self._diagnostics.subkey("supported_devices") 105 unsupported_devices_counter = self._diagnostics.subkey("unsupported_devices") 106 for duid, (device, product) in device_products.items(): 107 _LOGGER.debug("[%s] Discovered device %s %s", duid, product.summary_info(), device.summary_info()) 108 if duid in self._devices: 109 continue 110 try: 111 new_device = self._device_creator(self._home_data, device, product) 112 except UnsupportedDeviceError: 113 _LOGGER.info("Skipping unsupported device %s %s", product.summary_info(), device.summary_info()) 114 unsupported_devices_counter.increment(device.pv or "unknown") 115 continue 116 supported_devices_counter.increment(device.pv or "unknown") 117 start_tasks.append(new_device.start_connect()) 118 new_devices[duid] = new_device 119 120 self._devices.update(new_devices) 121 await asyncio.gather(*start_tasks) 122 return list(self._devices.values())
Discover all devices for the logged-in user.
124 async def get_device(self, duid: str) -> RoborockDevice | None: 125 """Get a specific device by DUID.""" 126 return self._devices.get(duid)
Get a specific device by DUID.
128 async def get_devices(self) -> list[RoborockDevice]: 129 """Get all discovered devices.""" 130 return list(self._devices.values())
Get all discovered devices.
132 async def close(self) -> None: 133 """Close all MQTT connections and clean up resources.""" 134 tasks = [device.close() for device in self._devices.values()] 135 self._devices.clear() 136 tasks.append(self._mqtt_session.close()) 137 await asyncio.gather(*tasks)
Close all MQTT connections and clean up resources.
139 def diagnostic_data(self) -> Mapping[str, Any]: 140 """Return diagnostics information about the device manager.""" 141 return { 142 "home_data": redact_device_data(self._home_data.as_dict()) if self._home_data else None, 143 "devices": [device.diagnostic_data() for device in self._devices.values()], 144 "diagnostics": self._diagnostics.as_dict(), 145 }
Return diagnostics information about the device manager.