added auth service

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
media/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
data/
.idea
FROM python:3.10
RUN mkdir /app
WORKDIR /app
ENV PYTHONUNBUFFERED 1
COPY ./requirements.txt /app/requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
EXPOSE 8000
COPY . /app
from fastapi import APIRouter
router = APIRouter(prefix='/api', tags=['AUTH'])
import os
db_url = os.getenv('DATABASE_URL')
from fastapi import FastAPI
from tortoise.contrib.fastapi import register_tortoise
from const import db_url
TORTOISE_ORM = {
'connections': {'default': db_url},
'apps': {
'models': {
'models': ['code.models', 'aerich.models'],
'default_connection': 'default',
},
},
}
def init_db(app: FastAPI) -> None:
register_tortoise(
app,
db_url=db_url,
modules={"models": ["models"]},
generate_schemas=False,
add_exception_handlers=True,
)
from datetime import timedelta
import fastapi as fa
from fastapi.security import OAuth2PasswordRequestForm
from api.router import router
from db import init_db
import services
def setup():
app = fa.FastAPI()
app.include_router(router)
return app
app = setup()
init_db(app)
@router.post("/token", response_model=services.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = fa.Depends()):
user = services.authenticate_user(services.fake_users_db, form_data.username, form_data.password)
if not user:
raise fa.HTTPException(
status_code=fa.status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=services.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = services.create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users/me/", response_model=services.User)
async def read_users_me(current_user: services.User = fa.Depends(services.get_current_active_user)):
return current_user
if __name__ == '__main__':
import uvicorn
uvicorn.run('main:app', host='0.0.0.0', port=8000, reload=True)
from tortoise import fields, models
class User(models.Model):
username = fields.CharField(max_length=100, unique=True)
fullname = fields.CharField(max_length=100)
password = fields.CharField(max_length=255)
def __str__(self):
return self.fullname
class Meta:
table = 'users'
import os
from datetime import datetime, timedelta
import fastapi as fa
import fastapi.security as fs
from jose import jwt, JWTError
from passlib.context import CryptContext
from pydantic import BaseModel
SECRET_KEY = os.getenv('SECRET_KEY')
ALGORITHM = os.getenv('ALGORITHM')
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"password": "$2a$12$ITTdvxKhvrBQt/zxKmP6K.F8vPnROkX7VWU7jQmcH1UspVdIguqXa",
"disabled": False,
},
'janedoe': {
"username": "janedoe",
"full_name": "Jane Doe",
"email": "janedoe@example.com",
"password": "$2a$12$4D442S0uyt74hPon/EBP9uZV3z2EUFZ0F9JgmtIK2DyBsdGNWHt1G"
},
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
password: str
pwd_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_scheme = fs.OAuth2PasswordBearer(tokenUrl='api/token')
app = fa.FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password(password: str) -> str:
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = fa.Depends(oauth2_scheme)):
credentials_exception = fa.HTTPException(
status_code=fa.status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = fa.Depends(get_current_user)):
if current_user.disabled:
raise fa.HTTPException(status_code=400, detail="Inactive user")
return current_user
version: '3'
services:
core:
build:
context: .
dockerfile: Dockerfile
image: auth
restart: always
ports:
- "8012:8000"
networks:
- tasktrekernetwork
volumes:
- .:/app
env_file:
- .env
command: bash -c "python3 code/main.py"
networks:
tasktrekernetwork:
driver: bridge
aiosqlite==0.17.0
annotated-types==0.6.0
anyio==3.7.1
async-timeout==4.0.3
asyncpg==0.29.0
bcrypt==4.1.1
certifi==2023.11.17
cffi==1.16.0
click==8.1.7
cryptography==41.0.7
ecdsa==0.18.0
exceptiongroup==1.2.0
fastapi==0.104.1
h11==0.14.0
httpcore==1.0.2
httpx==0.25.2
idna==3.6
iso8601==1.1.0
passlib==1.7.4
pyasn1==0.5.1
pycparser==2.21
pydantic==2.5.2
pydantic_core==2.14.5
pypika-tortoise==0.1.6
python-jose==3.3.0
python-multipart==0.0.6
pytz==2023.3.post1
rsa==4.9
six==1.16.0
sniffio==1.3.0
starlette==0.27.0
tortoise-orm==0.20.0
typing_extensions==4.8.0
uvicorn==0.24.0.post1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment