roborock.web_api

  1import base64
  2import hashlib
  3import hmac
  4import json
  5import logging
  6import math
  7import secrets
  8import string
  9import time
 10from collections.abc import Callable
 11from dataclasses import dataclass
 12
 13import aiohttp
 14from aiohttp import ContentTypeError, FormData
 15from pyrate_limiter import Duration, Limiter, Rate
 16
 17from roborock import HomeDataSchedule
 18from roborock.data import HomeData, HomeDataRoom, HomeDataScene, ProductResponse, RRiot, UserData
 19from roborock.exceptions import (
 20    RoborockAccountDoesNotExist,
 21    RoborockException,
 22    RoborockInvalidCode,
 23    RoborockInvalidCredentials,
 24    RoborockInvalidEmail,
 25    RoborockInvalidUserAgreement,
 26    RoborockMissingParameters,
 27    RoborockNoResponseFromBaseURL,
 28    RoborockNoUserAgreement,
 29    RoborockRateLimit,
 30    RoborockTooFrequentCodeRequests,
 31)
 32
 33_LOGGER = logging.getLogger(__name__)
 34BASE_URLS = [
 35    "https://usiot.roborock.com",
 36    "https://euiot.roborock.com",
 37    "https://cniot.roborock.com",
 38    "https://ruiot.roborock.com",
 39]
 40
 41
 42@dataclass
 43class IotLoginInfo:
 44    """Information about the login to the iot server."""
 45
 46    base_url: str
 47    country_code: str
 48    country: str
 49
 50
 51class RoborockApiClient:
 52    _LOGIN_RATES = [
 53        Rate(1, Duration.SECOND),
 54        Rate(3, Duration.MINUTE),
 55        Rate(10, Duration.HOUR),
 56        Rate(20, Duration.DAY),
 57    ]
 58    _HOME_DATA_RATES = [
 59        Rate(1, Duration.SECOND),
 60        Rate(3, Duration.MINUTE),
 61        Rate(5, Duration.HOUR),
 62        Rate(40, Duration.DAY),
 63    ]
 64
 65    _login_limiter = Limiter(_LOGIN_RATES)
 66    _home_data_limiter = Limiter(_HOME_DATA_RATES)
 67
 68    def __init__(
 69        self, username: str, base_url: str | None = None, session: aiohttp.ClientSession | None = None
 70    ) -> None:
 71        """Sample API Client."""
 72        self._username = username
 73        self._base_url = base_url
 74        self._device_identifier = secrets.token_urlsafe(16)
 75        self.session = session
 76        self._iot_login_info: IotLoginInfo | None = None
 77        self._base_urls = BASE_URLS if base_url is None else [base_url]
 78
 79    async def _get_iot_login_info(self) -> IotLoginInfo:
 80        if self._iot_login_info is None:
 81            for iot_url in self._base_urls:
 82                url_request = PreparedRequest(iot_url, self.session)
 83                response = await url_request.request(
 84                    "post",
 85                    "/api/v1/getUrlByEmail",
 86                    params={"email": self._username, "needtwostepauth": "false"},
 87                )
 88                if response is None:
 89                    continue
 90                response_code = response.get("code")
 91                if response_code != 200:
 92                    if response_code == 2003:
 93                        raise RoborockInvalidEmail("Your email was incorrectly formatted.")
 94                    elif response_code == 1001:
 95                        raise RoborockMissingParameters(
 96                            "You are missing parameters for this request, are you sure you entered your username?"
 97                        )
 98                    else:
 99                        raise RoborockException(f"{response.get('msg')} - response code: {response_code}")
