import os
import sys
import traceback
from pathlib import Path
from typing import Any, Type, Union
import numpy as np
import pandas as pd
from village.custom_classes.training_protocol_base import TrainingProtocolBase
from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.settings import settings
_WEEKDAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
[docs]
def convert_active(value: str) -> str:
"""Normalise an 'active' schedule value for storage.
Accepts the new pipe-separated hour format, legacy hyphen-separated day
format, ON/OFF variants, and empty strings.
"""
value = value.strip()
if not value:
return "ON"
if value in ("ON", "On", "on"):
return "ON"
if value in ("OFF", "Off", "off"):
return "OFF"
if "|" in value:
return value
days = [day.strip() for day in value.split("-")]
if all(day in _WEEKDAYS for day in days):
return "-".join(days)
return "OFF"
[docs]
class Collection:
"""Manages a collection of data entries stored in a CSV file and a pandas DataFrame.
Attributes:
name (str): Name of the collection.
columns (list[str]): List of column names.
types (list[Type]): List of column data types.
dict (dict): Dictionary mapping columns to types.
path (Path): Path to the CSV file.
df (pd.DataFrame): The pandas DataFrame holding the data.
"""
[docs]
def __init__(self) -> None:
"""Initializes the Collection."""
pass
[docs]
def create_data_collection(
self, name: str, columns: list[str], types: list[Type]
) -> None:
self.name: str = name
self.columns: list[str] = columns
self.types: list[Type] = types
self.dict = {col: t for col, t in zip(self.columns, self.types)}
filename = name if name.endswith(".csv") else name + ".csv"
self.path: Path = Path(settings.get("SYSTEM_DIRECTORY")) / filename
self.df = pd.DataFrame()
if name != "":
if not os.path.exists(self.path):
with open(self.path, "w", encoding="utf-8") as file:
columns_str: str = ";".join(self.columns) + "\n"
file.write(columns_str)
try:
self.df = pd.read_csv(self.path, dtype=self.dict, sep=";")
except Exception:
log.error(
"error reading from: " + str(self.path),
exception=traceback.format_exc(),
)
sys.exit()
[docs]
def add_entry(self, entry: list) -> None:
"""Adds a new entry to the collection.
Args:
entry (list): The list of values for the new row.
"""
entry_str = [
"" if isinstance(e, float) and np.isnan(e) else str(e) for e in entry
]
new_row = pd.DataFrame([entry_str], columns=self.columns)
new_row = self.convert_df_to_types(new_row)
self.df = pd.concat([self.df, new_row], ignore_index=True)
columns_str: str = ";".join(entry_str) + "\n"
with open(self.path, "a", encoding="utf-8") as file:
file.write(columns_str)
self.check_split_csv()
[docs]
@staticmethod
def convert_with_default(value, target_type: Any) -> Any:
"""Converts a value to a target type, using defaults for failures.
Args:
value: The value to convert.
target_type (Any): The target type.
Returns:
Any: The converted value or a default.
"""
try:
return target_type(value)
except (ValueError, TypeError):
if target_type is int or target_type is float:
return 0
elif target_type is bool:
return False
elif target_type is str:
return ""
else:
return value
[docs]
def convert_df_to_types(self, df: pd.DataFrame) -> pd.DataFrame:
"""Converts DataFrame columns to the specified types.
Args:
df (pd.DataFrame): The DataFrame to convert.
Returns:
pd.DataFrame: The converted DataFrame.
"""
for col, type in zip(df.columns, self.types):
df[col] = df[col].apply(lambda x: self.convert_with_default(x, type))
return df
[docs]
def check_split_csv(self) -> None:
"""Checks if the CSV file is too large and splits it if necessary."""
max_size = 50000
file_size = 40000
if len(self.df) > max_size:
first_rows: pd.DataFrame = self.df.head(file_size)
date_str: str = time_utils.now_string_for_filename()
new_filename: str = self.name + "_" + date_str + ".csv"
directory = Path(settings.get("SYSTEM_DIRECTORY"), "old_events")
new_path = Path(directory, new_filename)
directory.mkdir(parents=True, exist_ok=True)
first_rows.to_csv(new_path, index=False, sep=";")
last: pd.DataFrame = self.df.tail(len(self.df) - file_size)
last.to_csv(self.path, index=False, sep=";")
self.df = last
[docs]
def get_last_entry(self, column: str, value: str) -> Union[pd.Series, None]:
"""Gets the last entry matching a specific value in a column.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
Union[pd.Series, None]: The last matching row, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
if not column_df.empty:
return column_df.iloc[-1]
return None
[docs]
def get_last_entry_name(self, column: str, value: str) -> str | None:
"""Gets the 'name' field of the last entry matching a condition.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
str | None: The name, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
name = None
if not column_df.empty:
row = column_df.iloc[-1]
if row is not None:
try:
name = row["name"]
except Exception:
pass
return name
[docs]
def get_first_entry(self, column: str, value: str) -> Union[pd.Series, None]:
"""Gets the first entry matching a specific value in a column.
Args:
column (str): The column to search.
value (str): The value to match.
Returns:
Union[pd.Series, None]: The first matching row, or None.
"""
column_df: pd.DataFrame = self.df[self.df[column].astype(str) == value]
if not column_df.empty:
return column_df.iloc[0]
return None
[docs]
def change_last_entry(self, column: str, value: Any) -> None:
"""Updates a value in the last entry of the DataFrame and saves.
Args:
column (str): The column to update.
value (Any): The new value.
"""
self.df.loc[self.df.index[-1], column] = value
self.save_from_df()
[docs]
def save_from_df(
self, training: TrainingProtocolBase = TrainingProtocolBase()
) -> None:
"""Saves values from the current DataFrame to the CSV file,
processing formatting.
Args:
training (TrainingProtocolBase): Protocol for formatting specific fields.
"""
new_df = self.df_from_df(self.df, training)
new_df.to_csv(self.path, index=False, sep=";")
self.df = new_df
[docs]
def df_from_df(
self, df: pd.DataFrame, training: TrainingProtocolBase
) -> pd.DataFrame:
"""Processes a DataFrame for saving (formatting dates, enums, etc).
Args:
df (pd.DataFrame): The input DataFrame.
training (TrainingProtocolBase): The training protocol for
custom formatting.
Returns:
pd.DataFrame: The processed DataFrame.
"""
new_df = self.convert_df_to_types(df)
if "next_session_time" in new_df.columns:
new_df["next_session_time"] = pd.to_datetime(
new_df["next_session_time"], format="mixed", errors="coerce"
)
new_df["next_session_time"] = new_df["next_session_time"].fillna(
time_utils.now().replace(microsecond=0)
)
for col in new_df.columns:
if pd.api.types.is_datetime64_any_dtype(new_df[col]):
new_df[col] = new_df[col].dt.strftime("%Y-%m-%d %H:%M:%S")
if "active" in new_df.columns:
new_df["active"] = new_df["active"].apply(convert_active)
if "next_settings" in new_df.columns:
new_df["next_settings"] = new_df["next_settings"].apply(
training.get_jsonstring_from_jsonstring
)
return new_df