users and status working

This commit is contained in:
Bernhard Radermacher (hakisto)
2025-08-29 18:19:46 +02:00
commit 5300c35429
22 changed files with 856 additions and 0 deletions

0
app/__init__.py Normal file
View File

7
app/alchemy/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from .base import Base
from .contact import Contact
from .location import Country, LocationCode, Location
from .printer import PrinterManufacturer, PrinterModel, Printer
from .sap import SapNamePool
from .status import Status
from .user import User

44
app/alchemy/base.py Normal file
View File

@@ -0,0 +1,44 @@
from sqlalchemy.orm import DeclarativeBase, declared_attr, Mapped, mapped_column, relationship, add_mapped_attribute
def to_snake_case(name: str) -> str:
return "".join(["_" + i.lower() if i.isupper() else i for i in name]).lstrip("_")
def bidirectional_relationship(cls, foreign_table_cls):
"""Create a bidirectional relationship between two table-classes."""
column_name = f"{to_snake_case(cls.__name__)}"
column_name = f"{column_name}es" if column_name.endswith("s") else f"{column_name}s"
add_mapped_attribute(
foreign_table_cls,
column_name,
relationship(cls,
back_populates=foreign_table_cls.__tablename__,
cascade="all, delete-orphan",
collection_class=set,
)
)
return relationship(foreign_table_cls.__name__, back_populates=column_name)
class Base(DeclarativeBase):
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True, sort_order=-1000000)
# noinspection PyMethodParameters
@declared_attr.directive
def __tablename__(cls) -> str:
"""Default table name in Database is derived from Class Name"""
return to_snake_case(cls.__name__)
# noinspection PyPep8Naming

27
app/alchemy/contact.py Normal file
View File

@@ -0,0 +1,27 @@
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, declared_attr
from .base import Base, bidirectional_relationship
# noinspection PyProtectedMember
from .status import StatusForeignKey
from .user import Versioned
__all__ = ["Contact"]
class Contact(StatusForeignKey, Versioned, Base):
"""Contact"""
name: Mapped[str] = mapped_column(String(80), unique=True)
address: Mapped[str | None] = mapped_column(String(253))
notes: Mapped[str | None] = mapped_column(Text)
class ContactForeignKey:
"""Foreign Key Mixin for :py:class:`Contact`"""
contact_id: Mapped[int | None] = mapped_column(ForeignKey("contact.id"))
@declared_attr
def contact(cls) -> Mapped["Contact"]:
return bidirectional_relationship(cls, Contact)

88
app/alchemy/location.py Normal file
View File

@@ -0,0 +1,88 @@
from sqlalchemy import String, Text, ForeignKey, event, select, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, declared_attr, Session
from .base import Base, bidirectional_relationship
# noinspection PyProtectedMember
from .contact import ContactForeignKey
# noinspection PyProtectedMember
from .status import StatusForeignKey
from .user import Versioned, User
__all__ = ["Country", "LocationCode", "Location"]
class Country(StatusForeignKey, Versioned, Base):
iso: Mapped[str] = mapped_column(String(2), unique=True)
name: Mapped[str] = mapped_column(String(80))
notes: Mapped[str | None] = mapped_column(Text)
class CountryForeignKey:
country_id: Mapped[int] = mapped_column(ForeignKey("country.id"))
# noinspection PyMethodParameters
@declared_attr
def country(cls) -> Mapped["Country"]:
return bidirectional_relationship(cls, Country)
# noinspection PyUnusedLocal
@event.listens_for(Country.__table__, "after_create")
def initialize_country(target, connection, **kwargs):
with Session(connection) as session:
qsys = session.scalar(select(User).where(User.username == "QSYS"))
for kwargs in (
dict(iso="DE", name="Germany", status_id='A'),
dict(iso="IT", name="Italy", status_id='A'),
dict(iso="US", name="United States"),
dict(iso="CA", name="Canada"),
dict(iso="MX", name="Mexico"),
dict(iso="ES", name="Spain", status_id='A'),
):
kwargs['_user__'] = qsys
session.add(Country(**kwargs))
session.commit()
class LocationCode(CountryForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base):
"""Location Code"""
code: Mapped[str] = mapped_column(String(8), unique=True)
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
class LocationCodeForeignKey:
"""Foreign Key Mixin for :py:class:`LocationCode`"""
location_code_id: Mapped[int] = mapped_column(ForeignKey("location_code.id"))
# noinspection PyMethodParameters
@declared_attr
def location_code(cls) -> Mapped["LocationCode"]:
return bidirectional_relationship(cls, LocationCode)
class Location(LocationCodeForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base):
"""Location"""
location: Mapped[str] = mapped_column(String(30))
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
__table_args__ = (
UniqueConstraint('location_code_id', location),
)
class LocationForeignKey:
"""Foreign Key Mixin for :py:class:`Location`"""
location_id: Mapped[int | None] = mapped_column(ForeignKey("location.id"))
# noinspection PyMethodParameters
@declared_attr
def location(cls) -> Mapped["Location"]:
return bidirectional_relationship(cls, Location)