100                country_code = response["data"]["countrycode"]
101                country = response["data"]["country"]
102                if country_code is not None or country is not None:
103                    self._iot_login_info = IotLoginInfo(
104                        base_url=response["data"]["url"],
105                        country=country,
106                        country_code=country_code,
107                    )
108                    _LOGGER.debug("Country determined to be %s and code is %s", country, country_code)
109                    return self._iot_login_info
110            raise RoborockNoResponseFromBaseURL(
111                "No account was found for any base url we tried. Either your email is incorrect or we do not have a"
112                " record of the roborock server your device is on."
113            )
114        return self._iot_login_info
115
116    @property
117    async def base_url(self):
118        if self._base_url is not None:
119            return self._base_url
120        return (await self._get_iot_login_info()).base_url
121
122    @property
123    async def country(self):
124        return (await self._get_iot_login_info()).country
125
126    @property
127    async def country_code(self):
128        return (await self._get_iot_login_info()).country_code
129
130    def _get_header_client_id(self):
131        md5 = hashlib.md5()
132        md5.update(self._username.encode())
133        md5.update(self._device_identifier.encode())
134        return base64.b64encode(md5.digest()).decode()
135
136    async def nc_prepare(self, user_data: UserData, timezone: str) -> dict:
137        """This gets a few critical parameters for adding a device to your account."""
138        if (
139            user_data.rriot is None
140            or user_data.rriot.r is None
141            or user_data.rriot.u is None
142            or user_data.rriot.r.a is None
143        ):
144            raise RoborockException("Your userdata is missing critical attributes.")
145        base_url = user_data.rriot.r.a
146        prepare_request = PreparedRequest(base_url, self.session)
147        hid = await self._get_home_id(user_data)
148
149        data = FormData()
150        data.add_field("hid", hid)
151        data.add_field("tzid", timezone)
152
153        prepare_response = await prepare_request.request(
154            "post",
155            "/nc/prepare",
156            headers={
157                "Authorization": _get_hawk_authentication(
158                    user_data.rriot, "/nc/prepare", {"hid": hid, "tzid": timezone}
159                ),
160            },
161            data=data,
162        )
163
164        if prepare_response is None:
165            raise RoborockException("prepare_response is None")
166        if not prepare_response.get("success"):
167            raise RoborockException(f"{prepare_response.get('msg')} - response code: {prepare_response.get('code')}")
168
169        return prepare_response["result"]
170
171    async def add_device(self, user_data: UserData, s: str, t: str) -> dict:
172        """This will add a new device to your account
173        it is recommended to only use this during a pairing cycle with a device.
174        Please see here: https://github.com/Python-roborock/Roborockmitmproxy/blob/main/handshake_protocol.md
175        """
176        if (
177            user_data.rriot is None
178            or user_data.rriot.r is None
179            or user_data.rriot.u is None
180            or user_data.rriot.r.a is None
181        ):
182            raise RoborockException("Your userdata is missing critical attributes.")
183        base_url = user_data.rriot.r.a
184        add_device_request = PreparedRequest(base_url, self.session)
185
186        add_device_response = await add_device_request.request(
187            "GET",
188            "/user/devices/newadd",
189            headers={
190                "Authorization": _get_hawk_authentication(
191                    user_data.rriot, "/user/devices/newadd", params={"s": s, "t": t}
192                ),
193            },
194            params={"s": s, "t": t},
195        )
196
197        if add_device_response is None:
198            raise RoborockException("add_device is None")
199        if not add_device_response.get("success"):
200            raise RoborockException(
201                f"{add_device_response.get('msg')} - response code: {add_device_response.get('code')}"
202            )
203
204        return add_device_response["result"]
205
206    async def request_code(self) -> None:
207        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
208            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
209        base_url = await self.base_url
210        header_clientid = self._get_header_client_id()
211        code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
212
213        code_response = await code_request.request(
214            "post",
215            "/api/v1/sendEmailCode",
216            params={
217                "username": self._username,
218                "type": "auth",
219            },
220        )
221        if code_response is None:
222            raise RoborockException("Failed to get a response from send email code")
223        response_code = code_response.get("code")
224        if response_code != 200:
225            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
226            if response_code == 2008:
227                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
228            elif response_code == 9002:
229                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
230            else:
231                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
232
233    async def request_code_v4(self) -> None:
234        """Request a code using the v4 endpoint."""
235        if await self.country_code is None or await self.country is None:
236            _LOGGER.info("No country code or country found, trying old version of request code.")
237            return await self.request_code()
238        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
239            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
240        base_url = await self.base_url
241        header_clientid = self._get_header_client_id()
242        code_request = PreparedRequest(
243            base_url,
244            self.session,
245            {
246                "header_clientid": header_clientid,
247                "Content-Type": "application/x-www-form-urlencoded",
248                "header_clientlang": "en",
249            },
250        )
251
252        code_response = await code_request.request(
253            "post",
254            "/api/v4/email/code/send",
255            data={"email": self._username, "type": "login", "platform": ""},
256        )
257        if code_response is None:
258            raise RoborockException("Failed to get a response from send email code")
259        response_code = code_response.get("code")
260        if response_code != 200:
261            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
262            if response_code == 2008:
263                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
264            elif response_code == 9002:
265                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
266            elif response_code == 3030 and len(self._base_urls) > 1:
267                self._base_urls = self._base_urls[1:]
268                self._iot_login_info = None
269                return await self.request_code_v4()
270            else:
271                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
272
273    async def _sign_key_v3(self, s: str) -> str:
274        """Sign a randomly generated string."""
275        base_url = await self.base_url
276        header_clientid = self._get_header_client_id()
277        code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
278
279        code_response = await code_request.request(
280            "post",
281            "/api/v3/key/sign",
282            params={"s": s},
283        )
284
285        if not code_response or "data" not in code_response or "k" not in code_response["data"]:
286            raise RoborockException("Failed to get a response from sign key")
287        response_code = code_response.get("code")
288
289        if response_code != 200:
290            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
291            raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
292
293        return code_response["data"]["k"]
294
295    async def code_login_v4(
296        self, code: int | str, country: str | None = None, country_code: int | None = None
297    ) -> UserData:
298        """
299        Login via code authentication.
300        :param code: The code from the email.
301        :param country: The two-character representation of the country, i.e. "US"
302        :param country_code: the country phone number code i.e. 1 for US.
303        """
304        base_url = await self.base_url
305        if country is None:
306            country = await self.country
307        if country_code is None:
308            country_code = await self.country_code
309        if country_code is None or country is None:
310            _LOGGER.info("No country code or country found, trying old version of code login.")
311            return await self.code_login(code)
312        header_clientid = self._get_header_client_id()
313        x_mercy_ks = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16))
314        x_mercy_k = await self._sign_key_v3(x_mercy_ks)
315        login_request = PreparedRequest(
316            base_url,
317            self.session,
318            {
319                "header_clientid": header_clientid,
320                "x-mercy-ks": x_mercy_ks,
321                "x-mercy-k": x_mercy_k,
322                "Content-Type": "application/x-www-form-urlencoded",
323                "header_clientlang": "en",
324                "header_appversion": "4.54.02",
325                "header_phonesystem": "iOS",
326                "header_phonemodel": "iPhone16,1",
327            },
328        )
329        login_response = await login_request.request(
330            "post",
331            "/api/v4/auth/email/login/code",
332            data={
333                "country": country,
334                "countryCode": country_code,
335                "email": self._username,
336                "code": code,
337                # Major and minor version are the user agreement version, we will need to see if this needs to be
338                # dynamic https://usiot.roborock.com/api/v3/app/agreement/latest?country=US
339                "majorVersion": 14,
340                "minorVersion": 0,
341            },
342        )
343        if login_response is None:
344            raise RoborockException("Login request response is None")
345        response_code = login_response.get("code")
346        if response_code != 200:
347            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
348            if response_code == 2018:
349                raise RoborockInvalidCode("Invalid code - check your code and try again.")
350            if response_code == 3009:
351                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
352            if response_code == 3006:
353                raise RoborockInvalidUserAgreement(
354                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
355                )
356            if response_code == 3039:
357                raise RoborockAccountDoesNotExist(
358                    "This account does not exist - please ensure that you selected the right region and email."
359                )
360            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
361        user_data = login_response.get("data")
362        if not isinstance(user_data, dict):
363            raise RoborockException("Got unexpected data type for user_data")
364        return UserData.from_dict(user_data)
365
366    async def pass_login(self, password: str) -> UserData:
367        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
368            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
369        base_url = await self.base_url
370        header_clientid = self._get_header_client_id()
371
372        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
373        login_response = await login_request.request(
374            "post",
375            "/api/v1/login",
376            params={
377                "username": self._username,
378                "password": password,
379                "needtwostepauth": "false",
380            },
381        )
382        if login_response is None:
383            raise RoborockException("Login response is none")
384        if login_response.get("code") != 200:
385            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
386            raise RoborockException(f"{login_response.get('msg')} - response code: {login_response.get('code')}")
387        user_data = login_response.get("data")
388        if not isinstance(user_data, dict):
389            raise RoborockException("Got unexpected data type for user_data")
390        return UserData.from_dict(user_data)
391
392    async def pass_login_v3(self, password: str) -> UserData:
393        """Seemingly it follows the format below, but password is encrypted in some manner.
394        # login_response = await login_request.request(
395        #     "post",
396        #     "/api/v3/auth/email/login",
397        #     params={
398        #         "email": self._username,
399        #         "password": password,
400        #         "twoStep": 1,
401        #         "version": 0
402        #     },
403        # )
404        """
405        raise NotImplementedError("Pass_login_v3 has not yet been implemented")
406
407    async def code_login(self, code: int | str) -> UserData:
408        base_url = await self.base_url
409        header_clientid = self._get_header_client_id()
410
411        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
412        login_response = await login_request.request(
413            "post",
414            "/api/v1/loginWithCode",
415            params={
416                "username": self._username,
417                "verifycode": code,
418                "verifycodetype": "AUTH_EMAIL_CODE",
419            },
420        )
421        if login_response is None:
422            raise RoborockException("Login request response is None")
423        response_code = login_response.get("code")
424        if response_code != 200:
425            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
426            if response_code == 2018:
427                raise RoborockInvalidCode("Invalid code - check your code and try again.")
428            if response_code == 3009:
429                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
430            if response_code == 3006:
431                raise RoborockInvalidUserAgreement(
432                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
433                )
434            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
435        user_data = login_response.get("data")
436        if not isinstance(user_data, dict):
437            raise RoborockException("Got unexpected data type for user_data")
438        return UserData.from_dict(user_data)
439
440    async def _get_home_id(self, user_data: UserData):
441        base_url = await self.base_url
442        header_clientid = self._get_header_client_id()
443        home_id_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
444        home_id_response = await home_id_request.request(
445            "get",
446            "/api/v1/getHomeDetail",
447            headers={"Authorization": user_data.token},
448        )
449        if home_id_response is None:
450            raise RoborockException("home_id_response is None")
451        if home_id_response.get("code") != 200:
452            _LOGGER.info("Get Home Id failed with the following context: %s", home_id_response)
453            if home_id_response.get("code") == 2010:
454                raise RoborockInvalidCredentials(
455                    f"Invalid credentials ({home_id_response.get('msg')}) - check your login and try again."
456                )
457            raise RoborockException(f"{home_id_response.get('msg')} - response code: {home_id_response.get('code')}")
458
459        return home_id_response["data"]["rrHomeId"]
460
461    async def get_home_data(self, user_data: UserData) -> HomeData:
462        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
463            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
464        rriot = user_data.rriot
465        if rriot is None:
466            raise RoborockException("rriot is none")
467        home_id = await self._get_home_id(user_data)
468        if rriot.r.a is None:
469            raise RoborockException("Missing field 'a' in rriot reference")
470        home_request = PreparedRequest(
471            rriot.r.a,
472            self.session,
473            {
474                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{str(home_id)}"),
475            },
476        )
477        home_response = await home_request.request("get", "/user/homes/" + str(home_id))
478        if not home_response.get("success"):
479            raise RoborockException(home_response)
480        home_data = home_response.get("result")
481        if isinstance(home_data, dict):
482            return HomeData.from_dict(home_data)
483        else:
484            raise RoborockException("home_response result was an unexpected type")
485
486    async def get_home_data_v2(self, user_data: UserData) -> HomeData:
487        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
488        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
489            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
490        rriot = user_data.rriot
491        if rriot is None:
492            raise RoborockException("rriot is none")
493        home_id = await self._get_home_id(user_data)
494        if rriot.r.a is None:
495            raise RoborockException("Missing field 'a' in rriot reference")
496        home_request = PreparedRequest(
497            rriot.r.a,
498            self.session,
499            {
500                "Authorization": _get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)),
501            },
502        )
503        home_response = await home_request.request("get", "/v2/user/homes/" + str(home_id))
504        if not home_response.get("success"):
505            raise RoborockException(home_response)
506        home_data = home_response.get("result")
507        if isinstance(home_data, dict):
508            return HomeData.from_dict(home_data)
509        else:
510            raise RoborockException("home_response result was an unexpected type")
511
512    async def get_home_data_v3(self, user_data: UserData) -> HomeData:
513        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
514        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
515            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
516        rriot = user_data.rriot
517        home_id = await self._get_home_id(user_data)
518        if rriot.r.a is None:
519            raise RoborockException("Missing field 'a' in rriot reference")
520        home_request = PreparedRequest(
521            rriot.r.a,
522            self.session,
523            {
524                "Authorization": _get_hawk_authentication(rriot, "/v3/user/homes/" + str(home_id)),
525            },
526        )
527        home_response = await home_request.request("get", "/v3/user/homes/" + str(home_id))
528        if not home_response.get("success"):
529            raise RoborockException(home_response)
530        home_data = home_response.get("result")
531        if isinstance(home_data, dict):
532            return HomeData.from_dict(home_data)
533        raise RoborockException(f"home_response result was an unexpected type: {home_data}")
534
535    async def get_rooms(self, user_data: UserData, home_id: int | None = None) -> list[HomeDataRoom]:
536        rriot = user_data.rriot
537        if rriot is None:
538            raise RoborockException("rriot is none")
539        if home_id is None:
540            home_id = await self._get_home_id(user_data)
541        if rriot.r.a is None:
542            raise RoborockException("Missing field 'a' in rriot reference")
543        room_request = PreparedRequest(
544            rriot.r.a,
545            self.session,
546            {
547                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{home_id}/rooms"),
548            },
549        )
550        room_response = await room_request.request("get", f"/user/homes/{home_id}/rooms")
551        if not room_response.get("success"):
552            raise RoborockException(room_response)
553        rooms = room_response.get("result")
554        if isinstance(rooms, list):
555            output_list = []
556            for room in rooms:
557                output_list.append(HomeDataRoom.from_dict(room))
558            return output_list
559        else:
560            raise RoborockException("home_response result was an unexpected type")
561
562    async def get_shared_device_rooms(self, user_data: UserData, device_id: str) -> list[HomeDataRoom]:
563        """Fetch room names for a shared (received) device."""
564        rriot = user_data.rriot
565        if rriot is None:
566            raise RoborockException("rriot is none")
567        if rriot.r.a is None:
568            raise RoborockException("Missing field 'a' in rriot reference")
569        path = f"/user/deviceshare/query/{device_id}/rooms"
570        room_request = PreparedRequest(
571            rriot.r.a,
572            self.session,
573            {"Authorization": _get_hawk_authentication(rriot, path)},
574        )
575        room_response = await room_request.request("get", path)
576        if not room_response.get("success"):
577            raise RoborockException(room_response)
578        rooms = room_response.get("result")
579        if isinstance(rooms, list):
580            output_list = []
581            for room in rooms:
582                normalized_room = room
583                if isinstance(room, dict) and "id" not in room and "roomId" in room:
584                    normalized_room = {**room, "id": room["roomId"]}
585                output_list.append(HomeDataRoom.from_dict(normalized_room))
586            return output_list
587        raise RoborockException("get_shared_device_rooms result was an unexpected type")
588
589    async def get_scenes(self, user_data: UserData, device_id: str) -> list[HomeDataScene]:
590        rriot = user_data.rriot
591        if rriot is None:
592            raise RoborockException("rriot is none")
593        if rriot.r.a is None:
594            raise RoborockException("Missing field 'a' in rriot reference")
595        scenes_request = PreparedRequest(
596            rriot.r.a,
597            self.session,
598            {
599                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/device/{str(device_id)}"),
600            },
601        )
602        scenes_response = await scenes_request.request("get", f"/user/scene/device/{str(device_id)}")
603        if not scenes_response.get("success"):
604            raise RoborockException(scenes_response)
605        scenes = scenes_response.get("result")
606        if isinstance(scenes, list):
607            return [HomeDataScene.from_dict(scene) for scene in scenes]
608        else:
609            raise RoborockException("scene_response result was an unexpected type")
610
611    async def execute_scene(self, user_data: UserData, scene_id: int) -> None:
612        rriot = user_data.rriot
613        if rriot is None:
614            raise RoborockException("rriot is none")
615        if rriot.r.a is None:
616            raise RoborockException("Missing field 'a' in rriot reference")
617        execute_scene_request = PreparedRequest(
618            rriot.r.a,
619            self.session,
620            {
621                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/{str(scene_id)}/execute"),
622            },
623        )
624        execute_scene_response = await execute_scene_request.request("POST", f"/user/scene/{str(scene_id)}/execute")
625        if not execute_scene_response.get("success"):
626            raise RoborockException(execute_scene_response)
627
628    async def get_schedules(self, user_data: UserData, device_id: str) -> list[HomeDataSchedule]:
629        rriot = user_data.rriot
630        if rriot is None:
631            raise RoborockException("rriot is none")
632        if rriot.r.a is None:
633            raise RoborockException("Missing field 'a' in rriot reference")
634        schedules_request = PreparedRequest(
635            rriot.r.a,
636            self.session,
637            {
638                "Authorization": _get_hawk_authentication(rriot, f"/user/devices/{device_id}/jobs"),
639            },
640        )
641        schedules_response = await schedules_request.request("get", f"/user/devices/{str(device_id)}/jobs")
642        if not schedules_response.get("success"):
643            raise RoborockException(schedules_response)
644        schedules = schedules_response.get("result")
645        if isinstance(schedules, list):
646            return [HomeDataSchedule.from_dict(schedule) for schedule in schedules]
647        else:
648            raise RoborockException(f"schedule_response result was an unexpected type: {schedules}")
649
650    async def create_job(self, user_data: UserData, device_id: str, job: dict) -> dict:
651        """Create a /jobs entry (schedule or one-time room clean) on a B01 device.
652
653        Body-bearing writes must sign the request body in the Hawk payload slot and send those same
654        compact bytes via ``data=``; ``json=`` would re-serialize with spaces and break the MAC.
655        """
656        rriot = user_data.rriot
657        if rriot is None:
658            raise RoborockException("rriot is none")
659        if rriot.r.a is None:
660            raise RoborockException("Missing field 'a' in rriot reference")
661        path = f"/user/devices/{device_id}/jobs"
662        job_request = PreparedRequest(
663            rriot.r.a,
664            self.session,
665            {
666                "Authorization": _get_hawk_authentication(rriot, path, body=job),
667                "Content-Type": "application/json",
668            },
669        )
670        response = await job_request.request("post", path, data=_compact_json(job).encode())
671        if not response.get("success"):
672            raise RoborockException(response)
673        return response
674
675    async def get_products(self, user_data: UserData) -> ProductResponse:
676        """Gets all products and their schemas, good for determining status codes and model numbers."""
677        base_url = await self.base_url
678        header_clientid = self._get_header_client_id()
679        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
680        product_response = await product_request.request(
681            "get",
682            "/api/v4/product",
683            headers={"Authorization": user_data.token},
684        )
685        if product_response is None:
686            raise RoborockException("home_id_response is None")
687        if product_response.get("code") != 200:
688            raise RoborockException(f"{product_response.get('msg')} - response code: {product_response.get('code')}")
689        result = product_response.get("data")
690        if isinstance(result, dict):
691            return ProductResponse.from_dict(result)
692        raise RoborockException("product result was an unexpected type")
693
694    async def download_code(self, user_data: UserData, product_id: int):
695        base_url = await self.base_url
696        header_clientid = self._get_header_client_id()
697        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
698        request = {"apilevel": 99999, "productids": [product_id], "type": 2}
699        response = await product_request.request(
700            "post",
701            "/api/v1/appplugin",
702            json=request,
703            headers={"Authorization": user_data.token, "Content-Type": "application/json"},
704        )
705        return response["data"][0]["url"]
706
707    async def download_category_code(self, user_data: UserData):
708        base_url = await self.base_url
709        header_clientid = self._get_header_client_id()
710        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
711        response = await product_request.request(
712            "get",
713            "api/v1/plugins?apiLevel=99999&type=2",
714            headers={
715                "Authorization": user_data.token,
716            },
717        )
718        return {r["category"]: r["url"] for r in response["data"]["categoryPluginList"]}
719
720
721class PreparedRequest:
722    def __init__(
723        self, base_url: str, session: aiohttp.ClientSession | None = None, base_headers: dict | None = None
724    ) -> None:
725        self.base_url = base_url
726        self.base_headers = base_headers or {}
727        self.session = session
728
729    async def request(self, method: str, url: str, params=None, data=None, headers=None, json=None) -> dict:
730        _url = "/".join(s.strip("/") for s in [self.base_url, url])
731        _headers = {**self.base_headers, **(headers or {})}
732        close_session = self.session is None
733        session = self.session if self.session is not None else aiohttp.ClientSession()
734        try:
735            async with session.request(method, _url, params=params, data=data, headers=_headers, json=json) as resp:
736                return await resp.json()
737        except ContentTypeError as err:
738            """If we get an error, lets log everything for debugging."""
739            try:
740                resp_json = await resp.json(content_type=None)
741                _LOGGER.info("Resp: %s", resp_json)
742            except ContentTypeError as err_2:
743                _LOGGER.info(err_2)
744            resp_raw = await resp.read()
745            _LOGGER.info("Resp raw: %s", resp_raw)
746            # Still raise the err so that it's clear it failed.
747            raise err
748        except (aiohttp.ClientError, TimeoutError, OSError) as err:
749            raise RoborockException(f"Network error contacting {_url}: {err}") from err
750        finally:
751            if close_session:
752                await session.close()
753
754
755def _process_extra_hawk_values(values: dict | None) -> str:
756    if values is None:
757        return ""
758    else:
759        sorted_keys = sorted(values.keys())
760        result = []
761        for key in sorted_keys:
762            value = values.get(key)
763            result.append(f"{key}={value}")
764        return hashlib.md5("&".join(result).encode()).hexdigest()
765
766
767def _compact_json(body: dict) -> str:
768    """Serialize a JSON body to the exact compact bytes that are both signed and sent."""
769    return json.dumps(body, separators=(",", ":"))
770
771
772def _get_hawk_authentication(
773    rriot: RRiot,
774    url: str,
775    formdata: dict | None = None,
776    params: dict | None = None,
777    body: dict | None = None,
778) -> str:
779    timestamp = math.floor(time.time())
780    nonce = secrets.token_urlsafe(6)
781    params_str = _process_extra_hawk_values(params)
782    if body is not None:
783        payload_str = hashlib.md5(_compact_json(body).encode()).hexdigest()
784    else:
785        payload_str = _process_extra_hawk_values(formdata)
786
787    prestr = ":".join(
788        [
789            rriot.u,
790            rriot.s,
791            nonce,
792            str(timestamp),
793            hashlib.md5(url.encode()).hexdigest(),
794            params_str,
795            payload_str,
796        ]
797    )
798    mac = base64.b64encode(hmac.new(rriot.h.encode(), prestr.encode(), hashlib.sha256).digest()).decode()
799    return f'Hawk id="{rriot.u}",s="{rriot.s}",ts="{timestamp}",nonce="{nonce}",mac="{mac}"'
800
801
802class UserWebApiClient:
803    """Wrapper around RoborockApiClient to provide information for a specific user.
804
805    This binds a RoborockApiClient to a specific user context with the
806    provided UserData. This allows for easier access to user-specific data,
807    to avoid needing to pass UserData around and mock out the web API.
808    """
809
810    def __init__(
811        self, web_api: RoborockApiClient, user_data: UserData, unauthorized_hook: Callable[[], None] | None = None
812    ) -> None:
813        """Initialize the wrapper with the API client and user data."""
814        self._web_api = web_api
815        self._user_data = user_data
816        self._unauthorized_hook = unauthorized_hook
817
818    async def get_home_data(self) -> HomeData:
819        """Fetch home data using the API client."""
820        try:
821            return await self._web_api.get_home_data_v3(self._user_data)
822        except RoborockInvalidCredentials:
823            if self._unauthorized_hook:
824                self._unauthorized_hook()
825            raise
826
827    async def get_routines(self, device_id: str) -> list[HomeDataScene]:
828        """Fetch routines (scenes) for a specific device."""
829        try:
830            return await self._web_api.get_scenes(self._user_data, device_id)
831        except RoborockInvalidCredentials:
832            if self._unauthorized_hook:
833                self._unauthorized_hook()
834            raise
835
836    async def get_rooms(self) -> list[HomeDataRoom]:
837        """Fetch rooms using the API client."""
838        try:
839            return await self._web_api.get_rooms(self._user_data)
840        except RoborockInvalidCredentials:
841            if self._unauthorized_hook:
842                self._unauthorized_hook()
843            raise
844
845    async def get_shared_device_rooms(self, device_id: str) -> list[HomeDataRoom]:
846        """Fetch shared-device rooms using the API client."""
847        return await self._web_api.get_shared_device_rooms(self._user_data, device_id)
848
849    async def execute_routine(self, scene_id: int) -> None:
850        """Execute a specific routine (scene) by its ID."""
851        try:
852            await self._web_api.execute_scene(self._user_data, scene_id)
853        except RoborockInvalidCredentials:
854            if self._unauthorized_hook:
855                self._unauthorized_hook()
856            raise
BASE_URLS = ['https://usiot.roborock.com', 'https://euiot.roborock.com', 'https://cniot.roborock.com', 'https://ruiot.roborock.com']
@dataclass
class IotLoginInfo:
43@dataclass
44class IotLoginInfo:
45    """Information about the login to the iot server."""
46
47    base_url: str
48    country_code: str
49    country: str

