worklog/crud.py
2026-01-31 17:49:36 +01:00

126 lines
3.0 KiB
Python

from datetime import date
from typing import List, Optional
from sqlalchemy import and_, desc, select
from sqlalchemy.orm import Session
from models import Project, WorkLogEntry
def create_project(db: Session, name: str, description: Optional[str]) -> Project:
project = Project(name=name, description=description)
db.add(project)
db.commit()
db.refresh(project)
return project
def get_project(db: Session, project_id: int) -> Optional[Project]:
return db.get(Project, project_id)
def get_project_by_name(db: Session, name: str) -> Optional[Project]:
stmt = select(Project).where(Project.name == name)
return db.execute(stmt).scalars().first()
def list_projects(db: Session, limit: int, offset: int) -> List[Project]:
stmt = select(Project).order_by(desc(Project.id)).limit(limit).offset(offset)
return db.execute(stmt).scalars().all()
def update_project(
db: Session,
project: Project,
name: Optional[str],
description: Optional[str],
) -> Project:
if name is not None:
project.name = name
if description is not None:
project.description = description
db.commit()
db.refresh(project)
return project
def delete_project(db: Session, project: Project) -> None:
db.delete(project)
db.commit()
def create_work_log(
db: Session,
project_id: int,
log_date: date,
hours: float,
description: str,
) -> WorkLogEntry:
entry = WorkLogEntry(
project_id=project_id,
date=log_date,
hours=hours,
description=description,
)
db.add(entry)
db.commit()
db.refresh(entry)
return entry
def get_work_log(db: Session, log_id: int) -> Optional[WorkLogEntry]:
return db.get(WorkLogEntry, log_id)
def list_work_logs(db: Session, limit: int, offset: int) -> List[WorkLogEntry]:
stmt = (
select(WorkLogEntry)
.order_by(desc(WorkLogEntry.date), desc(WorkLogEntry.id))
.limit(limit)
.offset(offset)
)
return db.execute(stmt).scalars().all()
def list_work_logs_by_range(
db: Session,
start_date: date,
end_date: date,
limit: int,
offset: int,
) -> List[WorkLogEntry]:
stmt = (
select(WorkLogEntry)
.where(and_(WorkLogEntry.date >= start_date, WorkLogEntry.date <= end_date))
.order_by(desc(WorkLogEntry.date), desc(WorkLogEntry.id))
.limit(limit)
.offset(offset)
)
return db.execute(stmt).scalars().all()
def update_work_log(
db: Session,
entry: WorkLogEntry,
project_id: Optional[int],
log_date: Optional[date],
hours: Optional[float],
description: Optional[str],
) -> WorkLogEntry:
if project_id is not None:
entry.project_id = project_id
if log_date is not None:
entry.date = log_date
if hours is not None:
entry.hours = hours
if description is not None:
entry.description = description
db.commit()
db.refresh(entry)
return entry
def delete_work_log(db: Session, entry: WorkLogEntry) -> None:
db.delete(entry)
db.commit()