Source code for village.classes.collection

import os
import sys
import traceback
from pathlib import Path
from typing import Any, Type, Union

import numpy as np
import pandas as pd

from village.custom_classes.training_protocol_base import TrainingProtocolBase
from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.settings import settings

_WEEKDAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]


[docs] def convert_active(value: str) -> str: """Normalise an 'active' schedule value for storage. Accepts the new pipe-separated hour format, legacy hyphen-separated day format, ON/OFF variants, and empty strings. """ value = value.strip() if not value: return "ON" if value in ("ON", "On", "on"): return "ON" if value in ("OFF", "Off", "off"): return "OFF" if "|" in value: return value days = [day.strip() for day in value.split("-")] if all(day in _WEEKDAYS for day in days): return "-".join(days) return "OFF"
[docs] class Collection: """Manages a collection of data entries stored in a CSV file and a pandas DataFrame. Attributes: name (str): Name of the collection. columns (list[str]): List of column names. types (list[Type]): List of column data types. dict (dict): Dictionary mapping columns to types. path (Path): Path to the CSV file. df (pd.DataFrame): The pandas DataFrame holding the data. """
[docs] def __init__(self) -> None: """Initializes the Collection.""" pass
[docs] def create_data_collection( self, name: str, columns: list[str], types: list[Type] ) -> None: self.name: str = name self.columns: list[str] = columns self.types: list[Type] = types self.dict = {col: t for col, t in zip(self.columns, self.types)} filename = name if name.endswith(".csv") else name + ".csv" self.path: Path = Path(settings.get("SYSTEM_DIRECTORY")) / filename self.df = pd.DataFrame() if name != "": if not os.path.exists(self.path): with open(self.path, "w", encoding="utf-8") as file: columns_str: str = ";".join(self.columns) + "\n" file.write(columns_str) try: self.df = pd.read_csv(self.path, dtype=self.dict, sep=";") except Exception: log.error( "error reading from: " + str(self.path), exception=traceback.format_exc(), ) sys.exit()
[docs] def add_entry(self, entry: list) -> None: """Adds a new entry to the collection. Args: entry (list): The list of values for the new row. """ entry_str = [ "" if isinstance(e, float) and np.isnan(e) else str(e) for e in entry ] new_row = pd.DataFrame([entry_str], columns=self.columns) new_row = self.convert_df_to_types(new_row) self.df = pd.concat([self.df, new_row], ignore_index=True) columns_str: str = ";".join(entry_str) + "\n" with open(self.path, "a", encoding="utf-8") as file: file.write(columns_str) self.check_split_csv()
[docs] @staticmethod def convert_with_default(value, target_type: Any) -> Any: """Converts a value to a target type, using defaults for failures. Args: value: The value to convert. target_type (Any): The target type. Returns: Any: The converted value or a default. """ try: return target_type(value) except (ValueError, TypeError): if target_type is int or target_type is float: return 0 elif target_type is bool: return False elif target_type is str: return "" else: return value
[docs] def convert_df_to_types(self, df: pd.DataFrame) -> pd.DataFrame: """Converts DataFrame columns to the specified types. Args: df (pd.DataFrame): The DataFrame to convert. Returns: pd.DataFrame: The converted DataFrame. """ for col, type in zip(df.columns, self.types): df[col] = df[col].apply(lambda x: self.convert_with_default(x, type)) return df
[docs] def check_split_csv(self) -> None: """Checks if the CSV file is too large and splits it if necessary.""" max_size = 50000 file_size = 40000 if len(self.df) > max_size: first_rows: pd.DataFrame = self.df.head(file_size) date_str: str = time_utils.now_string_for_filename() new_filename: str = self.name + "_" + date_str + ".csv" directory = Path(settings.get("SYSTEM_DIRECTORY"), "old_events") new_path = Path(directory, new_filename) directory.mkdir(parents=True, exist_ok=True) first_rows.to_csv(new_path, index=False, sep=";") last: pd.DataFrame = self.df.tail(len(self.df) - file_size) last.to_csv(self.path, index=False, sep=";") self.df = last
[docs] def get_last_entry(self, column: str, value: str) -> Union[pd.Series, None]: """Gets the last entry matching a specific value in a column. Args: column (str): The column to search. value (str): The value to match. Returns: Union[pd.Series, None]: The last matching row, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] if not column_df.empty: return column_df.iloc[-1] return None
[docs] def get_last_entry_name(self, column: str, value: str) -> str | None: """Gets the 'name' field of the last entry matching a condition. Args: column (str): The column to search. value (str): The value to match. Returns: str | None: The name, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] name = None if not column_df.empty: row = column_df.iloc[-1] if row is not None: try: name = row["name"] except Exception: pass return name
[docs] def get_first_entry(self, column: str, value: str) -> Union[pd.Series, None]: """Gets the first entry matching a specific value in a column. Args: column (str): The column to search. value (str): The value to match. Returns: Union[pd.Series, None]: The first matching row, or None. """ column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value] if not column_df.empty: return column_df.iloc[0] return None
[docs] def change_last_entry(self, column: str, value: Any) -> None: """Updates a value in the last entry of the DataFrame and saves. Args: column (str): The column to update. value (Any): The new value. """ self.df.loc[self.df.index[-1], column] = value self.save_from_df()
[docs] def save_from_df( self, training: TrainingProtocolBase = TrainingProtocolBase() ) -> None: """Saves values from the current DataFrame to the CSV file, processing formatting. Args: training (TrainingProtocolBase): Protocol for formatting specific fields. """ new_df = self.df_from_df(self.df, training) new_df.to_csv(self.path, index=False, sep=";") self.df = new_df
[docs] def df_from_df( self, df: pd.DataFrame, training: TrainingProtocolBase ) -> pd.DataFrame: """Processes a DataFrame for saving (formatting dates, enums, etc). Args: df (pd.DataFrame): The input DataFrame. training (TrainingProtocolBase): The training protocol for custom formatting. Returns: pd.DataFrame: The processed DataFrame. """ new_df = self.convert_df_to_types(df) if "next_session_time" in new_df.columns: new_df["next_session_time"] = pd.to_datetime( new_df["next_session_time"], format="mixed", errors="coerce" ) new_df["next_session_time"] = new_df["next_session_time"].fillna( time_utils.now().replace(microsecond=0) ) for col in new_df.columns: if pd.api.types.is_datetime64_any_dtype(new_df[col]): new_df[col] = new_df[col].dt.strftime("%Y-%m-%d %H:%M:%S") if "active" in new_df.columns: new_df["active"] = new_df["active"].apply(convert_active) if "next_settings" in new_df.columns: new_df["next_settings"] = new_df["next_settings"].apply( training.get_jsonstring_from_jsonstring ) return new_df