From df948a732e9b5ceb14ee235566d4bd411696f949 Mon Sep 17 00:00:00 2001 From: xkello Date: Sat, 11 Apr 2026 18:46:07 +0000 Subject: [PATCH 01/12] Change get users logic --- web-app/packages/lib/src/modules/user/store.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/web-app/packages/lib/src/modules/user/store.ts b/web-app/packages/lib/src/modules/user/store.ts index 83f2fb9b..34da4d98 100644 --- a/web-app/packages/lib/src/modules/user/store.ts +++ b/web-app/packages/lib/src/modules/user/store.ts @@ -498,15 +498,10 @@ export const useUserStore = defineStore('userModule', { */ async getAuthNotProjectUserSearch(params: UserSearchParams) { const projectStore = useProjectStore() - const access = projectStore.project.access - const projectUsers = [ - ...access.readers, - ...access.writers, - ...access.owners - ] + const projectUsers = projectStore.access.map((item) => item.id) const response = await UserApi.getAuthUserSearch(params) - if (access) { + if (projectUsers.length) { response.data = response.data.filter( (item) => !projectUsers.find((id) => id === item.id) ) From 71bb92f4a98a1007ffdb778cc3039ae97fea9192 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 14 Apr 2026 08:42:05 +0200 Subject: [PATCH 02/12] Replace lockfile with upload last_ping db column This is a refactoring that replaces the filesystem-based Toucher/lockfile mechanism with a database-based heartbeat approach for tracking active uploads. This eliminates the NFS-specific hack (os.access() + os.utime()) that was needed to bust NFS attribute caches in the old approach. The new approach is cleaner and more cloud-native. --- server/mergin/sync/models.py | 77 ++++++++++++++++--- server/mergin/sync/public_api.yaml | 2 +- server/mergin/sync/public_api_controller.py | 10 +-- .../mergin/sync/public_api_v2_controller.py | 3 +- server/mergin/sync/utils.py | 47 ----------- .../mergin/tests/test_project_controller.py | 9 ++- server/mergin/tests/test_public_api_v2.py | 1 - .../e3a7f2b1c94d_add_upload_last_ping.py | 29 +++++++ 8 files changed, 105 insertions(+), 73 deletions(-) create mode 100644 server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index f1c6cbfd..4d9d42f1 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2,12 +2,14 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial from __future__ import annotations +from contextlib import contextmanager import json import logging import os +import threading import time import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from typing import Optional, List, Dict, Set, Tuple from dataclasses import dataclass, asdict @@ -21,7 +23,7 @@ from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError -from flask import current_app +from flask import Flask, current_app from .files import ( DeltaChangeMerged, @@ -44,7 +46,6 @@ LOG_BASE, Checkpoint, generate_checksum, - Toucher, get_chunk_location, get_project_path, is_supported_type, @@ -1805,6 +1806,8 @@ class Upload(db.Model): db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), nullable=True ) created = db.Column(db.DateTime, default=datetime.utcnow) + # last ping time to determine if upload is still active + last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) user = db.relationship("User") project = db.relationship( @@ -1827,17 +1830,67 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): def upload_dir(self): return os.path.join(self.project.storage.project_dir, "tmp", self.id) - @property - def lockfile(self): - return os.path.join(self.upload_dir, "lockfile") - def is_active(self): - """Check if upload is still active because there was a ping (lockfile update) from underlying process""" - return os.path.exists(self.lockfile) and ( - time.time() - os.path.getmtime(self.lockfile) - < current_app.config["LOCKFILE_EXPIRATION"] + """Check if upload is still active because there was a ping from underlying process""" + return datetime.now(tz=timezone.utc) < self.last_ping.replace( + tzinfo=timezone.utc + ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) + + def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): + """ + Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. + Uses a fresh engine connection to stay pool-efficient. + """ + # manual context push is required for background execution + with app.app_context(): + while not stop_event.is_set(): + try: + # db.engine.begin() is efficient and isolated, it immediately returns a connection to the pool + with db.engine.begin() as conn: + conn.execute( + db.text( + "UPDATE upload SET last_ping = NOW() WHERE id = :id" + ), + {"id": self.id}, + ) + except Exception as e: + logging.exception( + f"Upload heartbeat failed for ID {self.project_id} and version {self.version}: {e}" + ) + + # wait for x seconds, but wake up immediately if stop_event is set + stop_event.wait(timeout) + + @contextmanager + def heartbeat(self, timeout: int = 5): + """ + Context manager to be used inside a Flask route. + + Example of usage: + ----------------- + with upload.heartbeat(interval): + do_something_slow + """ + # we need to pass a real Flask app object to the thread + app = current_app._get_current_object() + stop_event = threading.Event() + + bg = threading.Thread( + target=self._heartbeat_task, args=(app, stop_event, timeout), daemon=True ) + bg.start() + try: + yield + finally: + # signal the loop to stop + stop_event.set() + + # wait for the task to finish its last SQL call. + # in Gevent, this yields to other requests (non-blocking), while in Sync, this blocks the current thread for up to 2s + # this is to protect main thread / greenlet from zombie bg processes + bg.join(timeout=2) + def clear(self): """Clean up pending upload. Uploaded files and table records are removed, and another upload can start. @@ -1864,7 +1917,7 @@ def process_chunks( to_remove = [i.path for i in file_changes if i.change == PushChangeType.DELETE] current_files = [f for f in self.project.files if f.path not in to_remove] - with Toucher(self.lockfile, 5): + with self.heartbeat(5): for f in file_changes: if f.change == PushChangeType.DELETE: continue diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 5227b562..157e8262 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -699,7 +699,7 @@ paths: - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory" + - remove artifacts (chunks) by moving them to tmp directory" operationId: push_finish parameters: - name: transaction_id diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8e2b0ea8..e27527cd 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -75,7 +75,6 @@ ) from .utils import ( generate_checksum, - Toucher, get_ip, get_user_agent, generate_location, @@ -849,11 +848,10 @@ def project_push(namespace, project_name): logging.error(f"Failed to create upload session: {str(err)}") abort(422, "Failed to create upload session. Please try later.") - # Create transaction folder and lockfile + # Create transaction folder os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() - # Update immediately without uploading of new/modified files and remove transaction/lockfile after successful commit + # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): next_version = version + 1 file_changes = files_changes_from_upload( @@ -920,7 +918,7 @@ def chunk_upload(transaction_id, chunk_id): abort(404) dest = os.path.join(upload_dir, "chunks", chunk_id) - with Toucher(upload.lockfile, 30): + with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest, current_app.config["MAX_CHUNK_SIZE"]) @@ -945,7 +943,7 @@ def push_finish(transaction_id): - do integrity check comparing uploaded file sizes with what was expected - move uploaded files to new version dir and applying sync changes (e.g. geodiff apply_changeset) - bump up version in database - - remove artifacts (chunks, lockfile) by moving them to tmp directory + - remove artifacts (chunks) by moving them to tmp directory :param transaction_id: Transaction id. :type transaction_id: str diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 9a82a211..2fc7a855 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -327,9 +327,8 @@ def create_project_version(id): logging.error(f"Failed to create upload session: {str(err)}") return AnotherUploadRunning().response(409) - # Create transaction folder and lockfile + # Create transaction folder os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted diff --git a/server/mergin/sync/utils.py b/server/mergin/sync/utils.py index 9e89eb7c..48966457 100644 --- a/server/mergin/sync/utils.py +++ b/server/mergin/sync/utils.py @@ -57,53 +57,6 @@ def generate_checksum(file, chunk_size=4096): checksum.update(chunk) -class Toucher: - """ - Helper class to periodically update modification time of file during - execution of longer lasting task. - - Example of usage: - ----------------- - with Toucher(file, interval): - do_something_slow - - """ - - def __init__(self, lockfile, interval): - self.lockfile = lockfile - self.interval = interval - self.running = False - self.timer = None - - def __enter__(self): - self.acquire() - - def __exit__(self, type, value, tb): # pylint: disable=W0612,W0622 - self.release() - - def release(self): - self.running = False - if self.timer: - self.timer.cancel() - self.timer = None - - def acquire(self): - self.running = True - self.touch_lockfile() - - def touch_lockfile(self): - # do an NFS ACCESS procedure request to clear the attribute cache (for various pods to actually see the file) - # https://docs.aws.amazon.com/efs/latest/ug/troubleshooting-efs-general.html#custom-nfs-settings-write-delays - os.access(self.lockfile, os.W_OK) - with open(self.lockfile, "a"): - os.utime(self.lockfile, None) - - sleep(0) # to unblock greenlet - if self.running: - self.timer = Timer(self.interval, self.touch_lockfile) - self.timer.start() - - def is_qgis(path: str) -> bool: """ Check if file is a QGIS project file. diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 25e0e055..e5f61c18 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1294,7 +1294,6 @@ def create_transaction(username, changes, version=1): db.session.commit() upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) os.makedirs(upload_dir) - open(os.path.join(upload_dir, "lockfile"), "w").close() return upload, upload_dir @@ -1320,6 +1319,7 @@ def test_chunk_upload(client, app): resp = client.post(url, data=data, headers=headers) assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() + assert os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests to send bigger chunk than allowed app.config["MAX_CHUNK_SIZE"] = 10 * CHUNK_SIZE @@ -1332,6 +1332,8 @@ def test_chunk_upload(client, app): failure = SyncFailuresHistory.query.filter_by(project_id=upload.project.id).first() assert failure.error_type == "chunk_upload" assert failure.error_details == "Too big chunk" + # residual after upload was removed + assert not os.path.exists(os.path.join(upload_dir, "chunks", chunk_id)) # tests with transaction with no uploads expected changes = _get_changes(test_project_dir) @@ -1342,9 +1344,8 @@ def test_chunk_upload(client, app): resp2 = client.post(url, data=data, headers=headers) assert resp2.status_code == 404 assert SyncFailuresHistory.query.count() == 1 - - # cleanup - shutil.rmtree(upload_dir) + # we do not have any chunks, so parent dir was removed as well + assert not os.path.exists(os.path.join(upload_dir)) def upload_chunks(upload_dir, changes, src_dir=test_project_dir): diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 7a87b1d0..f3c91539 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1035,7 +1035,6 @@ def test_create_version_failures(client): db.session.add(upload) db.session.commit() os.makedirs(upload.upload_dir) - open(upload.lockfile, "w").close() response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py new file mode 100644 index 00000000..dd727e5a --- /dev/null +++ b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py @@ -0,0 +1,29 @@ +"""Add last_ping to upload + +Revision ID: e3a7f2b1c94d +Revises: 4b4648483770 +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "e3a7f2b1c94d" +down_revision = "4b4648483770" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL") + op.alter_column("upload", "last_ping", nullable=False) + + +def downgrade(): + # drop the column but required lockfiles will be missing - make sure all uploads are gone + op.drop_column("upload", "last_ping") From dde191f037e5b504b378c34cdd86ee921d83bb09 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 14 Apr 2026 13:11:20 +0200 Subject: [PATCH 03/12] Rework concurrent upload using upsert strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the old try/except IntegrityError + cleanup loop pattern with an atomic upsert in Upload.create_upload(). Decouple the upload directory name from the DB primary key via transaction_id. Upload directory is created at this stage, no need to take care for it later. With the upsert logic, the ObjectDeletedError scenario which happened because a concurrent request could delete a stale upload row mid-operation is eliminated: - During push_finish, the heartbeat context manager continuously updates last_ping, keeping the upload fresh throughout the operation - A concurrent request can only take over an upload whose last_ping has expired - Since heartbeat prevents expiry, no other request can claim the row while push_finish is running - The upload object therefore stays valid for the full lifetime of the request — ObjectDeletedError becomes impossible --- server/mergin/sync/models.py | 95 +++++++++++++- server/mergin/sync/permissions.py | 7 +- server/mergin/sync/public_api_controller.py | 119 +++++++---------- .../mergin/sync/public_api_v2_controller.py | 118 +++++++---------- server/mergin/sync/schemas.py | 4 +- server/mergin/tests/test_db_hooks.py | 3 +- .../mergin/tests/test_project_controller.py | 122 ++++++++++++------ server/mergin/tests/test_public_api_v2.py | 56 +------- .../f1d9e4a7b823_add_upload_transaction_id.py | 35 +++++ 9 files changed, 308 insertions(+), 251 deletions(-) create mode 100644 server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 4d9d42f1..08ff6217 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -19,7 +19,7 @@ from flask_login import current_user from pygeodiff import GeoDiff from sqlalchemy import text, null, desc, nullslast, tuple_ -from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM +from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, UUID, JSONB, ENUM, insert from sqlalchemy.types import String from sqlalchemy.ext.hybrid import hybrid_property from pygeodiff.geodifflib import GeoDiffLibError, GeoDiffLibConflictError @@ -1808,6 +1808,7 @@ class Upload(db.Model): created = db.Column(db.DateTime, default=datetime.utcnow) # last ping time to determine if upload is still active last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) user = db.relationship("User") project = db.relationship( @@ -1825,10 +1826,98 @@ def __init__(self, project: Project, version: int, changes: dict, user_id: int): self.version = version self.changes = ChangesSchema().dump(changes) self.user_id = user_id + self.transaction_id = str(uuid.uuid4()) + + @classmethod + def create_upload( + cls, project_id: str, version: int, changes: dict, user_id: int + ) -> Upload | None: + """Create upload session, it can either create a new record or handover an existing one but with new transaction id + Old transaction folder is removed and new one is created. + """ + now = datetime.now(timezone.utc) + expiration = current_app.config["LOCKFILE_EXPIRATION"] + new_tx_id = str(uuid.uuid4()) + + # CTE captures the existing row's transaction_id BEFORE the upsert (pre-statement snapshot) + # NULL in RETURNING means fresh INSERT, non-NULL means we took over a stale upload + existing_cte = ( + db.select(Upload.transaction_id) + .where( + Upload.project_id == project_id, + Upload.version == version, + ) + .cte("existing") + ) + + stmt = ( + insert(Upload) + .values( + id=str(uuid.uuid4()), + transaction_id=new_tx_id, + project_id=project_id, + version=version, + user_id=user_id, + last_ping=now, + changes=ChangesSchema().dump(changes), + ) + .add_cte(existing_cte) + ) + + upsert_stmt = stmt.on_conflict_do_update( + constraint="uq_upload_project_id", + set_={ + "transaction_id": new_tx_id, + "user_id": user_id, + "last_ping": now, + "changes": ChangesSchema().dump(changes), + }, + # ONLY update if the existing row is stale + where=(Upload.last_ping < (now - timedelta(seconds=expiration))), + ) + + upsert_stmt = upsert_stmt.returning( + Upload, + db.select(existing_cte.c.transaction_id) + .scalar_subquery() + .label("old_transaction_id"), + ) + + result = db.session.execute(upsert_stmt).fetchone() + db.session.commit() + + # if nothing returned, it means the WHERE clause failed (active upload) + if not result: + return + + upload = result.Upload + old_transaction_id = result.old_transaction_id + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id + ) + if os.path.exists( + os.path.join( + upload.project.storage.project_dir, "tmp", old_transaction_id + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, "tmp", old_transaction_id + ), + old_transaction_id, + ) + + return upload @property def upload_dir(self): - return os.path.join(self.project.storage.project_dir, "tmp", self.id) + return os.path.join( + self.project.storage.project_dir, "tmp", self.transaction_id + ) def is_active(self): """Check if upload is still active because there was a ping from underlying process""" @@ -1896,7 +1985,7 @@ def clear(self): Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.id) + move_to_tmp(self.upload_dir, self.transaction_id) db.session.delete(self) db.session.commit() except Exception: diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index e155020a..880c0e33 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -271,8 +271,8 @@ def check_project_permissions( return None -def get_upload(transaction_id): - upload = Upload.query.get_or_404(transaction_id) +def get_upload_or_fail(transaction_id: str) -> Upload: + upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: abort(404) @@ -280,8 +280,7 @@ def get_upload(transaction_id): if upload.user_id != current_user.id: abort(403, "You do not have permissions for ongoing upload") - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", transaction_id) - return upload, upload_dir + return upload def projects_query(permission, as_admin=True, public=True): diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index e27527cd..894101b3 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -25,7 +25,7 @@ from pygeodiff import GeoDiffLibError from flask_login import current_user from sqlalchemy import and_, desc, asc -from sqlalchemy.exc import IntegrityError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from gevent import sleep import base64 from werkzeug.exceptions import HTTPException, Conflict @@ -70,7 +70,7 @@ require_project, projects_query, ProjectPermissions, - get_upload, + get_upload_or_fail, require_project_by_uuid, ) from .utils import ( @@ -774,13 +774,6 @@ def project_push(namespace, project_name): if all(len(changes[key]) == 0 for key in changes.keys()): abort(400, "No changes") - # reject upload early if there is another one already running - pending_upload = Upload.query.filter_by( - project_id=project.id, version=version - ).first() - if pending_upload and pending_upload.is_active(): - abort(400, "Another process is running. Please try later.") - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -812,44 +805,20 @@ def project_push(namespace, project_name): if requested_storage > ws.storage: return StorageLimitHit(current_usage, ws.storage).response(422) - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) try: - # Creating upload transaction with different project's version is possible. - db.session.commit() + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: + abort(400, "Another process is running. Please try later.") + logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" + f"Upload transaction {upload.transaction_id} created for project: {project.id}, version: {version}" ) - except IntegrityError: + except (IntegrityError, SQLAlchemyError) as err: db.session.rollback() - # check and clean dangling uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - abort(400, "Another process is running. Please try later.") - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - # Try again after cleanup - db.session.add(upload) - try: - db.session.commit() - logging.info( - f"Upload transaction {upload.id} created for project: {project.id}, version: {version}" - ) - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") - abort(422, "Failed to create upload session. Please try later.") - - # Create transaction folder - os.makedirs(upload.upload_dir) + logging.exception(f"Failed to create upload: {str(err)}") + abort(422, "Failed to create upload session. Please try later.") # Update immediately without uploading of new/modified files and remove transaction after successful commit if not (changes["added"] or changes["updated"]): @@ -874,7 +843,7 @@ def project_push(namespace, project_name): db.session.commit() logging.info( f"A project version {ProjectVersion.to_v_name(next_version)} for project: {project.id} created. " - f"Transaction id: {upload.id}. No upload." + f"Transaction id: {upload.transaction_id}. No upload." ) project_version_created.send(pv) push_finished.send(pv) @@ -882,7 +851,7 @@ def project_push(namespace, project_name): except IntegrityError as err: db.session.rollback() logging.exception( - f"Failed to upload a new project version using transaction id: {upload.id}: {str(err)}" + f"Failed to upload a new project version using transaction id: {upload.transaction_id}: {str(err)}" ) abort(422, "Failed to upload a new project version. Please try later.") except gevent.timeout.Timeout: @@ -891,7 +860,7 @@ def project_push(namespace, project_name): finally: upload.clear() - return {"transaction": upload.id}, 200 + return {"transaction": upload.transaction_id}, 200 @auth_required @@ -908,7 +877,7 @@ def chunk_upload(transaction_id, chunk_id): :rtype: Dict """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project chunks = [] for file in upload.changes["added"] + upload.changes["updated"]: @@ -917,7 +886,7 @@ def chunk_upload(transaction_id, chunk_id): if chunk_id not in chunks: abort(404) - dest = os.path.join(upload_dir, "chunks", chunk_id) + dest = os.path.join(upload.upload_dir, "chunks", chunk_id) with upload.heartbeat(30): try: # we could have used request.data here, but it could eventually cause OOM issue @@ -950,7 +919,7 @@ def push_finish(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) + upload = get_upload_or_fail(transaction_id) request.view_args["project"] = upload.project project = upload.project next_version = project.next_version() @@ -989,7 +958,7 @@ def push_finish(transaction_id): abort(422, f"Failed to create new version: {msg}") - files_dir = os.path.join(upload_dir, "files", v_next_version) + files_dir = os.path.join(upload.upload_dir, "files", v_next_version) target_dir = os.path.join(project.storage.project_dir, v_next_version) if os.path.exists(target_dir): pv = ProjectVersion.query.filter_by( @@ -1007,29 +976,31 @@ def push_finish(transaction_id): move_to_tmp(target_dir) try: - user_agent = get_user_agent(request) - device_id = get_device_id(request) - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - user_agent, - device_id, - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + user_agent = get_user_agent(request) + device_id = get_device_id(request) + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + user_agent, + device_id, + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # let's move uploaded files where they are expected to be + os.renames(files_dir, version_dir) - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." - ) - project_version_created.send(pv) - push_finished.send(pv) + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." + ) + project_version_created.send(pv) + push_finished.send(pv) except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: db.session.rollback() logging.exception( @@ -1059,10 +1030,8 @@ def push_cancel(transaction_id): :rtype: None """ - upload, upload_dir = get_upload(transaction_id) - db.session.delete(upload) - db.session.commit() - move_to_tmp(upload_dir) + upload = get_upload_or_fail(transaction_id) + upload.clear() return NoContent, 200 diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 2fc7a855..64d8ce84 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -15,8 +15,7 @@ from flask import abort, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.exc import ObjectDeletedError +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from .schemas_v2 import BatchErrorSchema, ProjectSchema as ProjectSchemaV2 from ..app import db @@ -296,87 +295,65 @@ def create_project_version(id): return NoContent, 204 try: - # while processing data, block other uploads - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - # Creating blocking upload can fail, e.g. in case of racing condition - db.session.commit() - except IntegrityError: - db.session.rollback() - # check and clean dangling blocking uploads or abort - for current_upload in project.uploads.all(): - if current_upload.is_active(): - return AnotherUploadRunning().response(409) - db.session.delete(current_upload) - db.session.commit() - # previous push attempt is definitely lost - project.sync_failed( - "", - "push_lost", - "Push artefact removed by subsequent push", - current_user.id, - ) - - try: - # Try again after cleanup - upload = Upload(project, version, upload_changes, current_user.id) - db.session.add(upload) - db.session.commit() - move_to_tmp(upload.upload_dir) - except IntegrityError as err: - logging.error(f"Failed to create upload session: {str(err)}") + upload = Upload.create_upload( + project.id, version, upload_changes, current_user.id + ) + if not upload: return AnotherUploadRunning().response(409) + except (IntegrityError, SQLAlchemyError) as err: + db.session.rollback() + logging.exception(f"Failed to create upload: {str(err)}") + return UploadError().response(422) - # Create transaction folder - os.makedirs(upload.upload_dir) - + # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() return DataSyncError(failed_files=errors).response(422) - upload_deleted = False try: - pv = ProjectVersion( - project, - next_version, - current_user.id, - file_changes, - get_ip(request), - get_user_agent(request), - get_device_id(request), - ) - db.session.add(pv) - db.session.add(project) - db.session.commit() - - # let's move uploaded files where they are expected to be - if to_be_added_files or to_be_updated_files: - temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) - os.renames(temp_files_dir, version_dir) - - # remove used chunks - # get chunks from added and updated files - chunks_ids = [] - for file in to_be_added_files + to_be_updated_files: - file_chunks = file.get("chunks", []) - chunks_ids.extend(file_chunks) - remove_transaction_chunks.delay(chunks_ids) - - logging.info( - f"Push finished for project: {project.id}, project version: {v_next_version}." - ) - project_version_created.send(pv) - push_finished.send(pv) + # let's keep upload alive until all work is done so no one else can claim it + with upload.heartbeat(5): + pv = ProjectVersion( + project, + next_version, + current_user.id, + file_changes, + get_ip(request), + get_user_agent(request), + get_device_id(request), + ) + db.session.add(pv) + db.session.add(project) + db.session.commit() + + # let's move uploaded files where they are expected to be + if to_be_added_files or to_be_updated_files: + temp_files_dir = os.path.join( + upload.upload_dir, "files", v_next_version + ) + os.renames(temp_files_dir, version_dir) + + # remove used chunks + # get chunks from added and updated files + chunks_ids = [] + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) + + logging.info( + f"Push finished for project: {project.id}, project version: {v_next_version}." + ) + project_version_created.send(pv) + push_finished.send(pv) except ( psycopg2.Error, FileNotFoundError, IntegrityError, - ObjectDeletedError, ) as err: db.session.rollback() - upload_deleted = isinstance(err, ObjectDeletedError) logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}: {str(err)}" ) @@ -400,9 +377,8 @@ def create_project_version(id): move_to_tmp(version_dir) raise finally: - # remove artifacts only if upload object is still valid - if not upload_deleted: - upload.clear() + # remove upload artifacts + upload.clear() result = ProjectSchemaV2().dump(project) result["files"] = ProjectFileSchema( diff --git a/server/mergin/sync/schemas.py b/server/mergin/sync/schemas.py index 4eecca0c..da18f7db 100644 --- a/server/mergin/sync/schemas.py +++ b/server/mergin/sync/schemas.py @@ -132,7 +132,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.project.uploads.all()] + return [u.transaction_id for u in obj.project.uploads.all()] def _permissions(self, obj): return project_user_permissions(obj.project) @@ -180,7 +180,7 @@ def _role(self, obj): return role.value def _uploads(self, obj): - return [u.id for u in obj.uploads.all()] + return [u.transaction_id for u in obj.uploads.all()] class Meta: model = Project diff --git a/server/mergin/tests/test_db_hooks.py b/server/mergin/tests/test_db_hooks.py index 044294c5..27aadb5b 100644 --- a/server/mergin/tests/test_db_hooks.py +++ b/server/mergin/tests/test_db_hooks.py @@ -114,8 +114,7 @@ def test_remove_project(client, diff_project): # set up mergin_user = User.query.filter_by(username=DEFAULT_USER[0]).first() project_dir = Path(diff_project.storage.project_dir) - upload = Upload(diff_project, 10, [], mergin_user.id) - db.session.add(upload) + upload = Upload.create_upload(diff_project.id, 10, [], mergin_user.id) project_id = diff_project.id user = add_user("user", "user") access_request = AccessRequest(diff_project, user.id) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index e5f61c18..71cd0fe2 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -404,7 +404,7 @@ def test_add_project(client, app, data, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # add TEMPLATES user and make him creator of test_project (to become template) @@ -508,7 +508,7 @@ def test_delete_project(client): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # try force delete for active project @@ -1120,7 +1120,7 @@ def test_push_to_new_project(client): assert resp.status_code == 200 upload_id = resp.json["transaction"] - upload = Upload.query.filter_by(id=upload_id).first() + upload = Upload.query.filter_by(transaction_id=upload_id).first() blacklisted_file = all( added["path"] != "test_dir/test4.txt" for added in upload.changes["added"] ) @@ -1211,6 +1211,52 @@ def test_push_integrity_error(client, app): assert failure.error_details == "No changes" +def test_stale_upload_takeover(client, app): + """Stale upload (last_ping expired) is atomically replaced by a new one. + + Verifies that: + - the new upload gets a fresh transaction_id + - the old upload directory is cleaned up + - a push_lost failure is recorded for the abandoned upload + """ + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + user = User.query.filter_by(username="mergin").first() + changes = _get_changes(test_project_dir) + changes["added"] = changes["removed"] = [] + + # create initial upload and record its identity + upload = Upload.create_upload(project.id, 1, changes, user.id) + old_tx_id = upload.transaction_id + old_upload_dir = upload.upload_dir + assert os.path.exists(old_upload_dir) + + # backdate last_ping to make the upload appear stale + db.session.execute( + db.text( + "UPDATE upload SET last_ping = NOW() - :expiry * INTERVAL '1 second' WHERE id = :id" + ), + { + "id": upload.id, + "expiry": client.application.config["LOCKFILE_EXPIRATION"] + 1, + }, + ) + db.session.commit() + + # takeover — should succeed and replace the stale upload + new_upload = Upload.create_upload(project.id, 1, changes, user.id) + assert new_upload is not None + assert new_upload.transaction_id != old_tx_id + assert os.path.exists(new_upload.upload_dir) + # old directory was moved away + assert not os.path.exists(old_upload_dir) + # push_lost was recorded for the abandoned upload + failure = SyncFailuresHistory.query.filter_by(project_id=project.id).first() + assert failure.error_type == "push_lost" + assert failure.error_details == "Push artefact removed by subsequent push" + + def test_exceed_data_limit(client): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1289,12 +1335,8 @@ def create_transaction(username, changes, version=1): project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id ).first() - upload = Upload(project, version, changes, user.id) - db.session.add(upload) - db.session.commit() - upload_dir = os.path.join(upload.project.storage.project_dir, "tmp", upload.id) - os.makedirs(upload_dir) - return upload, upload_dir + upload = Upload.create_upload(project.id, version, changes, user.id) + return upload, upload.upload_dir def remove_transaction(transaction_id): @@ -1310,7 +1352,7 @@ def test_chunk_upload(client, app): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) chunk_id = upload.changes["added"][0]["chunks"][0] - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_project_dir, "test_dir", "test4.txt"), "rb") as file: data = file.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1367,7 +1409,9 @@ def test_push_finish(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - resp = client.post(f"/v1/project/push/finish/{upload.id}", headers=json_headers) + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) assert resp.status_code == 422 assert "corrupted_files" in resp.json["detail"].keys() assert not os.path.exists(os.path.join(upload_dir, "files", "test.txt")) @@ -1388,7 +1432,7 @@ def test_push_finish(client): chunks.append(chunk) resp2 = client.post( - f"/v1/project/push/finish/{upload.id}", + f"/v1/project/push/finish/{upload.transaction_id}", headers={**json_headers, "User-Agent": "Werkzeug"}, ) assert resp2.status_code == 200 @@ -1415,7 +1459,7 @@ def test_push_finish(client): db.session.commit() upload, upload_dir = create_transaction(user.username, changes) - url = "/v1/project/push/finish/{}".format(upload.id) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) db.session.add(upload) db.session.commit() # still log in as mergin user @@ -1429,7 +1473,7 @@ def test_push_finish(client): def test_push_close(client): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/cancel/{}".format(upload.id) + url = "/v1/project/push/cancel/{}".format(upload.transaction_id) resp = client.post(url) assert resp.status_code == 200 @@ -1472,12 +1516,12 @@ def test_whole_push_process(client): assert resp.status_code == 200 assert "transaction" in resp.json.keys() - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.id in resp.json["uploads"] + assert upload.transaction_id in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" @@ -1488,7 +1532,7 @@ def test_whole_push_process(client): # push upload: upload file chunks for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(test_dir, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -1500,7 +1544,7 @@ def test_whole_push_process(client): assert resp.json["checksum"] == checksum.hexdigest() # push finish: call server to concatenate chunks and finish upload - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 project = Project.query.filter_by( name=test_project, workspace_id=test_workspace_id @@ -1528,7 +1572,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check there are not any changes between local modified file and server patched file (using geodiff) geodiff = GeoDiff() @@ -1552,7 +1596,7 @@ def test_push_diff_finish(client): upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert ( "GEODIFF ERROR: Nothing inserted (this should never happen)" @@ -1561,10 +1605,10 @@ def test_push_diff_finish(client): error = resp.json["detail"] # try again to make sure geodiff logs are related only to recent event - client.post("/v1/project/push/cancel/{}".format(upload.id)) + client.post("/v1/project/push/cancel/{}".format(upload.transaction_id)) upload, upload_dir = create_transaction("mergin", changes, 2) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert resp.json["detail"] == error @@ -1572,7 +1616,7 @@ def test_push_diff_finish(client): changes = _get_changes_with_diff_0_size(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, 3) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 @@ -1598,7 +1642,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 # check diff file was generated by server, and it is in file history latest_version = upload.project.get_latest_version() @@ -1638,7 +1682,7 @@ def test_push_no_diff_finish(client): } upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes, src_dir=working_dir) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() assert all( @@ -1710,7 +1754,7 @@ def test_clone_project(client, data, username, expected): "mergin", _get_changes_with_diff(test_project_dir) ) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 endpoint = "/v1/project/clone/{}/{}".format(test_workspace_name, test_project) @@ -1852,7 +1896,7 @@ def test_optimize_storage(app, client, diff_project): with open(os.path.join(upload_dir, "chunks", chunk), "wb") as out_file: out_file.write(in_file.read(CHUNK_SIZE)) - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 200 assert os.path.exists(optimize_v4) @@ -2214,16 +2258,16 @@ def test_inactive_project(client, diff_project): upload, upload_dir = create_transaction("mergin", _get_changes(test_project_dir)) chunk_id = upload.changes["added"][0]["chunks"][0] resp = client.post( - f"/v1/project/push/chunk/{upload.id}/{chunk_id}", + f"/v1/project/push/chunk/{upload.transaction_id}/{chunk_id}", data=data, headers={"Content-Type": "application/octet-stream"}, ) assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 404 - resp = client.post(f"/v1/project/push/cancel/{upload.id}") + resp = client.post(f"/v1/project/push/cancel/{upload.transaction_id}") assert resp.status_code == 404 # delete project again @@ -2324,7 +2368,7 @@ def test_project_version_integrity(client): "__init__", side_effect=IntegrityError("Project version already exists", None, None), ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 422 assert "Failed to create new version" in resp.json["detail"] failure = SyncFailuresHistory.query.filter_by( @@ -2383,7 +2427,7 @@ def _get_user_agent(): changes = _get_changes(test_project_dir) upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 @@ -2419,12 +2463,12 @@ def test_delete_diff_file(client): } upload, upload_dir = create_transaction("mergin", changes) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") changes = _get_changes_with_diff(test_project_dir) upload, upload_dir = create_transaction("mergin", changes, version=2) upload_chunks(upload_dir, upload.changes) - client.post(f"/v1/project/push/finish/{upload.id}") + client.post(f"/v1/project/push/finish/{upload.transaction_id}") fh = FileHistory.query.filter_by( project_version_name=upload.project.latest_version, @@ -2570,12 +2614,12 @@ def test_supported_file_upload(client): headers=json_headers, ) assert resp.status_code == 200 - upload = Upload.query.get(resp.json["transaction"]) + upload = Upload.query.filter_by(transaction_id=resp.json["transaction"]).first() assert upload # Even chunks are correctly uploaded for file in changes["added"]: for chunk_id in file["chunks"]: - url = "/v1/project/push/chunk/{}/{}".format(upload.id, chunk_id) + url = "/v1/project/push/chunk/{}/{}".format(upload.transaction_id, chunk_id) with open(os.path.join(TMP_DIR, file["path"]), "rb") as f: data = f.read(CHUNK_SIZE) checksum = hashlib.sha1() @@ -2586,7 +2630,7 @@ def test_supported_file_upload(client): assert resp.status_code == 200 assert resp.json["checksum"] == checksum.hexdigest() # Unsupported file type is revealed when reconstructed from chunks - based on the mime type - and upload is refused - resp = client.post(f"/v1/project/push/finish/{upload.id}") + resp = client.post(f"/v1/project/push/finish/{upload.transaction_id}") assert resp.status_code == 400 assert ( resp.json["detail"] @@ -2619,8 +2663,8 @@ def test_locked_project(client, diff_project): assert resp.headers["Content-Type"] == "application/problem+json" assert resp.json["code"] == "ProjectLocked" # to play safe push finish is also blocked - upload, upload_dir = create_transaction("mergin", changes) - url = "/v1/project/push/finish/{}".format(upload.id) + upload, _ = create_transaction("mergin", changes) + url = "/v1/project/push/finish/{}".format(upload.transaction_id) resp = client.post(url, headers=json_headers) assert resp.status_code == 422 diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index f3c91539..56caa7ff 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -1031,11 +1031,7 @@ def test_create_version_failures(client): data = {"version": "v1", "changes": _get_changes_without_added(test_project_dir)} # somebody else is syncing - upload = Upload(project, 1, _get_changes(test_project_dir), 1) - db.session.add(upload) - db.session.commit() - os.makedirs(upload.upload_dir) - + upload = Upload.create_upload(project.id, 1, _get_changes(test_project_dir), 1) response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == 409 assert response.json["code"] == AnotherUploadRunning.code @@ -1072,16 +1068,6 @@ def test_create_version_failures(client): assert response.status_code == 422 assert response.json["code"] == UploadError.code - # try to finish the transaction which would fail on existing Upload integrity error, e.g. race conditions - with patch.object( - Upload, - "__init__", - side_effect=IntegrityError("Cannot insert upload", None, None), - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - assert response.status_code == 409 - assert response.json["code"] == AnotherUploadRunning.code - # try to finish the transaction which would fail on unexpected integrity error # patch of ChangesSchema is just a workaround to trigger and error with patch.object( @@ -1093,46 +1079,6 @@ def test_create_version_failures(client): assert response.status_code == 409 -def test_create_version_object_deleted_error(client): - """Test that ObjectDeletedError during push returns 422 without secondary exception""" - project = Project.query.filter_by( - workspace_id=test_workspace_id, name=test_project - ).first() - - data = { - "version": "v1", - "changes": { - "added": [], - "removed": [ - file_info(test_project_dir, "base.gpkg"), - ], - "updated": [], - }, - } - - # Create a real ObjectDeletedError by using internal SQLAlchemy state - def raise_object_deleted(*args, **kwargs): - # Create a minimal state-like object that ObjectDeletedError can use - class FakeState: - class_ = Upload - - def obj(self): - return None - - raise ObjectDeletedError(FakeState()) - - with patch.object( - ProjectVersion, - "__init__", - side_effect=raise_object_deleted, - ): - response = client.post(f"v2/projects/{project.id}/versions", json=data) - - # Should return 422 UploadError, not 500 from secondary exception - assert response.status_code == 422 - assert response.json["code"] == UploadError.code - - def test_upload_chunk(client): """Test pushing a chunk to a project""" project = Project.query.filter_by( diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py new file mode 100644 index 00000000..5e799a86 --- /dev/null +++ b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py @@ -0,0 +1,35 @@ +"""Add transaction_id to upload + +Revision ID: f1d9e4a7b823 +Revises: e3a7f2b1c94d +Create Date: 2026-04-14 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "f1d9e4a7b823" +down_revision = "e3a7f2b1c94d" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("upload", sa.Column("transaction_id", sa.String(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint + op.execute( + "UPDATE upload SET transaction_id = id::text WHERE transaction_id IS NULL" + ) + op.alter_column("upload", "transaction_id", nullable=False) + op.create_index( + op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True + ) + + +def downgrade(): + op.drop_index(op.f("ix_upload_transaction_id"), table_name="upload") + # column is dropped but there could be orphan transaction folders, make sure upload table is empty + op.drop_column("upload", "transaction_id") From 52ad8ef04803d7f37543114c61d65133f18c2d46 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 14 Apr 2026 14:36:53 +0200 Subject: [PATCH 04/12] Merge user_profile into user table We keep responses untouched, the only change happens on DB / model side to avoid excessive joins. --- server/mergin/auth/commands.py | 3 +- server/mergin/auth/controller.py | 15 ++-- server/mergin/auth/models.py | 43 ++++------ server/mergin/auth/schemas.py | 33 +++++--- server/mergin/commands.py | 3 +- server/mergin/sync/project_handler.py | 7 +- server/mergin/tests/test_auth.py | 3 +- .../mergin/tests/test_project_controller.py | 3 +- server/mergin/tests/utils.py | 3 +- ...f1a9b2c4d6_merge_user_profile_into_user.py | 82 +++++++++++++++++++ 10 files changed, 135 insertions(+), 60 deletions(-) create mode 100644 server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py diff --git a/server/mergin/auth/commands.py b/server/mergin/auth/commands.py index f9ac636e..af6ed6af 100644 --- a/server/mergin/auth/commands.py +++ b/server/mergin/auth/commands.py @@ -8,7 +8,7 @@ from sqlalchemy import or_, func from ..app import db -from .models import User, UserProfile +from .models import User from ..commands import normalize_input @@ -36,7 +36,6 @@ def create(username, password, is_admin, email): # pylint: disable=W0612 sys.exit(1) user = User(username=username, passwd=password, is_admin=is_admin, email=email) - user.profile = UserProfile() user.active = True db.session.add(user) db.session.commit() diff --git a/server/mergin/auth/controller.py b/server/mergin/auth/controller.py index a4d584c8..06859255 100644 --- a/server/mergin/auth/controller.py +++ b/server/mergin/auth/controller.py @@ -24,7 +24,7 @@ CANNOT_EDIT_PROFILE_MSG, ) from .bearer import encode_token -from .models import User, LoginHistory, UserProfile +from .models import User, LoginHistory from .schemas import UserSchema, UserSearchSchema, UserProfileSchema, UserInfoSchema from .forms import ( LoginForm, @@ -65,7 +65,7 @@ def user_profile(user, return_all=True): { "email": user.email, "storage_limit": data["storage"], # duplicate - we should remove it - "receive_notifications": user.profile.receive_notifications, + "receive_notifications": user.receive_notifications, "verified_email": user.verified_email, "tier": "free", "registration_date": user.registration_date, @@ -369,7 +369,6 @@ def update_user_profile(): # pylint: disable=W0613,W0612 return jsonify(form.errors), 400 current_user.verified_email = False - form.update_obj(current_user.profile) form.update_obj(current_user) db.session.add(current_user) db.session.commit() @@ -483,7 +482,7 @@ def get_paginated_users( :rtype: Dict[str: List[User], str: Integer] """ - users = User.query.join(UserProfile).filter( + users = User.query.filter( is_(User.username.ilike("deleted_%"), False) | is_(User.active, True) ) @@ -491,14 +490,16 @@ def get_paginated_users( users = users.filter( User.username.ilike(f"%{like}%") | User.email.ilike(f"%{like}%") - | UserProfile.first_name.ilike(f"%{like}%") - | UserProfile.last_name.ilike(f"%{like}%") + | User.first_name.ilike(f"%{like}%") + | User.last_name.ilike(f"%{like}%") ) if descending and order_by: users = users.order_by(desc(User.__table__.c[order_by])) elif not descending and order_by: users = users.order_by(asc(User.__table__.c[order_by])) + else: + users = users.order_by(asc(User.id)) paginate = users.paginate(page=page, per_page=per_page) result = paginate.items @@ -561,7 +562,7 @@ def create_user(): workspace_role=request.json["role"], ) - if user.profile.receive_notifications: + if user.receive_notifications: send_confirmation_email( current_app, user, diff --git a/server/mergin/auth/models.py b/server/mergin/auth/models.py index 470b934b..760ab740 100644 --- a/server/mergin/auth/models.py +++ b/server/mergin/auth/models.py @@ -19,12 +19,9 @@ class User(db.Model): id = db.Column(db.Integer, primary_key=True) - username = db.Column(db.String(80), info={"label": "Username"}) email = db.Column(db.String(120)) - passwd = db.Column(db.String(80), info={"label": "Password"}) # salted + hashed - active = db.Column(db.Boolean, default=True) is_admin = db.Column(db.Boolean) verified_email = db.Column(db.Boolean, default=False) @@ -35,8 +32,12 @@ class User(db.Model): info={"label": "Date of creation of user account"}, default=datetime.datetime.utcnow, ) - last_signed_in = db.Column(db.DateTime(), nullable=True) + receive_notifications = db.Column( + db.Boolean, default=True, nullable=False, index=True + ) + first_name = db.Column(db.String(256), nullable=True) + last_name = db.Column(db.String(256), nullable=True) __table_args__ = ( db.Index("ix_user_username", func.lower(username), unique=True), @@ -187,8 +188,8 @@ def anonymize(self): self.username = del_str self.email = None self.passwd = None - self.profile.first_name = None - self.profile.last_name = None + self.first_name = None + self.last_name = None db.session.commit() @classmethod @@ -240,11 +241,19 @@ def create( cls, username: str, email: str, password: str, notifications: bool = True ) -> User: user = cls(username.strip(), email.strip(), password, False) - user.profile = UserProfile(receive_notifications=notifications) + user.receive_notifications = notifications db.session.add(user) db.session.commit() return user + @property + def profile(self) -> "User": + """Compatibility shim: profile fields are now on User directly.""" + return self + + def name(self) -> Optional[str]: + return f'{self.first_name if self.first_name else ""} {self.last_name if self.last_name else ""}'.strip() + @property def can_edit_profile(self) -> bool: """Flag if we allow user to edit their email and name""" @@ -252,26 +261,6 @@ def can_edit_profile(self) -> bool: return self.passwd is not None and self.active -class UserProfile(db.Model): - user_id = db.Column( - db.Integer, db.ForeignKey("user.id", ondelete="CASCADE"), primary_key=True - ) - receive_notifications = db.Column(db.Boolean, default=True, index=True) - first_name = db.Column(db.String(256), nullable=True, info={"label": "First name"}) - last_name = db.Column(db.String(256), nullable=True, info={"label": "Last name"}) - - user = db.relationship( - "User", - uselist=False, - backref=db.backref( - "profile", single_parent=True, uselist=False, cascade="all,delete" - ), - ) - - def name(self) -> Optional[str]: - return f'{self.first_name if self.first_name else ""} {self.last_name if self.last_name else ""}'.strip() - - class LoginHistory(db.Model): id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime(), default=datetime.datetime.utcnow, index=True) diff --git a/server/mergin/auth/schemas.py b/server/mergin/auth/schemas.py index 52ed01f6..0bb45c3e 100644 --- a/server/mergin/auth/schemas.py +++ b/server/mergin/auth/schemas.py @@ -5,7 +5,7 @@ from flask import current_app from marshmallow import fields -from .models import User, UserProfile +from .models import User from ..app import DateTimeWithZ, ma @@ -20,13 +20,13 @@ class UserProfileSchema(ma.SQLAlchemyAutoSchema): def get_storage(self, obj): # DEPRECATED functionality - kept for the backward-compatibility - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: return ws.storage def get_disk_usage(self, obj): # DEPRECATED functionality - kept for the backward-compatibility - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: return ws.disk_usage() @@ -34,21 +34,30 @@ def _has_project(self, obj): # DEPRECATED functionality - kept for the backward-compatibility from ..sync.models import ProjectUser, Project - ws = current_app.ws_handler.get_by_name(obj.user.username) + ws = current_app.ws_handler.get_by_name(obj.username) if ws: projects_count = ( Project.query.join(ProjectUser) - .filter(Project.creator_id == obj.user.id) + .filter(Project.creator_id == obj.id) .filter(Project.removed_at.is_(None)) .filter(Project.workspace_id == ws.id) - .filter(ProjectUser.user_id == obj.user.id) + .filter(ProjectUser.user_id == obj.id) .count() ) return projects_count > 0 return False class Meta: - model = UserProfile + model = User + fields = ( + "receive_notifications", + "first_name", + "last_name", + "name", + "storage", + "disk_usage", + "has_project", + ) load_instance = True @@ -81,7 +90,7 @@ class UserSearchSchema(ma.SQLAlchemyAutoSchema): name = fields.Method("_name", dump_only=True) def _name(self, obj): - return obj.profile.name() + return obj.name() class Meta: model = User @@ -97,11 +106,11 @@ class Meta: class UserInfoSchema(ma.SQLAlchemyAutoSchema): """User schema with full information""" - first_name = fields.String(attribute="profile.first_name") - last_name = fields.String(attribute="profile.last_name") - receive_notifications = fields.Boolean(attribute="profile.receive_notifications") + first_name = fields.String() + last_name = fields.String() + receive_notifications = fields.Boolean() registration_date = DateTimeWithZ(attribute="registration_date") - name = fields.Function(lambda obj: obj.profile.name()) + name = fields.Function(lambda obj: obj.name()) can_edit_profile = fields.Boolean(attribute="can_edit_profile") class Meta: diff --git a/server/mergin/commands.py b/server/mergin/commands.py index dbde7b7f..464890bf 100644 --- a/server/mergin/commands.py +++ b/server/mergin/commands.py @@ -202,7 +202,7 @@ def init_db(): ) def init(email: str, recreate: bool): """Initialize database if does not exist or -r is provided. Perform check of server configuration. Send statistics, respecting your setup.""" - from .auth.models import User, UserProfile + from .auth.models import User inspect_engine = inspect(db.engine) tables = inspect_engine.get_table_names() @@ -221,7 +221,6 @@ def init(email: str, recreate: bool): password_chars = string.ascii_letters + string.digits password = "".join(random.choice(password_chars) for i in range(12)) user = User(username=username, passwd=password, email=email, is_admin=True) - user.profile = UserProfile() user.active = True db.session.add(user) db.session.commit() diff --git a/server/mergin/sync/project_handler.py b/server/mergin/sync/project_handler.py index 7949dc20..e4d7e189 100644 --- a/server/mergin/sync/project_handler.py +++ b/server/mergin/sync/project_handler.py @@ -3,7 +3,7 @@ from .permissions import ProjectPermissions from sqlalchemy import or_, and_ from typing import List -from ..auth.models import User, UserProfile +from ..auth.models import User class ProjectHandler(AbstractProjectHandler): @@ -12,8 +12,7 @@ def get_push_permission(self, changes: dict): def get_email_receivers(self, project: Project) -> List[User]: return ( - User.query.join(UserProfile) - .outerjoin(ProjectUser, ProjectUser.user_id == User.id) + User.query.outerjoin(ProjectUser, ProjectUser.user_id == User.id) .filter( or_( and_( @@ -24,7 +23,7 @@ def get_email_receivers(self, project: Project) -> List[User]: ), User.active, User.verified_email, - UserProfile.receive_notifications, + User.receive_notifications, ) .all() ) diff --git a/server/mergin/tests/test_auth.py b/server/mergin/tests/test_auth.py index b130c109..ba7730c3 100644 --- a/server/mergin/tests/test_auth.py +++ b/server/mergin/tests/test_auth.py @@ -14,7 +14,7 @@ from ..auth.bearer import decode_token, encode_token from ..auth.forms import ResetPasswordForm from ..auth.app import generate_confirmation_token, confirm_token -from ..auth.models import User, UserProfile, LoginHistory +from ..auth.models import User, LoginHistory from ..auth.tasks import anonymize_removed_users from ..app import db from ..sync.models import Project, ProjectRole @@ -286,7 +286,6 @@ def test_change_password(client): email="user_test@mergin.com", ) user.active = True - user.profile = UserProfile() db.session.add(user) db.session.commit() diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 25e0e055..d1981391 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -41,7 +41,7 @@ from ..sync.files import files_changes_from_upload from ..sync.schemas import ProjectListSchema from ..sync.utils import Checkpoint, generate_checksum, is_versioned_file -from ..auth.models import User, UserProfile +from ..auth.models import User from . import ( test_project, @@ -666,7 +666,6 @@ def test_update_project(client): username="tester", passwd="tester", is_admin=False, email="tester@mergin.com" ) test_user.active = True - test_user.profile = UserProfile() db.session.add(test_user) db.session.commit() diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index 89ead403..57f67e80 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -15,7 +15,7 @@ from dateutil.tz import tzlocal from pygeodiff import GeoDiff -from ..auth.models import User, UserProfile +from ..auth.models import User from ..sync.utils import generate_location, generate_checksum from ..sync.models import ( Project, @@ -52,7 +52,6 @@ def add_user(username="random", password="random", is_admin=False) -> User: ) user.active = True user.verified_email = True - user.profile = UserProfile() db.session.add(user) db.session.commit() return user diff --git a/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py new file mode 100644 index 00000000..76c92b81 --- /dev/null +++ b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py @@ -0,0 +1,82 @@ +# Copyright (C) Lutra Consulting Limited +# +# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial + +"""Merge user_profile table into user table + +Revision ID: e3f1a9b2c4d6 +Revises: 4b4648483770 +Create Date: 2026-04-14 00:00:00.000000 +""" +from alembic import op +import sqlalchemy as sa + +revision = "e3f1a9b2c4d6" +down_revision = "4b4648483770" +branch_labels = None +depends_on = None + + +def upgrade(): + # Add profile columns to user table (nullable initially to allow data copy) + op.add_column( + "user", sa.Column("receive_notifications", sa.Boolean(), nullable=True) + ) + op.add_column("user", sa.Column("first_name", sa.String(256), nullable=True)) + op.add_column("user", sa.Column("last_name", sa.String(256), nullable=True)) + + # Copy data from user_profile + op.execute( + """ + UPDATE "user" u + SET + receive_notifications = up.receive_notifications, + first_name = up.first_name, + last_name = up.last_name + FROM user_profile up + WHERE up.user_id = u.id; + """ + ) + + # Fill in default for any users without a profile row (should not exist, but be safe) + op.execute( + 'UPDATE "user" SET receive_notifications = TRUE WHERE receive_notifications IS NULL;' + ) + + op.alter_column("user", "receive_notifications", nullable=False) + op.create_index("ix_user_receive_notifications", "user", ["receive_notifications"]) + op.drop_table("user_profile") + + +def downgrade(): + # Recreate user_profile table + op.create_table( + "user_profile", + sa.Column("user_id", sa.Integer(), nullable=False), + sa.Column("receive_notifications", sa.Boolean(), nullable=False), + sa.Column("first_name", sa.String(256), nullable=True), + sa.Column("last_name", sa.String(256), nullable=True), + sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("user_id"), + ) + + # Copy data back + op.execute( + """ + INSERT INTO user_profile (user_id, receive_notifications, first_name, last_name) + SELECT id, receive_notifications, first_name, last_name + FROM "user"; + """ + ) + + op.create_index( + "ix_user_profile_receive_notifications", + "user_profile", + ["receive_notifications"], + ) + + # Remove columns from user table + op.drop_index("ix_user_receive_notifications", table_name="user") + op.drop_column("user", "receive_notifications") + op.drop_column("user", "first_name") + op.drop_column("user", "last_name") From d9809a52f8817f551506db1b56019ddd4a3b4bcf Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Wed, 15 Apr 2026 08:16:04 +0200 Subject: [PATCH 05/12] Fix migration --- .../community/e3f1a9b2c4d6_merge_user_profile_into_user.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py index 76c92b81..88898460 100644 --- a/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py +++ b/server/migrations/community/e3f1a9b2c4d6_merge_user_profile_into_user.py @@ -40,7 +40,7 @@ def upgrade(): # Fill in default for any users without a profile row (should not exist, but be safe) op.execute( - 'UPDATE "user" SET receive_notifications = TRUE WHERE receive_notifications IS NULL;' + 'UPDATE "user" SET receive_notifications = FALSE WHERE receive_notifications IS NULL;' ) op.alter_column("user", "receive_notifications", nullable=False) @@ -53,7 +53,7 @@ def downgrade(): op.create_table( "user_profile", sa.Column("user_id", sa.Integer(), nullable=False), - sa.Column("receive_notifications", sa.Boolean(), nullable=False), + sa.Column("receive_notifications", sa.Boolean(), nullable=True), sa.Column("first_name", sa.String(256), nullable=True), sa.Column("last_name", sa.String(256), nullable=True), sa.ForeignKeyConstraint(["user_id"], ["user.id"], ondelete="CASCADE"), From b5378642b88719ffa60d48b5b2ee595ca4fb9729 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Thu, 16 Apr 2026 13:27:55 +0200 Subject: [PATCH 06/12] Update revision branch hash --- .../migrations/community/e3a7f2b1c94d_add_upload_last_ping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py index dd727e5a..d53d4440 100644 --- a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py +++ b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py @@ -1,7 +1,7 @@ """Add last_ping to upload Revision ID: e3a7f2b1c94d -Revises: 4b4648483770 +Revises: e3f1a9b2c4d6 Create Date: 2026-04-14 00:00:00.000000 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "e3a7f2b1c94d" -down_revision = "4b4648483770" +down_revision = "e3f1a9b2c4d6" branch_labels = None depends_on = None From 257e16d7bf8c26e6adedb3bad21c02be10eb0556 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 17 Apr 2026 14:04:45 +0200 Subject: [PATCH 07/12] Concurrent upload fixes - guard against transaction folder errors - make last_ping tz naive - set transaction id to uuid type - remove redundant is_active method --- server/mergin/sync/models.py | 61 +++++++++++-------- server/mergin/sync/permissions.py | 2 + .../mergin/sync/public_api_v2_controller.py | 8 +-- .../mergin/tests/test_project_controller.py | 2 +- .../f1d9e4a7b823_add_upload_transaction_id.py | 9 +-- 5 files changed, 46 insertions(+), 36 deletions(-) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 08ff6217..5f4aa967 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -1808,7 +1808,9 @@ class Upload(db.Model): created = db.Column(db.DateTime, default=datetime.utcnow) # last ping time to determine if upload is still active last_ping = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) - transaction_id = db.Column(db.String, unique=True, nullable=False, index=True) + transaction_id = db.Column( + UUID(as_uuid=True), unique=True, nullable=False, index=True + ) user = db.relationship("User") project = db.relationship( @@ -1835,7 +1837,7 @@ def create_upload( """Create upload session, it can either create a new record or handover an existing one but with new transaction id Old transaction folder is removed and new one is created. """ - now = datetime.now(timezone.utc) + now = datetime.now(timezone.utc).replace(tzinfo=None) expiration = current_app.config["LOCKFILE_EXPIRATION"] new_tx_id = str(uuid.uuid4()) @@ -1892,42 +1894,49 @@ def create_upload( upload = result.Upload old_transaction_id = result.old_transaction_id - os.makedirs(upload.upload_dir) - # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload - if old_transaction_id: - upload.project.sync_failed( - "", "push_lost", "Push artefact removed by subsequent push", user_id - ) - if os.path.exists( - os.path.join( - upload.project.storage.project_dir, "tmp", old_transaction_id + try: + os.makedirs(upload.upload_dir) + + # old_transaction_id is NULL on fresh INSERT, set to old UUID when taking over a stale upload + if old_transaction_id: + upload.project.sync_failed( + "", "push_lost", "Push artefact removed by subsequent push", user_id ) - ): - move_to_tmp( + if os.path.exists( os.path.join( - upload.project.storage.project_dir, "tmp", old_transaction_id - ), - old_transaction_id, - ) + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ) + ): + move_to_tmp( + os.path.join( + upload.project.storage.project_dir, + "tmp", + str(old_transaction_id), + ), + str(old_transaction_id), + ) + except OSError as err: + # filesystem setup failed after the DB row was already committed. + # delete the row immediately so the next attempt isn't blocked until expiration. + db.session.delete(upload) + db.session.commit() + logging.error(f"Failed to create upload directory: {err}") + return return upload @property def upload_dir(self): return os.path.join( - self.project.storage.project_dir, "tmp", self.transaction_id + self.project.storage.project_dir, "tmp", str(self.transaction_id) ) - def is_active(self): - """Check if upload is still active because there was a ping from underlying process""" - return datetime.now(tz=timezone.utc) < self.last_ping.replace( - tzinfo=timezone.utc - ) + timedelta(seconds=current_app.config["LOCKFILE_EXPIRATION"]) - def _heartbeat_task(self, app: Flask, stop_event: threading.Event, timeout: int): """ - Background task: Runs as a Thread (Sync) or Greenlet (Gevent) based on worker type. + Background task: Runs as a Thread, it is compatible with Sync (direct) or Gevent (monkey-patch) worker type. Uses a fresh engine connection to stay pool-efficient. """ # manual context push is required for background execution @@ -1985,7 +1994,7 @@ def clear(self): Uploaded files and table records are removed, and another upload can start. """ try: - move_to_tmp(self.upload_dir, self.transaction_id) + move_to_tmp(self.upload_dir, str(self.transaction_id)) db.session.delete(self) db.session.commit() except Exception: diff --git a/server/mergin/sync/permissions.py b/server/mergin/sync/permissions.py index 880c0e33..4fe3ad54 100644 --- a/server/mergin/sync/permissions.py +++ b/server/mergin/sync/permissions.py @@ -272,6 +272,8 @@ def check_project_permissions( def get_upload_or_fail(transaction_id: str) -> Upload: + if not is_valid_uuid(transaction_id): + abort(404) upload = Upload.query.filter_by(transaction_id=transaction_id).first_or_404() # upload to 'removed' projects is forbidden if upload.project.removed_at: diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 64d8ce84..0d34c9c3 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -240,11 +240,6 @@ def create_project_version(id): if pv and pv.name != version: return ProjectVersionExists(version, pv.name).response(409) - # reject push if there is another one already running - pending_upload = Upload.query.filter_by(project_id=project.id).first() - if pending_upload and pending_upload.is_active(): - return AnotherUploadRunning().response(409) - try: ChangesSchema().validate(changes) upload_changes = ChangesSchema().dump(changes) @@ -304,6 +299,9 @@ def create_project_version(id): db.session.rollback() logging.exception(f"Failed to create upload: {str(err)}") return UploadError().response(422) + except OSError as err: + logging.exception(f"Failed to create upload directory: {str(err)}") + return UploadError().response(422) # this is the heavy work of processing upload data file_changes, errors = upload.process_chunks(use_shared_chunk_dir=True) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 7b14fbf3..07d8fe77 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1520,7 +1520,7 @@ def test_whole_push_process(client): # assert we can get project info with active upload resp = client.get(f"/v1/project/{test_workspace_name}/{upload.project.name}") assert resp.status_code == 200 - assert upload.transaction_id in resp.json["uploads"] + assert str(upload.transaction_id) in resp.json["uploads"] assert ( client.get( f"/v1/project/{test_workspace_name}/{upload.project.name}?version=v1" diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py index 5e799a86..d3797b8b 100644 --- a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py +++ b/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py @@ -8,6 +8,7 @@ from alembic import op import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import UUID # revision identifiers, used by Alembic. @@ -18,11 +19,11 @@ def upgrade(): - op.add_column("upload", sa.Column("transaction_id", sa.String(), nullable=True)) - # backfill existing rows before adding NOT NULL constraint - op.execute( - "UPDATE upload SET transaction_id = id::text WHERE transaction_id IS NULL" + op.add_column( + "upload", sa.Column("transaction_id", UUID(as_uuid=True), nullable=True) ) + # backfill existing rows before adding NOT NULL constraint + op.execute("UPDATE upload SET transaction_id = id WHERE transaction_id IS NULL") op.alter_column("upload", "transaction_id", nullable=False) op.create_index( op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True From 683eaf71e58ff240ef6ac8af1c791991390cdf36 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Fri, 17 Apr 2026 14:33:56 +0200 Subject: [PATCH 08/12] Fix: Swap file rename and DB project version commit to avoid orphaned versions in DB If os.rename failed for moving uploaded data we would end up in broken project with project version in DB but no actual data. --- server/mergin/sync/public_api_controller.py | 24 +++++++++++++++---- .../mergin/sync/public_api_v2_controller.py | 20 ++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 894101b3..8f142e71 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -991,26 +991,42 @@ def push_finish(transaction_id): ) db.session.add(pv) db.session.add(project) - db.session.commit() - # let's move uploaded files where they are expected to be - os.renames(files_dir, version_dir) + # move files before committing so a filesystem failure leaves the DB clean + if os.path.exists(files_dir): + os.renames(files_dir, version_dir) + + db.session.commit() logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) project_version_created.send(pv) push_finished.send(pv) - except (psycopg2.Error, FileNotFoundError, IntegrityError) as err: + except (psycopg2.Error, OSError, IntegrityError) as err: db.session.rollback() logging.exception( f"Failed to finish push for project: {project.id}, project version: {v_next_version}, " f"transaction id: {transaction_id}.: {str(err)}" ) + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) abort(422, "Failed to create new version: {}".format(str(err))) # catch exception during pg transaction so we can rollback and prevent PendingRollbackError during upload clean up except gevent.timeout.Timeout: db.session.rollback() + if ( + os.path.exists(version_dir) + and not ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count() + ): + move_to_tmp(version_dir) raise finally: # remove artifacts diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 0d34c9c3..ebd909ad 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -310,6 +310,15 @@ def create_project_version(id): upload.clear() return DataSyncError(failed_files=errors).response(422) + if os.path.exists(version_dir): + if ProjectVersion.query.filter_by( + project_id=project.id, name=next_version + ).count(): + return UploadError( + error=f"Version {v_next_version} already exists" + ).response(409) + move_to_tmp(version_dir) + try: # let's keep upload alive until all work is done so no one else can claim it with upload.heartbeat(5): @@ -324,17 +333,18 @@ def create_project_version(id): ) db.session.add(pv) db.session.add(project) - db.session.commit() - # let's move uploaded files where they are expected to be + # move files before committing so a filesystem failure leaves the DB clean if to_be_added_files or to_be_updated_files: temp_files_dir = os.path.join( upload.upload_dir, "files", v_next_version ) os.renames(temp_files_dir, version_dir) - # remove used chunks - # get chunks from added and updated files + db.session.commit() + + # remove used chunks only after commit — chunks belong to the now-committed version + if to_be_added_files or to_be_updated_files: chunks_ids = [] for file in to_be_added_files + to_be_updated_files: file_chunks = file.get("chunks", []) @@ -348,7 +358,7 @@ def create_project_version(id): push_finished.send(pv) except ( psycopg2.Error, - FileNotFoundError, + OSError, IntegrityError, ) as err: db.session.rollback() From 5a7129e7e90139474601c002ef98da0aab77441e Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Thu, 30 Apr 2026 07:50:05 +0200 Subject: [PATCH 09/12] Fix upload migrations and merge into single transaction --- .../e3a7f2b1c94d_add_upload_last_ping.py | 29 ------------------- ...23_update_upload_table_for_concurrency.py} | 19 ++++++++---- 2 files changed, 14 insertions(+), 34 deletions(-) delete mode 100644 server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py rename server/migrations/community/{f1d9e4a7b823_add_upload_transaction_id.py => f1d9e4a7b823_update_upload_table_for_concurrency.py} (60%) diff --git a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py b/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py deleted file mode 100644 index d53d4440..00000000 --- a/server/migrations/community/e3a7f2b1c94d_add_upload_last_ping.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Add last_ping to upload - -Revision ID: e3a7f2b1c94d -Revises: e3f1a9b2c4d6 -Create Date: 2026-04-14 00:00:00.000000 - -""" - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = "e3a7f2b1c94d" -down_revision = "e3f1a9b2c4d6" -branch_labels = None -depends_on = None - - -def upgrade(): - op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) - # backfill existing rows before adding NOT NULL constraint - op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL") - op.alter_column("upload", "last_ping", nullable=False) - - -def downgrade(): - # drop the column but required lockfiles will be missing - make sure all uploads are gone - op.drop_column("upload", "last_ping") diff --git a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py b/server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py similarity index 60% rename from server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py rename to server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py index d3797b8b..c99f26df 100644 --- a/server/migrations/community/f1d9e4a7b823_add_upload_transaction_id.py +++ b/server/migrations/community/f1d9e4a7b823_update_upload_table_for_concurrency.py @@ -1,7 +1,7 @@ -"""Add transaction_id to upload +"""Add transaction_id and last_ping columns to upload Revision ID: f1d9e4a7b823 -Revises: e3a7f2b1c94d +Revises: e3f1a9b2c4d6 Create Date: 2026-04-14 00:00:00.000000 """ @@ -13,7 +13,7 @@ # revision identifiers, used by Alembic. revision = "f1d9e4a7b823" -down_revision = "e3a7f2b1c94d" +down_revision = "e3f1a9b2c4d6" branch_labels = None depends_on = None @@ -22,9 +22,17 @@ def upgrade(): op.add_column( "upload", sa.Column("transaction_id", UUID(as_uuid=True), nullable=True) ) + op.add_column("upload", sa.Column("last_ping", sa.DateTime(), nullable=True)) + # backfill existing rows before adding NOT NULL constraint - op.execute("UPDATE upload SET transaction_id = id WHERE transaction_id IS NULL") + op.execute( + "UPDATE upload SET transaction_id = id::uuid WHERE transaction_id IS NULL;" + ) + op.execute("UPDATE upload SET last_ping = NOW() WHERE last_ping IS NULL;") + op.alter_column("upload", "transaction_id", nullable=False) + op.alter_column("upload", "last_ping", nullable=False) + op.create_index( op.f("ix_upload_transaction_id"), "upload", ["transaction_id"], unique=True ) @@ -32,5 +40,6 @@ def upgrade(): def downgrade(): op.drop_index(op.f("ix_upload_transaction_id"), table_name="upload") - # column is dropped but there could be orphan transaction folders, make sure upload table is empty + # column is dropped but there could be orphan transaction folders and required lockfiles will be missing, make sure upload table is empty op.drop_column("upload", "transaction_id") + op.drop_column("upload", "last_ping") From 3c42ce9c0f3197b13907dcd75917178aac00dd2f Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Thu, 30 Apr 2026 10:55:29 +0200 Subject: [PATCH 10/12] fix failing test --- server/mergin/tests/test_project_controller.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 3f232b08..60c36ee2 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -1690,7 +1690,7 @@ def copy_file_failing_for_geodiff(src, dest): "mergin.sync.storages.disk.copy_file", side_effect=copy_file_failing_for_geodiff, ): - resp = client.post("/v1/project/push/finish/{}".format(upload.id)) + resp = client.post("/v1/project/push/finish/{}".format(upload.transaction_id)) assert resp.status_code == 200 latest_version = upload.project.get_latest_version() file_meta = latest_version.changes.filter( From 5c5cbdddb55248590d852001ed466f7b6733b981 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 5 May 2026 09:45:30 +0200 Subject: [PATCH 11/12] Handle rename error in case of full disk --- server/mergin/sync/storages/disk.py | 6 +++++- server/mergin/tests/test_disk_utils.py | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/server/mergin/sync/storages/disk.py b/server/mergin/sync/storages/disk.py index f4cb34fc..80715ed6 100644 --- a/server/mergin/sync/storages/disk.py +++ b/server/mergin/sync/storages/disk.py @@ -114,7 +114,11 @@ def move_to_tmp(src, dest=None): else: root = tempfile.gettempdir() temp_path = os.path.join(root, "delete-me-" + dest, os.path.basename(src)) - os.renames(src, temp_path) + try: + os.renames(src, temp_path) + except OSError as rename_err: + logging.error(f"Failed to move {src} to tmp: {rename_err}") + return None else: raise return temp_path diff --git a/server/mergin/tests/test_disk_utils.py b/server/mergin/tests/test_disk_utils.py index dd485078..c9abd66f 100644 --- a/server/mergin/tests/test_disk_utils.py +++ b/server/mergin/tests/test_disk_utils.py @@ -2,10 +2,13 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +import errno +import logging import os import tempfile import shutil import pytest +from unittest.mock import patch from ..sync.storages.disk import copy_file, copy_dir, move_to_tmp from ..sync.utils import generate_checksum from . import test_project_dir @@ -85,3 +88,22 @@ def test_failures(): os.path.join(test_project_dir, "not_found"), os.path.join(tempfile.gettempdir(), "new_dir"), ) + + +def test_move_to_tmp_full_disk_on_fallback(app, tmp_path, caplog): + """Fallback rename on cross-device error logs error and returns None when disk is full.""" + cross_device_err = OSError(errno.EXDEV, "Invalid cross-device link") + no_space_err = OSError(errno.ENOSPC, "No space left on device") + + src = tmp_path / "test_file.gpkg" + src.touch() + + with caplog.at_level(logging.ERROR), patch( + "mergin.sync.storages.disk.os.renames", + side_effect=[cross_device_err, no_space_err], + ): + result = move_to_tmp(str(src)) + + assert result is None + assert "Failed to move" in caplog.text + assert str(src) in caplog.text From d14b8fa5bb0fdcf7581a1facd89cb1ff425e9835 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Wed, 6 May 2026 14:37:17 +0200 Subject: [PATCH 12/12] Bump 2026.4.0 --- server/mergin/version.py | 2 +- server/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/mergin/version.py b/server/mergin/version.py index 89b89699..2e64bc98 100644 --- a/server/mergin/version.py +++ b/server/mergin/version.py @@ -4,4 +4,4 @@ def get_version(): - return "2026.3.2" + return "2026.4.0" diff --git a/server/setup.py b/server/setup.py index c8c9dfb4..bc6aa257 100644 --- a/server/setup.py +++ b/server/setup.py @@ -6,7 +6,7 @@ setup( name="mergin", - version="2026.3.2", + version="2026.4.0", url="https://github.com/MerginMaps/mergin", license="AGPL-3.0-only", author="Lutra Consulting Limited",