-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth.py
115 lines (85 loc) · 3.29 KB
/
auth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import hashlib
from datetime import datetime, timedelta
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from pydantic import BaseModel
from tortoise.models import Model
from tortoise import fields
from config import *
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
auth_route = APIRouter()
class User(Model):
id = fields.IntField(pk=True, source_field='idUser')
name = fields.CharField(source_field='nama', max_length=150)
username = fields.CharField(source_field='usernameApp', max_length=32)
password = fields.CharField(source_field='passwordApp', max_length=32)
group = fields.IntField(source_field='idGroup')
active = fields.BooleanField(default=True, source_field='isAktif')
class Meta:
table = 'app_user'
# pydantic
class Credential(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
# utils
def password_hash(password: str):
hashed = hashlib.md5(password.encode())
return hashed.hexdigest()
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = expires_delta
else:
expire = timedelta(minutes=15)
to_encode.update({"exp": datetime.utcnow() + expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def validate_user(credential: Credential):
return await User.filter(username=credential.username,
password=password_hash(credential.password),
active=True).get_or_none()
async def get_user(username: str):
return await User.filter(username=username, active=True)\
.prefetch_related('faculty')\
.values('id', 'username', 'fullName')
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
async def get_current_user(token: str = Depends(oauth2_scheme)):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await get_user(username=username)
if user is None:
raise credentials_exception
return user[0]
@auth_route.post('/login', tags=['auth'])
async def auth_login(credential: OAuth2PasswordRequestForm = Depends()):
user = await validate_user(Credential(username=credential.username, password=credential.password))
if user is not None:
access_token = create_access_token(
data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return Token(
access_token=access_token, token_type="bearer"
)
else:
raise credentials_exception
@auth_route.get('/me', tags=['auth'])
async def auth_me(current_user: User = Depends(get_current_user)):
return current_user
@auth_route.post('/logout', tags=['auth'])
async def auth_me(current_user: User = Depends(get_current_user)):
return {
'ok': True
}