Source code for serenity_sdk.client.auth0_device_auth

import json
import time
from http.client import HTTPSConnection

from serenity_sdk.client.base_auth import AuthToken, BaseAuthCredential
from serenity_sdk.config import ConnectionConfig


[docs] class Auth0DeviceAuth(BaseAuthCredential): """ Class to authenticate with the Serenity API using the Auth0 Device Code Flow. This class is for internal use and normally you should not need to instantiate it. """ def __init__(self, config: ConnectionConfig): """ :param config: The loaded connection configuration params to use for the authentication process """ self.access_token = AuthToken() self._config = config
[docs] def get_token(self) -> AuthToken: """ Gets an access token from the Auth0 API. This method will block until either the user has completed the authentication process or a timeout has occurred. """ conn = HTTPSConnection(f"{self._config.domain}") payload = f"client_id={self._config.user_client_id}&audience={self._config.user_audience}" # noqa: E501 - URL definition headers = {'content-type': "application/x-www-form-urlencoded"} conn.request("POST", "/oauth/device/code", payload, headers) res = conn.getresponse() data = res.read() code_request_res = json.loads(data.decode("utf-8")) device_code = code_request_res["device_code"] # If expires_in property is not a number, raise an error if not isinstance(code_request_res["expires_in"], int): raise ValueError(f"expires_in property is not a number: {code_request_res['expires_in']}") print(f"Please visit {code_request_res['verification_uri_complete']} to complete the authentication process...") time.sleep(5) payload = f"grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Adevice_code&device_code={device_code}&client_id={self._config.user_client_id}" # noqa: E501 - URL definition poll_count = 0 max_poll = 18 while True: poll_count += 1 conn.request("POST", "/oauth/token", payload, headers) res = conn.getresponse() data = res.read() token_res = json.loads(data.decode("utf-8")) if "error" in token_res and token_res["error"] == "authorization_pending": seconds_left = max_poll * 5 - poll_count * 5 if seconds_left <= 0: print("Error: Timeout waiting for authorization. Feel free to try again...") raise ValueError("Timeout waiting for authorization") print(f"Waiting for authorization... ({seconds_left} seconds left)") time.sleep(5) elif "error" in token_res: raise ValueError(f"Error in response: {token_res['error']}") else: break print("Access Confirmed!") expires_on = int(time.time()) + int(token_res["expires_in"]) self.access_token.token = token_res["access_token"] expires_on_display = time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(expires_on)) current_time = time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(time.time())) print(f"- Auth code expires: {expires_on_display} (Current time: {current_time})") self.access_token.expires_on = expires_on # return a new object with properties access_token and access_token_expiry return self.access_token