72
app/alchemy/printer.py Normal file
View File

@@ -0,0 +1,72 @@
from sqlalchemy import String, Text, ForeignKey, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, declared_attr
from .base import Base, bidirectional_relationship
from .contact import ContactForeignKey
from .location import LocationForeignKey
from .status import StatusForeignKey
from .user import Versioned
class PrinterManufacturer(StatusForeignKey, Versioned, Base):
"""Printer Manufacturer"""
code: Mapped[str] = mapped_column(String(10), unique=True)
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
class PrinterManufacturerForeignKey:
"""Foreign Key Mixin for :py:class:`PrinterManufacturer`"""
printer_manufacturer_id: Mapped[str] = mapped_column(ForeignKey("printer_manufacturer.id"))
# noinspection PyMethodParameters
@declared_attr
def printer_manufacturer(cls) -> Mapped["PrinterManufacturer"]:
return bidirectional_relationship(cls, PrinterManufacturer)
class PrinterModel(PrinterManufacturerForeignKey, StatusForeignKey, Versioned, Base):
"""Printer Model"""
code: Mapped[str] = mapped_column(String(20), unique=True)
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
__table_args__ = (
UniqueConstraint('printer_manufacturer_id', code),
)
class PrinterModelForeignKey:
"""Foreign Key Mixin for :py:class:`PrinterModel`"""
printer_model_id: Mapped[str] = mapped_column(ForeignKey("printer_model.id"))
# noinspection PyMethodParameters
@declared_attr
def printer_model(cls) -> Mapped["PrinterModel"]:
return bidirectional_relationship(cls, PrinterModel)
class Printer(PrinterModelForeignKey, LocationForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base):
"""Printer"""
name: Mapped[str] = mapped_column(String(63), unique=True)
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
dns_name: Mapped[str | None] = mapped_column(String(253))
port: Mapped[int | None]
location_detail: Mapped[str] = mapped_column(String(64), default='')
class PrinterForeignKey:
"""Foreign Key Mixin for :py:class:`Printer`"""
printer_id: Mapped[int] = mapped_column(ForeignKey("printer.id"))
# noinspection PyMethodParameters
@declared_attr
def printer(cls) -> Mapped["Printer"]:
return bidirectional_relationship(cls, Printer)

24
app/alchemy/sap.py Normal file
View File

@@ -0,0 +1,24 @@
from sqlalchemy import String, Text, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, declared_attr
from .base import Base, bidirectional_relationship
from .status import StatusForeignKey
from .user import Versioned
class SapNamePool(StatusForeignKey, Versioned, Base):
"""SAP Printer Name Pool"""
code: Mapped[str] = mapped_column(String(12), primary_key=True, sort_order=-1000)
description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text)
class SapNamePoolForeignKey:
"""Foreign Key Mixin for :py:class:`SapNamePool`"""
sap_name_pool_id: Mapped[str] = mapped_column(ForeignKey("sap_name_pool.id"))
@declared_attr
def sap_name_pool(cls) -> Mapped["SapNamePool"]:
return bidirectional_relationship(cls, SapNamePool)

42
app/alchemy/status.py Normal file
View File

