Skip to content

Commit

Permalink
make okta client credentials scope mandatory
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin-b committed Jun 17, 2024
1 parent ab77590 commit 9ef81c6
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Type information is now provided following [PEP 561](https://www.python.org/dev/peps/pep-0561/).
- Remove deprecation warnings due to usage of `utcnow` and `utcfromtimestamp`.
- `requests_auth.OktaClientCredentials` `scope` parameter is now mandatory and does not default to `openid` anymore.
- `requests_auth.OktaClientCredentials` will now display a more user-friendly error message in case Okta instance is not provided.
- Tokens cache `DEBUG` logs will not display tokens anymore.

### Removed
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ import requests
from requests_auth import OktaClientCredentials


okta = OktaClientCredentials(instance='testserver.okta-emea.com', client_id='54239d18-c68c-4c47-8bdd-ce71ea1d50cd', client_secret="secret")
okta = OktaClientCredentials(instance='testserver.okta-emea.com', client_id='54239d18-c68c-4c47-8bdd-ce71ea1d50cd', client_secret="secret", scope=["scope1", "scope2"])
requests.get('https://www.example.com', auth=okta)
```

Expand All @@ -369,11 +369,11 @@ requests.get('https://www.example.com', auth=okta)
| `instance` | Okta instance (like "testserver.okta-emea.com"). | Mandatory | |
| `client_id` | Okta Application Identifier (formatted as an Universal Unique Identifier). | Mandatory | |
| `client_secret` | Resource owner password. | Mandatory | |
| `scope` | Scope parameter sent in query. Can also be a list of scopes. | Mandatory | |
| `authorization_server` | Okta authorization server. | Optional | 'default' |
| `timeout` | Maximum amount of seconds to wait for a token to be received once requested. | Optional | 60 |
| `header_name` | Name of the header field used to send token. | Optional | Authorization |
| `header_value` | Format used to send the token value. "{token}" must be present as it will be replaced by the actual token. | Optional | Bearer {token} |
| `scope` | Scope parameter sent in query. Can also be a list of scopes. | Optional | openid |
| `token_field_name` | Field name containing the token. | Optional | access_token |
| `early_expiry` | Number of seconds before actual token expiry where token will be considered as expired. Used to ensure token will not expire between the time of retrieval and the time the request reaches the actual server. Set it to 0 to deactivate this feature and use the same token until actual expiry. | Optional | 30.0 |
| `session` | `requests.Session` instance that will be used to request the token. Use it to provide a custom proxying rule for instance. | Optional | |
Expand Down
21 changes: 16 additions & 5 deletions requests_auth/_oauth2/client_credentials.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from hashlib import sha512
from typing import Union, Iterable

import requests
import requests.auth
Expand Down Expand Up @@ -98,11 +99,20 @@ class OktaClientCredentials(OAuth2ClientCredentials):
Describes an Okta (OAuth 2) client credentials (also called application) flow requests authentication.
"""

def __init__(self, instance: str, client_id: str, client_secret: str, **kwargs):
def __init__(
self,
instance: str,
client_id: str,
client_secret: str,
*,
scope: Union[str, Iterable[str]],
**kwargs,
):
"""
:param instance: Okta instance (like "testserver.okta-emea.com")
:param client_id: Okta Application Identifier (formatted as an Universal Unique Identifier)
:param client_secret: Resource owner password.
:param scope: Scope parameter sent to token URL as body. Can also be a list of scopes.
:param authorization_server: Okta authorization server
default by default.
:param timeout: Maximum amount of seconds to wait for a token to be received once requested.
Expand All @@ -112,8 +122,6 @@ def __init__(self, instance: str, client_id: str, client_secret: str, **kwargs):
:param header_value: Format used to send the token value.
"{token}" must be present as it will be replaced by the actual token.
Token will be sent as "Bearer {token}" by default.
:param scope: Scope parameter sent to token URL as body. Can also be a list of scopes.
Request 'openid' by default.
:param token_field_name: Field name containing the token. access_token by default.
:param early_expiry: Number of seconds before actual token expiry where token will be considered as expired.
Default to 30 seconds to ensure token will not expire between the time of retrieval and the time the request
Expand All @@ -122,13 +130,16 @@ def __init__(self, instance: str, client_id: str, client_secret: str, **kwargs):
Use it to provide a custom proxying rule for instance.
:param kwargs: all additional authorization parameters that should be put as query parameter in the token URL.
"""
if not scope:
raise Exception("scope is mandatory.")
if not instance:
raise Exception("Okta instance is mandatory.")
authorization_server = kwargs.pop("authorization_server", None) or "default"
scopes = kwargs.pop("scope", "openid")
kwargs["scope"] = " ".join(scopes) if isinstance(scopes, list) else scopes
OAuth2ClientCredentials.__init__(
self,
f"https://{instance}/oauth2/{authorization_server}/v1/token",
client_id=client_id,
client_secret=client_secret,
scope=scope,
**kwargs,
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import requests
from responses import RequestsMock
from responses.matchers import header_matcher
from responses.matchers import header_matcher, urlencoded_params_matcher

import requests_auth
from requests_auth.testing import token_cache # noqa: F401
Expand All @@ -12,7 +13,11 @@ def test_okta_client_credentials_flow_uses_provided_session(
session = requests.Session()
session.headers.update({"x-test": "Test value"})
auth = requests_auth.OktaClientCredentials(
"test_okta", client_id="test_user", client_secret="test_pwd", session=session
"test_okta",
client_id="test_user",
client_secret="test_pwd",
scope="dummy",
session=session,
)
responses.post(
"https://test_okta/oauth2/default/v1/token",
Expand All @@ -25,6 +30,9 @@ def test_okta_client_credentials_flow_uses_provided_session(
},
match=[
header_matcher({"x-test": "Test value"}),
urlencoded_params_matcher(
{"grant_type": "client_credentials", "scope": "dummy"}
),
],
)
responses.get(
Expand All @@ -39,7 +47,7 @@ def test_okta_client_credentials_flow_token_is_sent_in_authorization_header_by_d
token_cache, responses: RequestsMock
):
auth = requests_auth.OktaClientCredentials(
"test_okta", client_id="test_user", client_secret="test_pwd"
"test_okta", client_id="test_user", client_secret="test_pwd", scope="dummy"
)
responses.post(
"https://test_okta/oauth2/default/v1/token",
Expand All @@ -50,6 +58,11 @@ def test_okta_client_credentials_flow_token_is_sent_in_authorization_header_by_d
"refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
"example_parameter": "example_value",
},
match=[
urlencoded_params_matcher(
{"grant_type": "client_credentials", "scope": "dummy"}
),
],
)
responses.get(
"http://authorized_only",
Expand All @@ -63,11 +76,11 @@ def test_okta_client_credentials_flow_token_is_expired_after_30_seconds_by_defau
token_cache, responses: RequestsMock
):
auth = requests_auth.OktaClientCredentials(
"test_okta", client_id="test_user", client_secret="test_pwd"
"test_okta", client_id="test_user", client_secret="test_pwd", scope="dummy"
)
# Add a token that expires in 29 seconds, so should be considered as expired when issuing the request
token_cache._add_token(
key="f0d25aa4e496c6615328e776bb981dabe53fa77768a0a58eaf6d54215c598d80e57ffc7926fd96ec6a6a872942cb684a473e36233b593fb760d3eb6dc22ae550",
key="7830dd38bb95d4ac6273bd1a208c3db2097ac2715c6d3fb646ef3ccd48877109dd4cba292cef535559747cf6c4f497bf0804994dfb1c31bb293d2774889c2cfb",
token="2YotnFZFEjr1zCsicMWpAA",
expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29),
)
Expand All @@ -81,6 +94,11 @@ def test_okta_client_credentials_flow_token_is_expired_after_30_seconds_by_defau
"refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
"example_parameter": "example_value",
},
match=[
urlencoded_params_matcher(
{"grant_type": "client_credentials", "scope": "dummy"}
),
],
)
responses.get(
"http://authorized_only",
Expand All @@ -97,11 +115,12 @@ def test_okta_client_credentials_flow_token_custom_expiry(
"test_okta",
client_id="test_user",
client_secret="test_pwd",
scope="dummy",
early_expiry=28,
)
# Add a token that expires in 29 seconds, so should be considered as not expired when issuing the request
token_cache._add_token(
key="f0d25aa4e496c6615328e776bb981dabe53fa77768a0a58eaf6d54215c598d80e57ffc7926fd96ec6a6a872942cb684a473e36233b593fb760d3eb6dc22ae550",
key="7830dd38bb95d4ac6273bd1a208c3db2097ac2715c6d3fb646ef3ccd48877109dd4cba292cef535559747cf6c4f497bf0804994dfb1c31bb293d2774889c2cfb",
token="2YotnFZFEjr1zCsicMWpAA",
expiry=requests_auth._oauth2.tokens._to_expiry(expires_in=29),
)
Expand All @@ -115,7 +134,7 @@ def test_okta_client_credentials_flow_token_custom_expiry(

def test_expires_in_sent_as_str(token_cache, responses: RequestsMock):
auth = requests_auth.OktaClientCredentials(
"test_okta", client_id="test_user", client_secret="test_pwd"
"test_okta", client_id="test_user", client_secret="test_pwd", scope="dummy"
)
responses.post(
"https://test_okta/oauth2/default/v1/token",
Expand All @@ -126,10 +145,53 @@ def test_expires_in_sent_as_str(token_cache, responses: RequestsMock):
"refresh_token": "tGzv3JOkF0XG5Qx2TlKWIA",
"example_parameter": "example_value",
},
match=[
urlencoded_params_matcher(
{"grant_type": "client_credentials", "scope": "dummy"}
),
],
)
responses.get(
"http://authorized_only",
match=[header_matcher({"Authorization": "Bearer 2YotnFZFEjr1zCsicMWpAA"})],
)

requests.get("http://authorized_only", auth=auth)


def test_scope_is_mandatory():
with pytest.raises(Exception) as exception_info:
requests_auth.OktaClientCredentials(
"test_url", "test_user", "test_pwd", scope=""
)
assert str(exception_info.value) == "scope is mandatory."


def test_instance_is_mandatory():
with pytest.raises(Exception) as exception_info:
requests_auth.OktaClientCredentials("", "test_user", "test_pwd", scope="dummy")
assert str(exception_info.value) == "Okta instance is mandatory."


def test_client_id_is_mandatory():
with pytest.raises(Exception) as exception_info:
requests_auth.OktaClientCredentials("test_url", "", "test_pwd", scope="dummy")
assert str(exception_info.value) == "client_id is mandatory."


def test_client_secret_is_mandatory():
with pytest.raises(Exception) as exception_info:
requests_auth.OktaClientCredentials("test_url", "test_user", "", scope="dummy")
assert str(exception_info.value) == "client_secret is mandatory."


def test_header_value_must_contains_token():
with pytest.raises(Exception) as exception_info:
requests_auth.OktaClientCredentials(
"test_url",
"test_user",
"test_pwd",
scope="dummy",
header_value="Bearer token",
)
assert str(exception_info.value) == "header_value parameter must contains {token}."

0 comments on commit 9ef81c6

Please sign in to comment.