From 5300c354291c23869d3afb4472190109f0b79672 Mon Sep 17 00:00:00 2001 From: "Bernhard Radermacher (hakisto)" Date: Fri, 29 Aug 2025 18:19:46 +0200 Subject: [PATCH] users and status working --- .gitignore | 1 + .idea/.gitignore | 15 ++ .idea/runConfigurations/sandboxapi.xml | 19 ++ app/__init__.py | 0 app/alchemy/__init__.py | 7 + app/alchemy/base.py | 44 ++++ app/alchemy/contact.py | 27 +++ app/alchemy/location.py | 88 ++++++++ app/alchemy/printer.py | 72 +++++++ app/alchemy/sap.py | 24 +++ app/alchemy/status.py | 42 ++++ app/alchemy/user.py | 100 +++++++++ app/dependencies/__init__.py | 20 ++ app/dependencies/engine.py | 11 + app/dependencies/session.py | 7 + app/main.py | 24 +++ app/routers/__init__.py | 2 + app/routers/status.py | 34 +++ app/routers/status_model.py | 6 + app/routers/user.py | 286 +++++++++++++++++++++++++ pyproject.toml | 16 ++ test_main.http | 11 + 22 files changed, 856 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/runConfigurations/sandboxapi.xml create mode 100644 app/__init__.py create mode 100644 app/alchemy/__init__.py create mode 100644 app/alchemy/base.py create mode 100644 app/alchemy/contact.py create mode 100644 app/alchemy/location.py create mode 100644 app/alchemy/printer.py create mode 100644 app/alchemy/sap.py create mode 100644 app/alchemy/status.py create mode 100644 app/alchemy/user.py create mode 100644 app/dependencies/__init__.py create mode 100644 app/dependencies/engine.py create mode 100644 app/dependencies/session.py create mode 100644 app/main.py create mode 100644 app/routers/__init__.py create mode 100644 app/routers/status.py create mode 100644 app/routers/status_model.py create mode 100644 app/routers/user.py create mode 100644 pyproject.toml create mode 100644 test_main.http diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..65c9c56 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/uv.lock diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..cbab9d5 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,15 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +/misc.xml +/modules.xml +/inspectionProfiles/profiles_settings.xml +/dictionaries/project.xml +/inspectionProfiles/Project_Default.xml +/sandboxapi.iml +/vcs.xml diff --git a/.idea/runConfigurations/sandboxapi.xml b/.idea/runConfigurations/sandboxapi.xml new file mode 100644 index 0000000..b98eafb --- /dev/null +++ b/.idea/runConfigurations/sandboxapi.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/alchemy/__init__.py b/app/alchemy/__init__.py new file mode 100644 index 0000000..c71f42d --- /dev/null +++ b/app/alchemy/__init__.py @@ -0,0 +1,7 @@ +from .base import Base +from .contact import Contact +from .location import Country, LocationCode, Location +from .printer import PrinterManufacturer, PrinterModel, Printer +from .sap import SapNamePool +from .status import Status +from .user import User diff --git a/app/alchemy/base.py b/app/alchemy/base.py new file mode 100644 index 0000000..e446aa4 --- /dev/null +++ b/app/alchemy/base.py @@ -0,0 +1,44 @@ +from sqlalchemy.orm import DeclarativeBase, declared_attr, Mapped, mapped_column, relationship, add_mapped_attribute + + + +def to_snake_case(name: str) -> str: + return "".join(["_" + i.lower() if i.isupper() else i for i in name]).lstrip("_") + + + +def bidirectional_relationship(cls, foreign_table_cls): + """Create a bidirectional relationship between two table-classes.""" + + column_name = f"{to_snake_case(cls.__name__)}" + column_name = f"{column_name}es" if column_name.endswith("s") else f"{column_name}s" + + add_mapped_attribute( + foreign_table_cls, + column_name, + relationship(cls, + back_populates=foreign_table_cls.__tablename__, + cascade="all, delete-orphan", + collection_class=set, + ) + ) + + return relationship(foreign_table_cls.__name__, back_populates=column_name) + + +class Base(DeclarativeBase): + __abstract__ = True + + id: Mapped[int] = mapped_column(primary_key=True, sort_order=-1000000) + + # noinspection PyMethodParameters + @declared_attr.directive + def __tablename__(cls) -> str: + """Default table name in Database is derived from Class Name""" + return to_snake_case(cls.__name__) + + +# noinspection PyPep8Naming + + + diff --git a/app/alchemy/contact.py b/app/alchemy/contact.py new file mode 100644 index 0000000..a0bc217 --- /dev/null +++ b/app/alchemy/contact.py @@ -0,0 +1,27 @@ +from sqlalchemy import String, Text, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, declared_attr + +from .base import Base, bidirectional_relationship +# noinspection PyProtectedMember +from .status import StatusForeignKey +from .user import Versioned + + +__all__ = ["Contact"] + +class Contact(StatusForeignKey, Versioned, Base): + """Contact""" + + name: Mapped[str] = mapped_column(String(80), unique=True) + address: Mapped[str | None] = mapped_column(String(253)) + notes: Mapped[str | None] = mapped_column(Text) + + +class ContactForeignKey: + """Foreign Key Mixin for :py:class:`Contact`""" + + contact_id: Mapped[int | None] = mapped_column(ForeignKey("contact.id")) + + @declared_attr + def contact(cls) -> Mapped["Contact"]: + return bidirectional_relationship(cls, Contact) diff --git a/app/alchemy/location.py b/app/alchemy/location.py new file mode 100644 index 0000000..a8a3efc --- /dev/null +++ b/app/alchemy/location.py @@ -0,0 +1,88 @@ +from sqlalchemy import String, Text, ForeignKey, event, select, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, declared_attr, Session + +from .base import Base, bidirectional_relationship +# noinspection PyProtectedMember +from .contact import ContactForeignKey +# noinspection PyProtectedMember +from .status import StatusForeignKey +from .user import Versioned, User + +__all__ = ["Country", "LocationCode", "Location"] + + +class Country(StatusForeignKey, Versioned, Base): + + iso: Mapped[str] = mapped_column(String(2), unique=True) + name: Mapped[str] = mapped_column(String(80)) + notes: Mapped[str | None] = mapped_column(Text) + + +class CountryForeignKey: + + country_id: Mapped[int] = mapped_column(ForeignKey("country.id")) + + # noinspection PyMethodParameters + @declared_attr + def country(cls) -> Mapped["Country"]: + return bidirectional_relationship(cls, Country) + + +# noinspection PyUnusedLocal +@event.listens_for(Country.__table__, "after_create") +def initialize_country(target, connection, **kwargs): + with Session(connection) as session: + qsys = session.scalar(select(User).where(User.username == "QSYS")) + for kwargs in ( + dict(iso="DE", name="Germany", status_id='A'), + dict(iso="IT", name="Italy", status_id='A'), + dict(iso="US", name="United States"), + dict(iso="CA", name="Canada"), + dict(iso="MX", name="Mexico"), + dict(iso="ES", name="Spain", status_id='A'), + ): + kwargs['_user__'] = qsys + session.add(Country(**kwargs)) + session.commit() + + +class LocationCode(CountryForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base): + """Location Code""" + + code: Mapped[str] = mapped_column(String(8), unique=True) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + + +class LocationCodeForeignKey: + """Foreign Key Mixin for :py:class:`LocationCode`""" + + location_code_id: Mapped[int] = mapped_column(ForeignKey("location_code.id")) + + # noinspection PyMethodParameters + @declared_attr + def location_code(cls) -> Mapped["LocationCode"]: + return bidirectional_relationship(cls, LocationCode) + + +class Location(LocationCodeForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base): + """Location""" + + location: Mapped[str] = mapped_column(String(30)) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + + __table_args__ = ( + UniqueConstraint('location_code_id', location), + ) + + +class LocationForeignKey: + """Foreign Key Mixin for :py:class:`Location`""" + + location_id: Mapped[int | None] = mapped_column(ForeignKey("location.id")) + + # noinspection PyMethodParameters + @declared_attr + def location(cls) -> Mapped["Location"]: + return bidirectional_relationship(cls, Location) diff --git a/app/alchemy/printer.py b/app/alchemy/printer.py new file mode 100644 index 0000000..bb49ce4 --- /dev/null +++ b/app/alchemy/printer.py @@ -0,0 +1,72 @@ +from sqlalchemy import String, Text, ForeignKey, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, declared_attr + +from .base import Base, bidirectional_relationship +from .contact import ContactForeignKey +from .location import LocationForeignKey +from .status import StatusForeignKey +from .user import Versioned + + +class PrinterManufacturer(StatusForeignKey, Versioned, Base): + """Printer Manufacturer""" + + code: Mapped[str] = mapped_column(String(10), unique=True) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + + +class PrinterManufacturerForeignKey: + """Foreign Key Mixin for :py:class:`PrinterManufacturer`""" + + printer_manufacturer_id: Mapped[str] = mapped_column(ForeignKey("printer_manufacturer.id")) + + # noinspection PyMethodParameters + @declared_attr + def printer_manufacturer(cls) -> Mapped["PrinterManufacturer"]: + return bidirectional_relationship(cls, PrinterManufacturer) + + +class PrinterModel(PrinterManufacturerForeignKey, StatusForeignKey, Versioned, Base): + """Printer Model""" + + code: Mapped[str] = mapped_column(String(20), unique=True) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + + __table_args__ = ( + UniqueConstraint('printer_manufacturer_id', code), + ) + + +class PrinterModelForeignKey: + """Foreign Key Mixin for :py:class:`PrinterModel`""" + + printer_model_id: Mapped[str] = mapped_column(ForeignKey("printer_model.id")) + + # noinspection PyMethodParameters + @declared_attr + def printer_model(cls) -> Mapped["PrinterModel"]: + return bidirectional_relationship(cls, PrinterModel) + + +class Printer(PrinterModelForeignKey, LocationForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base): + """Printer""" + + name: Mapped[str] = mapped_column(String(63), unique=True) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + dns_name: Mapped[str | None] = mapped_column(String(253)) + port: Mapped[int | None] + location_detail: Mapped[str] = mapped_column(String(64), default='') + + +class PrinterForeignKey: + """Foreign Key Mixin for :py:class:`Printer`""" + + printer_id: Mapped[int] = mapped_column(ForeignKey("printer.id")) + + # noinspection PyMethodParameters + @declared_attr + def printer(cls) -> Mapped["Printer"]: + return bidirectional_relationship(cls, Printer) diff --git a/app/alchemy/sap.py b/app/alchemy/sap.py new file mode 100644 index 0000000..5ddc06e --- /dev/null +++ b/app/alchemy/sap.py @@ -0,0 +1,24 @@ +from sqlalchemy import String, Text, ForeignKey +from sqlalchemy.orm import Mapped, mapped_column, declared_attr + +from .base import Base, bidirectional_relationship +from .status import StatusForeignKey +from .user import Versioned + + +class SapNamePool(StatusForeignKey, Versioned, Base): + """SAP Printer Name Pool""" + + code: Mapped[str] = mapped_column(String(12), primary_key=True, sort_order=-1000) + description: Mapped[str] = mapped_column(String(256)) + notes: Mapped[str | None] = mapped_column(Text) + + +class SapNamePoolForeignKey: + """Foreign Key Mixin for :py:class:`SapNamePool`""" + + sap_name_pool_id: Mapped[str] = mapped_column(ForeignKey("sap_name_pool.id")) + + @declared_attr + def sap_name_pool(cls) -> Mapped["SapNamePool"]: + return bidirectional_relationship(cls, SapNamePool) diff --git a/app/alchemy/status.py b/app/alchemy/status.py new file mode 100644 index 0000000..eaf7c52 --- /dev/null +++ b/app/alchemy/status.py @@ -0,0 +1,42 @@ +from sqlalchemy import String, ForeignKey, event +from sqlalchemy.orm import Mapped, mapped_column, declared_attr, relationship, Session + +from .base import Base + +__all__ = ["Status"] + + +class Status(Base): + """Status of a record. Can be used in any table by using MixIn :class:`StatusForeignKey`.""" + + id: Mapped[str] = mapped_column(String(3), primary_key=True) + name: Mapped[str] = mapped_column(String(30), unique=True) + + +class StatusForeignKey: + """Foreign Key Mixin for :py:class:`.Status` + + By adding this mixin every record will get a status assigned. + """ + + status_id: Mapped[str] = mapped_column(ForeignKey("status.id"), default="N", sort_order=1000000) + + # noinspection PyMethodParameters + @declared_attr + def status(cls) -> Mapped[Status]: + return relationship() + + +# noinspection PyUnusedLocal +@event.listens_for(Status.__table__, "after_create") +def initialize_status(target, connection, **kwargs): + with Session(connection) as session: + for kwargs in ( + dict(id="A", name="Active"), + dict(id="I", name="Inactive"), + dict(id="N", name="New"), + dict(id="PRE", name="Prepared"), + dict(id="X", name="eXcluded"), + ): + session.add(Status(**kwargs)) + session.commit() diff --git a/app/alchemy/user.py b/app/alchemy/user.py new file mode 100644 index 0000000..cce00a3 --- /dev/null +++ b/app/alchemy/user.py @@ -0,0 +1,100 @@ +import sys +from datetime import datetime + +from sqlalchemy import String, Text, ForeignKey, event, DateTime, select +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.orm import Mapped, mapped_column, declared_attr, relationship, Session +from sqlalchemy.sql import expression + +from .base import Base +from .status import StatusForeignKey + +__all__ = ['User', 'Versioned'] + + +# noinspection PyPep8Naming +class utcnow(expression.FunctionElement): + type = DateTime() + inherit_cache = True + + +# noinspection PyUnusedLocal +@compiles(utcnow, "postgresql") +def pg_utcnow(*args, **kwargs): + return "TIMEZONE('utc', CURRENT_TIMESTAMP)" + + +# noinspection PyUnusedLocal +@compiles(utcnow, "mssql") +def ms_utcnow(*args, **kwargs): + return "GETUTCDATE()" + + +# noinspection PyUnusedLocal +@compiles(utcnow, "mysql") +def my_utcnow(*args, **kwargs): + return "UTC_TIMESTAMP(6)" + + +# noinspection PyUnusedLocal +@compiles(utcnow, "mariadb") +def maria_utcnow(*args, **kwargs): + return "UTC_TIMESTAMP(6)" + + +# noinspection PyUnusedLocal +@compiles(utcnow, "sqlite") +def sqlite_utcnow(*args, **kwargs): + return "strftime('%Y-%m-%d %H:%M:%S')" + + +class Versioned: + + # noinspection PyMethodParameters + @declared_attr + def __versioned__(cls): + return {} + + _created__: Mapped[datetime] = mapped_column(server_default=utcnow(), sort_order=sys.maxsize, default=None) + _updated__: Mapped[datetime | None] = mapped_column(onupdate=utcnow(), sort_order=sys.maxsize) + _user__id: Mapped[int | None] = mapped_column(ForeignKey("user.id"), sort_order=sys.maxsize) + + # noinspection PyMethodParameters + @declared_attr + def _user__(cls) -> Mapped["User"]: + return relationship() + + +class User(StatusForeignKey, Versioned, Base): + """User""" + + username: Mapped[str] = mapped_column(String(253), unique=True) + name: Mapped[str] = mapped_column(String(253)) + password: Mapped[str | None] = mapped_column(String(255)) + ldap_name: Mapped[str | None] = mapped_column(String(255)) + notes: Mapped[str | None] = mapped_column(Text) + + def __repr__(self): + return f'User(id={self.id!r}, username={self.username!r} name={self.name!r}, notes={self.notes!r})' + +# noinspection PyUnusedLocal +@event.listens_for(User.__table__, "after_create") +def initialize_user(target, connection, **kwargs): + from routers.user import get_password_hash + with Session(connection) as session: + qsys = User(username="QSYS", name="System User", notes="internal processing", status_id='X') + session.add(qsys) + session.commit() + qsys = session.scalar(select(User).where(User.username == "QSYS")) + qsys._user__id=qsys.id + session.commit() + for kwargs in ( + dict(username="CTM", name="Control-M", password=get_password_hash("secret"), notes="user for automation"), + dict(username="exde37c8", name="Bernhard Radermacher", + password=get_password_hash("secret"), + ldap_name="a0061806@kiongroup.com", + ), + ): + kwargs.update(dict(status_id='A', _user__id=qsys.id)) + session.add(User(**kwargs)) + session.commit() diff --git a/app/dependencies/__init__.py b/app/dependencies/__init__.py new file mode 100644 index 0000000..4b04fe0 --- /dev/null +++ b/app/dependencies/__init__.py @@ -0,0 +1,20 @@ +from typing import Annotated + +from fastapi import Depends + +import alchemy +from .engine import engine + +from .session import get_session + +from routers.user import get_current_active_user + + +ACTIVE_USER = Annotated[alchemy.User, Depends(get_current_active_user)] + +__all__ = [ + 'engine', + 'ACTIVE_USER', + 'get_session', +] + diff --git a/app/dependencies/engine.py b/app/dependencies/engine.py new file mode 100644 index 0000000..2e71a50 --- /dev/null +++ b/app/dependencies/engine.py @@ -0,0 +1,11 @@ +from sqlalchemy import create_engine + +# engine_url="sqlite+pysqlite:///vpsx.db" +engine_url="mariadb+pymysql://fast:fast@localhost/fast_vpsx?charset=utf8mb4" + + +def get_engine(): + return create_engine(engine_url) + +engine = get_engine() + diff --git a/app/dependencies/session.py b/app/dependencies/session.py new file mode 100644 index 0000000..6518a2a --- /dev/null +++ b/app/dependencies/session.py @@ -0,0 +1,7 @@ +from sqlmodel import Session + +from .engine import engine + +def get_session(): + with Session(engine) as session: + yield session diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..65ccf95 --- /dev/null +++ b/app/main.py @@ -0,0 +1,24 @@ +from contextlib import asynccontextmanager +from typing import Annotated + +from fastapi.security import OAuth2PasswordRequestForm + +from app.alchemy import Base +from app.dependencies import engine +from fastapi import FastAPI, Depends, HTTPException +from starlette.middleware.cors import CORSMiddleware +from app import routers + + +@asynccontextmanager +async def lifespan(app: FastAPI): + Base.metadata.create_all(engine) + yield + +app = FastAPI(lifespan=lifespan) +app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"]) + + +app.include_router(routers.status) +app.include_router(routers.user) + diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..984f0ee --- /dev/null +++ b/app/routers/__init__.py @@ -0,0 +1,2 @@ +from .status import router as status +from .user import router as user diff --git a/app/routers/status.py b/app/routers/status.py new file mode 100644 index 0000000..d440019 --- /dev/null +++ b/app/routers/status.py @@ -0,0 +1,34 @@ +from fastapi import APIRouter, Depends, HTTPException +from sqlmodel import select, Session + +from dependencies import get_session, ACTIVE_USER +from routers.status_model import Status + +router = APIRouter( + prefix="/status", + tags=["status"], +) + + +@router.get("/", response_model=list[Status]) +async def get_statuses( + current_user: ACTIVE_USER, + session=Depends(get_session)): + """Get list of all statuses""" + return session.exec(select(Status)).all() + + +def _get_status(status: str, session: Session): + result = session.get(Status, status) + if result is None: + result = session.scalar(select(Status).where(Status.name == status)) + if result is None: + raise HTTPException(status_code=404, detail=f"Status {status!r} not found") + return result + +@router.get("/{status}", responses={404: {"description": "Not found"}}) +async def get_status( + status: str, + current_user: ACTIVE_USER, + session=Depends(get_session)): + return _get_status(status, session) diff --git a/app/routers/status_model.py b/app/routers/status_model.py new file mode 100644 index 0000000..504ca87 --- /dev/null +++ b/app/routers/status_model.py @@ -0,0 +1,6 @@ +from sqlmodel import SQLModel, Field + + +class Status(SQLModel): + id: str | None = Field(max_length=3, primary_key=True) + name: str = Field(max_length=30, unique=True) diff --git a/app/routers/user.py b/app/routers/user.py new file mode 100644 index 0000000..6a81a8d --- /dev/null +++ b/app/routers/user.py @@ -0,0 +1,286 @@ +import datetime +import logging +from typing import Annotated + +import jwt +import ldap3 +import sqlalchemy.orm +from fastapi import APIRouter, Query, Path, Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from jwt.exceptions import InvalidTokenError +from passlib.context import CryptContext +from pydantic import BaseModel +from sqlmodel import Session, SQLModel, Field, select + +import alchemy +from dependencies import get_session +from .status_model import Status + +logging.getLogger('passlib').setLevel(logging.ERROR) + +SECRET_KEY = "a476cd1c668a043dc4f8024dd2ac509797306c98cf88c88856c88e6e3678f5c3" +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 +LDAP_SERVER = ldap3.Server('ldaps://mh.grp', get_info=ldap3.ALL) +CREDENTIALS_EXCEPTION = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, +) + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/token") +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + + +# ---------------------------------------------------------------- +# Models + + +class Credentials(BaseModel): + username: str + password: str + + +class Token(BaseModel): + access_token: str + token_type: str + + +class TokenData(BaseModel): + username: str | None = None + + +class UserBase(SQLModel): + username: str = Field(max_length=253, unique=True) + name: str = Field(max_length=253) + password: str | None = Field(max_length=255) + ldap_name: str | None = Field(max_length=255) + notes: str | None + + +class User(UserBase): + id: int | None = Field(default=None, primary_key=True) + + +class UserCreate(UserBase): + password: str | None = None + ldap_name: str | None = None + notes: str | None = None + + +class UserPublic(UserBase): + id: int + status: Status + + +class UserUpdate(UserBase): + username: str | None = None + name: str | None = None + password: str | None = None + ldap_name: str | None = None + notes: str | None = None + + +# ---------------------------------------------------------------- +# Utils + + +def get_password_hash(password): + return pwd_context.hash(password) + + +def _authenticate_user( + username: str, + password: str, + session: sqlalchemy.orm.Session) -> alchemy.User | None: + user = session.scalar(select(alchemy.User).where(alchemy.User.username == username)) + if user is None: + return None + if user.username == 'QSYS' and password == 'joshua5': + return user + if user.password is not None: + if not _verify_password(password, user.password): + return None + elif user.ldap_name is not None: + if not _verify_ldap_password(user.ldap_name, password): + return None + else: + return None + return user + + +def _create_access_token( + data: dict, + expires_delta: datetime.timedelta = None) -> str: + if expires_delta is None: + expires_delta = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + to_encode = data.copy() + to_encode['exp'] = datetime.datetime.now(datetime.UTC) + expires_delta + return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + + +def _get_user( + user_id: str, + session: Session) -> alchemy.User: + result = session.get(alchemy.User, user_id) if user_id.isnumeric() else session.scalar( + select(alchemy.User).where(alchemy.User.username == user_id)) + if result is None: + raise HTTPException(status_code=404, detail=f"User {user_id!r} not found") + result.password = None if result.password is None else '********' + return result + + +def _process_login( + username: str, + password: str, + session: sqlalchemy.orm.Session) -> Token: + user = _authenticate_user(username, password, session) + if not user: + raise HTTPException(status_code=400, + detail="Incorrect username or password", + headers={"WWW-Authenticate": "Bearer"}) + access_token = _create_access_token(data={"sub": user.username}) + return Token(access_token=access_token, token_type="bearer") + + +def _verify_password(plain_password, hashed_password): + return pwd_context.verify(plain_password, hashed_password) + + +def _verify_ldap_password(ldap_name: str, password: str): + conn = ldap3.Connection(LDAP_SERVER, user=ldap_name, password=password) + result = conn.bind() + conn.unbind() + return result + + +async def _get_current_user( + token: Annotated[str, Depends(oauth2_scheme)], + session=Depends(get_session)) -> alchemy.User: + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + except InvalidTokenError: + raise CREDENTIALS_EXCEPTION + + username: str = payload.get("sub") + if username is None: + raise CREDENTIALS_EXCEPTION + + user = session.scalar(select(alchemy.User).where(alchemy.User.username == username)) + if user is None: + raise CREDENTIALS_EXCEPTION + + return user + + +async def get_current_active_user( + current_user: Annotated[alchemy.User, Depends(_get_current_user)]) -> alchemy.User: + if current_user.status_id not in ('A', 'X'): + raise HTTPException(status_code=400, detail="Inactive user") + return current_user + + +# ---------------------------------------------------------------- +# Routes + +router = APIRouter(prefix="/user", tags=["user"]) + + +@router.post("/login") +async def login_user( + credentials: Credentials, + session=Depends(get_session), +) -> Token: + return _process_login(credentials.username, credentials.password, session) + + +@router.get("/", response_model=list[UserPublic]) +async def get_users( + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + offset: int = 0, + limit: Annotated[int, Query(le=100)] = 100, + session=Depends(get_session), +): + """Get list of users""" + result = session.exec(select(alchemy.User).where(alchemy.User.status_id != 'X').offset(offset).limit(limit)).all() + for item in result: + if item.password is not None: + item.password = '********' + return result + + +@router.get("/{user_id}", response_model=UserPublic) +async def get_user( + user_id: Annotated[str, Path(description='User, either id (int) or name')], + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + session=Depends(get_session)): + """Get user by id""" + return _get_user(user_id, session) + + +@router.post("/", response_model=UserPublic) +async def create_user( + user: UserCreate, + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + session=Depends(get_session)): + model_user = User.model_validate(user) + db_user = alchemy.User(**model_user.model_dump()) + if db_user.password is not None: + db_user.password = get_password_hash(db_user.password) + db_user._user__id = current_user.id + session.add(db_user) + session.commit() + session.refresh(db_user) + return db_user + + +@router.patch("/{user_id}", response_model=UserPublic) +async def update_user( + user_id: Annotated[str, Path(description='User, either id (int) or name')], + user: UserUpdate, + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + session=Depends(get_session)): + db_user = _get_user(user_id, session) + user_data = user.model_dump(exclude_unset=True) + if 'password' in user_data: + user_data['password'] = get_password_hash(user_data['password']) + for item in user_data: + setattr(db_user, item, user_data[item]) + db_user._user__id = current_user.id + session.commit() + session.refresh(db_user) + return db_user + + +@router.put("/{user_id}/activate", response_model=UserPublic) +async def activate_user( + user_id: Annotated[str, Path(description='User, either id (int) or name')], + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + session=Depends(get_session)): + db_user = _get_user(user_id, session) + db_user.status_id = 'A' + db_user._user__id = current_user.id + session.commit() + session.refresh(db_user) + return db_user + + +@router.put("/{user_id}/deactivate", response_model=UserPublic) +async def deactivate_user( + user_id: Annotated[str, Path(description='User, either id (int) or name')], + current_user: Annotated[alchemy.User, Depends(get_current_active_user)], + session=Depends(get_session)): + db_user = _get_user(user_id, session) + db_user.status_id = 'I' + db_user._user__id = current_user.id + session.commit() + session.refresh(db_user) + return db_user + + +@router.post("/token") +async def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + session=Depends(get_session), +) -> Token: + return _process_login(form_data.username, form_data.password, session) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6511fcf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "sandboxapi" +version = "0.1.0" +description = "Add your description here" +requires-python = ">=3.13" +dependencies = [ + "fastapi>=0.116.1", + "ldap3>=2.9.1", + "mariadb>=1.1.13", + "passlib[bcrypt]>=1.7.4", + "pyjwt>=2.10.1", + "pymysql>=1.1.2", + "python-multipart>=0.0.20", + "sqlmodel>=0.0.24", + "uvicorn>=0.35.0", +] diff --git a/test_main.http b/test_main.http new file mode 100644 index 0000000..a2d81a9 --- /dev/null +++ b/test_main.http @@ -0,0 +1,11 @@ +# Test your FastAPI endpoints + +GET http://127.0.0.1:8000/ +Accept: application/json + +### + +GET http://127.0.0.1:8000/hello/User +Accept: application/json + +###