Files
vpsx-fast/app/routers/contact.py
Bernhard Radermacher 292e296f01 wip
2025-09-01 11:36:22 +00:00

103 lines
3.0 KiB
Python

import sys
from typing import Annotated
from fastapi import APIRouter, Query, Depends
from sqlmodel import SQLModel, Field
from sqlmodel import select
import alchemy
import utils
from dependencies import get_session
from utils import update_item, create_item
from .user import ACTIVE_USER
from .models import Contact, ContactCreate, ContactUpdate
PRIMARY_ANNOTATION = utils.make_primary_annotation('Contact')
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/contact", tags=["contact"])
@router.get("/",
response_model=list[Contact])
async def get_contacts(
offset: int = 0,
limit: Annotated[int, Query] = 100,
session=Depends(get_session)):
"""Get list of all Contacts"""
return session.exec(
select(
alchemy.Contact
).offset(offset).limit(limit or sys.maxsize)).all()
# noinspection PyTypeHints
@router.get("/{contact}",
response_model=Contact,
responses={404: {"description": "Not found"}})
async def get_contact(
contact: PRIMARY_ANNOTATION,
session=Depends(get_session)):
return utils.get_single_record(session, alchemy.Contact, 'Contact', contact)
# noinspection PyTypeHints
@router.post("/",
response_model=Contact)
async def create_contact(
data: ContactCreate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return create_item(
session=session,
cls=alchemy.Contact,
name='Contact',
current_user=current_user,
data=data)
@router.patch("/{contact}",
response_model=Contact,
responses={404: {"description": "Not found"}})
async def update_contact(
contact: PRIMARY_ANNOTATION,
data: ContactUpdate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return update_item(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, 'Contact', contact),
data=data)
@router.put("/{contact}/activate",
response_model=Contact,
responses={404: {"description": "Not found"}})
async def activate_contact(
contact: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, 'Contact', contact),
status='A')
@router.put("/{contact}/deactivate",
response_model=Contact,
responses={404: {"description": "Not found"}})
async def deactivate_contact(
contact: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, 'Contact', contact),
status='I')