Source code for village.classes.collection

import os
import sys
import traceback
from pathlib import Path
from typing import Any, Type, Union

import numpy as np
import pandas as pd

from village.classes.abstract_classes import EventBase
from village.custom_classes.training_protocol_base import TrainingProtocolBase
from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.scripts.utils import get_x_value_interp
from village.settings import settings


[docs] class Collection(EventBase): """Manages a collection of data entries stored in a CSV file and a pandas DataFrame. Attributes: name (str): Name of the collection. columns (list[str]): List of column names. types (list[Type]): List of column data types. dict (dict): Dictionary mapping columns to types. path (Path): Path to the CSV file. df (pd.DataFrame): The pandas DataFrame holding the data. """
[docs] def __init__(self, name: str, columns: list[str], types: list[Type]) -> None: """Initializes the Collection. Args: name (str): The name of the collection (and the file base name). columns (list[str]): The column names. types (list[Type]): The data types for each column. """ self.name: str = name self.columns: list[str] = columns self.types: list[Type] = types self.dict = {col: t for col, t in zip(self.columns, self.types)} self.path: Path = Path(settings.get("SYSTEM_DIRECTORY")) / (name + ".csv") self.df = pd.DataFrame() if name != "": if not os.path.exists(self.path): with open(self.path, "w", encoding="utf-8") as file: columns_str: str = ";".join(self.columns) + "\n" file.write(columns_str) try: self.df = pd.read_csv(self.path, dtype=self.dict, sep=";") except Exception: log.error( "error reading from: " + str(self.path), exception=traceback.format_exc(), ) sys.exit()
[docs] def add_entry(self, entry: list) -> None: """Adds a new entry to the collection. Args: entry (list): The list of values for the new row. """ entry_str = [ "" if isinstance(e, float) and np.isnan(e) else str(e) for e in entry ] new_row = pd.DataFrame([entry_str], columns=self.columns) new_row = self.convert_df_to_types(new_row) self.df = pd.concat([self.df, new_row], ignore_index=True) columns_str: str = ";".join(entry_str) + "\n" with open(self.path, "a", encoding="utf-8") as file: file.write(columns_str) self.check_split_csv()
[docs] @staticmethod def convert_with_default(value, target_type: Any) -> Any: """Converts a value to a target type, using defaults for failures. Args: value: The value to convert. target_type (Any): The target type. Returns: Any: The converted value or a default. """ try: return target_type(value) except (ValueError, TypeError): if target_type == int or target_type == float: return 0 elif target_type == bool: return False elif target_type == str: return "" else: return value
[docs] def convert_df_to_types(self, df: pd.DataFrame) -> pd.DataFrame: """Converts DataFrame columns to the specified types. Args: df (pd.DataFrame): The DataFrame to convert. Returns: pd.DataFrame: The converted DataFrame. """ for col, type in zip(df.columns, self.types): df[col] = df[col].apply(lambda x: self.convert_with_default(x, type)) return df
[docs] def check_split_csv(self) -> None: """Checks if the CSV file is too large and splits it if necessary.""" max_size = 50000 file_size = 40000 if len(self.df) > max_size: first_rows: pd.DataFrame = self.df.head(file_size) date_str: str = time_utils.now_string_for_filename() new_filename: str = self.name + "_" + date_str + ".csv" directory = Path(settings.get("SYSTEM_DIRECTORY"), "old_events") new_path = Path(directory, new_filename) directory.mkdir(parents=True, exist_ok=True) first_rows.to_csv(new_path, index=False, sep=";") last: pd.DataFrame = self.df.tail(len(self.df) - file_size) last.to_csv(self.path, index=False, sep=";") self.df = last
[docs] def get_last_entry(self, column: str, value: str) -> Union[pd.Series, None]: """Gets the last entry matching a specific value in a column. Args: column (str): The column to search. value (str): The value to match. Returns: Union[pd.Series, None]: The last matching row, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] if not column_df.empty: return column_df.iloc[-1] return None
[docs] def get_last_entry_name(self, column: str, value: str) -> str | None: """Gets the 'name' field of the last entry matching a condition. Args: column (str): The column to search. value (str): The value to match. Returns: str | None: The name, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] name = None if not column_df.empty: row = column_df.iloc[-1] if row is not None: try: name = row["name"] except Exception: pass return name
[docs] def get_first_entry(self, column: str, value: str) -> Union[pd.Series, None]: """Gets the first entry matching a specific value in a column. Args: column (str): The column to search. value (str): The value to match. Returns: Union[pd.Series, None]: The first matching row, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] if not column_df.empty: return column_df.iloc[0] return None
[docs] def change_last_entry(self, column: str, value: Any) -> None: """Updates a value in the last entry of the DataFrame and saves. Args: column (str): The column to update. value (Any): The new value. """ self.df.loc[self.df.index[-1], column] = value self.save_from_df()
[docs] def log(self, date: str, type: str, subject: str, description: str) -> None: """Logs an event if the collection schema matches standard logging fields. Args: date (str): Date string. type (str): Event type. subject (str): Subject name. description (str): Description. """ if self.columns == ["date", "type", "subject", "description"]: entry = [date, type, subject, description] self.add_entry(entry)
[docs] def log_temp(self, date: str, temperature: float, humidity: float) -> None: """Logs temperature if the collection schema matches temperature fields. Args: date (str): Date string. temperature (float): Temperature value. humidity (float): Humidity value. """ if self.columns == ["date", "temperature", "humidity"]: entry = [date, temperature, humidity] self.add_entry(entry)
[docs] def get_last_water_df(self) -> pd.DataFrame: """Returns the latest water calibration entries for each port. Returns: pd.DataFrame: Filtered DataFrame with max calibration numbers per port. """ df = self.df[self.df["calibration_number"] != -1].copy() max_values = df.groupby(["port_number"])["calibration_number"].transform("max") df = df[df["calibration_number"] == max_values] return df
[docs] def get_last_sound_df(self) -> pd.DataFrame: """Returns the latest sound calibration entries for each speaker/sound. Returns: pd.DataFrame: Filtered DataFrame with max calibration numbers per sound. """ df = self.df[self.df["calibration_number"] != -1].copy() max_values = df.groupby(["speaker", "sound_name"])[ "calibration_number" ].transform("max") df = df[df["calibration_number"] == max_values] return df
[docs] def get_valve_time(self, port: int, volume: float) -> float: """Calculates valve open time for a given volume based on calibration. Args: port (int): The port number. volume (float): The target volume in ul. Returns: float: The time in seconds. Raises: ValueError: If calibration data is invalid or missing. """ try: calibration_df = self.df[self.df["port_number"] == port] max_calibration = calibration_df["calibration_number"].max() calibration_df = calibration_df[ calibration_df["calibration_number"] == max_calibration ] x = calibration_df["time(s)"].values y = calibration_df["water_delivered(ul)"].values val = get_x_value_interp(x, y, volume) if val is None: raise Exception else: return val except Exception: text = f""" \n\n\t--> WATER CALIBRATION PROBLEM !!!!!!\n It is not possible to provide a valid time value for a water delivery of {volume} ul for the port {port}.\n 1. Make sure you have calibrated the valves/pumps you are using.\n 2. Make sure the water you want to give is within calibration range.\n 3. Ultimately, check water_calibration.csv in 'data'.\n """ raise ValueError(text)
[docs] def get_sound_gain(self, speaker: int, dB: float, sound_name: str) -> float: """Calculates sound gain for a target dB based on calibration. Args: speaker (int): The speaker ID. dB (float): The target decibels. sound_name (str): The name of the sound. Returns: float: The gain factor. Raises: ValueError: If calibration data is invalid or missing. """ try: if dB == 0: return 0.0 calibration_df = self.df[self.df["speaker"] == speaker] calibration_df = calibration_df[calibration_df["sound_name"] == sound_name] max_calibration = calibration_df["calibration_number"].max() calibration_df = calibration_df[ calibration_df["calibration_number"] == max_calibration ] x = calibration_df["gain"].values y = calibration_df["dB_obtained"].values val = get_x_value_interp(x, y, dB) if val is None: raise Exception else: return val except Exception: text = f""" \n\n\t--> SOUND CALIBRATION PROBLEM !!!!!!\n It is not possible to provide a valid gain value for a target dB of {dB} for the speaker {speaker} and sound {sound_name}.\n 1. Make sure you have calibrated the sound you are using.\n 2. Make sure the dB you want to obtain is within calibration range.\n 3. Ultimately, check sound_calibration.csv in 'data'.\n """ raise ValueError(text)
[docs] def save_from_df( self, training: TrainingProtocolBase = TrainingProtocolBase() ) -> None: """Saves values from the current DataFrame to the CSV file, processing formatting. Args: training (TrainingProtocolBase): Protocol for formatting specific fields. """ new_df = self.df_from_df(self.df, training) new_df.to_csv(self.path, index=False, sep=";") self.df = new_df
[docs] def df_from_df( self, df: pd.DataFrame, training: TrainingProtocolBase ) -> pd.DataFrame: """Processes a DataFrame for saving (formatting dates, enums, etc). Args: df (pd.DataFrame): The input DataFrame. training (TrainingProtocolBase): The training protocol for custom formatting. Returns: pd.DataFrame: The processed DataFrame. """ new_df = self.convert_df_to_types(df) if "next_session_time" in new_df.columns: new_df["next_session_time"] = pd.to_datetime( new_df["next_session_time"], format="%Y-%m-%d %H:%M:%S", errors="coerce" ) new_df["next_session_time"] = new_df["next_session_time"].fillna( time_utils.now() ) for col in new_df.columns: if new_df[col].dtype == "datetime64[ns]": new_df[col] = new_df[col].dt.strftime("%Y-%m-%d %H:%M:%S") if "active" in new_df.columns: weekdays = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] def convert_active(value) -> str: value = value.strip() if value in ("ON", "On", "on"): return "ON" else: days = [day.strip() for day in value.split("-")] if all(day in weekdays for day in days): return "-".join(days) else: return "OFF" new_df["active"] = new_df["active"].apply(convert_active) if "next_settings" in new_df.columns: new_df["next_settings"] = new_df["next_settings"].apply( training.get_jsonstring_from_jsonstring ) return new_df