import os
import sys
import traceback
from pathlib import Path
from typing import Any, Type, Union
import numpy as np
import pandas as pd
from village.classes.abstract_classes import EventBase
from village.custom_classes.training_protocol_base import TrainingProtocolBase
from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.scripts.utils import get_x_value_interp
from village.settings import settings
[docs]
class Collection(EventBase):
"""Manages a collection of data entries stored in a CSV file and a pandas DataFrame.
Attributes:
name (str): Name of the collection.
columns (list[str]): List of column names.
types (list[Type]): List of column data types.
dict (dict): Dictionary mapping columns to types.
path (Path): Path to the CSV file.
df (pd.DataFrame): The pandas DataFrame holding the data.
"""
[docs]
def __init__(self, name: str, columns: list[str], types: list[Type]) -> None:
"""Initializes the Collection.
Args:
name (str): The name of the collection (and the file base name).
columns (list[str]): The column names.
types (list[Type]): The data types for each column.
"""
self.name: str = name
self.columns: list[str] = columns
self.types: list[Type] = types
self.dict = {col: t for col, t in zip(self.columns, self.types)}
self.path: Path = Path(settings.get("SYSTEM_DIRECTORY")) / (name + ".csv")
self.df = pd.DataFrame()
if name != "":
if not os.path.exists(self.path):
with open(self.path, "w", encoding="utf-8") as file:
columns_str: str = ";".join(self.columns) + "\n"
file.write(columns_str)
try:
self.df = pd.read_csv(self.path, dtype=self.dict, sep=";")
except Exception:
log.error(
"error reading from: " + str(self.path),
exception=traceback.format_exc(),
)
sys.exit()
[docs]
def add_entry(self, entry: list) -> None:
"""Adds a new entry to the collection.
Args:
entry (list): The list of values for the new row.
"""
entry_str = [
"" if isinstance(e, float) and np.isnan(e) else str(e) for e in entry
]
new_row = pd.DataFrame([entry_str], columns=self.columns)
new_row = self.convert_df_to_types(new_row)
self.df = pd.concat([self.df, new_row], ignore_index=True)
columns_str: str = ";".join(entry_str) + "\n"
with open(self.path, "a", encoding="utf-8") as file:
file.write(columns_str)
self.check_split_csv()
[docs]
@staticmethod
def convert_with_default(value, target_type: Any) -> Any:
"""Converts a value to a target type, using defaults for failures.
Args:
value: The value to convert.
target_type (Any): The target type.
Returns:
Any: The converted value or a default.
"""
try:
return target_type(value)
except (ValueError, TypeError):
if target_type == int or target_type == float:
return 0
elif target_type == bool:
return False
elif target_type == str:
return ""
else:
return value
[docs]
def convert_df_to_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Converts DataFrame columns to the specified types.
Args:
df (pd.DataFrame): The DataFrame to convert.
Returns:
pd.DataFrame: The converted DataFrame.
"""
for col, type in zip(df.columns, self.types):
df[col] = df[col].apply(lambda x: self.convert_with_default(x, type))
return df
[docs]
def check_split_csv(self) -> None:
"""Checks if the CSV file is too large and splits it if necessary."""
max_size = 50000
file_size = 40000
if len(self.df) > max_size:
first_rows: pd.DataFrame = self.df.head(file_size)
date_str: str = time_utils.now_string_for_filename()
new_filename: str = self.name + "_" + date_str + ".csv"
directory = Path(settings.get("SYSTEM_DIRECTORY"), "old_events")
new_path = Path(directory, new_filename)
directory.mkdir(parents=True, exist_ok=True)
first_rows.to_csv(new_path, index=False, sep=";")
last: pd.DataFrame = self.df.tail(len(self.df) - file_size)
last.to_csv(self.path, index=False, sep=";")
self.df = last
[docs]
def get_last_entry(self, column: str, value: str) -> Union[pd.Series, None]:
"""Gets the last entry matching a specific value in a column.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
Union[pd.Series, None]: The last matching row, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
if not column_df.empty:
return column_df.iloc[-1]
return None
[docs]
def get_last_entry_name(self, column: str, value: str) -> str | None:
"""Gets the 'name' field of the last entry matching a condition.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
str | None: The name, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
name = None
if not column_df.empty:
row = column_df.iloc[-1]
if row is not None:
try:
name = row["name"]
except Exception:
pass
return name
[docs]
def get_first_entry(self, column: str, value: str) -> Union[pd.Series, None]:
"""Gets the first entry matching a specific value in a column.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
Union[pd.Series, None]: The first matching row, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
if not column_df.empty:
return column_df.iloc[0]
return None
[docs]
def change_last_entry(self, column: str, value: Any) -> None:
"""Updates a value in the last entry of the DataFrame and saves.
Args:
column (str): The column to update.
value (Any): The new value.
"""
self.df.loc[self.df.index[-1], column] = value
self.save_from_df()
[docs]
def log(self, date: str, type: str, subject: str, description: str) -> None:
"""Logs an event if the collection schema matches standard logging fields.
Args:
date (str): Date string.
type (str): Event type.
subject (str): Subject name.
description (str): Description.
"""
if self.columns == ["date", "type", "subject", "description"]:
entry = [date, type, subject, description]
self.add_entry(entry)
[docs]
def log_temp(self, date: str, temperature: float, humidity: float) -> None:
"""Logs temperature if the collection schema matches temperature fields.
Args:
date (str): Date string.
temperature (float): Temperature value.
humidity (float): Humidity value.
"""
if self.columns == ["date", "temperature", "humidity"]:
entry = [date, temperature, humidity]
self.add_entry(entry)
[docs]
def get_last_water_df(self) -> pd.DataFrame:
"""Returns the latest water calibration entries for each port.
Returns:
pd.DataFrame: Filtered DataFrame with max calibration numbers per port.
"""
df = self.df[self.df["calibration_number"] != -1].copy()
max_values = df.groupby(["port_number"])["calibration_number"].transform("max")
df = df[df["calibration_number"] == max_values]
return df
[docs]
def get_last_sound_df(self) -> pd.DataFrame:
"""Returns the latest sound calibration entries for each speaker/sound.
Returns:
pd.DataFrame: Filtered DataFrame with max calibration numbers per sound.
"""
df = self.df[self.df["calibration_number"] != -1].copy()
max_values = df.groupby(["speaker", "sound_name"])[
"calibration_number"
].transform("max")
df = df[df["calibration_number"] == max_values]
return df
[docs]
def get_valve_time(self, port: int, volume: float) -> float:
"""Calculates valve open time for a given volume based on calibration.
Args:
port (int): The port number.
volume (float): The target volume in ul.
Returns:
float: The time in seconds.
Raises:
ValueError: If calibration data is invalid or missing.
"""
try:
calibration_df = self.df[self.df["port_number"] == port]
max_calibration = calibration_df["calibration_number"].max()
calibration_df = calibration_df[
calibration_df["calibration_number"] == max_calibration
]
x = calibration_df["time(s)"].values
y = calibration_df["water_delivered(ul)"].values
val = get_x_value_interp(x, y, volume)
if val is None:
raise Exception
else:
return val
except Exception:
text = f"""
\n\n\t--> WATER CALIBRATION PROBLEM !!!!!!\n
It is not possible to provide a valid time value
for a water delivery of {volume} ul for the port {port}.\n
1. Make sure you have calibrated the valves/pumps you are using.\n
2. Make sure the water you want to give is within calibration range.\n
3. Ultimately, check water_calibration.csv in 'data'.\n
"""
raise ValueError(text)
[docs]
def get_sound_gain(self, speaker: int, dB: float, sound_name: str) -> float:
"""Calculates sound gain for a target dB based on calibration.
Args:
speaker (int): The speaker ID.
dB (float): The target decibels.
sound_name (str): The name of the sound.
Returns:
float: The gain factor.
Raises:
ValueError: If calibration data is invalid or missing.
"""
try:
if dB == 0:
return 0.0
calibration_df = self.df[self.df["speaker"] == speaker]
calibration_df = calibration_df[calibration_df["sound_name"] == sound_name]
max_calibration = calibration_df["calibration_number"].max()
calibration_df = calibration_df[
calibration_df["calibration_number"] == max_calibration
]
x = calibration_df["gain"].values
y = calibration_df["dB_obtained"].values
val = get_x_value_interp(x, y, dB)
if val is None:
raise Exception
else:
return val
except Exception:
text = f"""
\n\n\t--> SOUND CALIBRATION PROBLEM !!!!!!\n
It is not possible to provide a valid gain value
for a target dB of {dB} for the speaker {speaker} and sound {sound_name}.\n
1. Make sure you have calibrated the sound you are using.\n
2. Make sure the dB you want to obtain is within calibration range.\n
3. Ultimately, check sound_calibration.csv in 'data'.\n
"""
raise ValueError(text)
[docs]
def save_from_df(
self, training: TrainingProtocolBase = TrainingProtocolBase()
) -> None:
"""Saves values from the current DataFrame to the CSV file, processing formatting.
Args:
training (TrainingProtocolBase): Protocol for formatting specific fields.
"""
new_df = self.df_from_df(self.df, training)
new_df.to_csv(self.path, index=False, sep=";")
self.df = new_df
[docs]
def df_from_df(
self, df: pd.DataFrame, training: TrainingProtocolBase
) -> pd.DataFrame:
"""Processes a DataFrame for saving (formatting dates, enums, etc).
Args:
df (pd.DataFrame): The input DataFrame.
training (TrainingProtocolBase): The training protocol for custom formatting.
Returns:
pd.DataFrame: The processed DataFrame.
"""
new_df = self.convert_df_to_types(df)
if "next_session_time" in new_df.columns:
new_df["next_session_time"] = pd.to_datetime(
new_df["next_session_time"], format="%Y-%m-%d %H:%M:%S", errors="coerce"
)
new_df["next_session_time"] = new_df["next_session_time"].fillna(
time_utils.now()
)
for col in new_df.columns:
if new_df[col].dtype == "datetime64[ns]":
new_df[col] = new_df[col].dt.strftime("%Y-%m-%d %H:%M:%S")
if "active" in new_df.columns:
weekdays = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
def convert_active(value) -> str:
value = value.strip()
if value in ("ON", "On", "on"):
return "ON"
else:
days = [day.strip() for day in value.split("-")]
if all(day in weekdays for day in days):
return "-".join(days)
else:
return "OFF"
new_df["active"] = new_df["active"].apply(convert_active)
if "next_settings" in new_df.columns:
new_df["next_settings"] = new_df["next_settings"].apply(
training.get_jsonstring_from_jsonstring
)
return new_df