This commit is contained in:
Bernhard Radermacher (hakisto)
2025-08-31 07:53:17 +02:00
parent 5614ecbba6
commit bd510016de
12 changed files with 404 additions and 74 deletions

View File

@@ -12,7 +12,7 @@ __all__ = ["Contact"]
class Contact(StatusForeignKey, Versioned, Base): class Contact(StatusForeignKey, Versioned, Base):
"""Contact""" """Contact"""
name: Mapped[str] = mapped_column(String(80), unique=True) code: Mapped[str] = mapped_column(String(80), unique=True)
address: Mapped[str | None] = mapped_column(String(253)) address: Mapped[str | None] = mapped_column(String(253))
notes: Mapped[str | None] = mapped_column(Text) notes: Mapped[str | None] = mapped_column(Text)

View File

@@ -13,7 +13,7 @@ __all__ = ["Country", "LocationCode", "Location"]
class Country(StatusForeignKey, Versioned, Base): class Country(StatusForeignKey, Versioned, Base):
iso: Mapped[str] = mapped_column(String(2), unique=True) code: Mapped[str] = mapped_column(String(2), unique=True)
name: Mapped[str] = mapped_column(String(80)) name: Mapped[str] = mapped_column(String(80))
notes: Mapped[str | None] = mapped_column(Text) notes: Mapped[str | None] = mapped_column(Text)
@@ -32,14 +32,14 @@ class CountryForeignKey:
@event.listens_for(Country.__table__, "after_create") @event.listens_for(Country.__table__, "after_create")
def initialize_country(target, connection, **kwargs): def initialize_country(target, connection, **kwargs):
with Session(connection) as session: with Session(connection) as session:
qsys = session.scalar(select(User).where(User.username == "QSYS")) qsys = session.scalar(select(User).where(User.code == "QSYS"))
for kwargs in ( for kwargs in (
dict(iso="DE", name="Germany", status_id='A'), dict(code="DE", name="Germany", status_id='A'),
dict(iso="IT", name="Italy", status_id='A'), dict(code="IT", name="Italy", status_id='A'),
dict(iso="US", name="United States"), dict(code="US", name="United States"),
dict(iso="CA", name="Canada"), dict(code="CA", name="Canada"),
dict(iso="MX", name="Mexico"), dict(code="MX", name="Mexico"),
dict(iso="ES", name="Spain", status_id='A'), dict(code="ES", name="Spain", status_id='A'),
): ):
kwargs['_user__'] = qsys kwargs['_user__'] = qsys
session.add(Country(**kwargs)) session.add(Country(**kwargs))
@@ -68,12 +68,12 @@ class LocationCodeForeignKey:
class Location(LocationCodeForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base): class Location(LocationCodeForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base):
"""Location""" """Location"""
location: Mapped[str] = mapped_column(String(30)) code: Mapped[str] = mapped_column(String(30))
description: Mapped[str] = mapped_column(String(256)) description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text) notes: Mapped[str | None] = mapped_column(Text)
__table_args__ = ( __table_args__ = (
UniqueConstraint('location_code_id', location), UniqueConstraint('location_code_id', code),
) )

View File

@@ -53,7 +53,7 @@ class PrinterModelForeignKey:
class Printer(PrinterModelForeignKey, LocationForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base): class Printer(PrinterModelForeignKey, LocationForeignKey, ContactForeignKey, StatusForeignKey, Versioned, Base):
"""Printer""" """Printer"""
name: Mapped[str] = mapped_column(String(63), unique=True) code: Mapped[str] = mapped_column(String(63), unique=True)
description: Mapped[str] = mapped_column(String(256)) description: Mapped[str] = mapped_column(String(256))
notes: Mapped[str | None] = mapped_column(Text) notes: Mapped[str | None] = mapped_column(Text)
dns_name: Mapped[str | None] = mapped_column(String(253)) dns_name: Mapped[str | None] = mapped_column(String(253))

View File