Information about the login to the iot server.

IotLoginInfo(base_url: str, country_code: str, country: str)
base_url: str
country_code: str
country: str
class RoborockApiClient:
 52class RoborockApiClient:
 53    _LOGIN_RATES = [
 54        Rate(1, Duration.SECOND),
 55        Rate(3, Duration.MINUTE),
 56        Rate(10, Duration.HOUR),
 57        Rate(20, Duration.DAY),
 58    ]
 59    _HOME_DATA_RATES = [
 60        Rate(1, Duration.SECOND),
 61        Rate(3, Duration.MINUTE),
 62        Rate(5, Duration.HOUR),
 63        Rate(40, Duration.DAY),
 64    ]
 65
 66    _login_limiter = Limiter(_LOGIN_RATES)
 67    _home_data_limiter = Limiter(_HOME_DATA_RATES)
 68
 69    def __init__(
 70        self, username: str, base_url: str | None = None, session: aiohttp.ClientSession | None = None
 71    ) -> None:
 72        """Sample API Client."""
 73        self._username = username
 74        self._base_url = base_url
 75        self._device_identifier = secrets.token_urlsafe(16)
 76        self.session = session
 77        self._iot_login_info: IotLoginInfo | None = None
 78        self._base_urls = BASE_URLS if base_url is None else [base_url]
 79
 80    async def _get_iot_login_info(self) -> IotLoginInfo:
 81        if self._iot_login_info is None:
 82            for iot_url in self._base_urls:
 83                url_request = PreparedRequest(iot_url, self.session)
 84                response = await url_request.request(
 85                    "post",
 86                    "/api/v1/getUrlByEmail",
 87                    params={"email": self._username, "needtwostepauth": "false"},
 88                )
 89                if response is None:
 90                    continue
 91                response_code = response.get("code")
 92                if response_code != 200:
 93                    if response_code == 2003:
 94                        raise RoborockInvalidEmail("Your email was incorrectly formatted.")
 95                    elif response_code == 1001:
 96                        raise RoborockMissingParameters(
 97                            "You are missing parameters for this request, are you sure you entered your username?"
 98                        )
 99                    else:
100                        raise RoborockException(f"{response.get('msg')} - response code: {response_code}")
101                country_code = response["data"]["countrycode"]
102                country = response["data"]["country"]
103                if country_code is not None or country is not None:
104                    self._iot_login_info = IotLoginInfo(
105                        base_url=response["data"]["url"],
106                        country=country,
107                        country_code=country_code,
108                    )
109                    _LOGGER.debug("Country determined to be %s and code is %s", country, country_code)
110                    return self._iot_login_info
111            raise RoborockNoResponseFromBaseURL(
112                "No account was found for any base url we tried. Either your email is incorrect or we do not have a"
113                " record of the roborock server your device is on."
114            )
115        return self._iot_login_info
116
117    @property
118    async def base_url(self):
119        if self._base_url is not None:
120            return self._base_url
121        return (await self._get_iot_login_info()).base_url
122
123    @property
124    async def country(self):
125        return (await self._get_iot_login_info()).country
126
127    @property
128    async def country_code(self):
129        return (await self._get_iot_login_info()).country_code
130
131    def _get_header_client_id(self):
132        md5 = hashlib.md5()
133        md5.update(self._username.encode())
134        md5.update(self._device_identifier.encode())
135        return base64.b64encode(md5.digest()).decode()
136
137    async def nc_prepare(self, user_data: UserData, timezone: str) -> dict:
138        """This gets a few critical parameters for adding a device to your account."""
139        if (
140            user_data.rriot is None
141            or user_data.rriot.r is None
142            or user_data.rriot.u is None
143            or user_data.rriot.r.a is None
144        ):
145            raise RoborockException("Your userdata is missing critical attributes.")
146        base_url = user_data.rriot.r.a
147        prepare_request = PreparedRequest(base_url, self.session)
148        hid = await self._get_home_id(user_data)
149
150        data = FormData()
151        data.add_field("hid", hid)
152        data.add_field("tzid", timezone)
153
154        prepare_response = await prepare_request.request(
155            "post",
156            "/nc/prepare",
157            headers={
158                "Authorization": _get_hawk_authentication(
159                    user_data.rriot, "/nc/prepare", {"hid": hid, "tzid": timezone}
160                ),
161            },
162            data=data,
163        )
164
165        if prepare_response is None:
166            raise RoborockException("prepare_response is None")
167        if not prepare_response.get("success"):
168            raise RoborockException(f"{prepare_response.get('msg')} - response code: {prepare_response.get('code')}")
169
170        return prepare_response["result"]
171
172    async def add_device(self, user_data: UserData, s: str, t: str) -> dict:
173        """This will add a new device to your account
174        it is recommended to only use this during a pairing cycle with a device.
175        Please see here: https://github.com/Python-roborock/Roborockmitmproxy/blob/main/handshake_protocol.md
176        """
177        if (
178            user_data.rriot is None
179            or user_data.rriot.r is None
180            or user_data.rriot.u is None
181            or user_data.rriot.r.a is None
182        ):
183            raise RoborockException("Your userdata is missing critical attributes.")
184        base_url = user_data.rriot.r.a
185        add_device_request = PreparedRequest(base_url, self.session)
186
187        add_device_response = await add_device_request.request(
188            "GET",
189            "/user/devices/newadd",
190            headers={
191                "Authorization": _get_hawk_authentication(
192                    user_data.rriot, "/user/devices/newadd", params={"s": s, "t": t}
193                ),
194            },
195            params={"s": s, "t": t},
196        )
197
198        if add_device_response is None:
199            raise RoborockException("add_device is None")
200        if not add_device_response.get("success"):
201            raise RoborockException(
202                f"{add_device_response.get('msg')} - response code: {add_device_response.get('code')}"
203            )
204
205        return add_device_response["result"]
206
207    async def request_code(self) -> None:
208        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
209            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
210        base_url = await self.base_url
211        header_clientid = self._get_header_client_id()
212        code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
213
214        code_response = await code_request.request(
215            "post",
216            "/api/v1/sendEmailCode",
217            params={
218                "username": self._username,
219                "type": "auth",
220            },
221        )
222        if code_response is None:
223            raise RoborockException("Failed to get a response from send email code")
224        response_code = code_response.get("code")
225        if response_code != 200:
226            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
227            if response_code == 2008:
228                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
229            elif response_code == 9002:
230                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
231            else:
232                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
233
234    async def request_code_v4(self) -> None:
235        """Request a code using the v4 endpoint."""
236        if await self.country_code is None or await self.country is None:
237            _LOGGER.info("No country code or country found, trying old version of request code.")
238            return await self.request_code()
239        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
240            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
241        base_url = await self.base_url
242        header_clientid = self._get_header_client_id()
243        code_request = PreparedRequest(
244            base_url,
245            self.session,
246            {
247                "header_clientid": header_clientid,
248                "Content-Type": "application/x-www-form-urlencoded",
249                "header_clientlang": "en",
250            },
251        )
252
253        code_response = await code_request.request(
254            "post",
255            "/api/v4/email/code/send",
256            data={"email": self._username, "type": "login", "platform": ""},
257        )
258        if code_response is None:
259            raise RoborockException("Failed to get a response from send email code")
260        response_code = code_response.get("code")
261        if response_code != 200:
262            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
263            if response_code == 2008:
264                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
265            elif response_code == 9002:
266                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
267            elif response_code == 3030 and len(self._base_urls) > 1:
268                self._base_urls = self._base_urls[1:]
269                self._iot_login_info = None
270                return await self.request_code_v4()
271            else:
272                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
273
274    async def _sign_key_v3(self, s: str) -> str:
275        """Sign a randomly generated string."""
276        base_url = await self.base_url
277        header_clientid = self._get_header_client_id()
278        code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
279
280        code_response = await code_request.request(
281            "post",
282            "/api/v3/key/sign",
283            params={"s": s},
284        )
285
286        if not code_response or "data" not in code_response or "k" not in code_response["data"]:
287            raise RoborockException("Failed to get a response from sign key")
288        response_code = code_response.get("code")
289
290        if response_code != 200:
291            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
292            raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
293
294        return code_response["data"]["k"]
295
296    async def code_login_v4(
297        self, code: int | str, country: str | None = None, country_code: int | None = None
298    ) -> UserData:
299        """
300        Login via code authentication.
301        :param code: The code from the email.
302        :param country: The two-character representation of the country, i.e. "US"
303        :param country_code: the country phone number code i.e. 1 for US.
304        """
305        base_url = await self.base_url
306        if country is None:
307            country = await self.country
308        if country_code is None:
309            country_code = await self.country_code
310        if country_code is None or country is None:
311            _LOGGER.info("No country code or country found, trying old version of code login.")
312            return await self.code_login(code)
313        header_clientid = self._get_header_client_id()
314        x_mercy_ks = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16))
315        x_mercy_k = await self._sign_key_v3(x_mercy_ks)
316        login_request = PreparedRequest(
317            base_url,
318            self.session,
319            {
320                "header_clientid": header_clientid,
321                "x-mercy-ks": x_mercy_ks,
322                "x-mercy-k": x_mercy_k,
323                "Content-Type": "application/x-www-form-urlencoded",
324                "header_clientlang": "en",
325                "header_appversion": "4.54.02",
326                "header_phonesystem": "iOS",
327                "header_phonemodel": "iPhone16,1",
328            },
329        )
330        login_response = await login_request.request(
331            "post",
332            "/api/v4/auth/email/login/code",
333            data={
334                "country": country,
335                "countryCode": country_code,
336                "email": self._username,
337                "code": code,
338                # Major and minor version are the user agreement version, we will need to see if this needs to be
339                # dynamic https://usiot.roborock.com/api/v3/app/agreement/latest?country=US
340                "majorVersion": 14,
341                "minorVersion": 0,
342            },
343        )
344        if login_response is None:
345            raise RoborockException("Login request response is None")
346        response_code = login_response.get("code")
347        if response_code != 200:
348            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
349            if response_code == 2018:
350                raise RoborockInvalidCode("Invalid code - check your code and try again.")
351            if response_code == 3009:
352                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
353            if response_code == 3006:
354                raise RoborockInvalidUserAgreement(
355                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
356                )
357            if response_code == 3039:
358                raise RoborockAccountDoesNotExist(
359                    "This account does not exist - please ensure that you selected the right region and email."
360                )
361            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
362        user_data = login_response.get("data")
363        if not isinstance(user_data, dict):
364            raise RoborockException("Got unexpected data type for user_data")
365        return UserData.from_dict(user_data)
366
367    async def pass_login(self, password: str) -> UserData:
368        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
369            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
370        base_url = await self.base_url
371        header_clientid = self._get_header_client_id()
372
373        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
374        login_response = await login_request.request(
375            "post",
376            "/api/v1/login",
377            params={
378                "username": self._username,
379                "password": password,
380                "needtwostepauth": "false",
381            },
382        )
383        if login_response is None:
384            raise RoborockException("Login response is none")
385        if login_response.get("code") != 200:
386            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
387            raise RoborockException(f"{login_response.get('msg')} - response code: {login_response.get('code')}")
388        user_data = login_response.get("data")
389        if not isinstance(user_data, dict):
390            raise RoborockException("Got unexpected data type for user_data")
391        return UserData.from_dict(user_data)
392
393    async def pass_login_v3(self, password: str) -> UserData:
394        """Seemingly it follows the format below, but password is encrypted in some manner.
395        # login_response = await login_request.request(
396        #     "post",
397        #     "/api/v3/auth/email/login",
398        #     params={
399        #         "email": self._username,
400        #         "password": password,
401        #         "twoStep": 1,
402        #         "version": 0
403        #     },
404        # )
405        """
406        raise NotImplementedError("Pass_login_v3 has not yet been implemented")
407
408    async def code_login(self, code: int | str) -> UserData:
409        base_url = await self.base_url
410        header_clientid = self._get_header_client_id()
411
412        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
413        login_response = await login_request.request(
414            "post",
415            "/api/v1/loginWithCode",
416            params={
417                "username": self._username,
418                "verifycode": code,
419                "verifycodetype": "AUTH_EMAIL_CODE",
420            },
421        )
422        if login_response is None:
423            raise RoborockException("Login request response is None")
424        response_code = login_response.get("code")
425        if response_code != 200:
426            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
427            if response_code == 2018:
428                raise RoborockInvalidCode("Invalid code - check your code and try again.")
429            if response_code == 3009:
430                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
431            if response_code == 3006:
432                raise RoborockInvalidUserAgreement(
433                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
434                )
435            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
436        user_data = login_response.get("data")
437        if not isinstance(user_data, dict):
438            raise RoborockException("Got unexpected data type for user_data")
439        return UserData.from_dict(user_data)
440
441    async def _get_home_id(self, user_data: UserData):
442        base_url = await self.base_url
443        header_clientid = self._get_header_client_id()
444        home_id_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
445        home_id_response = await home_id_request.request(
446            "get",
447            "/api/v1/getHomeDetail",
448            headers={"Authorization": user_data.token},
449        )
450        if home_id_response is None:
451            raise RoborockException("home_id_response is None")
452        if home_id_response.get("code") != 200:
453            _LOGGER.info("Get Home Id failed with the following context: %s", home_id_response)
454            if home_id_response.get("code") == 2010:
455                raise RoborockInvalidCredentials(
456                    f"Invalid credentials ({home_id_response.get('msg')}) - check your login and try again."
457                )
458            raise RoborockException(f"{home_id_response.get('msg')} - response code: {home_id_response.get('code')}")
459
460        return home_id_response["data"]["rrHomeId"]
461
462    async def get_home_data(self, user_data: UserData) -> HomeData:
463        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
464            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
465        rriot = user_data.rriot
466        if rriot is None:
467            raise RoborockException("rriot is none")
468        home_id = await self._get_home_id(user_data)
469        if rriot.r.a is None:
470            raise RoborockException("Missing field 'a' in rriot reference")
471        home_request = PreparedRequest(
472            rriot.r.a,
473            self.session,
474            {
475                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{str(home_id)}"),
476            },
477        )
478        home_response = await home_request.request("get", "/user/homes/" + str(home_id))
479        if not home_response.get("success"):
480            raise RoborockException(home_response)
481        home_data = home_response.get("result")
482        if isinstance(home_data, dict):
483            return HomeData.from_dict(home_data)
484        else:
485            raise RoborockException("home_response result was an unexpected type")
486
487    async def get_home_data_v2(self, user_data: UserData) -> HomeData:
488        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
489        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
490            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
491        rriot = user_data.rriot
492        if rriot is None:
493            raise RoborockException("rriot is none")
494        home_id = await self._get_home_id(user_data)
495        if rriot.r.a is None:
496            raise RoborockException("Missing field 'a' in rriot reference")
497        home_request = PreparedRequest(
498            rriot.r.a,
499            self.session,
500            {
501                "Authorization": _get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)),
502            },
503        )
504        home_response = await home_request.request("get", "/v2/user/homes/" + str(home_id))
505        if not home_response.get("success"):
506            raise RoborockException(home_response)
507        home_data = home_response.get("result")
508        if isinstance(home_data, dict):
509            return HomeData.from_dict(home_data)
510        else:
511            raise RoborockException("home_response result was an unexpected type")
512
513    async def get_home_data_v3(self, user_data: UserData) -> HomeData:
514        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
515        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
516            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
517        rriot = user_data.rriot
518        home_id = await self._get_home_id(user_data)
519        if rriot.r.a is None:
520            raise RoborockException("Missing field 'a' in rriot reference")
521        home_request = PreparedRequest(
522            rriot.r.a,
523            self.session,
524            {
525                "Authorization": _get_hawk_authentication(rriot, "/v3/user/homes/" + str(home_id)),
526            },
527        )
528        home_response = await home_request.request("get", "/v3/user/homes/" + str(home_id))
529        if not home_response.get("success"):
530            raise RoborockException(home_response)
531        home_data = home_response.get("result")
532        if isinstance(home_data, dict):
533            return HomeData.from_dict(home_data)
534        raise RoborockException(f"home_response result was an unexpected type: {home_data}")
535
536    async def get_rooms(self, user_data: UserData, home_id: int | None = None) -> list[HomeDataRoom]:
537        rriot = user_data.rriot
538        if rriot is None:
539            raise RoborockException("rriot is none")
540        if home_id is None:
541            home_id = await self._get_home_id(user_data)
542        if rriot.r.a is None:
543            raise RoborockException("Missing field 'a' in rriot reference")
544        room_request = PreparedRequest(
545            rriot.r.a,
546            self.session,
547            {
548                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{home_id}/rooms"),
549            },
550        )
551        room_response = await room_request.request("get", f"/user/homes/{home_id}/rooms")
552        if not room_response.get("success"):
553            raise RoborockException(room_response)
554        rooms = room_response.get("result")
555        if isinstance(rooms, list):
556            output_list = []
557            for room in rooms:
558                output_list.append(HomeDataRoom.from_dict(room))
559            return output_list
560        else:
561            raise RoborockException("home_response result was an unexpected type")
562
563    async def get_shared_device_rooms(self, user_data: UserData, device_id: str) -> list[HomeDataRoom]:
564        """Fetch room names for a shared (received) device."""
565        rriot = user_data.rriot
566        if rriot is None:
567            raise RoborockException("rriot is none")
568        if rriot.r.a is None:
569            raise RoborockException("Missing field 'a' in rriot reference")
570        path = f"/user/deviceshare/query/{device_id}/rooms"
571        room_request = PreparedRequest(
572            rriot.r.a,
573            self.session,
574            {"Authorization": _get_hawk_authentication(rriot, path)},
575        )
576        room_response = await room_request.request("get", path)
577        if not room_response.get("success"):
578            raise RoborockException(room_response)
579        rooms = room_response.get("result")
580        if isinstance(rooms, list):
581            output_list = []
582            for room in rooms:
583                normalized_room = room
584                if isinstance(room, dict) and "id" not in room and "roomId" in room:
585                    normalized_room = {**room, "id": room["roomId"]}
586                output_list.append(HomeDataRoom.from_dict(normalized_room))
587            return output_list
588        raise RoborockException("get_shared_device_rooms result was an unexpected type")
589
590    async def get_scenes(self, user_data: UserData, device_id: str) -> list[HomeDataScene]:
591        rriot = user_data.rriot
592        if rriot is None:
593            raise RoborockException("rriot is none")
594        if rriot.r.a is None:
595            raise RoborockException("Missing field 'a' in rriot reference")
596        scenes_request = PreparedRequest(
597            rriot.r.a,
598            self.session,
599            {
600                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/device/{str(device_id)}"),
601            },
602        )
603        scenes_response = await scenes_request.request("get", f"/user/scene/device/{str(device_id)}")
604        if not scenes_response.get("success"):
605            raise RoborockException(scenes_response)
606        scenes = scenes_response.get("result")
607        if isinstance(scenes, list):
608            return [HomeDataScene.from_dict(scene) for scene in scenes]
609        else:
610            raise RoborockException("scene_response result was an unexpected type")
611
612    async def execute_scene(self, user_data: UserData, scene_id: int) -> None:
613        rriot = user_data.rriot
614        if rriot is None:
615            raise RoborockException("rriot is none")
616        if rriot.r.a is None:
617            raise RoborockException("Missing field 'a' in rriot reference")
618        execute_scene_request = PreparedRequest(
619            rriot.r.a,
620            self.session,
621            {
622                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/{str(scene_id)}/execute"),
623            },
624        )
625        execute_scene_response = await execute_scene_request.request("POST", f"/user/scene/{str(scene_id)}/execute")
626        if not execute_scene_response.get("success"):
627            raise RoborockException(execute_scene_response)
628
629    async def get_schedules(self, user_data: UserData, device_id: str) -> list[HomeDataSchedule]:
630        rriot = user_data.rriot
631        if rriot is None:
632            raise RoborockException("rriot is none")
633        if rriot.r.a is None:
634            raise RoborockException("Missing field 'a' in rriot reference")
635        schedules_request = PreparedRequest(
636            rriot.r.a,
637            self.session,
638            {
639                "Authorization": _get_hawk_authentication(rriot, f"/user/devices/{device_id}/jobs"),
640            },
641        )
642        schedules_response = await schedules_request.request("get", f"/user/devices/{str(device_id)}/jobs")
643        if not schedules_response.get("success"):
644            raise RoborockException(schedules_response)
645        schedules = schedules_response.get("result")
646        if isinstance(schedules, list):
647            return [HomeDataSchedule.from_dict(schedule) for schedule in schedules]
648        else:
649            raise RoborockException(f"schedule_response result was an unexpected type: {schedules}")
650
651    async def create_job(self, user_data: UserData, device_id: str, job: dict) -> dict:
652        """Create a /jobs entry (schedule or one-time room clean) on a B01 device.
653
654        Body-bearing writes must sign the request body in the Hawk payload slot and send those same
655        compact bytes via ``data=``; ``json=`` would re-serialize with spaces and break the MAC.
656        """
657        rriot = user_data.rriot
658        if rriot is None:
659            raise RoborockException("rriot is none")
660        if rriot.r.a is None:
661            raise RoborockException("Missing field 'a' in rriot reference")
662        path = f"/user/devices/{device_id}/jobs"
663        job_request = PreparedRequest(
664            rriot.r.a,
665            self.session,
666            {
667                "Authorization": _get_hawk_authentication(rriot, path, body=job),
668                "Content-Type": "application/json",
669            },
670        )
671        response = await job_request.request("post", path, data=_compact_json(job).encode())
672        if not response.get("success"):
673            raise RoborockException(response)
674        return response
675
676    async def get_products(self, user_data: UserData) -> ProductResponse:
677        """Gets all products and their schemas, good for determining status codes and model numbers."""
678        base_url = await self.base_url
679        header_clientid = self._get_header_client_id()
680        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
681        product_response = await product_request.request(
682            "get",
683            "/api/v4/product",
684            headers={"Authorization": user_data.token},
685        )
686        if product_response is None:
687            raise RoborockException("home_id_response is None")
688        if product_response.get("code") != 200:
689            raise RoborockException(f"{product_response.get('msg')} - response code: {product_response.get('code')}")
690        result = product_response.get("data")
691        if isinstance(result, dict):
692            return ProductResponse.from_dict(result)
693        raise RoborockException("product result was an unexpected type")
694
695    async def download_code(self, user_data: UserData, product_id: int):
696        base_url = await self.base_url
697        header_clientid = self._get_header_client_id()
698        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
699        request = {"apilevel": 99999, "productids": [product_id], "type": 2}
700        response = await product_request.request(
701            "post",
702            "/api/v1/appplugin",
703            json=request,
704            headers={"Authorization": user_data.token, "Content-Type": "application/json"},
705        )
706        return response["data"][0]["url"]
707
708    async def download_category_code(self, user_data: UserData):
709        base_url = await self.base_url
710        header_clientid = self._get_header_client_id()
711        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
712        response = await product_request.request(
713            "get",
714            "api/v1/plugins?apiLevel=99999&type=2",
715            headers={
716                "Authorization": user_data.token,
717            },
718        )
719        return {r["category"]: r["url"] for r in response["data"]["categoryPluginList"]}
RoborockApiClient( username: str, base_url: str | None = None, session: aiohttp.client.ClientSession | None = None)
69    def __init__(
70        self, username: str, base_url: str | None = None, session: aiohttp.ClientSession | None = None
71    ) -> None:
72        """Sample API Client."""
73        self._username = username
74        self._base_url = base_url
75        self._device_identifier = secrets.token_urlsafe(16)
76        self.session = session
77        self._iot_login_info: IotLoginInfo | None = None
78        self._base_urls = BASE_URLS if base_url is None else [base_url]

