Source code for village.devices.rfid
import threading
from collections import deque
from datetime import datetime
from typing import Deque
import serial
from village.classes.enums import Active
from village.manager import manager
from village.scripts.time_utils import time_utils
from village.settings import settings
[docs]
class Rfid:
"""Handles RFID reader communication via serial port.
Attributes:
port (str): Serial port path.
baudrate (int): Serial baudrate.
multiple (bool): Whether multiple tags where detected recently-ish.
time_detections (float): Time window in seconds to check for duplicate IDs.
id (str): The most recently read RFID tag ID.
id_history (Deque[tuple[str, datetime]]): History of read IDs and timestamps.
reading (bool): Flag to control reading loop (not used in loop, running is used).
s (serial.Serial): The serial connection.
thread (threading.Thread): Background thread for reading serial data.
running (bool): Flag to control the background thread.
"""
[docs]
def __init__(self, port="/dev/ttyAMA0", baudrate=9600) -> None:
"""Initializes the RFID reader.
Args:
port (str): Serial port. Defaults to "/dev/ttyAMA0".
baudrate (int): Baudrate. Defaults to 9600.
"""
self.port = port
self.baudrate = baudrate
self.multiple = False
self.time_detections = settings.get("TIME_BETWEEN_DETECTIONS")
self.id = ""
self.id_history: Deque[tuple[str, datetime]] = deque()
self.reading = True
self.s = serial.Serial(self.port, self.baudrate, timeout=0.1)
self.thread = threading.Thread(target=self.read_serial, daemon=True)
self.running = True
self.thread.start()
[docs]
def read_serial(self) -> None:
"""Reads data from the serial port in a background loop."""
while self.running:
try:
line = self.s.readline().decode("utf-8").strip()
line = "".join(char for char in line if char.isprintable())
if len(line) < 8:
continue
self.id = line[-10:]
self.id_history.append((self.id, time_utils.now()))
self.clean_old_ids()
self.update_multiple()
except UnicodeDecodeError:
pass
[docs]
def clean_old_ids(self) -> None:
"""Removes IDs from history that are older than `time_detections`."""
current_time = time_utils.now()
while self.id_history:
diff = current_time - self.id_history[0][1]
if diff.total_seconds() > self.time_detections:
self.id_history.popleft()
else:
break
[docs]
def update_multiple(self) -> None:
"""Updates the `multiple` flag if more than one unique ID is in history."""
unique_ids = set(id for id, _ in self.id_history)
self.multiple = len(unique_ids) > 1
[docs]
def stop(self) -> None:
"""Stops the serial reading thread."""
self.running = False
self.s.close()
self.thread.join()
[docs]
def get_id(self) -> tuple[str, bool]:
"""Retrieves the current RFID tag ID and multiple status.
Returns:
tuple[str, bool]: A tuple containing the ID (str) and whether multiple tags were detected (bool).
Returns ("", False) if reading is disabled.
"""
if manager.rfid_reader == Active.ON:
value = (self.id, self.multiple)
self.id = ""
self.multiple = False
return value
else:
self.id = ""
self.multiple = False
return ("", False)
rfid = Rfid()