Files
vpsx-fast/app/routers/contact.py

118 lines
3.2 KiB
Python
Raw Normal View History

2025-08-30 10:06:47 +02:00
from typing import Annotated
from fastapi import APIRouter, Query, Depends
from sqlmodel import SQLModel, Field
from sqlmodel import select
import alchemy
import utils
from dependencies import get_session
from utils import update_item, create_item
from .status_model import Status
from .user import ACTIVE_USER
PRIMARY_ANNOTATION = utils.make_primary_annotation('Contact')
# ----------------------------------------------------------------
# Models
class ContactBase(SQLModel):
name: str = Field(max_length=80, unique=True)
address: str = Field(max_length=253)
notes: str | None
class Contact(ContactBase):
id: int | None = Field(default=None, primary_key=True)
class ContactCreate(ContactBase):
address: str | None = None
notes: str | None = None
class ContactPublic(ContactBase):
id: int
status: Status
class ContactUpdate(ContactBase):
name: str | None = None
address: str | None = None
notes: str | None = None
# ----------------------------------------------------------------
# Routes
router = APIRouter(prefix="/contact", tags=["contact"])
@router.get("/", response_model=list[ContactPublic])
async def get_contacts(
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
session=Depends(get_session)):
"""Get list of all contacts"""
return session.exec(select(alchemy.Contact).offset(offset).limit(limit)).all()
@router.get("/{contact_id}",
response_model=ContactPublic,
responses={404: {"description": "Not found"}})
async def get_contact(
contact_id: PRIMARY_ANNOTATION,
session=Depends(get_session)):
result = utils.get_single_record(session, alchemy.Contact, contact_id)
return result
@router.post("/", response_model=ContactPublic)
async def create_contact(
contact: ContactCreate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return create_item(
session=session,
cls=alchemy.Contact,
current_user=current_user,
data=Contact.model_validate(contact))
@router.patch("/{contact_id}", response_model=ContactPublic)
async def update_contact(
contact_id: PRIMARY_ANNOTATION,
contact: ContactUpdate,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return update_item(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, contact_id),
data=contact)
@router.put("/{contact_id}/activate", response_model=ContactPublic)
async def activate_contact(
contact_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, contact_id),
status='A')
@router.put("/{contact_id}/deactivate", response_model=ContactPublic)
async def deactivate_contact(
contact_id: PRIMARY_ANNOTATION,
current_user: ACTIVE_USER,
session=Depends(get_session)):
return utils.set_item_status(
session=session,
current_user=current_user,
item=utils.get_single_record(session, alchemy.Contact, contact_id),
status='I')