33from json import loads
44from typing import Any
55from urllib .parse import parse_qsl
6+ from enum import Enum
67
78from django .utils .crypto import constant_time_compare , get_random_string
89from django .utils .translation import gettext as _
1617SESSION_KEY_OAUTH_PKCE = "authentik/sources/oauth/pkce"
1718
1819
20+ class AuthScheme (Enum ):
21+ BASIC_AUTH = "basic_auth"
22+ POST_BODY = "post_body"
23+
24+
1925class OAuth2Client (BaseOAuthClient ):
2026 """OAuth2 Client"""
2127
2228 _default_headers = {
2329 "Accept" : "application/json" ,
2430 }
2531
32+ _source_auth_scheme : AuthScheme = AuthScheme .BASIC_AUTH
33+
2634 def get_request_arg (self , key : str , default : Any | None = None ) -> Any :
2735 """Depending on request type, get data from post or get"""
2836 if self .request .method == "POST" :
@@ -72,6 +80,12 @@ def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
7280 "code" : code ,
7381 "grant_type" : "authorization_code" ,
7482 }
83+ basic_auth = None
84+ if self ._source_auth_scheme == AuthScheme .BASIC_AUTH :
85+ basic_auth = (self .get_client_id (), self .get_client_secret ())
86+ if self ._source_auth_scheme == AuthScheme .POST_BODY :
87+ args ["client_id" ] = self .get_client_id ()
88+ args ["client_secret" ] = self .get_client_secret ()
7589 if SESSION_KEY_OAUTH_PKCE in self .request .session :
7690 args ["code_verifier" ] = self .request .session [SESSION_KEY_OAUTH_PKCE ]
7791 try :
@@ -81,7 +95,7 @@ def get_access_token(self, **request_kwargs) -> dict[str, Any] | None:
8195 response = self .do_request (
8296 "post" ,
8397 access_token_url ,
84- auth = ( self . get_client_id (), self . get_client_secret ()) ,
98+ auth = basic_auth ,
8599 data = args ,
86100 headers = self ._default_headers ,
87101 ** request_kwargs ,
0 commit comments