Sample API Client.

session
base_url
117    @property
118    async def base_url(self):
119        if self._base_url is not None:
120            return self._base_url
121        return (await self._get_iot_login_info()).base_url
country
123    @property
124    async def country(self):
125        return (await self._get_iot_login_info()).country
country_code
127    @property
128    async def country_code(self):
129        return (await self._get_iot_login_info()).country_code
async def nc_prepare( self, user_data: roborock.data.containers.UserData, timezone: str) -> dict:
137    async def nc_prepare(self, user_data: UserData, timezone: str) -> dict:
138        """This gets a few critical parameters for adding a device to your account."""
139        if (
140            user_data.rriot is None
141            or user_data.rriot.r is None
142            or user_data.rriot.u is None
143            or user_data.rriot.r.a is None
144        ):
145            raise RoborockException("Your userdata is missing critical attributes.")
146        base_url = user_data.rriot.r.a
147        prepare_request = PreparedRequest(base_url, self.session)
148        hid = await self._get_home_id(user_data)
149
150        data = FormData()
151        data.add_field("hid", hid)
152        data.add_field("tzid", timezone)
153
154        prepare_response = await prepare_request.request(
155            "post",
156            "/nc/prepare",
157            headers={
158                "Authorization": _get_hawk_authentication(
159                    user_data.rriot, "/nc/prepare", {"hid": hid, "tzid": timezone}
160                ),
161            },
162            data=data,
163        )
164
165        if prepare_response is None:
166            raise RoborockException("prepare_response is None")
167        if not prepare_response.get("success"):
168            raise RoborockException(f"{prepare_response.get('msg')} - response code: {prepare_response.get('code')}")
169
170        return prepare_response["result"]

