1
+ from datetime import datetime , timedelta , timezone
2
+ import os
3
+ from typing import Annotated
4
+
5
+
6
+ from fastapi import APIRouter , Depends , HTTPException , status
7
+ from fastapi .security import OAuth2PasswordBearer , OAuth2PasswordRequestForm
8
+ from jose import JWTError , jwt
9
+ from passlib .context import CryptContext
10
+ from pydantic import BaseModel
11
+
12
+ # to get a string like this run:
13
+ # openssl rand -hex 32
14
+
15
+ # Fetch environmental variables
16
+ SECRET_KEY = os .getenv ("SPECTACLES_SECRET_KEY" )
17
+ ALGORITHM = os .getenv ("SPECTACLES_ALGORITHM" )
18
+
19
+ if not SECRET_KEY or not ALGORITHM :
20
+ raise ValueError ("SPECTACLES_SECRET_KEY and SPECTACLES_ALGORITHM must be set as environmental variables" )
21
+
22
+ ACCESS_TOKEN_EXPIRE_MINUTES = 90
23
+
24
+
25
+ clients_db = {
26
+ "first_client" : {"disabled" : False , "client_id" : "first_client" , "client_secret_hashed" : "$2b$12$Yqwzj50q0.5brgJAYwOIEO1l10tdgStMZEB41HwRMFzU/h5wuDsh." }
27
+ }
28
+
29
+ class Token (BaseModel ):
30
+ access_token : str
31
+ token_type : str
32
+
33
+
34
+ class TokenData (BaseModel ):
35
+ client_id : str | None = None
36
+
37
+ class Client (BaseModel ):
38
+ """Client without the hashed secret, more suitable to view"""
39
+ client_id : str
40
+ disabled : bool | None = None
41
+
42
+
43
+ class ClientInDB (Client ):
44
+ """Client with hashed secret"""
45
+ client_secret_hashed : str
46
+
47
+
48
+ secret_context = CryptContext (schemes = ["bcrypt" ], deprecated = "auto" )
49
+
50
+ oauth2_scheme = OAuth2PasswordBearer (tokenUrl = "token" )
51
+
52
+ router = APIRouter ()
53
+
54
+
55
+ def verify_client_secret (plain_secret , hashed_secret ):
56
+ return secret_context .verify (plain_secret , hashed_secret )
57
+
58
+
59
+ def get_secret_hash (secret ):
60
+ return secret_context .hash (secret )
61
+
62
+ def get_client (db , client_id : str ):
63
+ if client_id in db :
64
+ client_dict = db [client_id ]
65
+ return ClientInDB (** client_dict )
66
+
67
+ def authenticate_client (clients_db , client_id : str , client_secret : str ):
68
+ client : ClientInDB = get_client (clients_db , client_id )
69
+ if not client :
70
+ return False
71
+ if not verify_client_secret (client_secret , client .client_secret_hashed ):
72
+ return False
73
+ return client
74
+
75
+
76
+ def create_access_token (data : dict , expires_delta : timedelta | None = None ):
77
+ to_encode = data .copy ()
78
+ if expires_delta :
79
+ expire = datetime .now (timezone .utc ) + expires_delta
80
+ else :
81
+ expire = datetime .now (timezone .utc ) + timedelta (minutes = 15 )
82
+ to_encode .update ({"exp" : expire })
83
+ encoded_jwt = jwt .encode (to_encode , SECRET_KEY , algorithm = ALGORITHM )
84
+ return encoded_jwt
85
+
86
+
87
+ async def get_current_client (token : Annotated [str , Depends (oauth2_scheme )]):
88
+ credentials_exception = HTTPException (
89
+ status_code = status .HTTP_401_UNAUTHORIZED ,
90
+ detail = "Could not validate credentials" ,
91
+ headers = {"WWW-Authenticate" : "Bearer" },
92
+ )
93
+ try :
94
+ payload = jwt .decode (token , SECRET_KEY , algorithms = [ALGORITHM ])
95
+ client_id : str = payload .get ("sub" )
96
+ if client_id is None :
97
+ raise credentials_exception
98
+ token_data = TokenData (client_id = client_id )
99
+ except JWTError :
100
+ raise credentials_exception
101
+
102
+ # If we get this far, the token is valid
103
+ client = get_client (clients_db , client_id = token_data .client_id )
104
+ if client is None :
105
+ raise credentials_exception
106
+ return client
107
+
108
+
109
+ async def get_current_active_client (
110
+ current_client : Annotated [Client , Depends (get_current_client )]
111
+ ):
112
+ if current_client .disabled :
113
+ raise HTTPException (status_code = 400 , detail = "Inactive user" )
114
+ return current_client
115
+
116
+
117
+ @router .post ("/token" )
118
+ async def login_for_access_token (
119
+ form_data : Annotated [OAuth2PasswordRequestForm , Depends ()]
120
+ ) -> Token :
121
+ client = authenticate_client (clients_db , form_data .username , form_data .password )
122
+ if not client :
123
+ raise HTTPException (
124
+ status_code = status .HTTP_401_UNAUTHORIZED ,
125
+ detail = "Incorrect username or password" ,
126
+ headers = {"WWW-Authenticate" : "Bearer" },
127
+ )
128
+ access_token_expires = timedelta (minutes = ACCESS_TOKEN_EXPIRE_MINUTES )
129
+ access_token = create_access_token (
130
+ data = {"sub" : client .client_id }, expires_delta = access_token_expires
131
+ )
132
+ return Token (access_token = access_token , token_type = "bearer" )
133
+
134
+
135
+ @router .get ("/users/me/" , response_model = Client )
136
+ async def read_users_me (
137
+ current_client : Annotated [Client , Depends (get_current_active_client )]
138
+ ):
139
+ return current_client
140
+
141
+
142
+ @router .get ("/users/me/items/" )
143
+ async def read_own_items (
144
+ current_client : Annotated [Client , Depends (get_current_active_client )]
145
+ ):
146
+ return [{"item_id" : "Foo" , "owner" : current_client .client_id }]
0 commit comments