@@ -0,0 +1,42 @@
from sqlalchemy import String, ForeignKey, event
from sqlalchemy.orm import Mapped, mapped_column, declared_attr, relationship, Session
from .base import Base
__all__ = ["Status"]
class Status(Base):
"""Status of a record. Can be used in any table by using MixIn :class:`StatusForeignKey`."""
id: Mapped[str] = mapped_column(String(3), primary_key=True)
name: Mapped[str] = mapped_column(String(30), unique=True)
class StatusForeignKey:
"""Foreign Key Mixin for :py:class:`.Status`
By adding this mixin every record will get a status assigned.
"""
status_id: Mapped[str] = mapped_column(ForeignKey("status.id"), default="N", sort_order=1000000)
# noinspection PyMethodParameters
@declared_attr
def status(cls) -> Mapped[Status]:
return relationship()
# noinspection PyUnusedLocal
@event.listens_for(Status.__table__, "after_create")
def initialize_status(target, connection, **kwargs):
with Session(connection) as session:
for kwargs in (
dict(id="A", name="Active"),
dict(id="I", name="Inactive"),
dict(id="N", name="New"),
dict(id="PRE", name="Prepared"),
dict(id="X", name="eXcluded"),
):
session.add(Status(**kwargs))
session.commit()

100
app/alchemy/user.py Normal file
View File

@@ -0,0 +1,100 @@
import sys
from datetime import datetime
from sqlalchemy import String, Text, ForeignKey, event, DateTime, select
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import Mapped, mapped_column, declared_attr, relationship, Session
from sqlalchemy.sql import expression
from .base import Base
from .status import StatusForeignKey
__all__ = ['User', 'Versioned']
# noinspection PyPep8Naming
class utcnow(expression.FunctionElement):
type = DateTime()
inherit_cache = True
# noinspection PyUnusedLocal
@compiles(utcnow, "postgresql")
def pg_utcnow(*args, **kwargs):
return "TIMEZONE('utc', CURRENT_TIMESTAMP)"
# noinspection PyUnusedLocal
@compiles(utcnow, "mssql")
def ms_utcnow(*args, **kwargs):
return "GETUTCDATE()"
# noinspection PyUnusedLocal
@compiles(utcnow, "mysql")
def my_utcnow(*args, **kwargs):
return "UTC_TIMESTAMP(6)"
# noinspection PyUnusedLocal
@compiles(utcnow, "mariadb")
def maria_utcnow(*args, **kwargs):
return "UTC_TIMESTAMP(6)"
# noinspection PyUnusedLocal
@compiles(utcnow, "sqlite")
def sqlite_utcnow(*args, **kwargs):
return "strftime('%Y-%m-%d %H:%M:%S')"
class Versioned:
# noinspection PyMethodParameters
@declared_attr
def __versioned__(cls):
return {}
_created__: Mapped[datetime] = mapped_column(server_default=utcnow(), sort_order=sys.maxsize, default=None)
_updated__: Mapped[datetime | None] = mapped_column(onupdate=utcnow(), sort_order=sys.maxsize)
_user__id: Mapped[int | None] = mapped_column(ForeignKey("user.id"), sort_order=sys.maxsize)
# noinspection PyMethodParameters
@declared_attr
def _user__(cls) -> Mapped["User"]:
return relationship()
class User(StatusForeignKey, Versioned, Base):
"""User"""
username: Mapped[str] = mapped_column(String(253), unique=True)
name: Mapped[str] = mapped_column(String(253))
password: Mapped[str | None] = mapped_column(String(255))
ldap_name: Mapped[str | None] = mapped_column(String(255))
notes: Mapped[str | None] = mapped_column(Text)
def __repr__(self):
return f'User(id={self.id!r}, username={self.username!r} name={self.name!r}, notes={self.notes!r})'
# noinspection PyUnusedLocal
@event.listens_for(User.__table__, "after_create")
def initialize_user(target, connection, **kwargs):
from routers.user import get_password_hash
with Session(connection) as session:
qsys = User(username="QSYS", name="System User", notes="internal processing", status_id='X')
session.add(qsys)
session.commit()
qsys = session.scalar(select(User).where(User.username == "QSYS"))
qsys._user__id=qsys.id
session.commit()
for kwargs in (
dict(username="CTM", name="Control-M", password=get_password_hash("secret"), notes="user for automation"),
dict(username="exde37c8", name="Bernhard Radermacher",
password=get_password_hash("secret"),
ldap_name="a0061806@kiongroup.com",
),
):
kwargs.update(dict(status_id='A', _user__id=qsys.id))
session.add(User(**kwargs))
session.commit()

