added authorization

parent 14667cbd
from fastapi import APIRouter
from routes import router as user_route
from routes import user_router as user_route, auth_router as auth_route
router = APIRouter(prefix='/api', tags=['AUTH'])
router.include_router(user_route)
router.include_router(user_route)
router.include_router(auth_route)
import os
SECRET_KEY = os.getenv('SECRET_KEY')
ALGORITHM = os.getenv('ALGORITHM')
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
REFRESH_TOKEN_EXPIRE_HOURS = int(os.getenv('REFRESH_TOKEN_EXPIRE_HOURS'))
from datetime import timedelta
import fastapi as fa
from fastapi.security import OAuth2PasswordRequestForm
from api.router import router
from db import init_db
import services
def setup():
......@@ -20,31 +16,6 @@ app = setup()
init_db(app)
@router.post("/token", response_model=services.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = fa.Depends()):
user = services.authenticate_user(services.fake_users_db, form_data.username, form_data.password)
if not user:
raise fa.HTTPException(
status_code=fa.status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=services.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = services.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users/me/", response_model=services.User)
async def read_users_me(current_user: services.User = fa.Depends(services.get_current_active_user)):
return current_user
if __name__ == '__main__':
import uvicorn
......
from uuid import UUID
import fastapi
from fastapi.security import OAuth2PasswordRequestForm
from schemas import (
UserAllOptionalSchema, CreateUserSchema, CreatedUserResponseSchema,
UpdateUserSchema
)
from ctrl import UserController
import services
router = fastapi.APIRouter(prefix='/users', tags=['AUTH'])
user_router = fastapi.APIRouter(prefix='/users')
auth_router = fastapi.APIRouter(prefix='/auth')
ctrl = UserController()
@router.get('')
@user_router.get('')
async def get_users(query: UserAllOptionalSchema = fastapi.Depends()):
return await ctrl.get_list(**query.model_dump(exclude_none=True))
@router.get('/{id}')
@user_router.get('/{id}')
async def get_users(id: UUID):
return await ctrl.get(id)
@router.post('')
@user_router.post('')
async def create_users(
body: CreateUserSchema
) -> CreatedUserResponseSchema:
return await ctrl.create(**body.model_dump(exclude_none=True))
@router.patch('/{id}')
@user_router.patch('/{id}')
async def update_users(
id: UUID, body: UpdateUserSchema,
) -> CreatedUserResponseSchema:
return await ctrl.update(id, **body.model_dump(exclude_none=True))
@router.delete('/{id}')
@user_router.delete('/{id}')
async def delete_users(
id: UUID,
):
return await ctrl.delete(id)
@auth_router.post('/login')
async def login(
form_data: OAuth2PasswordRequestForm = fastapi.Depends(),
):
return await services.auth_service.login(form_data)
@auth_router.get('/users/me')
async def read_user_me(
user: dict = fastapi.Depends(services.auth_service.get_current_user)
):
return user
@auth_router.post('/refresh')
async def refresh_token(
data: dict,
user: dict = fastapi.Depends(services.auth_service.get_current_user)
):
return services.token_service.refresh_token(data['refresh_token'])
from dataclasses import dataclass
from uuid import UUID
from datetime import datetime
from pydantic import BaseModel, EmailStr
......@@ -28,3 +30,15 @@ class UpdateUserSchema(BaseModel):
class UserAllOptionalSchema(BaseModel):
username: None | str = None
password: None | str = None
@dataclass
class TokenData:
sub: int
exp: datetime | None = None
@dataclass
class LoginRequestModel:
username: str
password: str
import os
from datetime import datetime, timedelta
import fastapi as fa
import fastapi.security as fs
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
import schemas
import const
from repository import UserRepository
SECRET_KEY = os.getenv('SECRET_KEY')
ALGORITHM = os.getenv('ALGORITHM')
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"password": "$2a$12$ITTdvxKhvrBQt/zxKmP6K.F8vPnROkX7VWU7jQmcH1UspVdIguqXa",
"disabled": False,
},
'janedoe': {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"password": "$2a$12$4D442S0uyt74hPon/EBP9uZV3z2EUFZ0F9JgmtIK2DyBsdGNWHt1G"
},
}
oauth2_scheme = fs.OAuth2PasswordBearer(tokenUrl='/api/auth/login')
class Token(BaseModel):
access_token: str
token_type: str
class TokenService:
def create_token(self, data: schemas.TokenData, expires_delta: timedelta) -> str:
payload = {
'sub': str(data.sub),
'exp': datetime.utcnow() + expires_delta,
}
return jwt.encode(payload, const.SECRET_KEY, algorithm=const.ALGORITHM)
class TokenData(BaseModel):
username: str | None = None
def create_tokens(self, data: schemas.TokenData):
access = self.create_token(
data, expires_delta=timedelta(minutes=const.ACCESS_TOKEN_EXPIRE_MINUTES)
)
refresh = self.create_token(
data, expires_delta=timedelta(hours=const.REFRESH_TOKEN_EXPIRE_HOURS)
)
return {"access_token": access, "refresh_token": refresh}
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
def decode_token(self, token: str) -> schemas.TokenData:
credentials_exception = fa.HTTPException(
status_code=fa.status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
decoded_token_payload = jwt.decode(token, const.SECRET_KEY, algorithms=[const.ALGORITHM])
user_id = decoded_token_payload.get('sub')
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
class UserInDB(User):
password: str
return schemas.TokenData(**decoded_token_payload)
def refresh_token(self, refresh_token: str):
token_data = self.decode_token(refresh_token)
access = self.create_token(token_data, timedelta(minutes=const.ACCESS_TOKEN_EXPIRE_MINUTES))
return {'access_token': access, 'refresh_token': refresh_token}
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = fs.OAuth2PasswordBearer(tokenUrl='api/token')
token_service = TokenService()
app = fa.FastAPI()
class AuthService:
def __init__(self) -> None:
self.user_repo = UserRepository()
def verify_password(plain_password, hashed_password):
async def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
async def authenticate_user(self, username: str, password: str):
users = await self.user_repo.get_list(username=username)
def get_password(password: str) -> str:
return pwd_context.hash(password)
if users and self.verify_password(password, users[0].password):
return users[0]
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
async def login(self, data: schemas.LoginRequestModel):
user = await self.authenticate_user(username=data.username, password=data.password)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = fa.Depends(oauth2_scheme)):
credentials_exception = fa.HTTPException(
status_code=fa.status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
raise fa.HTTPException(
status_code=fa.status.HTTP_404_NOT_FOUND,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
return token_service.create_tokens(schemas.TokenData(sub=user.id))
async def get_current_user(self, token: str = fa.Security(oauth2_scheme)):
token_data = token_service.decode_token(token)
return await self.user_repo.get(id=token_data.sub)
async def get_current_active_user(current_user: User = fa.Depends(get_current_user)):
if current_user.disabled:
raise fa.HTTPException(status_code=400, detail="Inactive user")
return current_user
auth_service = AuthService()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment