186 lines
6.0 KiB
Python
186 lines
6.0 KiB
Python
import calendar
|
|
from datetime import date, timedelta
|
|
from typing import Generator, List
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException, Query, status
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import Session
|
|
|
|
import crud
|
|
from db import SessionLocal, engine
|
|
from models import Base
|
|
from schemas import (
|
|
ProjectCreate,
|
|
ProjectRead,
|
|
ProjectUpdate,
|
|
WorkLogCreate,
|
|
WorkLogRead,
|
|
WorkLogUpdate,
|
|
)
|
|
|
|
app = FastAPI(title="Work Log API")
|
|
|
|
|
|
def get_db() -> Generator[Session, None, None]:
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.on_event("startup")
|
|
def on_startup() -> None:
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
|
@app.post("/projects", response_model=ProjectRead, status_code=status.HTTP_201_CREATED)
|
|
def create_project(payload: ProjectCreate, db: Session = Depends(get_db)) -> ProjectRead:
|
|
existing = crud.get_project_by_name(db, payload.name)
|
|
if existing:
|
|
raise HTTPException(status_code=409, detail="project name already exists")
|
|
try:
|
|
return crud.create_project(db, payload.name, payload.description)
|
|
except IntegrityError:
|
|
raise HTTPException(status_code=409, detail="project name already exists")
|
|
|
|
|
|
@app.get("/projects", response_model=List[ProjectRead])
|
|
def list_projects(
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
) -> List[ProjectRead]:
|
|
return crud.list_projects(db, limit, offset)
|
|
|
|
|
|
@app.get("/projects/{project_id}", response_model=ProjectRead)
|
|
def get_project(project_id: int, db: Session = Depends(get_db)) -> ProjectRead:
|
|
project = crud.get_project(db, project_id)
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="project not found")
|
|
return project
|
|
|
|
|
|
@app.put("/projects/{project_id}", response_model=ProjectRead)
|
|
def update_project(
|
|
project_id: int,
|
|
payload: ProjectUpdate,
|
|
db: Session = Depends(get_db),
|
|
) -> ProjectRead:
|
|
project = crud.get_project(db, project_id)
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="project not found")
|
|
if payload.name:
|
|
existing = crud.get_project_by_name(db, payload.name)
|
|
if existing and existing.id != project_id:
|
|
raise HTTPException(status_code=409, detail="project name already exists")
|
|
return crud.update_project(db, project, payload.name, payload.description)
|
|
|
|
|
|
@app.delete("/projects/{project_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_project(project_id: int, db: Session = Depends(get_db)) -> None:
|
|
project = crud.get_project(db, project_id)
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="project not found")
|
|
crud.delete_project(db, project)
|
|
return None
|
|
|
|
|
|
@app.post("/work-logs", response_model=WorkLogRead, status_code=status.HTTP_201_CREATED)
|
|
def create_work_log(payload: WorkLogCreate, db: Session = Depends(get_db)) -> WorkLogRead:
|
|
project = crud.get_project(db, payload.project_id)
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="project not found")
|
|
return crud.create_work_log(
|
|
db,
|
|
payload.project_id,
|
|
payload.date,
|
|
payload.hours,
|
|
payload.description,
|
|
)
|
|
|
|
|
|
@app.get("/work-logs", response_model=List[WorkLogRead])
|
|
def list_work_logs(
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
) -> List[WorkLogRead]:
|
|
return crud.list_work_logs(db, limit, offset)
|
|
|
|
|
|
@app.get("/work-logs/{log_id}", response_model=WorkLogRead)
|
|
def get_work_log(log_id: int, db: Session = Depends(get_db)) -> WorkLogRead:
|
|
entry = crud.get_work_log(db, log_id)
|
|
if not entry:
|
|
raise HTTPException(status_code=404, detail="work log not found")
|
|
return entry
|
|
|
|
|
|
@app.put("/work-logs/{log_id}", response_model=WorkLogRead)
|
|
def update_work_log(
|
|
log_id: int,
|
|
payload: WorkLogUpdate,
|
|
db: Session = Depends(get_db),
|
|
) -> WorkLogRead:
|
|
entry = crud.get_work_log(db, log_id)
|
|
if not entry:
|
|
raise HTTPException(status_code=404, detail="work log not found")
|
|
if payload.project_id is not None:
|
|
project = crud.get_project(db, payload.project_id)
|
|
if not project:
|
|
raise HTTPException(status_code=404, detail="project not found")
|
|
return crud.update_work_log(
|
|
db,
|
|
entry,
|
|
payload.project_id,
|
|
payload.date,
|
|
payload.hours,
|
|
payload.description,
|
|
)
|
|
|
|
|
|
@app.delete("/work-logs/{log_id}", status_code=status.HTTP_204_NO_CONTENT)
|
|
def delete_work_log(log_id: int, db: Session = Depends(get_db)) -> None:
|
|
entry = crud.get_work_log(db, log_id)
|
|
if not entry:
|
|
raise HTTPException(status_code=404, detail="work log not found")
|
|
crud.delete_work_log(db, entry)
|
|
return None
|
|
|
|
|
|
@app.get("/work-logs/day", response_model=List[WorkLogRead])
|
|
def work_logs_for_day(
|
|
date_param: date = Query(..., alias="date"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
) -> List[WorkLogRead]:
|
|
return crud.list_work_logs_by_range(db, date_param, date_param, limit, offset)
|
|
|
|
|
|
@app.get("/work-logs/week", response_model=List[WorkLogRead])
|
|
def work_logs_for_week(
|
|
date_param: date = Query(..., alias="date"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
) -> List[WorkLogRead]:
|
|
start = date_param - timedelta(days=date_param.weekday())
|
|
end = start + timedelta(days=6)
|
|
return crud.list_work_logs_by_range(db, start, end, limit, offset)
|
|
|
|
|
|
@app.get("/work-logs/month", response_model=List[WorkLogRead])
|
|
def work_logs_for_month(
|
|
date_param: date = Query(..., alias="date"),
|
|
limit: int = Query(50, ge=1, le=200),
|
|
offset: int = Query(0, ge=0),
|
|
db: Session = Depends(get_db),
|
|
) -> List[WorkLogRead]:
|
|
last_day = calendar.monthrange(date_param.year, date_param.month)[1]
|
|
start = date_param.replace(day=1)
|
|
end = date_param.replace(day=last_day)
|
|
return crud.list_work_logs_by_range(db, start, end, limit, offset)
|