This gets a few critical parameters for adding a device to your account.

async def add_device( self, user_data: roborock.data.containers.UserData, s: str, t: str) -> dict:
172    async def add_device(self, user_data: UserData, s: str, t: str) -> dict:
173        """This will add a new device to your account
174        it is recommended to only use this during a pairing cycle with a device.
175        Please see here: https://github.com/Python-roborock/Roborockmitmproxy/blob/main/handshake_protocol.md
176        """
177        if (
178            user_data.rriot is None
179            or user_data.rriot.r is None
180            or user_data.rriot.u is None
181            or user_data.rriot.r.a is None
182        ):
183            raise RoborockException("Your userdata is missing critical attributes.")
184        base_url = user_data.rriot.r.a
185        add_device_request = PreparedRequest(base_url, self.session)
186
187        add_device_response = await add_device_request.request(
188            "GET",
189            "/user/devices/newadd",
190            headers={
191                "Authorization": _get_hawk_authentication(
192                    user_data.rriot, "/user/devices/newadd", params={"s": s, "t": t}
193                ),
194            },
195            params={"s": s, "t": t},
196        )
197
198        if add_device_response is None:
199            raise RoborockException("add_device is None")
200        if not add_device_response.get("success"):
201            raise RoborockException(
202                f"{add_device_response.get('msg')} - response code: {add_device_response.get('code')}"
203            )
204
205        return add_device_response["result"]

This will add a new device to your account it is recommended to only use this during a pairing cycle with a device. Please see here: https://github.com/Python-roborock/Roborockmitmproxy/blob/main/handshake_protocol.md

async def request_code(self) -> None:
207    async def request_code(self) -> None:
208        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
209            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
210        base_url = await self.base_url
211        header_clientid = self._get_header_client_id()
212        code_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
213
214        code_response = await code_request.request(
215            "post",
216            "/api/v1/sendEmailCode",
217            params={
218                "username": self._username,
219                "type": "auth",
220            },
221        )
222        if code_response is None:
223            raise RoborockException("Failed to get a response from send email code")
224        response_code = code_response.get("code")
225        if response_code != 200:
226            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
227            if response_code == 2008:
228                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
229            elif response_code == 9002:
230                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
231            else:
232                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")
async def request_code_v4(self) -> None:
234    async def request_code_v4(self) -> None:
235        """Request a code using the v4 endpoint."""
236        if await self.country_code is None or await self.country is None:
237            _LOGGER.info("No country code or country found, trying old version of request code.")
238            return await self.request_code()
239        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
240            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
241        base_url = await self.base_url
242        header_clientid = self._get_header_client_id()
243        code_request = PreparedRequest(
244            base_url,
245            self.session,
246            {
247                "header_clientid": header_clientid,
248                "Content-Type": "application/x-www-form-urlencoded",
249                "header_clientlang": "en",
250            },
251        )
252
253        code_response = await code_request.request(
254            "post",
255            "/api/v4/email/code/send",
256            data={"email": self._username, "type": "login", "platform": ""},
257        )
258        if code_response is None:
259            raise RoborockException("Failed to get a response from send email code")
260        response_code = code_response.get("code")
261        if response_code != 200:
262            _LOGGER.info("Request code failed for %s with the following context: %s", self._username, code_response)
263            if response_code == 2008:
264                raise RoborockAccountDoesNotExist("Account does not exist - check your login and try again.")
265            elif response_code == 9002:
266                raise RoborockTooFrequentCodeRequests("You have attempted to request too many codes. Try again later")
267            elif response_code == 3030 and len(self._base_urls) > 1:
268                self._base_urls = self._base_urls[1:]
269                self._iot_login_info = None
270                return await self.request_code_v4()
271            else:
272                raise RoborockException(f"{code_response.get('msg')} - response code: {code_response.get('code')}")

Request a code using the v4 endpoint.

async def code_login_v4( self, code: int | str, country: str | None = None, country_code: int | None = None) -> roborock.data.containers.UserData:
296    async def code_login_v4(
297        self, code: int | str, country: str | None = None, country_code: int | None = None
298    ) -> UserData:
299        """
300        Login via code authentication.
301        :param code: The code from the email.
302        :param country: The two-character representation of the country, i.e. "US"
303        :param country_code: the country phone number code i.e. 1 for US.
304        """
305        base_url = await self.base_url
306        if country is None:
307            country = await self.country
308        if country_code is None:
309            country_code = await self.country_code
310        if country_code is None or country is None:
311            _LOGGER.info("No country code or country found, trying old version of code login.")
312            return await self.code_login(code)
313        header_clientid = self._get_header_client_id()
314        x_mercy_ks = "".join(secrets.choice(string.ascii_letters + string.digits) for _ in range(16))
315        x_mercy_k = await self._sign_key_v3(x_mercy_ks)
316        login_request = PreparedRequest(
317            base_url,
318            self.session,
319            {
320                "header_clientid": header_clientid,
321                "x-mercy-ks": x_mercy_ks,
322                "x-mercy-k": x_mercy_k,
323                "Content-Type": "application/x-www-form-urlencoded",
324                "header_clientlang": "en",
325                "header_appversion": "4.54.02",
326                "header_phonesystem": "iOS",
327                "header_phonemodel": "iPhone16,1",
328            },
329        )
330        login_response = await login_request.request(
331            "post",
332            "/api/v4/auth/email/login/code",
333            data={
334                "country": country,
335                "countryCode": country_code,
336                "email": self._username,
337                "code": code,
338                # Major and minor version are the user agreement version, we will need to see if this needs to be
339                # dynamic https://usiot.roborock.com/api/v3/app/agreement/latest?country=US
340                "majorVersion": 14,
341                "minorVersion": 0,
342            },
343        )
344        if login_response is None:
345            raise RoborockException("Login request response is None")
346        response_code = login_response.get("code")
347        if response_code != 200:
348            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
349            if response_code == 2018:
350                raise RoborockInvalidCode("Invalid code - check your code and try again.")
351            if response_code == 3009:
352                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
353            if response_code == 3006:
354                raise RoborockInvalidUserAgreement(
355                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
356                )
357            if response_code == 3039:
358                raise RoborockAccountDoesNotExist(
359                    "This account does not exist - please ensure that you selected the right region and email."
360                )
361            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
362        user_data = login_response.get("data")
363        if not isinstance(user_data, dict):
364            raise RoborockException("Got unexpected data type for user_data")
365        return UserData.from_dict(user_data)

Login via code authentication.

Parameters
  • code: The code from the email.
  • country: The two-character representation of the country, i.e. "US"
  • country_code: the country phone number code i.e. 1 for US.
async def pass_login(self, password: str) -> roborock.data.containers.UserData:
367    async def pass_login(self, password: str) -> UserData:
368        if not await self._login_limiter.try_acquire_async("login", blocking=True, timeout=1):
369            raise RoborockRateLimit("Reached maximum requests for login. Please try again later.")
370        base_url = await self.base_url
371        header_clientid = self._get_header_client_id()
372
373        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
374        login_response = await login_request.request(
375            "post",
376            "/api/v1/login",
377            params={
378                "username": self._username,
379                "password": password,
380                "needtwostepauth": "false",
381            },
382        )
383        if login_response is None:
384            raise RoborockException("Login response is none")
385        if login_response.get("code") != 200:
386            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
387            raise RoborockException(f"{login_response.get('msg')} - response code: {login_response.get('code')}")
388        user_data = login_response.get("data")
389        if not isinstance(user_data, dict):
390            raise RoborockException("Got unexpected data type for user_data")
391        return UserData.from_dict(user_data)
async def pass_login_v3(self, password: str) -> roborock.data.containers.UserData:
393    async def pass_login_v3(self, password: str) -> UserData:
394        """Seemingly it follows the format below, but password is encrypted in some manner.
395        # login_response = await login_request.request(
396        #     "post",
397        #     "/api/v3/auth/email/login",
398        #     params={
399        #         "email": self._username,
400        #         "password": password,
401        #         "twoStep": 1,
402        #         "version": 0
403        #     },
404        # )
405        """
406        raise NotImplementedError("Pass_login_v3 has not yet been implemented")

Seemingly it follows the format below, but password is encrypted in some manner.

