diff --git a/Dockerfile b/Dockerfile index 91be63c..4a79774 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.11-slim-buster +FROM python:3.11-slim-bookworm # LABEL instead of MAINTAINER (fixes deprecation warning) LABEL maintainer="Martin Dobias " diff --git a/attempt_workspace_poc_plan.md b/attempt_workspace_poc_plan.md new file mode 100644 index 0000000..f7a4ef4 --- /dev/null +++ b/attempt_workspace_poc_plan.md @@ -0,0 +1,241 @@ +# Attempt Workspace POC - Bare Bones Plan + +## Core Flow (COPY Mode) + +**Every run in COPY mode uses an attempt workspace:** + +1. Get current server version +2. Create attempt workspace from baseline (hardlink clone) +3. Break hardlinks for non-media files (.gpkg, .mergin, etc.) +4. Run sync in attempt workspace: + - Pull latest (if needed) + - Upload media to S3 + - Update references in .gpkg +5. Push from attempt workspace +6. If push fails (version conflict): + - Delete attempt workspace + - Baseline remains clean (unchanged) + - Get latest server version (will be newer) + - Create new attempt workspace with new version + - Retry from step 4 +7. If push succeeds: + - Promote attempt to baseline (rename) + - Baseline is now updated + +## Key Simplifications + +- No logging beyond basic print statements +- No tests +- No lock mechanism (assume single instance) +- No backup of previous baseline +- No cleanup of old attempts (manual cleanup) +- No retry limits (can loop, but user can interrupt) +- No error classification (assume all push failures are version conflicts) +- No backoff (immediate retry) + +## Implementation Steps + +### Step 1: Modify `media_sync_push()` to handle push failures + +**Location:** `media_sync.py`, function `media_sync_push()` + +**Changes:** +- When push fails, check if it's a version conflict +- If yes, raise a special exception: `VersionConflictError` +- Otherwise, raise normal `MediaSyncError` + +### Step 2: Create attempt workspace functions + +**New function:** `create_attempt_workspace(baseline_path, server_version)` +- Create path: `_attempt_v{server_version}` +- Hardlink clone entire directory +- Break hardlinks for non-media files +- Return attempt_path + +**New function:** `break_hardlinks_for_non_media(attempt_path)` +- Walk directory +- For each file: + - If extension NOT in `config.allowed_extensions` → break hardlink (delete + copy) + - If file is `.gpkg` (from config.references) → break hardlink + - If in `.mergin/` directory → break hardlink (copy entire directory) +- Media files remain hardlinked + +### Step 3: Modify main flow to always use attempt workspace (COPY mode) + +**Location:** `media_sync.py`, function `main()` + +**New flow for COPY mode:** +1. Check if `config.operation_mode == "copy"` +2. If COPY mode: + - Get current server version + - Create attempt workspace from baseline + - Run sync in attempt workspace (with retry loop) + - If push succeeds → promote attempt to baseline +3. If MOVE mode: + - Run normal sync in baseline (unchanged behavior) + +**New function:** `sync_with_attempt_workspace(mc, driver, baseline_path)` +- Retry loop: + - Get current server version + - Create attempt workspace with that version + - Run sync in attempt workspace: + - `mc_pull(mc, workspace_path=attempt_path)` + - Get files to sync + - `media_sync_push(mc, driver, files, workspace_path=attempt_path)` + - If push fails (VersionConflictError): + - Delete attempt workspace + - Get new server version (will be newer) + - Continue loop (retry) + - If push succeeds: + - Promote attempt to baseline + - Break loop + +**New function:** `promote_attempt_to_baseline(baseline_path, attempt_path)` +- Rename attempt to baseline: `os.rename(attempt_path, baseline_path)` + +### Step 4: Modify functions to accept workspace_path parameter + +**Functions to modify:** +- `mc_pull(mc, workspace_path=None)` - use workspace_path if provided, else config +- `media_sync_push(mc, driver, files, workspace_path=None)` - use workspace_path if provided +- `_update_references(files, workspace_path=None)` - use workspace_path for gpkg path +- `_check_pending_changes(workspace_path=None)` - use workspace_path if provided + +### Step 5: Error handling + +**New exception:** `VersionConflictError(MediaSyncError)` +- Raised when push fails due to version conflict + +**In `media_sync_push()`:** +- Catch `ClientError` on push +- Check error message for version conflict indicators +- If version conflict → raise `VersionConflictError` +- Otherwise → raise `MediaSyncError` + +## Minimal Code Structure + +### New Functions (minimal signatures) + +```python +# In media_sync.py + +class VersionConflictError(MediaSyncError): + pass + +def create_attempt_workspace(baseline_path, server_version): + """Create attempt workspace with hardlinks, break non-media hardlinks""" + pass + +def break_hardlinks_for_non_media(attempt_path): + """Break hardlinks for .gpkg, .mergin, and other non-media files""" + pass + +def sync_with_attempt_workspace(mc, driver, baseline_path, server_version): + """Run sync in attempt workspace, retry on version conflict""" + pass + +def promote_attempt_to_baseline(baseline_path, attempt_path): + """Rename attempt workspace to become baseline""" + pass +``` + +### Modified Functions + +```python +def mc_pull(mc, workspace_path=None): + """Pull with optional workspace_path parameter""" + # Use workspace_path if provided, else config.project_working_dir + pass + +def media_sync_push(mc, driver, files, workspace_path=None): + """Push with optional workspace_path, raise VersionConflictError on conflict""" + # Use workspace_path if provided + # On push failure, check if version conflict, raise VersionConflictError + pass + +def _update_references(files, workspace_path=None): + """Update references with optional workspace_path""" + # Use workspace_path for gpkg path construction + pass + +def main(): + """Main entry point with attempt workspace for COPY mode""" + if config.operation_mode == "copy": + # Always use attempt workspace in COPY mode + sync_with_attempt_workspace(mc, driver, config.project_working_dir) + else: + # MOVE mode: normal sync in baseline (unchanged) + files_to_sync = mc_pull(mc) + if files_to_sync: + media_sync_push(mc, driver, files_to_sync) +``` + +## Retry Loop Logic + +``` +def sync_with_attempt_workspace(mc, driver, baseline_path): + while True: + # Get current server version (may be newer on retry) + server_version = get_server_version(mc, baseline_path) + + # Create attempt workspace from baseline + attempt_path = create_attempt_workspace(baseline_path, server_version) + + try: + # Sync in attempt workspace + mc_pull(mc, workspace_path=attempt_path) + files = get_files_to_sync(attempt_path) + media_sync_push(mc, driver, files, workspace_path=attempt_path) + + # Success! Promote attempt to baseline + promote_attempt_to_baseline(baseline_path, attempt_path) + break + + except VersionConflictError: + # Push failed - cleanup attempt + shutil.rmtree(attempt_path) + # Baseline is still clean (unchanged) + # Loop continues: get new server version, create new attempt, retry + continue +``` + +## File Changes Summary + +**`media_sync.py`:** +- Add `VersionConflictError` exception class +- Add `create_attempt_workspace()` function +- Add `break_hardlinks_for_non_media()` function +- Add `sync_with_attempt_workspace()` function (contains retry loop) +- Add `promote_attempt_to_baseline()` function +- Add `get_server_version()` helper function +- Modify `mc_pull()` to accept `workspace_path` parameter +- Modify `media_sync_push()` to accept `workspace_path` and raise `VersionConflictError` +- Modify `_update_references()` to accept `workspace_path` parameter +- Modify `main()` to check COPY mode and call `sync_with_attempt_workspace()` if COPY mode + +**No other files need changes for POC.** + +## Hardlink Operations + +**Create hardlink clone:** +- Use `os.link()` for files +- Use `shutil.copytree()` with `copy_function=os.link` for directories +- Handle `OSError` if hardlinks not supported (raise informative error) + +**Break hardlinks:** +- For each non-media file: + - `os.remove(file_path)` (removes hardlink) + - `shutil.copy2(original_path, file_path)` (creates independent copy) +- For `.mergin/` directory: + - `shutil.rmtree(attempt_mergin)` + - `shutil.copytree(baseline_mergin, attempt_mergin)` + +## Key Points + +1. **COPY mode always uses attempt workspace**: Every run creates attempt from baseline +2. **Baseline stays clean**: Original workspace never gets modified during attempt +3. **Retry with latest version**: Each retry gets the newest server version +4. **Simple cleanup**: Delete attempt on failure, baseline remains unchanged +5. **Atomic promotion**: Single rename operation when push succeeds +6. **No state persistence**: Everything derived from current server state +7. **MOVE mode unchanged**: MOVE mode continues to work in baseline (no attempt workspace) diff --git a/media_sync.py b/media_sync.py index f3a6ec8..8e6afbb 100644 --- a/media_sync.py +++ b/media_sync.py @@ -8,6 +8,8 @@ import os import sqlite3 +import shutil +import time from mergin import MerginClient, MerginProject, LoginError, ClientError from version import __version__ @@ -19,17 +21,28 @@ class MediaSyncError(Exception): pass +class VersionConflictError(MediaSyncError): + pass + + def _quote_identifier(identifier): """Quote identifiers""" return '"' + identifier + '"' -def _get_project_version(): +def _get_project_version(workspace_path=None): """Returns the current version of the project""" - mp = MerginProject(config.project_working_dir) + path = workspace_path if workspace_path else config.project_working_dir + mp = MerginProject(path) return mp.version() +def _get_server_version(mc, project_full_name): + """Get current server version without modifying local state""" + projects = mc.get_projects_by_names([project_full_name]) + return projects[project_full_name]["version"] + + def _check_has_working_dir(): if not os.path.exists(config.project_working_dir): raise MediaSyncError( @@ -44,9 +57,10 @@ def _check_has_working_dir(): ) -def _check_pending_changes(): +def _check_pending_changes(workspace_path=None): """Check working directory was not modified manually - this is probably uncommitted change from last attempt""" - mp = MerginProject(config.project_working_dir) + path = workspace_path if workspace_path else config.project_working_dir + mp = MerginProject(path) status_push = mp.get_push_changes() if status_push["added"] or status_push["updated"] or status_push["removed"]: raise MediaSyncError( @@ -109,15 +123,17 @@ def mc_download(mc): return files_to_upload -def mc_pull(mc): +def mc_pull(mc, workspace_path=None): """Pull latest version to synchronize with local dir :param mc: mergin client instance + :param workspace_path: Optional workspace path, defaults to config.project_working_dir :return: list(dict) list of project files metadata """ + path = workspace_path if workspace_path else config.project_working_dir print("Pulling from mergin server ...") - _check_pending_changes() + _check_pending_changes(path) - mp = MerginProject(config.project_working_dir) + mp = MerginProject(path) local_version = mp.version() try: @@ -128,7 +144,7 @@ def mc_pull(mc): # this could be e.g. DNS error raise MediaSyncError("Mergin client error: " + str(e)) - _check_pending_changes() + _check_pending_changes(path) if server_version == local_version: print("No changes on Mergin.") @@ -136,19 +152,20 @@ def mc_pull(mc): try: status_pull = mp.get_pull_changes(project_info["files"]) - mc.pull_project(config.project_working_dir) + mc.pull_project(path) except ClientError as e: raise MediaSyncError("Mergin client error on pull: " + str(e)) - print("Pulled new version from Mergin: " + _get_project_version()) + print("Pulled new version from Mergin: " + _get_project_version(path)) files_to_upload = _get_media_sync_files( status_pull["added"] + status_pull["updated"] ) return files_to_upload -def _update_references(files): +def _update_references(files, workspace_path=None): """Update references to media files in reference table""" + path = workspace_path if workspace_path else config.project_working_dir for ref in config.references: reference_config = [ ref.file, @@ -162,7 +179,7 @@ def _update_references(files): print("Updating references ...") try: gpkg_conn = sqlite3.connect( - os.path.join(config.project_working_dir, ref.file) + os.path.join(path, ref.file) ) gpkg_conn.enable_load_extension(True) gpkg_cur = gpkg_conn.cursor() @@ -188,16 +205,24 @@ def _update_references(files): raise MediaSyncError("SQLITE error: " + str(e)) -def media_sync_push(mc, driver, files): +def media_sync_push(mc, driver, files, workspace_path=None): if not files: return + path = workspace_path if workspace_path else config.project_working_dir print("Synchronizing files with external drive...") - _check_has_working_dir() + if not os.path.exists(path): + raise MediaSyncError( + "The project working directory does not exist: " + path + ) + if not os.path.exists(os.path.join(path, ".mergin")): + raise MediaSyncError( + "The project working directory does not seem to contain Mergin project: " + path + ) migrated_files = {} # TODO make async and parallel for better performance for file in files: - src = os.path.join(config.project_working_dir, file["path"]) + src = os.path.join(path, file["path"]) if not os.path.exists(src): print("Missing local file: " + str(file["path"])) continue @@ -212,34 +237,197 @@ def media_sync_push(mc, driver, files): continue # update reference table (if applicable) - _update_references(migrated_files) + _update_references(migrated_files, workspace_path=path) # remove from local dir if move mode if config.operation_mode == "move": for file in migrated_files.keys(): - src = os.path.join(config.project_working_dir, file) + src = os.path.join(path, file) os.remove(src) # push changes to mergin back (with changed references and removed files) if applicable try: - mp = MerginProject(config.project_working_dir) + mp = MerginProject(path) status_push = mp.get_push_changes() if status_push["added"]: raise MediaSyncError( "There are changes to be added - it should never happen" ) if status_push["updated"] or status_push["removed"]: - mc.push_project(config.project_working_dir) - version = _get_project_version() + mc.push_project(path) + version = _get_project_version(path) print("Pushed new version to Mergin: " + version) - except (ClientError, MediaSyncError) as e: - # this could be either because of some temporal error (network, server lock) - # or permanent one that needs to be resolved by user + except ClientError as e: + # Check if it's a version conflict + error_msg = str(e).lower() + if any(keyword in error_msg for keyword in ["version", "conflict", "newer version", "update required"]): + raise VersionConflictError("Version conflict on push: " + str(e)) raise MediaSyncError("Mergin client error on push: " + str(e)) + except MediaSyncError: + raise print("Sync finished") +def create_attempt_workspace(baseline_path, server_version): + """Create attempt workspace with hardlinks, break non-media hardlinks""" + attempt_path = f"{baseline_path}_attempt_v{server_version}" + + # Remove existing attempt if it exists + if os.path.exists(attempt_path): + print(f"Removing existing attempt workspace: {attempt_path}") + shutil.rmtree(attempt_path) + + print(f"Creating attempt workspace: {attempt_path}") + + # Create hardlink clone + def link_file(src, dst): + try: + os.link(src, dst) + except OSError as e: + if e.errno == 18: # EXDEV - cross-device link + raise MediaSyncError( + "Hardlinks not supported (cross-filesystem). " + "Ensure baseline and attempt are on same filesystem." + ) + raise + + # Copy directory structure with hardlinks + def copy_with_hardlinks(src, dst): + os.makedirs(dst, exist_ok=True) + for item in os.listdir(src): + src_path = os.path.join(src, item) + dst_path = os.path.join(dst, item) + + if os.path.isdir(src_path): + if item == ".mergin": + # .mergin must be fully copied, not hardlinked + shutil.copytree(src_path, dst_path) + else: + copy_with_hardlinks(src_path, dst_path) + else: + try: + link_file(src_path, dst_path) + except OSError as e: + if e.errno == 2: # ENOENT - parent directory doesn't exist + os.makedirs(os.path.dirname(dst_path), exist_ok=True) + link_file(src_path, dst_path) + else: + raise + + copy_with_hardlinks(baseline_path, attempt_path) + + # Break hardlinks for non-media files + break_hardlinks_for_non_media(attempt_path) + + return attempt_path + + +def break_hardlinks_for_non_media(attempt_path): + """Break hardlinks for .gpkg, .mergin, and other non-media files""" + print("Breaking hardlinks for non-media files...") + + allowed_extensions = [ext.lower() for ext in config.allowed_extensions] + gpkg_files = [ref.file for ref in config.references if ref.file] + + for root, dirs, files in os.walk(attempt_path): + # Skip .mergin directory (already copied, not hardlinked) + if ".mergin" in root: + continue + + for file in files: + file_path = os.path.join(root, file) + rel_path = os.path.relpath(file_path, attempt_path) + + # Get file extension + _, ext = os.path.splitext(file) + ext = ext.lstrip(".").lower() + + # Check if this is a non-media file that needs hardlink breaking + is_media = ext in allowed_extensions + is_gpkg = file in gpkg_files or file.endswith(".gpkg") + + # Break hardlink if: not a media file OR is a gpkg file + if not is_media or is_gpkg: + # Check if file has hardlinks + try: + stat_info = os.stat(file_path) + if stat_info.st_nlink > 1: + # Break hardlink: read, delete, write + print(f"Breaking hardlink: {rel_path}") + with open(file_path, "rb") as f: + content = f.read() + os.remove(file_path) + with open(file_path, "wb") as f: + f.write(content) + except OSError as e: + print(f"Warning: Could not break hardlink for {rel_path}: {e}") + + +def promote_attempt_to_baseline(baseline_path, attempt_path): + """Rename attempt workspace to become baseline""" + print(f"Promoting attempt workspace to baseline...") + backup_path = baseline_path + "_old" + + # If a previous backup exists, remove it (POC: keep only one backup) + if os.path.exists(backup_path): + print(f"Removing existing backup directory: {backup_path}") + shutil.rmtree(backup_path) + + if os.path.exists(baseline_path): + os.rename(baseline_path, backup_path) + os.rename(attempt_path, baseline_path) + print("Promotion complete") + + +def sync_with_attempt_workspace(mc, driver, baseline_path): + """Run sync in attempt workspace, retry on version conflict""" + while True: + # Get current server version + mp = MerginProject(baseline_path) + project_full_name = mp.project_full_name() + server_version = _get_server_version(mc, project_full_name) + print(f"Attempting against server version at start of attempt: {server_version}") + + # Create attempt workspace + attempt_path = create_attempt_workspace(baseline_path, server_version) + + try: + # Sync in attempt workspace + files_to_sync = mc_pull(mc, workspace_path=attempt_path) + + if not files_to_sync: + print("No files to sync") + # Still need to promote (attempt is up to date) + promote_attempt_to_baseline(baseline_path, attempt_path) + break + + # print("Pausing before push to allow a server-side update (test window)...") + #time.sleep(30) # <-- test window to simulate server-side update + + + media_sync_push(mc, driver, files_to_sync, workspace_path=attempt_path) + + # Success! Promote attempt to baseline + promote_attempt_to_baseline(baseline_path, attempt_path) + break + + except VersionConflictError: + # Push failed - cleanup attempt + print("Version conflict detected, cleaning up attempt workspace...") + if os.path.exists(attempt_path): + shutil.rmtree(attempt_path) + # Baseline is still clean (unchanged) + # Loop continues: get new server version, create new attempt, retry + continue + except Exception as e: + # Other error - cleanup attempt and re-raise + print(f"Error during sync: {e}") + if os.path.exists(attempt_path): + shutil.rmtree(attempt_path) + raise + + def main(): print(f"== Starting Mergin Media Sync version {__version__} ==") try: @@ -257,18 +445,24 @@ def main(): try: print("Logging in to Mergin...") mc = create_mergin_client() - # initialize or pull changes to sync with latest project version - if os.path.exists(config.project_working_dir): - files_to_sync = mc_pull(mc) - else: + + # Initialize if needed + if not os.path.exists(config.project_working_dir): files_to_sync = mc_download(mc) - - if not files_to_sync: - print("No files to sync") + if files_to_sync: + media_sync_push(mc, driver, files_to_sync) + print("== Media sync done! ==") return - - # sync media files with external driver - media_sync_push(mc, driver, files_to_sync) + + # COPY mode: always use attempt workspace + if config.operation_mode == "copy": + sync_with_attempt_workspace(mc, driver, config.project_working_dir) + else: + # MOVE mode: normal sync in baseline (unchanged behavior) + files_to_sync = mc_pull(mc) + if files_to_sync: + media_sync_push(mc, driver, files_to_sync) + print("== Media sync done! ==") except MediaSyncError as err: print("Error: " + str(err)) diff --git a/media_sync_daemon.py b/media_sync_daemon.py index 816cbfb..5b9c8d3 100644 --- a/media_sync_daemon.py +++ b/media_sync_daemon.py @@ -7,10 +7,14 @@ """ import argparse -import sys import datetime +import gc +import logging import os +import sys import time + +from config import config, validate_config, ConfigError, update_config_path from drivers import DriverError, create_driver from media_sync import ( create_mergin_client, @@ -18,11 +22,40 @@ media_sync_push, mc_pull, MediaSyncError, + sync_with_attempt_workspace, ) -from config import config, validate_config, ConfigError, update_config_path from version import __version__ +def setup_logger(): + logger = logging.getLogger("media-sync") + logger.setLevel(logging.INFO) + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +def run_sync_cycle(mc, driver, logger): + try: + # In COPY mode, use attempt workspace flow to avoid contaminating baseline + if config.operation_mode == "copy": + logger.info("Running COPY-mode sync in attempt workspace...") + sync_with_attempt_workspace(mc, driver, config.project_working_dir) + logger.info("Sync complete (COPY mode, attempt workspace).") + else: + # MOVE mode: keep existing baseline workflow + logger.info("Pulling changes from Mergin maps server...") + files_to_sync = mc_pull(mc) + + media_sync_push(mc, driver, files_to_sync) + logger.info("Sync complete.") + + except MediaSyncError as e: + logger.error(f"Media sync error: {e}") + + def main(): parser = argparse.ArgumentParser( prog="media_sync_daemon.py", @@ -34,68 +67,69 @@ def main(): "config_file", nargs="?", default="config.yaml", - help="Path to file with configuration. Default value is config.yaml in current working directory.", + help="Path to file with configuration. Default is ./config.yaml", ) args = parser.parse_args() - - print(f"== Starting Mergin Media Sync daemon version {__version__} ==") + logger = setup_logger() + logger.info(f"== Starting Mergin Media Sync daemon v{__version__} ==") try: update_config_path(args.config_file) - except IOError as e: - print("Error:" + str(e)) + validate_config(config) + except (IOError, ConfigError) as e: + logger.error(f"Configuration error: {e}") sys.exit(1) sleep_time = config.as_int("daemon.sleep_time") - try: - validate_config(config) - except ConfigError as e: - print("Error: " + str(e)) - return - try: driver = create_driver(config) except DriverError as e: - print("Error: " + str(e)) - return + logger.error(f"Driver error: {e}") + sys.exit(1) - print("Logging in to Mergin...") + logger.info("Logging in to Mergin maps server...") try: mc = create_mergin_client() - - # initialize or pull changes to sync with latest project version if not os.path.exists(config.project_working_dir): + logger.info("Project directory not found. Downloading from Mergin maps server...") files_to_sync = mc_download(mc) media_sync_push(mc, driver, files_to_sync) except MediaSyncError as e: - print("Error: " + str(e)) - return + logger.error(f"Initial sync error: {e}") + sys.exit(1) - # keep running until killed by ctrl+c: - # - sleep N seconds - # - pull - # - push + logger.info("Entering sync loop...") while True: - print(datetime.datetime.now()) - try: - files_to_sync = mc_pull(mc) - media_sync_push(mc, driver, files_to_sync) + logger.info(f"Heartbeat: {datetime.datetime.utcnow().isoformat()} UTC") + run_sync_cycle(mc, driver, logger) - # check mergin client token expiration - delta = mc._auth_session["expire"] - datetime.datetime.now( - datetime.timezone.utc + # Check token expiry + try: + delta = mc._auth_session["expire"] - datetime.datetime.now(datetime.timezone.utc) + except (AttributeError, KeyError, TypeError) as e: + logger.warning( + f"Error checking Mergin token expiration (skipping refresh this cycle): {e}" ) + else: if delta.total_seconds() < 3600: - mc = create_mergin_client() - - except MediaSyncError as e: - print("Error: " + str(e)) - - print("Going to sleep") + logger.info("Refreshing Mergin maps server auth token...") + try: + mc = create_mergin_client() + except MediaSyncError as e: + # MediaSyncError already wraps LoginError/ClientError from MerginClient + logger.warning( + f"Failed to refresh Mergin maps server auth token " + f"(will retry next cycle): {e}" + ) + else: + logger.info("Mergin maps server auth token refreshed successfully.") + + logger.info(f"Sleeping for {sleep_time} seconds...") time.sleep(sleep_time) + if __name__ == "__main__": main()