@@ -68,29 +68,29 @@ class Versioned:
class User(StatusForeignKey, Versioned, Base): class User(StatusForeignKey, Versioned, Base):
"""User""" """User"""
username: Mapped[str] = mapped_column(String(253), unique=True) code: Mapped[str] = mapped_column(String(253), unique=True)
name: Mapped[str] = mapped_column(String(253)) name: Mapped[str] = mapped_column(String(253))
password: Mapped[str | None] = mapped_column(String(255)) password: Mapped[str | None] = mapped_column(String(255))
ldap_name: Mapped[str | None] = mapped_column(String(255)) ldap_name: Mapped[str | None] = mapped_column(String(255))
notes: Mapped[str | None] = mapped_column(Text) notes: Mapped[str | None] = mapped_column(Text)
def __repr__(self): def __repr__(self):
return f'User(id={self.id!r}, username={self.username!r} name={self.name!r}, notes={self.notes!r})' return f'User(id={self.id!r}, code={self.code!r} name={self.name!r}, notes={self.notes!r})'
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@event.listens_for(User.__table__, "after_create") @event.listens_for(User.__table__, "after_create")
def initialize_user(target, connection, **kwargs): def initialize_user(target, connection, **kwargs):
from routers.user import get_password_hash from routers.user import get_password_hash
with Session(connection) as session: with Session(connection) as session:
qsys = User(username="QSYS", name="System User", notes="internal processing", status_id='X') qsys = User(code="QSYS", name="System User", notes="internal processing", status_id='X')
session.add(qsys) session.add(qsys)
session.commit() session.commit()
qsys = session.scalar(select(User).where(User.username == "QSYS")) qsys = session.scalar(select(User).where(User.code == "QSYS"))
qsys._user__id=qsys.id qsys._user__id=qsys.id
session.commit() session.commit()
for kwargs in ( for kwargs in (
dict(username="CTM", name="Control-M", password=get_password_hash("secret"), notes="user for automation"), dict(code="CTM", name="Control-M", password=get_password_hash("secret"), notes="user for automation"),
dict(username="exde37c8", name="Bernhard Radermacher", dict(code="exde37c8", name="Bernhard Radermacher",
password=get_password_hash("secret"), password=get_password_hash("secret"),
ldap_name="a0061806@kiongroup.com", ldap_name="a0061806@kiongroup.com",
), ),

View File

@@ -1,6 +1,8 @@
import inspect
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Annotated from typing import Annotated
import fastapi
from fastapi.security import OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordRequestForm
from app.alchemy import Base from app.alchemy import Base
@@ -19,8 +21,11 @@ app = FastAPI(lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"]) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
app.include_router(routers.contact) # app.include_router(routers.contact)
# app.include_router(routers.country)
app.include_router(routers.status) # app.include_router(routers.status)
app.include_router(routers.user) # app.include_router(routers.user)
for i in inspect.getmembers(routers):
if isinstance(i[1], fastapi.routing.APIRouter):
app.include_router(i[1])

View File

@@ -1,3 +1,5 @@
from .contact import router as contact from .contact import router as contact
from .country import router as country
from .location_code import router as location_code
from .status import router as status from .status import router as status
from .user import router as user from .user import router as user

View File

@@ -1,3 +1,4 @@
import sys
from typing import Annotated from typing import Annotated
from fastapi import APIRouter, Query, Depends from fastapi import APIRouter, Query, Depends
@@ -18,7 +19,7 @@ PRIMARY_ANNOTATION = utils.make_primary_annotation('Contact')
# Models # Models
class ContactBase(SQLModel): class ContactBase(SQLModel):
name: str = Field(max_length=80, unique=True) code: str = Field(max_length=80, unique=True)
address: str = Field(max_length=253) address: str = Field(max_length=253)
notes: str | None notes: str | None
@@ -38,7 +39,7 @@ class ContactPublic(ContactBase):
class ContactUpdate(ContactBase): class ContactUpdate(ContactBase):
name: str | None = None code: str | None = None
address: str | None = None address: str | None = None
notes: str | None = None notes: str | None = None
@@ -52,9 +53,11 @@ router = APIRouter(prefix="/contact", tags=["contact"])
@router.get("/", response_model=list[ContactPublic]) @router.get("/", response_model=list[ContactPublic])
async def get_contacts( async def get_contacts(
offset: int = 0, offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100, limit: Annotated[int, Query] = 100,
session=Depends(get_session)): session=Depends(get_session)):
"""Get list of all contacts""" """Get list of all contacts"""
if limit < 1:
limit = sys.maxsize
return session.exec(select(alchemy.Contact).offset(offset).limit(limit)).all() return session.exec(select(alchemy.Contact).offset(offset).limit(limit)).all()
@@ -64,11 +67,11 @@ async def get_contacts(
async def get_contact( async def get_contact(
contact_id: PRIMARY_ANNOTATION, contact_id: PRIMARY_ANNOTATION,
session=Depends(get_session)): session=Depends(get_session)):
result = utils.get_single_record(session, alchemy.Contact, contact_id) return utils.get_single_record(session, alchemy.Contact, contact_id)
return result
@router.post("/", response_model=ContactPublic) @router.post("/",
response_model=ContactPublic)
async def create_contact( async def create_contact(
contact: ContactCreate, contact: ContactCreate,
current_user: ACTIVE_USER, current_user: ACTIVE_USER,
@@ -80,7 +83,9 @@ async def create_contact(
data=Contact.model_validate(contact)) data=Contact.model_validate(contact))
@router.patch("/{contact_id}", response_model=ContactPublic) @router.patch("/{contact_id}",
response_model=ContactPublic,
responses={404: {"description": "Not found"}})
async def update_contact( async def update_contact(
contact_id: PRIMARY_ANNOTATION, contact_id: PRIMARY_ANNOTATION,
contact: ContactUpdate, contact: ContactUpdate,
@@ -93,7 +98,9 @@ async def update_contact(
data=contact) data=contact)
@router.put("/{contact_id}/activate", response_model=ContactPublic) @router.put("/{contact_id}/activate",
response_model=ContactPublic,
responses={404: {"description": "Not found"}})
async def activate_contact( async def activate_contact(
contact_id: PRIMARY_ANNOTATION, contact_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER, current_user: ACTIVE_USER,
@@ -105,7 +112,9 @@ async def activate_contact(
status='A') status='A')
@router.put("/{contact_id}/deactivate", response_model=ContactPublic) @router.put("/{contact_id}/deactivate",
response_model=ContactPublic,
responses={404: {"description": "Not found"}})
async def deactivate_contact( async def deactivate_contact(
contact_id: PRIMARY_ANNOTATION, contact_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER, current_user: ACTIVE_USER,

125
app/routers/country.py Normal file
View File

@@ -0,0 +1,125 @@
import sys
from typing import Annotated
from fastapi import APIRouter, Query, Depends
from sqlmodel import SQLModel, Field
from sqlmodel import select
import alchemy
import utils
from dependencies import get_session
from utils import update_item, create_item
from .status_model import Status
from .user import ACTIVE_USER
PRIMARY_ANNOTATION = utils.make_primary_annotation('Country')
# ----------------------------------------------------------------
# Models
class CountryBase(SQLModel):
code: str = Field(max_length=2, unique=True)
name: str = Field(max_length=80)
notes: str | None
class Country(CountryBase):
id: int | None = Field(default=None, primary_key=True)
class CountryCreate(CountryBase):
notes: str | None = None
class CountryPublic(CountryBase):
id: int
status: Status
class CountryUpdate(CountryBase):
code: str | None = None
name: str | None = None
notes: str | None = None
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/country", tags=["country"])
@router.get("/", response_model=list[CountryPublic])
async def get_countries(
offset: int = 0,
limit: Annotated[int, Query] = 100,
session=Depends(get_session)):
"""Get list of all countries"""
if limit < 1:
limit = sys.maxsize
return session.exec(select(alchemy.Country).offset(offset).limit(limit)).all()
@router.get("/{country_id}",
response_model=CountryPublic,
responses={404: {"description": "Not found"}})
async def get_country(
country_id: PRIMARY_ANNOTATION,
session=Depends(get_session)):
return utils.get_single_record(session, alchemy.Country, country_id)
@router.post("/",
response_model=CountryPublic)
async def create_country(
country: CountryCreate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return create_item(
session=session,
cls=alchemy.Country,
current_user=current_user,
data=Country.model_validate(country))
@router.patch("/{country_id}",
response_model=CountryPublic,
responses={404: {"description": "Not found"}})
async def update_country(
country_id: PRIMARY_ANNOTATION,
country: CountryUpdate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return update_item(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Country, country_id),
data=country)
@router.put("/{country_id}/activate",
response_model=CountryPublic,
responses={404: {"description": "Not found"}})
async def activate_country(
country_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Country, country_id),
status='A')
@router.put("/{country_id}/deactivate",
response_model=CountryPublic,
responses={404: {"description": "Not found"}})
async def deactivate_country(
country_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Country, country_id),
status='I')

View File

@@ -0,0 +1,134 @@
import sys
from typing import Annotated
from fastapi import APIRouter, Query, Depends
from sqlmodel import SQLModel, Field
from sqlmodel import select
import alchemy
import utils
from dependencies import get_session
from utils import update_item, create_item
from .status_model import Status
from .user import ACTIVE_USER
from .contact import Contact
from .country import Country
PRIMARY_ANNOTATION = utils.make_primary_annotation('Location Code')
# ----------------------------------------------------------------
# Models
class LocationCodeBase(SQLModel):
code: str = Field(max_length=8, unique=True)
description: str = Field(max_length=256)
notes: str | None
class LocationCode(LocationCodeBase):
id: int | None = Field(default=None, primary_key=True)
country: Country
contact: Contact | None = None
class LocationCodeCreate(LocationCodeBase):
country_id: int
contact_id : int | None = None
notes: str | None = None
class LocationCodePublic(LocationCode):
status: Status
class LocationCodeUpdate(LocationCodeBase):
code: str | None = None
description: str | None = None
country_id: int | None = None
contact_id: int | None = None
notes: str | None = None
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/location_code", tags=["location_code"])
@router.get("/", response_model=list[LocationCodePublic])
async def get_location_codes(
offset: int = 0,
limit: Annotated[int, Query] = 100,
session=Depends(get_session)):
"""Get list of all location codes"""
if limit < 1:
limit = sys.maxsize
return session.exec(select(alchemy.LocationCode).offset(offset).limit(limit)).all()
@router.get("/{location_code_id}",
response_model=LocationCodePublic,
responses={404: {"description": "Not found"}})
async def get_location_code(
location_code_id: PRIMARY_ANNOTATION,
session=Depends(get_session)):
return utils.get_single_record(session, alchemy.LocationCode, location_code_id)
@router.post("/",
response_model=LocationCodePublic)
async def create_location_code(
location_code: LocationCodeCreate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return create_item(
session=session,
cls=alchemy.LocationCode,
current_user=current_user,
data=LocationCodeCreate.model_validate(location_code))
@router.patch("/{location_code_id}",
response_model=LocationCodePublic,
responses={404: {"description": "Not found"}})
async def update_location_code(
location_code_id: PRIMARY_ANNOTATION,
location_code: LocationCodeUpdate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return update_item(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.LocationCode, location_code_id),
data=location_code)
@router.put("/{location_code_id}/activate",
response_model=LocationCodePublic,
responses={404: {"description": "Not found"}})
async def activate_location_code(
locationCode_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.LocationCode, locationCode_id),
status='A')
@router.put("/{location_code_id}/deactivate",
response_model=LocationCodePublic,
responses={404: {"description": "Not found"}})
async def deactivate_location_code(
location_code_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.LocationCode, location_code_id),
status='I')

View File

@@ -1,13 +1,12 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import select, Session from sqlmodel import select, Session
import alchemy
from dependencies import get_session from dependencies import get_session
from routers.status_model import Status from routers.status_model import Status
router = APIRouter( router = APIRouter(prefix="/status", tags=["status"])
prefix="/status",
tags=["status"],
)
def _get_status(status: str, session: Session): def _get_status(status: str, session: Session):
result = session.get(Status, status) result = session.get(Status, status)
@@ -18,14 +17,17 @@ def _get_status(status: str, session: Session):
return result return result
@router.get("/", response_model=list[Status]) @router.get("/",
response_model=list[Status])
async def get_statuses( async def get_statuses(
session=Depends(get_session)): session=Depends(get_session)):
"""Get list of all statuses""" """Get list of all statuses"""
return session.exec(select(Status)).all() return session.exec(select(alchemy.Status)).all()
@router.get("/{status}", responses={404: {"description": "Not found"}}) @router.get("/{status}",
response_model=Status,
responses={404: {"description": "Not found"}})
async def get_status( async def get_status(
status: str, status: str,
session=Depends(get_session)): session=Depends(get_session)):

View File

@@ -1,5 +1,6 @@
import datetime import datetime
import logging import logging
import sys
from typing import Annotated from typing import Annotated
import jwt import jwt
@@ -13,6 +14,7 @@ from pydantic import BaseModel
from sqlmodel import SQLModel, Field, Session, select from sqlmodel import SQLModel, Field, Session, select
import alchemy import alchemy
import utils
from dependencies import get_session from dependencies import get_session
from routers.status_model import Status from routers.status_model import Status
@@ -51,7 +53,7 @@ class TokenData(BaseModel):
class UserBase(SQLModel): class UserBase(SQLModel):
username: str = Field(max_length=253, unique=True) code: str = Field(max_length=253, unique=True)
name: str = Field(max_length=253) name: str = Field(max_length=253)
password: str | None = Field(max_length=255) password: str | None = Field(max_length=255)
ldap_name: str | None = Field(max_length=255) ldap_name: str | None = Field(max_length=255)
@@ -74,7 +76,7 @@ class UserPublic(UserBase):
class UserUpdate(UserBase): class UserUpdate(UserBase):
username: str | None = None code: str | None = None
name: str | None = None name: str | None = None
password: str | None = None password: str | None = None
ldap_name: str | None = None ldap_name: str | None = None
@@ -93,10 +95,10 @@ def _authenticate_user(
username: str, username: str,
password: str, password: str,
session: sqlalchemy.orm.Session) -> alchemy.User | None: session: sqlalchemy.orm.Session) -> alchemy.User | None:
user = session.scalar(select(alchemy.User).where(alchemy.User.username == username)) user = session.scalar(select(alchemy.User).where(alchemy.User.code == username))
if user is None: if user is None:
return None return None
if user.username == 'QSYS' and password == 'joshua5': if user.code == 'QSYS' and password == 'joshua5':
return user return user
if user.password is not None: if user.password is not None:
if not _verify_password(password, user.password): if not _verify_password(password, user.password):
@@ -122,10 +124,7 @@ def _create_access_token(
def _get_user( def _get_user(
user_id: str, user_id: str,
session: Session) -> alchemy.User: session: Session) -> alchemy.User:
result = session.get(alchemy.User, user_id) if user_id.isnumeric() else session.scalar( result = utils.get_single_record(session, alchemy.User, user_id)
select(alchemy.User).where(alchemy.User.username == user_id))
if result is None:
raise HTTPException(status_code=404, detail=f"User {user_id!r} not found")
result.password = None if result.password is None else '********' result.password = None if result.password is None else '********'
return result return result
@@ -139,7 +138,7 @@ def _process_login(
raise HTTPException(status_code=400, raise HTTPException(status_code=400,
detail="Incorrect username or password", detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}) headers={"WWW-Authenticate": "Bearer"})
access_token = _create_access_token(data={"sub": user.username}) access_token = _create_access_token(data={"sub": user.code})
return Token(access_token=access_token, token_type="bearer") return Token(access_token=access_token, token_type="bearer")
@@ -166,7 +165,7 @@ async def _get_current_user(
if username is None: if username is None:
raise CREDENTIALS_EXCEPTION raise CREDENTIALS_EXCEPTION
user = session.scalar(select(alchemy.User).where(alchemy.User.username == username)) user = utils.get_single_record(session, alchemy.User, username)
if user is None: if user is None:
raise CREDENTIALS_EXCEPTION raise CREDENTIALS_EXCEPTION
@@ -189,19 +188,20 @@ router = APIRouter(prefix="/user", tags=["user"])
@router.post("/login") @router.post("/login")
async def login_user( async def login_user(
credentials: Credentials, credentials: Credentials,
session=Depends(get_session), session=Depends(get_session)) -> Token:
) -> Token:
return _process_login(credentials.username, credentials.password, session) return _process_login(credentials.username, credentials.password, session)
@router.get("/", response_model=list[UserPublic]) @router.get("/",
response_model=list[UserPublic])
async def get_users( async def get_users(
current_user: Annotated[alchemy.User, Depends(get_current_active_user)], current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
offset: int = 0, offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100, limit: Annotated[int, Query] = 100,
session=Depends(get_session), session=Depends(get_session)):
):
"""Get list of users""" """Get list of users"""
if limit < 1:
limit = sys.maxsize
result = session.exec(select(alchemy.User).where(alchemy.User.status_id != 'X').offset(offset).limit(limit)).all() result = session.exec(select(alchemy.User).where(alchemy.User.status_id != 'X').offset(offset).limit(limit)).all()
for item in result: for item in result:
if item.password is not None: if item.password is not None:
@@ -209,7 +209,9 @@ async def get_users(
return result return result
@router.get("/{user_id}", response_model=UserPublic) @router.get("/{user_id}",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
async def get_user( async def get_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')], user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)], current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
@@ -218,12 +220,17 @@ async def get_user(
return _get_user(user_id, session) return _get_user(user_id, session)
@router.post("/", response_model=UserPublic) @router.post("/",
response_model=UserPublic)
async def create_user( async def create_user(
user: UserCreate, user: UserCreate,
current_user: Annotated[alchemy.User, Depends(get_current_active_user)], current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
session=Depends(get_session)): session=Depends(get_session)):
model_user = User.model_validate(user) model_user = User.model_validate(user)
if session.scalar(select(alchemy.User).where(alchemy.User.code == model_user.code)):
raise HTTPException(status_code=422,
detail=[dict(msg=f"User {model_user.code} already exists",
type="IntegrityError")])
db_user = alchemy.User(**model_user.model_dump()) db_user = alchemy.User(**model_user.model_dump())
if db_user.password is not None: if db_user.password is not None:
db_user.password = get_password_hash(db_user.password) db_user.password = get_password_hash(db_user.password)
@@ -234,7 +241,9 @@ async def create_user(
return db_user return db_user
@router.patch("/{user_id}", response_model=UserPublic) @router.patch("/{user_id}",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
async def update_user( async def update_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')], user_id: Annotated[str, Path(description='User, either id (int) or name')],
user: UserUpdate, user: UserUpdate,
@@ -242,17 +251,29 @@ async def update_user(
session=Depends(get_session)): session=Depends(get_session)):
db_user = _get_user(user_id, session) db_user = _get_user(user_id, session)
user_data = user.model_dump(exclude_unset=True) user_data = user.model_dump(exclude_unset=True)
if ('code' in user_data and
session.scalar(select(alchemy.User).where(alchemy.User.code == user_data['code'])) != db_user):
raise HTTPException(status_code=422,
detail=[dict(msg=f"User {user_data['code']} already exists",
type="IntegrityError")])
if 'password' in user_data: if 'password' in user_data:
user_data['password'] = get_password_hash(user_data['password']) user_data['password'] = get_password_hash(user_data['password'])
for item in user_data: for item in user_data:
setattr(db_user, item, user_data[item]) setattr(db_user, item, user_data[item])
db_user._user__id = current_user.id db_user._user__id = current_user.id
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(db_user) session.refresh(db_user)
return db_user return db_user
@router.put("/{user_id}/activate", response_model=UserPublic) @router.put("/{user_id}/activate",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
async def activate_user( async def activate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')], user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)], current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
@@ -260,12 +281,19 @@ async def activate_user(
db_user = _get_user(user_id, session) db_user = _get_user(user_id, session)
db_user.status_id = 'A' db_user.status_id = 'A'
db_user._user__id = current_user.id db_user._user__id = current_user.id
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(db_user) session.refresh(db_user)
return db_user return db_user
@router.put("/{user_id}/deactivate", response_model=UserPublic) @router.put("/{user_id}/deactivate",
response_model=UserPublic,
responses={404: {"description": "Not found"}})
async def deactivate_user( async def deactivate_user(
user_id: Annotated[str, Path(description='User, either id (int) or name')], user_id: Annotated[str, Path(description='User, either id (int) or name')],
current_user: Annotated[alchemy.User, Depends(get_current_active_user)], current_user: Annotated[alchemy.User, Depends(get_current_active_user)],
@@ -273,7 +301,12 @@ async def deactivate_user(
db_user = _get_user(user_id, session) db_user = _get_user(user_id, session)
db_user.status_id = 'I' db_user.status_id = 'I'
db_user._user__id = current_user.id db_user._user__id = current_user.id
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(db_user) session.refresh(db_user)
return db_user return db_user

View File

@@ -6,45 +6,65 @@ from sqlalchemy import select
def make_primary_annotation(name=str): def make_primary_annotation(name=str):
return Annotated[str, Path(description=f'{name}, either id (int) or name')] return Annotated[str, Path(description=f'{name}, either id (int) or code')]
def get_single_record(session, cls, primary: str): def get_single_record(session, cls, code: str):
print(session) result = session.get(cls, code)
print(cls)
print(primary)
result = session.get(cls, primary)
print(result)
if result is None: if result is None:
result = session.scalar(select(cls).where(cls.name == primary)) result = session.scalar(select(cls).where(cls.code == code))
if result is None: if result is None:
raise HTTPException( raise HTTPException(
status_code=404, status_code=404,
detail=f"{cls.__class__.__name__} {primary!r} not found.") detail=f"{cls.__class__.__name__} {code!r} not found.")
return result return result
def set_item_status(session, current_user, item, status: str): def set_item_status(session, current_user, item, status: str):
item.status_id = status item.status_id = status
item._user__id = current_user.id item._user__id = current_user.id
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(item) session.refresh(item)
return item return item
def update_item(session, current_user, item, data): def update_item(session, current_user, item, data):
for k, v in data.model_dump(exclude_unset=True).items(): for k, v in data.model_dump(exclude_unset=True).items():
if (k == 'code' and
session.scalar(select(item.__class__).where(item.__class__.code == v)) != item):
raise HTTPException(status_code=422,
detail=[dict(msg=f"{item.__class__.__name__} {k!r} already exists",
type="Integrity Error")])
setattr(item, k, v) setattr(item, k, v)
item._user__id = current_user.id item._user__id = current_user.id
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(item) session.refresh(item)
return item return item
def create_item(session, cls, current_user, data): def create_item(session, cls, current_user, data):
item = cls(**data.model_dump()) item = cls(**data.model_dump())
if session.scalar(select(cls).where(cls.code == item.code)):
raise HTTPException(status_code=422,
detail=[dict(msg=f"{cls.__class__.__name__} {item.code} already exists",
type="Integrity Error")])
item._user__id = current_user.id item._user__id = current_user.id
session.add(item) session.add(item)
session.commit() try:
session.commit()
except Exception as exc:
raise HTTPException(status_code=422,
detail=[dict(msg=', '.join(exc.args),
type="Database Error")])
session.refresh(item) session.refresh(item)
return item return item