Skip to content

auth

pixel_client.auth

KeyCloakAuth

Bases: httpx.Auth

Auth class for KeyCloak authentication

Source code in src/pixel_client/auth.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class KeyCloakAuth(httpx.Auth):
    """Auth class for KeyCloak authentication"""

    def __init__(
        self,
        keycloak_server_url: str,
        realm: str,
        client_id: str,
        username: str,
        password: str,
    ):
        """Initialize the KeyCloakAuth class"""
        self.token_url = (
            f"{keycloak_server_url}/realms/{realm}/protocol/openid-connect/token"
        )
        self.client_id = client_id
        self.username = username
        self.password = password
        self.realm = realm
        self._token = None

    def auth_flow(self, request):
        """Flow for the authentication.

        Note:
            See [httpx documentation](https://www.python-httpx.org/advanced/authentication) on custom authentication schemes
        """
        request.headers["Authorization"] = f"Bearer {self.token}"
        response = yield request
        if response.status_code == 401:
            logger.info("Token expired, retrieving new token")
            self._token = None  # Clear the token
            request.headers["Authorization"] = f"Bearer {self.token}"
            yield request

    @property
    def token(self):
        if self._token is None:
            self._token = self._retrieve_token()
        return self._token

    def _retrieve_token(self):
        # Use the token_url to get the token
        with httpx.Client() as client:
            logger.info(f"Retrieving token from {self.token_url}")
            headers = {"Content-Type": "application/x-www-form-urlencoded"}
            response = client.post(
                self.token_url,
                data={
                    "client_id": self.client_id,
                    "username": self.username,
                    "password": self.password,
                    "scope": "openid email profile",
                    "grant_type": "password",
                },
                headers=headers,
            )
            response.raise_for_status()
            return response.json()["access_token"]

token_url instance-attribute

token_url = f"{keycloak_server_url}/realms/{realm}/protocol/openid-connect/token"

client_id instance-attribute

client_id = client_id

username instance-attribute

username = username

password instance-attribute

password = password

realm instance-attribute

realm = realm

token property

token

__init__

__init__(
    keycloak_server_url: str,
    realm: str,
    client_id: str,
    username: str,
    password: str,
)
Source code in src/pixel_client/auth.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def __init__(
    self,
    keycloak_server_url: str,
    realm: str,
    client_id: str,
    username: str,
    password: str,
):
    """Initialize the KeyCloakAuth class"""
    self.token_url = (
        f"{keycloak_server_url}/realms/{realm}/protocol/openid-connect/token"
    )
    self.client_id = client_id
    self.username = username
    self.password = password
    self.realm = realm
    self._token = None

auth_flow

auth_flow(request)

Flow for the authentication.

Note

See httpx documentation on custom authentication schemes

Source code in src/pixel_client/auth.py
28
29
30
31
32
33
34
35
36
37
38
39
40
def auth_flow(self, request):
    """Flow for the authentication.

    Note:
        See [httpx documentation](https://www.python-httpx.org/advanced/authentication) on custom authentication schemes
    """
    request.headers["Authorization"] = f"Bearer {self.token}"
    response = yield request
    if response.status_code == 401:
        logger.info("Token expired, retrieving new token")
        self._token = None  # Clear the token
        request.headers["Authorization"] = f"Bearer {self.token}"
        yield request