Skip to content

Authenticator

pyfamilysafety.authenticator.Authenticator

Authenticator(client_session=None)

The base authenticator class.

init the class.

Source code in pyfamilysafety/authenticator/__init__.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
        self,
        client_session: aiohttp.ClientSession = None
    ) -> None:
    """init the class."""
    _LOGGER.debug(">> Init authenticator.")
    self.expires: datetime = None
    self.refresh_token: str = None
    self._access_token: str = None
    self.user_id: str = None
    self._ppft: str = None
    self._login_lock: asyncio.Lock = asyncio.Lock()
    if client_session is None:
        client_session = aiohttp.ClientSession()
    self.client_session: aiohttp.ClientSession = client_session

expires instance-attribute

expires = None

refresh_token instance-attribute

refresh_token = None

user_id instance-attribute

user_id = None

client_session instance-attribute

client_session = client_session

access_token property

access_token

Returns the access token.

access_token_expired property

access_token_expired

Check if the access token has expired.

create async classmethod

create(token, use_refresh_token=False, client_session=None)

Creates and starts a Microsoft auth session without retaining the username and password.

Source code in pyfamilysafety/authenticator/__init__.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@classmethod
async def create(
    cls,
    token: str,
    use_refresh_token: bool=False,
    client_session: aiohttp.ClientSession | None = None) -> 'Authenticator':
    """Creates and starts a Microsoft auth session without retaining the username and password."""
    auth = cls(client_session=client_session)
    if use_refresh_token:
        auth.refresh_token = token
        await auth.perform_refresh()
        return auth
    else:
        redir_parsed = auth._parse_response_token(token)
        await auth.perform_login(redir_parsed["code"])
        return auth

perform_login async

perform_login(auth_code)

Performs login from the username and password.

Source code in pyfamilysafety/authenticator/__init__.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
async def perform_login(self, auth_code):
    """Performs login from the username and password."""
    if self._login_lock.locked():
        while self._login_lock.locked():
            await asyncio.sleep(2)
        return
    async with self._login_lock:
        _LOGGER.debug(">> Performing authenticator login")
        form = aiohttp.FormData()
        form.add_field("client_id", CLIENT_ID)
        form.add_field("code", auth_code)
        form.add_field("grant_type", "authorization_code")
        form.add_field("redirect_uri", REDIRECT_URL)
        form.add_field("scope", SCOPE)
        tokens = await self._request_handler(
            method="POST",
            url=TOKEN_ENDPOINT,
            data=form
        )
        _LOGGER.debug(">> Token request response %s", tokens["status"])
        if tokens["status"] == 200:
            self._access_token = tokens["json"]["access_token"]
            self.expires = datetime.now() + timedelta(seconds=tokens["json"]["expires_in"])
            self.refresh_token = tokens["json"]["refresh_token"]
            self.user_id = tokens["json"]["user_id"]
        else:
            raise Unauthorized()

perform_refresh async

perform_refresh()

Refresh the token.

Source code in pyfamilysafety/authenticator/__init__.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
async def perform_refresh(self):
    """Refresh the token."""
    if self._login_lock.locked():
        while self._login_lock.locked():
            await asyncio.sleep(2)
        return
    async with self._login_lock:
        _LOGGER.debug(">> Performing authenticator refresh")
        form = aiohttp.FormData()
        form.add_field("client_id", CLIENT_ID)
        form.add_field("refresh_token", self.refresh_token)
        form.add_field("grant_type", "refresh_token")
        form.add_field("scope", SCOPE)
        tokens = await self._request_handler(
            method="POST",
            url=TOKEN_ENDPOINT,
            data=form
        )
        _LOGGER.debug(">> Token request response %s", tokens["status"])
        _LOGGER.debug(">> Token response value %s", tokens)
        if tokens["status"] == 200:
            self._access_token = tokens["json"]["access_token"]
            self.expires = datetime.now() + timedelta(seconds=tokens["json"]["expires_in"])
            self.refresh_token = tokens["json"]["refresh_token"]
            self.user_id = tokens["json"]["user_id"]