login_response = await login_request.request(

"post",

"/api/v3/auth/email/login",

params={

"email": self._username,

"password": password,

"twoStep": 1,

"version": 0

},

)

async def code_login(self, code: int | str) -> roborock.data.containers.UserData:
408    async def code_login(self, code: int | str) -> UserData:
409        base_url = await self.base_url
410        header_clientid = self._get_header_client_id()
411
412        login_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
413        login_response = await login_request.request(
414            "post",
415            "/api/v1/loginWithCode",
416            params={
417                "username": self._username,
418                "verifycode": code,
419                "verifycodetype": "AUTH_EMAIL_CODE",
420            },
421        )
422        if login_response is None:
423            raise RoborockException("Login request response is None")
424        response_code = login_response.get("code")
425        if response_code != 200:
426            _LOGGER.info("Login failed for %s with the following context: %s", self._username, login_response)
427            if response_code == 2018:
428                raise RoborockInvalidCode("Invalid code - check your code and try again.")
429            if response_code == 3009:
430                raise RoborockNoUserAgreement("You must accept the user agreement in the Roborock app to continue.")
431            if response_code == 3006:
432                raise RoborockInvalidUserAgreement(
433                    "User agreement must be accepted again - or you are attempting to use the Mi Home app account."
434                )
435            raise RoborockException(f"{login_response.get('msg')} - response code: {response_code}")
436        user_data = login_response.get("data")
437        if not isinstance(user_data, dict):
438            raise RoborockException("Got unexpected data type for user_data")
439        return UserData.from_dict(user_data)
async def get_home_data( self, user_data: roborock.data.containers.UserData) -> roborock.data.containers.HomeData:
462    async def get_home_data(self, user_data: UserData) -> HomeData:
463        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
464            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
465        rriot = user_data.rriot
466        if rriot is None:
467            raise RoborockException("rriot is none")
468        home_id = await self._get_home_id(user_data)
469        if rriot.r.a is None:
470            raise RoborockException("Missing field 'a' in rriot reference")
471        home_request = PreparedRequest(
472            rriot.r.a,
473            self.session,
474            {
475                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{str(home_id)}"),
476            },
477        )
478        home_response = await home_request.request("get", "/user/homes/" + str(home_id))
479        if not home_response.get("success"):
480            raise RoborockException(home_response)
481        home_data = home_response.get("result")
482        if isinstance(home_data, dict):
483            return HomeData.from_dict(home_data)
484        else:
485            raise RoborockException("home_response result was an unexpected type")
async def get_home_data_v2( self, user_data: roborock.data.containers.UserData) -> roborock.data.containers.HomeData:
487    async def get_home_data_v2(self, user_data: UserData) -> HomeData:
488        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
489        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
490            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
491        rriot = user_data.rriot
492        if rriot is None:
493            raise RoborockException("rriot is none")
494        home_id = await self._get_home_id(user_data)
495        if rriot.r.a is None:
496            raise RoborockException("Missing field 'a' in rriot reference")
497        home_request = PreparedRequest(
498            rriot.r.a,
499            self.session,
500            {
501                "Authorization": _get_hawk_authentication(rriot, "/v2/user/homes/" + str(home_id)),
502            },
503        )
504        home_response = await home_request.request("get", "/v2/user/homes/" + str(home_id))
505        if not home_response.get("success"):
506            raise RoborockException(home_response)
507        home_data = home_response.get("result")
508        if isinstance(home_data, dict):
509            return HomeData.from_dict(home_data)
510        else:
511            raise RoborockException("home_response result was an unexpected type")

This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums.

async def get_home_data_v3( self, user_data: roborock.data.containers.UserData) -> roborock.data.containers.HomeData:
513    async def get_home_data_v3(self, user_data: UserData) -> HomeData:
514        """This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums."""
515        if not self._home_data_limiter.try_acquire("home_data", blocking=False):
516            raise RoborockRateLimit("Reached maximum requests for home data. Please try again later.")
517        rriot = user_data.rriot
518        home_id = await self._get_home_id(user_data)
519        if rriot.r.a is None:
520            raise RoborockException("Missing field 'a' in rriot reference")
521        home_request = PreparedRequest(
522            rriot.r.a,
523            self.session,
524            {
525                "Authorization": _get_hawk_authentication(rriot, "/v3/user/homes/" + str(home_id)),
526            },
527        )
528        home_response = await home_request.request("get", "/v3/user/homes/" + str(home_id))
529        if not home_response.get("success"):
530            raise RoborockException(home_response)
531        home_data = home_response.get("result")
532        if isinstance(home_data, dict):
533            return HomeData.from_dict(home_data)
534        raise RoborockException(f"home_response result was an unexpected type: {home_data}")

This is the same as get_home_data, but uses a different endpoint and includes non-robotic vacuums.

async def get_rooms( self, user_data: roborock.data.containers.UserData, home_id: int | None = None) -> list[roborock.data.containers.HomeDataRoom]:
536    async def get_rooms(self, user_data: UserData, home_id: int | None = None) -> list[HomeDataRoom]:
537        rriot = user_data.rriot
538        if rriot is None:
539            raise RoborockException("rriot is none")
540        if home_id is None:
541            home_id = await self._get_home_id(user_data)
542        if rriot.r.a is None:
543            raise RoborockException("Missing field 'a' in rriot reference")
544        room_request = PreparedRequest(
545            rriot.r.a,
546            self.session,
547            {
548                "Authorization": _get_hawk_authentication(rriot, f"/user/homes/{home_id}/rooms"),
549            },
550        )
551        room_response = await room_request.request("get", f"/user/homes/{home_id}/rooms")
552        if not room_response.get("success"):
553            raise RoborockException(room_response)
554        rooms = room_response.get("result")
555        if isinstance(rooms, list):
556            output_list = []
557            for room in rooms:
558                output_list.append(HomeDataRoom.from_dict(room))
559            return output_list
560        else:
561            raise RoborockException("home_response result was an unexpected type")
async def get_shared_device_rooms( self, user_data: roborock.data.containers.UserData, device_id: str) -> list[roborock.data.containers.HomeDataRoom]:
563    async def get_shared_device_rooms(self, user_data: UserData, device_id: str) -> list[HomeDataRoom]:
564        """Fetch room names for a shared (received) device."""
565        rriot = user_data.rriot
566        if rriot is None:
567            raise RoborockException("rriot is none")
568        if rriot.r.a is None:
569            raise RoborockException("Missing field 'a' in rriot reference")
570        path = f"/user/deviceshare/query/{device_id}/rooms"
571        room_request = PreparedRequest(
572            rriot.r.a,
573            self.session,
574            {"Authorization": _get_hawk_authentication(rriot, path)},
575        )
576        room_response = await room_request.request("get", path)
577        if not room_response.get("success"):
578            raise RoborockException(room_response)
579        rooms = room_response.get("result")
580        if isinstance(rooms, list):
581            output_list = []
582            for room in rooms:
583                normalized_room = room
584                if isinstance(room, dict) and "id" not in room and "roomId" in room:
585                    normalized_room = {**room, "id": room["roomId"]}
586                output_list.append(HomeDataRoom.from_dict(normalized_room))
587            return output_list
588        raise RoborockException("get_shared_device_rooms result was an unexpected type")

Fetch room names for a shared (received) device.

async def get_scenes( self, user_data: roborock.data.containers.UserData, device_id: str) -> list[roborock.data.containers.HomeDataScene]:
590    async def get_scenes(self, user_data: UserData, device_id: str) -> list[HomeDataScene]:
591        rriot = user_data.rriot
592        if rriot is None:
593            raise RoborockException("rriot is none")
594        if rriot.r.a is None:
595            raise RoborockException("Missing field 'a' in rriot reference")
596        scenes_request = PreparedRequest(
597            rriot.r.a,
598            self.session,
599            {
600                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/device/{str(device_id)}"),
601            },
602        )
603        scenes_response = await scenes_request.request("get", f"/user/scene/device/{str(device_id)}")
604        if not scenes_response.get("success"):
605            raise RoborockException(scenes_response)
606        scenes = scenes_response.get("result")
607        if isinstance(scenes, list):
608            return [HomeDataScene.from_dict(scene) for scene in scenes]
609        else:
610            raise RoborockException("scene_response result was an unexpected type")
async def execute_scene( self, user_data: roborock.data.containers.UserData, scene_id: int) -> None:
612    async def execute_scene(self, user_data: UserData, scene_id: int) -> None:
613        rriot = user_data.rriot
614        if rriot is None:
615            raise RoborockException("rriot is none")
616        if rriot.r.a is None:
617            raise RoborockException("Missing field 'a' in rriot reference")
618        execute_scene_request = PreparedRequest(
619            rriot.r.a,
620            self.session,
621            {
622                "Authorization": _get_hawk_authentication(rriot, f"/user/scene/{str(scene_id)}/execute"),
623            },
624        )
625        execute_scene_response = await execute_scene_request.request("POST", f"/user/scene/{str(scene_id)}/execute")
626        if not execute_scene_response.get("success"):
627            raise RoborockException(execute_scene_response)
async def get_schedules( self, user_data: roborock.data.containers.UserData, device_id: str) -> list[roborock.data.containers.HomeDataSchedule]:
629    async def get_schedules(self, user_data: UserData, device_id: str) -> list[HomeDataSchedule]:
630        rriot = user_data.rriot
631        if rriot is None:
632            raise RoborockException("rriot is none")
633        if rriot.r.a is None:
634            raise RoborockException("Missing field 'a' in rriot reference")
635        schedules_request = PreparedRequest(
636            rriot.r.a,
637            self.session,
638            {
639                "Authorization": _get_hawk_authentication(rriot, f"/user/devices/{device_id}/jobs"),
640            },
641        )
642        schedules_response = await schedules_request.request("get", f"/user/devices/{str(device_id)}/jobs")
643        if not schedules_response.get("success"):
644            raise RoborockException(schedules_response)
645        schedules = schedules_response.get("result")
646        if isinstance(schedules, list):
647            return [HomeDataSchedule.from_dict(schedule) for schedule in schedules]
648        else:
649            raise RoborockException(f"schedule_response result was an unexpected type: {schedules}")
async def create_job( self, user_data: roborock.data.containers.UserData, device_id: str, job: dict) -> dict:
651    async def create_job(self, user_data: UserData, device_id: str, job: dict) -> dict:
652        """Create a /jobs entry (schedule or one-time room clean) on a B01 device.
653
654        Body-bearing writes must sign the request body in the Hawk payload slot and send those same
655        compact bytes via ``data=``; ``json=`` would re-serialize with spaces and break the MAC.
656        """
657        rriot = user_data.rriot
658        if rriot is None:
659            raise RoborockException("rriot is none")
660        if rriot.r.a is None:
661            raise RoborockException("Missing field 'a' in rriot reference")
662        path = f"/user/devices/{device_id}/jobs"
663        job_request = PreparedRequest(
664            rriot.r.a,
665            self.session,
666            {
667                "Authorization": _get_hawk_authentication(rriot, path, body=job),
668                "Content-Type": "application/json",
669            },
670        )
671        response = await job_request.request("post", path, data=_compact_json(job).encode())
672        if not response.get("success"):
673            raise RoborockException(response)
674        return response

