Source code for village.devices.sound_device
import os
import queue
import threading
import traceback
from typing import Any
import numpy as np
import sounddevice as sd
from scipy.io import wavfile
from village.classes.abstract_classes import SoundDeviceBase
from village.classes.enums import Active
from village.scripts.error_queue import error_queue
from village.scripts.log import log
from village.settings import settings
[docs]
def get_sound_devices() -> list[str]:
"""Retrieves a list of available sound device names.
Returns:
list[str]: A list of device names.
"""
devices = sd.query_devices()
devices_str = [d["name"] for d in devices]
return devices_str
[docs]
class SoundDevice(SoundDeviceBase):
"""Handles audio playback using sounddevice.
Attributes:
samplerate (int): Audio sample rate.
channels (int): Number of audio channels.
latency (str): Latency setting for sounddevice.
index (int): Index of the used sound device.
error (str): Error message.
stream (sd.OutputStream): The active audio stream (internal).
sound (np.ndarray): The current sound buffer.
command_queue (queue.Queue): Queue for processing audio commands in a thread.
thread (threading.Thread): Background thread for audio processing.
thread_running (bool): Flag to control the background thread.
"""
[docs]
def __init__(self) -> None:
"""Initializes the SoundDevice with settings."""
self.samplerate = int(settings.get("SAMPLERATE"))
self.channels = 2
self.latency = "high"
devices = get_sound_devices()
device = settings.get("SOUND_DEVICE")
self.index = devices.index(device) if device in devices else 0
self.error = ""
sd.default.device = device
sd.default.samplerate = self.samplerate
sd.default.channels = self.channels
sd.default.latency = self.latency
self.stream = None
self.sound: np.ndarray[np.float32] = np.empty(0, dtype=np.float32)
self.command_queue: queue.Queue[tuple] = queue.Queue()
self.thread = threading.Thread(target=self._audio_worker, daemon=True)
self.thread_running = True
self.thread.start()
[docs]
def load(self, left: Any, right: Any) -> None:
"""Loads sound data into the playback queue.
Args:
left (Any): Left channel data (array-like).
right (Any): Right channel data (array-like).
Raises:
ValueError: If inputs are invalid or lengths differ.
"""
if left is None and right is not None:
left = np.zeros(len(right))
elif right is None and left is not None:
right = np.zeros(len(left))
elif left is None and right is None:
raise ValueError("Sound error: Both vectors left and right are None.")
if len(left) != len(right):
raise ValueError(
"Sound error: Left and right vectors must have same length."
)
new_sound = self.create_sound_vec(left, right)
self.command_queue.put(("load", new_sound))
[docs]
def load_wav(self, file: str) -> None:
"""Loads a WAV file into the playback queue.
Args:
file (str): Filename of the WAV file in the media directory.
Raises:
FileNotFoundError: If the file does not exist.
ValueError: If sample rate mismatches or channel count is unsupported.
"""
media_directory = settings.get("MEDIA_DIRECTORY")
path = os.path.join(media_directory, file)
if not os.path.exists(path):
raise FileNotFoundError(f"File '{path}' does not exist.")
samplerate, data = wavfile.read(path)
if samplerate != self.samplerate:
raise ValueError(
f"Expected samplerate {self.samplerate}, but got {samplerate}."
)
# Normalize to float32 in range [-1.0, 1.0] if needed
if data.dtype != np.float32:
if np.issubdtype(data.dtype, np.integer):
max_val = np.iinfo(data.dtype).max
data = data.astype(np.float32) / max_val
else:
data = data.astype(np.float32)
if data.ndim == 1:
left = right = data
elif data.shape[1] == 2:
left, right = data[:, 0], data[:, 1]
else:
raise ValueError("Unsupported number of channels in WAV file.")
self.load(left, right)
[docs]
def play(self) -> None:
"""Triggers playback of the loaded sound."""
self.command_queue.put(("play", None))
[docs]
def stop(self) -> None:
"""Stops playback."""
self.command_queue.put(("stop", None))
def _audio_worker(self) -> None:
"""Worker function for the audio thread to handle stream operations."""
current_sound = np.empty(0, dtype=np.float32)
stream = sd.OutputStream(dtype="float32")
try:
while self.thread_running:
try:
command, data = self.command_queue.get(timeout=1.0)
if command == "load":
try:
stream.close()
except Exception:
pass
current_sound = data
stream = sd.OutputStream(dtype="float32")
stream.start()
elif command == "play":
if current_sound.size != 0:
stream.write(current_sound)
elif command == "stop":
try:
stream.stop()
except Exception:
pass
elif command == "shutdown":
break
except queue.Empty:
continue
except Exception:
try:
print("exception")
error_queue.put_nowait(("sound", traceback.format_exc()))
except queue.Full:
print("error queue full, cannot log audio error")
pass
finally:
if stream is not None:
stream.close()
[docs]
def shutdown(self) -> None:
"""Shuts down the audio thread and stream."""
if self.thread_running:
self.thread_running = False
self.command_queue.put(("shutdown", None))
self.thread.join(timeout=2.0)
def __del__(self) -> None:
"""Destructor to ensure shutdown."""
self.shutdown()
[docs]
@staticmethod
def create_sound_vec(left: np.ndarray, right: np.ndarray) -> np.ndarray[np.float32]:
"""Interleaves left and right channels into a stereo array.
Args:
left (np.ndarray): Left channel data.
right (np.ndarray): Right channel data.
Returns:
np.ndarray[np.float32]: Interleaved stereo data.
"""
sound = np.array([left, right])
return np.ascontiguousarray(sound.T, dtype=np.float32)
[docs]
def get_sound_device() -> SoundDeviceBase:
"""Factory function to initialize the SoundDevice.
Returns:
SoundDeviceBase: An initialized SoundDevice or base class if disabled/failed.
"""
if settings.get("USE_SOUNDCARD") == Active.OFF:
return SoundDeviceBase()
else:
try:
sound_device = SoundDevice()
log.info("Sound device successfully initialized")
return sound_device
except Exception:
log.error(
"Could not initialize sound device", exception=traceback.format_exc()
)
return SoundDeviceBase()
sound_device = get_sound_device()