from datetime import date from typing import List, Optional from sqlalchemy import and_, desc, select from sqlalchemy.orm import Session from models import Project, WorkLogEntry def create_project(db: Session, name: str, description: Optional[str]) -> Project: project = Project(name=name, description=description) db.add(project) db.commit() db.refresh(project) return project def get_project(db: Session, project_id: int) -> Optional[Project]: return db.get(Project, project_id) def get_project_by_name(db: Session, name: str) -> Optional[Project]: stmt = select(Project).where(Project.name == name) return db.execute(stmt).scalars().first() def list_projects(db: Session, limit: int, offset: int) -> List[Project]: stmt = select(Project).order_by(desc(Project.id)).limit(limit).offset(offset) return db.execute(stmt).scalars().all() def update_project( db: Session, project: Project, name: Optional[str], description: Optional[str], ) -> Project: if name is not None: project.name = name if description is not None: project.description = description db.commit() db.refresh(project) return project def delete_project(db: Session, project: Project) -> None: db.delete(project) db.commit() def create_work_log( db: Session, project_id: int, log_date: date, hours: float, description: str, ) -> WorkLogEntry: entry = WorkLogEntry( project_id=project_id, date=log_date, hours=hours, description=description, ) db.add(entry) db.commit() db.refresh(entry) return entry def get_work_log(db: Session, log_id: int) -> Optional[WorkLogEntry]: return db.get(WorkLogEntry, log_id) def list_work_logs(db: Session, limit: int, offset: int) -> List[WorkLogEntry]: stmt = ( select(WorkLogEntry) .order_by(desc(WorkLogEntry.date), desc(WorkLogEntry.id)) .limit(limit) .offset(offset) ) return db.execute(stmt).scalars().all() def list_work_logs_by_range( db: Session, start_date: date, end_date: date, limit: int, offset: int, ) -> List[WorkLogEntry]: stmt = ( select(WorkLogEntry) .where(and_(WorkLogEntry.date >= start_date, WorkLogEntry.date <= end_date)) .order_by(desc(WorkLogEntry.date), desc(WorkLogEntry.id)) .limit(limit) .offset(offset) ) return db.execute(stmt).scalars().all() def update_work_log( db: Session, entry: WorkLogEntry, project_id: Optional[int], log_date: Optional[date], hours: Optional[float], description: Optional[str], ) -> WorkLogEntry: if project_id is not None: entry.project_id = project_id if log_date is not None: entry.date = log_date if hours is not None: entry.hours = hours if description is not None: entry.description = description db.commit() db.refresh(entry) return entry def delete_work_log(db: Session, entry: WorkLogEntry) -> None: db.delete(entry) db.commit()