Create a /jobs entry (schedule or one-time room clean) on a B01 device.

Body-bearing writes must sign the request body in the Hawk payload slot and send those same compact bytes via data=; json= would re-serialize with spaces and break the MAC.

async def get_products( self, user_data: roborock.data.containers.UserData) -> roborock.data.containers.ProductResponse:
676    async def get_products(self, user_data: UserData) -> ProductResponse:
677        """Gets all products and their schemas, good for determining status codes and model numbers."""
678        base_url = await self.base_url
679        header_clientid = self._get_header_client_id()
680        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
681        product_response = await product_request.request(
682            "get",
683            "/api/v4/product",
684            headers={"Authorization": user_data.token},
685        )
686        if product_response is None:
687            raise RoborockException("home_id_response is None")
688        if product_response.get("code") != 200:
689            raise RoborockException(f"{product_response.get('msg')} - response code: {product_response.get('code')}")
690        result = product_response.get("data")
691        if isinstance(result, dict):
692            return ProductResponse.from_dict(result)
693        raise RoborockException("product result was an unexpected type")

Gets all products and their schemas, good for determining status codes and model numbers.

async def download_code(self, user_data: roborock.data.containers.UserData, product_id: int):
695    async def download_code(self, user_data: UserData, product_id: int):
696        base_url = await self.base_url
697        header_clientid = self._get_header_client_id()
698        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
699        request = {"apilevel": 99999, "productids": [product_id], "type": 2}
700        response = await product_request.request(
701            "post",
702            "/api/v1/appplugin",
703            json=request,
704            headers={"Authorization": user_data.token, "Content-Type": "application/json"},
705        )
706        return response["data"][0]["url"]
async def download_category_code(self, user_data: roborock.data.containers.UserData):
708    async def download_category_code(self, user_data: UserData):
709        base_url = await self.base_url
710        header_clientid = self._get_header_client_id()
711        product_request = PreparedRequest(base_url, self.session, {"header_clientid": header_clientid})
712        response = await product_request.request(
713            "get",
714            "api/v1/plugins?apiLevel=99999&type=2",
715            headers={
716                "Authorization": user_data.token,
717            },
718        )
719        return {r["category"]: r["url"] for r in response["data"]["categoryPluginList"]}
class PreparedRequest:
722class PreparedRequest:
723    def __init__(
724        self, base_url: str, session: aiohttp.ClientSession | None = None, base_headers: dict | None = None
725    ) -> None:
726        self.base_url = base_url
727        self.base_headers = base_headers or {}
728        self.session = session
729
730    async def request(self, method: str, url: str, params=None, data=None, headers=None, json=None) -> dict:
731        _url = "/".join(s.strip("/") for s in [self.base_url, url])
732        _headers = {**self.base_headers, **(headers or {})}
733        close_session = self.session is None
734        session = self.session if self.session is not None else aiohttp.ClientSession()
735        try:
736            async with session.request(method, _url, params=params, data=data, headers=_headers, json=json) as resp:
737                return await resp.json()
738        except ContentTypeError as err:
739            """If we get an error, lets log everything for debugging."""
740            try:
741                resp_json = await resp.json(content_type=None)
742                _LOGGER.info("Resp: %s", resp_json)
743            except ContentTypeError as err_2:
744                _LOGGER.info(err_2)
745            resp_raw = await resp.read()
746            _LOGGER.info("Resp raw: %s", resp_raw)
747            # Still raise the err so that it's clear it failed.
748            raise err
749        except (aiohttp.ClientError, TimeoutError, OSError) as err:
750            raise RoborockException(f"Network error contacting {_url}: {err}") from err
751        finally:
752            if close_session:
753                await session.close()
PreparedRequest( base_url: str, session: aiohttp.client.ClientSession | None = None, base_headers: dict | None = None)
723    def __init__(
724        self, base_url: str, session: aiohttp.ClientSession | None = None, base_headers: dict | None = None
725    ) -> None:
726        self.base_url = base_url
727        self.base_headers = base_headers or {}
728        self.session = session
base_url
base_headers
session
async def request( self, method: str, url: str, params=None, data=None, headers=None, json=None) -> dict:
730    async def request(self, method: str, url: str, params=None, data=None, headers=None, json=None) -> dict:
731        _url = "/".join(s.strip("/") for s in [self.base_url, url])
732        _headers = {**self.base_headers, **(headers or {})}
733        close_session = self.session is None
734        session = self.session if self.session is not None else aiohttp.ClientSession()
735        try:
736            async with session.request(method, _url, params=params, data=data, headers=_headers, json=json) as resp:
737                return await resp.json()
738        except ContentTypeError as err:
739            """If we get an error, lets log everything for debugging."""
740            try:
741                resp_json = await resp.json(content_type=None)
742                _LOGGER.info("Resp: %s", resp_json)
743            except ContentTypeError as err_2:
744                _LOGGER.info(err_2)
745            resp_raw = await resp.read()
746            _LOGGER.info("Resp raw: %s", resp_raw)
747            # Still raise the err so that it's clear it failed.
748            raise err
749        except (aiohttp.ClientError, TimeoutError, OSError) as err:
750            raise RoborockException(f"Network error contacting {_url}: {err}") from err
751        finally:
752            if close_session:
753                await session.close()
class UserWebApiClient:
803class UserWebApiClient:
804    """Wrapper around RoborockApiClient to provide information for a specific user.
805
806    This binds a RoborockApiClient to a specific user context with the
807    provided UserData. This allows for easier access to user-specific data,
808    to avoid needing to pass UserData around and mock out the web API.
809    """
810
811    def __init__(
812        self, web_api: RoborockApiClient, user_data: UserData, unauthorized_hook: Callable[[], None] | None = None
813    ) -> None:
814        """Initialize the wrapper with the API client and user data."""
815        self._web_api = web_api
816        self._user_data = user_data
817        self._unauthorized_hook = unauthorized_hook
818
819    async def get_home_data(self) -> HomeData:
820        """Fetch home data using the API client."""
821        try:
822            return await self._web_api.get_home_data_v3(self._user_data)
823        except RoborockInvalidCredentials:
824            if self._unauthorized_hook:
825                self._unauthorized_hook()
826            raise
827
828    async def get_routines(self, device_id: str) -> list[HomeDataScene]:
829        """Fetch routines (scenes) for a specific device."""
830        try:
831            return await self._web_api.get_scenes(self._user_data, device_id)
832        except RoborockInvalidCredentials:
833            if self._unauthorized_hook:
834                self._unauthorized_hook()
835            raise
836
837    async def get_rooms(self) -> list[HomeDataRoom]:
838        """Fetch rooms using the API client."""
839        try:
840            return await self._web_api.get_rooms(self._user_data)
841        except RoborockInvalidCredentials:
842            if self._unauthorized_hook:
843                self._unauthorized_hook()
844            raise
845
846    async def get_shared_device_rooms(self, device_id: str) -> list[HomeDataRoom]:
847        """Fetch shared-device rooms using the API client."""
848        return await self._web_api.get_shared_device_rooms(self._user_data, device_id)
849
850    async def execute_routine(self, scene_id: int) -> None:
851        """Execute a specific routine (scene) by its ID."""
852        try:
853            await self._web_api.execute_scene(self._user_data, scene_id)
854        except RoborockInvalidCredentials:
855            if self._unauthorized_hook:
856                self._unauthorized_hook()
857            raise

Wrapper around RoborockApiClient to provide information for a specific user.

This binds a RoborockApiClient to a specific user context with the provided UserData. This allows for easier access to user-specific data, to avoid needing to pass UserData around and mock out the web API.

UserWebApiClient( web_api: RoborockApiClient, user_data: roborock.data.containers.UserData, unauthorized_hook: Callable[[], None] | None = None)
811    def __init__(
812        self, web_api: RoborockApiClient, user_data: UserData, unauthorized_hook: Callable[[], None] | None = None
813    ) -> None:
814        """Initialize the wrapper with the API client and user data."""
815        self._web_api = web_api
816        self._user_data = user_data
817        self._unauthorized_hook = unauthorized_hook

Initialize the wrapper with the API client and user data.

async def get_home_data(self) -> roborock.data.containers.HomeData:
819    async def get_home_data(self) -> HomeData:
820        """Fetch home data using the API client."""
821        try:
822            return await self._web_api.get_home_data_v3(self._user_data)
823        except RoborockInvalidCredentials:
824            if self._unauthorized_hook:
825                self._unauthorized_hook()
826            raise

Fetch home data using the API client.

async def get_routines(self, device_id: str) -> list[roborock.data.containers.HomeDataScene]:
828    async def get_routines(self, device_id: str) -> list[HomeDataScene]:
829        """Fetch routines (scenes) for a specific device."""
830        try:
831            return await self._web_api.get_scenes(self._user_data, device_id)
832        except RoborockInvalidCredentials:
833            if self._unauthorized_hook:
834                self._unauthorized_hook()
835            raise

Fetch routines (scenes) for a specific device.

async def get_rooms(self) -> list[roborock.data.containers.HomeDataRoom]:
837    async def get_rooms(self) -> list[HomeDataRoom]:
838        """Fetch rooms using the API client."""
839        try:
840            return await self._web_api.get_rooms(self._user_data)
841        except RoborockInvalidCredentials:
842            if self._unauthorized_hook:
843                self._unauthorized_hook()
844            raise

Fetch rooms using the API client.

async def get_shared_device_rooms(self, device_id: str) -> list[roborock.data.containers.HomeDataRoom]:
846    async def get_shared_device_rooms(self, device_id: str) -> list[HomeDataRoom]:
847        """Fetch shared-device rooms using the API client."""
848        return await self._web_api.get_shared_device_rooms(self._user_data, device_id)

Fetch shared-device rooms using the API client.

async def execute_routine(self, scene_id: int) -> None:
850    async def execute_routine(self, scene_id: int) -> None:
851        """Execute a specific routine (scene) by its ID."""
852        try:
853            await self._web_api.execute_scene(self._user_data, scene_id)
854        except RoborockInvalidCredentials:
855            if self._unauthorized_hook:
856                self._unauthorized_hook()
857            raise

Execute a specific routine (scene) by its ID.