Source code for village.scripts.rsync_to_hard_drive

import logging
import os
import signal
import subprocess
import threading
import time

import fire

from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.scripts.utils import setup_logging


[docs] def run_rsync_local(source, destination, maximum_sync_time, cancel_event=None) -> bool: """ Run rsync to sync to a local destination (e.g., external HDD). Args: source (str): Local path to sync. destination (str): Local destination path (e.g., /media/pi/mydisk/backup/). maximum_sync_time (int): Maximum sync time in seconds. cancel_event (threading.Event | None): Event to signal cancellation. Returns: bool: True if sync succeeded, False otherwise. """ if cancel_event is None: cancel_event = threading.Event() # Ensure source path ends with / source = os.path.join(source, "") # Ensure destination directory exists try: os.makedirs(os.path.dirname(destination), exist_ok=True) except Exception as e: logging.error(f"Failed to create destination directory: {e}") return False # Build rsync command for local copy rsync_cmd = [ "rsync", "-avP", "--update", "--safe-links", "--timeout=30", "--exclude", "*.tmp", "--exclude", "CORRIDOR*", "--exclude", ".git/", "--exclude", "rsync_logs/", # exclude logs "--exclude", "data_removal_logs/", # exclude logs source, destination, ] logging.info(f"Starting local rsync with command: {' '.join(rsync_cmd)}") try: # Run rsync with timeout handling process = subprocess.Popen( rsync_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, preexec_fn=os.setsid, ) start_time = time_utils.get_time_monotonic() last_progress_time = start_time process_running = True def _terminate(reason: str): nonlocal process_running logging.error(reason) try: os.killpg(os.getpgid(process.pid), signal.SIGTERM) except Exception: process.terminate() try: if process.stdout: process.stdout.close() except Exception: pass try: if process.stderr: process.stderr.close() except Exception: pass process_running = False def check_maximum_sync_time(): nonlocal process_running fired = cancel_event.wait(timeout=maximum_sync_time) if process_running and process.poll() is None: if fired: text = "External cancel requested. " else: text = f"Maximum sync time reached ({maximum_sync_time}s). " _terminate(text + "Terminating rsync process.") sync_time_thread = threading.Thread(target=check_maximum_sync_time, daemon=True) sync_time_thread.start() while process_running: if process.stdout: try: output = process.stdout.readline() if output == "" and process.poll() is not None: break if output: logging.info(output.strip()) last_progress_time = time_utils.get_time_monotonic() except Exception: pass if process.poll() is not None: logging.info("rsync completed.") process_running = False break if time_utils.get_time_monotonic() - last_progress_time > 600: logging.warning("rsync seems stuck! Terminating...") try: os.killpg(os.getpgid(process.pid), signal.SIGTERM) except Exception: process.terminate() time.sleep(2) if process.poll() is None: try: os.killpg(os.getpgid(process.pid), signal.SIGKILL) except Exception: process.kill() process_running = False break time.sleep(0.1) if process.stderr: try: stderr = process.stderr.read() if stderr: logging.error(stderr) except Exception: pass if process.poll() is None: try: process.terminate() process.wait(timeout=5) except Exception: try: os.killpg(os.getpgid(process.pid), signal.SIGKILL) except Exception: process.kill() if process.returncode != 0 and process.returncode is not None: logging.error(f"Rsync failed with return code: {process.returncode}") return False if process.returncode == 0: logging.info("Rsync completed successfully") return True else: return False except Exception as e: logging.error(f"An error occurred: {str(e)}") try: if "process" in locals() and process.poll() is None: os.killpg(os.getpgid(process.pid), signal.SIGKILL) except Exception: pass return False finally: try: if "process" in locals(): if process.stdout: process.stdout.close() if process.stderr: process.stderr.close() except Exception: pass
[docs] def main( source: str, destination: str, maximum_sync_time: int = 1800, cancel_event: threading.Event | None = None, ) -> None: """Main function to sync data to local disk using rsync. Args: source (str): Source directory path. destination (str): Destination path on remote system. maximum_sync_time (int): Maximum sync time duration in seconds. Defaults to 1800. cancel_event (threading.Event | None): Event to signal cancellation. Defaults to None. Parameters: - source: Source directory path - destination: Destination path (on remote system) - maximum_sync_time: Maximum sync time duration in seconds (default: 1200) - cancel_event: threading.Event to signal cancellation (optional) """ log_file, file_handler = setup_logging(logs_subdirectory="rsync_logs") logging.info(f"Logging to file: {log_file}") logging.info(f"Starting local sync from {source} to {destination}") success = run_rsync_local(source, destination, maximum_sync_time, cancel_event) if success: logging.info(f"Sync completed successfully. Log file: {log_file}") log.info("Sync completed successfully") else: logging.error(f"Sync failed. Check log file for details: {log_file}") log.error(f"Sync failed. Check log file for details: {log_file}") logging.getLogger().removeHandler(file_handler) file_handler.close() logging.shutdown()
if __name__ == "__main__": fire.Fire(main)