View File

@@ -0,0 +1,20 @@
from typing import Annotated
from fastapi import Depends
import alchemy
from .engine import engine
from .session import get_session
from routers.user import get_current_active_user
ACTIVE_USER = Annotated[alchemy.User, Depends(get_current_active_user)]
__all__ = [
'engine',
'ACTIVE_USER',
'get_session',
]

View File

@@ -0,0 +1,11 @@
from sqlalchemy import create_engine
# engine_url="sqlite+pysqlite:///vpsx.db"
engine_url="mariadb+pymysql://fast:fast@localhost/fast_vpsx?charset=utf8mb4"
def get_engine():
return create_engine(engine_url)
engine = get_engine()

View File

@@ -0,0 +1,7 @@
from sqlmodel import Session
from .engine import engine
def get_session():
with Session(engine) as session:
yield session

24
app/main.py Normal file
View File

@@ -0,0 +1,24 @@
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi.security import OAuth2PasswordRequestForm
from app.alchemy import Base
from app.dependencies import engine
from fastapi import FastAPI, Depends, HTTPException
from starlette.middleware.cors import CORSMiddleware
from app import routers
@asynccontextmanager
async def lifespan(app: FastAPI):
Base.metadata.create_all(engine)
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
app.include_router(routers.status)
app.include_router(routers.user)

2
app/routers/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .status import router as status
from .user import router as user

34
app/routers/status.py Normal file
View File

