Source code for village.scripts.rsync_to_hard_drive
import logging
import os
import signal
import subprocess
import threading
import time
import fire
from village.scripts.log import log
from village.scripts.time_utils import time_utils
from village.scripts.utils import setup_logging
[docs]
def run_rsync_local(source, destination, maximum_sync_time, cancel_event=None) -> bool:
"""
Run rsync to sync to a local destination (e.g., external HDD).
Args:
source (str): Local path to sync.
destination (str): Local destination path (e.g., /media/pi/mydisk/backup/).
maximum_sync_time (int): Maximum sync time in seconds.
cancel_event (threading.Event | None): Event to signal cancellation.
Returns:
bool: True if sync succeeded, False otherwise.
"""
if cancel_event is None:
cancel_event = threading.Event()
# Ensure source path ends with /
source = os.path.join(source, "")
# Ensure destination directory exists
try:
os.makedirs(os.path.dirname(destination), exist_ok=True)
except Exception as e:
logging.error(f"Failed to create destination directory: {e}")
return False
# Build rsync command for local copy
rsync_cmd = [
"rsync",
"-avP",
"--update",
"--safe-links",
"--timeout=30",
"--exclude",
"*.tmp",
"--exclude",
"CORRIDOR*",
"--exclude",
".git/",
"--exclude",
"rsync_logs/", # exclude logs
"--exclude",
"data_removal_logs/", # exclude logs
source,
destination,
]
logging.info(f"Starting local rsync with command: {' '.join(rsync_cmd)}")
try:
# Run rsync with timeout handling
process = subprocess.Popen(
rsync_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
preexec_fn=os.setsid,
)
start_time = time_utils.get_time_monotonic()
last_progress_time = start_time
process_running = True
def _terminate(reason: str):
nonlocal process_running
logging.error(reason)
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except Exception:
process.terminate()
try:
if process.stdout:
process.stdout.close()
except Exception:
pass
try:
if process.stderr:
process.stderr.close()
except Exception:
pass
process_running = False
def check_maximum_sync_time():
nonlocal process_running
fired = cancel_event.wait(timeout=maximum_sync_time)
if process_running and process.poll() is None:
if fired:
text = "External cancel requested. "
else:
text = f"Maximum sync time reached ({maximum_sync_time}s). "
_terminate(text + "Terminating rsync process.")
sync_time_thread = threading.Thread(target=check_maximum_sync_time, daemon=True)
sync_time_thread.start()
while process_running:
if process.stdout:
try:
output = process.stdout.readline()
if output == "" and process.poll() is not None:
break
if output:
logging.info(output.strip())
last_progress_time = time_utils.get_time_monotonic()
except Exception:
pass
if process.poll() is not None:
logging.info("rsync completed.")
process_running = False
break
if time_utils.get_time_monotonic() - last_progress_time > 600:
logging.warning("rsync seems stuck! Terminating...")
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except Exception:
process.terminate()
time.sleep(2)
if process.poll() is None:
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except Exception:
process.kill()
process_running = False
break
time.sleep(0.1)
if process.stderr:
try:
stderr = process.stderr.read()
if stderr:
logging.error(stderr)
except Exception:
pass
if process.poll() is None:
try:
process.terminate()
process.wait(timeout=5)
except Exception:
try:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except Exception:
process.kill()
if process.returncode != 0 and process.returncode is not None:
logging.error(f"Rsync failed with return code: {process.returncode}")
return False
if process.returncode == 0:
logging.info("Rsync completed successfully")
return True
else:
return False
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
try:
if "process" in locals() and process.poll() is None:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
except Exception:
pass
return False
finally:
try:
if "process" in locals():
if process.stdout:
process.stdout.close()
if process.stderr:
process.stderr.close()
except Exception:
pass
[docs]
def main(
source: str,
destination: str,
maximum_sync_time: int = 1800,
cancel_event: threading.Event | None = None,
) -> None:
"""Main function to sync data to local disk using rsync.
Args:
source (str): Source directory path.
destination (str): Destination path on remote system.
maximum_sync_time (int): Maximum sync time duration in seconds.
Defaults to 1800.
cancel_event (threading.Event | None): Event to signal cancellation.
Defaults to None.
Parameters:
- source: Source directory path
- destination: Destination path (on remote system)
- maximum_sync_time: Maximum sync time duration in seconds (default: 1200)
- cancel_event: threading.Event to signal cancellation (optional)
"""
log_file, file_handler = setup_logging(logs_subdirectory="rsync_logs")
logging.info(f"Logging to file: {log_file}")
logging.info(f"Starting local sync from {source} to {destination}")
success = run_rsync_local(source, destination, maximum_sync_time, cancel_event)
if success:
logging.info(f"Sync completed successfully. Log file: {log_file}")
log.info("Sync completed successfully")
else:
logging.error(f"Sync failed. Check log file for details: {log_file}")
log.error(f"Sync failed. Check log file for details: {log_file}")
logging.getLogger().removeHandler(file_handler)
file_handler.close()
logging.shutdown()
if __name__ == "__main__":
fire.Fire(main)