@@ -0,0 +1,34 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import select, Session
from dependencies import get_session, ACTIVE_USER
from routers.status_model import Status
router = APIRouter(
prefix="/status",
tags=["status"],
)
@router.get("/", response_model=list[Status])
async def get_statuses(
current_user: ACTIVE_USER,
session=Depends(get_session)):
"""Get list of all statuses"""
return session.exec(select(Status)).all()
def _get_status(status: str, session: Session):
result = session.get(Status, status)
if result is None:
result = session.scalar(select(Status).where(Status.name == status))
if result is None:
raise HTTPException(status_code=404, detail=f"Status {status!r} not found")
return result
@router.get("/{status}", responses={404: {"description": "Not found"}})
async def get_status(
status: str,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return _get_status(status, session)

View File

@@ -0,0 +1,6 @@
from sqlmodel import SQLModel, Field
class Status(SQLModel):
id: str | None = Field(max_length=3, primary_key=True)
name: str = Field(max_length=30, unique=True)

286
app/routers/user.py Normal file
View File

@@ -0,0 +1,286 @@
import datetime
import logging
from typing import Annotated
import jwt
import ldap3
import sqlalchemy.orm
from fastapi import APIRouter, Query, Path, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
from sqlmodel import Session, SQLModel, Field, select
import alchemy
from dependencies import get_session
from .status_model import Status
logging.getLogger('passlib').setLevel(logging.ERROR)
SECRET_KEY = "a476cd1c668a043dc4f8024dd2ac509797306c98cf88c88856c88e6e3678f5c3"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
LDAP_SERVER = ldap3.Server('ldaps://mh.grp', get_info=ldap3.ALL)
CREDENTIALS_EXCEPTION = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ----------------------------------------------------------------
# Models
class Credentials(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class UserBase(SQLModel):
username: str = Field(max_length=253, unique=True)
name: str = Field(max_length=253)
password: str | None = Field(max_length=255)
ldap_name: str | None = Field(max_length=255)
notes: str | None
class User(UserBase):
id: int | None = Field(default=None, primary_key=True)
class UserCreate(UserBase):
password: str | None = None
ldap_name: str | None = None
notes: str | None = None
class UserPublic(UserBase):
id: int
status: Status
class UserUpdate(UserBase):
username: str | None = None
name: str | None = None
password: str | None = None
ldap_name: str | None = None
notes: str | None = None
# ----------------------------------------------------------------
# Utils
def get_password_hash(password):
return pwd_context.hash(password)
def _authenticate_user(
username: str,
password: str,
session: sqlalchemy.orm.Session) -> alchemy.User | None:
user = session.scalar(select(alchemy.User).where(alchemy.User.username == username))
if user is None:
return None
if user.username == 'QSYS' and password == 'joshua5':
return user
if user.password is not None:
if not _verify_password(password, user.password):
return None
elif user.ldap_name is not None:
if not _verify_ldap_password(user.ldap_name, password):
return None
else:
return None
return user
def _create_access_token(
data: dict,
expires_delta: datetime.timedelta = None) -> str:
if expires_delta is None:
expires_delta = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = data.copy()
to_encode['exp'] = datetime.datetime.now(datetime.UTC) + expires_delta
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def _get_user(
user_id: str,
session: Session) -> alchemy.User:
result = session.get(alchemy.User, user_id) if user_id.isnumeric() else session.scalar(
select(alchemy.User).where(alchemy.User.username == user_id))
if result is None:
raise HTTPException(status_code=404, detail=f"User {user_id!r} not found")
result.password = None if result.password is None else '********'
return result
def _process_login(
username: str,
password: str,
session: sqlalchemy.orm.Session) -> Token:
user = _authenticate_user(username, password, session)
if not user:
raise HTTPException(status_code=400,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"})
access_token = _create_access_token(data={"sub": user.username})
return Token(access_token=access_token, token_type="bearer")
def _verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def _verify_ldap_password(ldap_name: str, password: str):
conn = ldap3.Connection(LDAP_SERVER, user=ldap_name, password=password)
result = conn.bind()
conn.unbind()
return result
async def _get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session=Depends(get_session)) -> alchemy.User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except InvalidTokenError:
raise CREDENTIALS_EXCEPTION
username: str = payload.get("sub")
if username is None:
raise CREDENTIALS_EXCEPTION
user = session.scalar(select(alchemy.User).where(alchemy.User.username == username))
if user is None:
raise CREDENTIALS_EXCEPTION
return user
async def get_current_active_user(
current_user: Annotated[alchemy.User, Depends(_get_current_user)]) -> alchemy.User:
if current_user.status_id not in ('A', 'X'):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/user", tags=["user"])
@router.post("/login")
async def login_user(
credentials: Credentials,
session=Depends(get_session),
) -> Token:
return _process_login(credentials.username, credentials.password, session)
@router.get("/", response_model=list[UserPublic])
async def get_users(
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
session=Depends(get_session),
):
"""Get list of users"""
result = session.exec(select(alchemy.User).where(alchemy.User.status_id != 'X').offset(offset).limit(limit)).all()
for item in result:
if item.password is not None:
item.password = '********'
return result
@router.get("/{user_id}", response_model=UserPublic)
async def get_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
"""Get user by id"""
return _get_user(user_id, session)
@router.post("/", response_model=UserPublic)
async def create_user(
user: UserCreate,
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
model_user = User.model_validate(user)
db_user = alchemy.User(**model_user.model_dump())
if db_user.password is not None:
db_user.password = get_password_hash(db_user.password)
db_user._user__id = current_user.id
session.add(db_user)
session.commit()
session.refresh(db_user)
return db_user
@router.patch("/{user_id}", response_model=UserPublic)
async def update_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
user: UserUpdate,
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
user_data = user.model_dump(exclude_unset=True)
if 'password' in user_data:
user_data['password'] = get_password_hash(user_data['password'])
for item in user_data:
setattr(db_user, item, user_data[item])
db_user._user__id = current_user.id
session.commit()
session.refresh(db_user)
return db_user
@router.put("/{user_id}/activate", response_model=UserPublic)
async def activate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
db_user.status_id = 'A'
db_user._user__id = current_user.id
session.commit()
session.refresh(db_user)
return db_user
@router.put("/{user_id}/deactivate", response_model=UserPublic)
async def deactivate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)):
db_user = _get_user(user_id, session)
db_user.status_id = 'I'
db_user._user__id = current_user.id
session.commit()
session.refresh(db_user)
return db_user
@router.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
session=Depends(get_session),
) -> Token:
return _process_login(form_data.username, form_data.password, session)