From ca0900c51a5ad1257b65bb26fa74468d60342000 Mon Sep 17 00:00:00 2001 From: Siva Date: Mon, 11 May 2026 23:18:04 +0530 Subject: [PATCH 1/3] feat: harden add-node workflow for data safety - Add `PeerCatchupResource` to block add-node progress until the new node's replication slot catches up via `spock.progress.remote_lsn`, preventing data loss during failover after node addition - Advance replication origin alongside slot during add-node to ensure consistent origin tracking on the subscriber - Treat disabled/down subscriptions as transient in `wait_for_sync_event` to avoid false failures mid-add-node - Bump Postgres image tags to Spock 5.0.8 - Add E2E tests covering add-node replication data safety --- e2e/add_node_data_safety_test.go | 116 ++++++++++++++++++ ...wo_nodes_to_three_nodes_with_populate.json | 28 ++++- .../database/operations/populate_nodes.go | 15 +++ .../operations/populate_nodes_test.go | 44 +++++++ .../database/peer_catchup_resource.go | 112 +++++++++++++++++ .../replication_origin_advance_resource.go | 99 +++++++++++++++ ...lication_slot_advance_from_cts_resource.go | 13 +- server/internal/database/resources.go | 2 + .../database/wait_for_sync_event_resource.go | 13 +- server/internal/orchestrator/swarm/images.go | 6 +- server/internal/postgres/create_db.go | 46 +++++++ 11 files changed, 484 insertions(+), 10 deletions(-) create mode 100644 e2e/add_node_data_safety_test.go create mode 100644 server/internal/database/peer_catchup_resource.go create mode 100644 server/internal/database/replication_origin_advance_resource.go diff --git a/e2e/add_node_data_safety_test.go b/e2e/add_node_data_safety_test.go new file mode 100644 index 00000000..b74cb341 --- /dev/null +++ b/e2e/add_node_data_safety_test.go @@ -0,0 +1,116 @@ +//go:build e2e_test + +package e2e + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/jackc/pgx/v5" + controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestAddNodeOriginAdvanced verifies that after adding a node the replication +// origin on the new subscriber has been advanced past 0/0. A zeroed origin +// causes the apply worker to start from the beginning of WAL, producing +// duplicate-key errors or silently overwriting rows. +// +// Covers: Change 1 — EnsureReplicationOriginExists + AdvanceReplicationOrigin +// wired into ReplicationSlotAdvanceFromCTSResource. +// Ref: zodan.sql:2071-2073, 2183-2185; spock PR #397. +func TestAddNodeOriginAdvanced(t *testing.T) { + t.Parallel() + + const ( + username = "admin" + password = "password" + dbName = "origin_adv_db" + ) + + ctx, cancel := context.WithTimeout(t.Context(), 7*time.Minute) + defer cancel() + + hostIDs := fixture.HostIDs() + db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ + Spec: &controlplane.DatabaseSpec{ + DatabaseName: dbName, + Port: pointerTo(0), + PatroniPort: pointerTo(0), + DatabaseUsers: []*controlplane.DatabaseUserSpec{{ + Username: username, + Password: pointerTo(password), + DbOwner: pointerTo(true), + Attributes: []string{"LOGIN", "SUPERUSER"}, + }}, + Nodes: []*controlplane.DatabaseNodeSpec{ + {Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[0])}}, + {Name: "n2", HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[1])}}, + }, + }, + }) + + // Write rows on n2 so its WAL position is meaningfully ahead of the slot's + // consistent_point. This gives the origin advancement a non-trivial LSN. + n2Opts := ConnectionOptions{ + Matcher: And(WithNode("n2"), WithRole("primary")), + Username: username, + Password: password, + } + db.WithConnection(ctx, n2Opts, t, func(conn *pgx.Conn) { + _, err := conn.Exec(ctx, `CREATE TABLE origin_probe (id INT PRIMARY KEY, v TEXT)`) + require.NoError(t, err) + + for i := 1; i <= 100; i++ { + _, err = conn.Exec(ctx, `INSERT INTO origin_probe VALUES ($1, $2)`, i, fmt.Sprintf("r%d", i)) + require.NoError(t, err) + } + }) + + // Add n3 with n1 as source. + db.Spec.Nodes = append(db.Spec.Nodes, &controlplane.DatabaseNodeSpec{ + Name: "n3", + HostIds: []controlplane.Identifier{controlplane.Identifier(hostIDs[2])}, + SourceNode: pointerTo("n1"), + }) + require.NoError(t, db.Update(ctx, UpdateOptions{Spec: db.Spec})) + + // The replication slot spk__n2_sub_n2_n3 lives on n2. + // The origin with the same name lives on n3 (subscriber side). + slotName := e2eReplicationSlotName(dbName, "n2", "n3") + + n3Opts := ConnectionOptions{ + Matcher: And(WithNode("n3"), WithRole("primary")), + Username: username, + Password: password, + } + db.WithConnection(ctx, n3Opts, t, func(conn *pgx.Conn) { + // Query progress; COALESCE returns '0/0' when the origin is absent or + // has never been advanced, so a single assert covers both failure modes. + var lsn string + err := conn.QueryRow(ctx, ` + SELECT COALESCE( + (SELECT pg_replication_origin_progress($1, false)::text + FROM pg_replication_origin WHERE roname = $1), + '0/0' + )`, slotName, + ).Scan(&lsn) + require.NoError(t, err) + + assert.NotEqual(t, "0/0", lsn, + "replication origin %q on n3 should be advanced past 0/0 (got %s); "+ + "a zeroed origin risks the apply worker replaying historical WAL", + slotName, lsn) + }) +} + +// e2eReplicationSlotName mirrors postgres.ReplicationSlotName without +// importing the server package from the e2e test binary. +// Format: spk___sub__ +func e2eReplicationSlotName(databaseName, providerNode, subscriberNode string) string { + return fmt.Sprintf("spk_%s_%s_sub_%s_%s", + databaseName, providerNode, providerNode, subscriberNode) +} diff --git a/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json b/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json index f51bec04..869f0e4a 100644 --- a/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json +++ b/server/internal/database/operations/golden_test/TestUpdateDatabase/two_nodes_to_three_nodes_with_populate.json @@ -97,6 +97,12 @@ } ], [ + { + "type": "create", + "resource_id": "database.peer_catchup::n1:n2:test", + "reason": "does_not_exist", + "diff": null + }, { "type": "create", "resource_id": "database.wait_for_sync_event::n2:n1:test", @@ -143,6 +149,14 @@ "reason": "does_not_exist", "diff": null } + ], + [ + { + "type": "create", + "resource_id": "database.replication_origin_advance::n2:n3:test", + "reason": "does_not_exist", + "diff": null + } ] ], [ @@ -211,7 +225,7 @@ [ { "type": "delete", - "resource_id": "database.replication_slot_advance_from_cts::n2:n3:test", + "resource_id": "database.replication_origin_advance::n2:n3:test", "diff": null }, { @@ -226,6 +240,13 @@ "resource_id": "database.dump_roles::n1", "diff": null }, + { + "type": "delete", + "resource_id": "database.replication_slot_advance_from_cts::n2:n3:test", + "diff": null + } + ], + [ { "type": "delete", "resource_id": "database.lag_tracker_commit_ts::n2:n3:test", @@ -247,6 +268,11 @@ } ], [ + { + "type": "delete", + "resource_id": "database.peer_catchup::n1:n2:test", + "diff": null + }, { "type": "delete", "resource_id": "database.wait_for_sync_event::n2:n1:test", diff --git a/server/internal/database/operations/populate_nodes.go b/server/internal/database/operations/populate_nodes.go index dba4f867..9aa84eef 100644 --- a/server/internal/database/operations/populate_nodes.go +++ b/server/internal/database/operations/populate_nodes.go @@ -38,6 +38,7 @@ func PopulateNode(node *NodeResources, existingNodeNames []string) (*resource.St peerWaitForSync = append( peerWaitForSync, database.WaitForSyncEventResourceIdentifier(peer, node.SourceNode, dbName), + database.PeerCatchupResourceIdentifier(node.SourceNode, peer, dbName), ) } @@ -125,6 +126,13 @@ func addPeerResources( ProviderNode: peerNode, SubscriberNode: sourceNode, }, + // Belt-and-suspenders: also wait using remote_lsn, which + // tracks actual commit application rather than WAL receipt. + &database.PeerCatchupResource{ + DatabaseName: dbName, + SourceNode: sourceNode, + PeerNode: peerNode, + }, // After the new node has caught up to the source node, we advance the // replication slots we created earlier. &database.LagTrackerCommitTimestampResource{ @@ -144,6 +152,13 @@ func addPeerResources( ProviderNode: peerNode, SubscriberNode: newNode, }, + // Origin advance runs on the subscriber's host; must be separate from + // slot advance which runs on the provider's host. + &database.ReplicationOriginAdvanceResource{ + DatabaseName: dbName, + ProviderNode: peerNode, + SubscriberNode: newNode, + }, ) } diff --git a/server/internal/database/operations/populate_nodes_test.go b/server/internal/database/operations/populate_nodes_test.go index 6230a24f..7e41ee9d 100644 --- a/server/internal/database/operations/populate_nodes_test.go +++ b/server/internal/database/operations/populate_nodes_test.go @@ -114,6 +114,11 @@ func TestPopulateNode(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -127,6 +132,7 @@ func TestPopulateNode(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), }, }, &database.SyncEventResource{ @@ -152,6 +158,11 @@ func TestPopulateNode(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n3", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n3", + }, }, nil, ), @@ -304,6 +315,11 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -317,6 +333,7 @@ func TestPopulateNodes(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), }, }, &database.SyncEventResource{ @@ -342,6 +359,11 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n3", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n3", + }, }, nil, ), @@ -446,11 +468,21 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n2", + }, &database.WaitForSyncEventResource{ DatabaseName: "test", ProviderNode: "n3", SubscriberNode: "n1", }, + &database.PeerCatchupResource{ + DatabaseName: "test", + SourceNode: "n1", + PeerNode: "n3", + }, &database.ReplicationSlotResource{ DatabaseName: "test", ProviderNode: "n1", @@ -464,7 +496,9 @@ func TestPopulateNodes(t *testing.T) { SyncData: true, ExtraDependencies: []resource.Identifier{ database.WaitForSyncEventResourceIdentifier("n2", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n2", "test"), database.WaitForSyncEventResourceIdentifier("n3", "n1", "test"), + database.PeerCatchupResourceIdentifier("n1", "n3", "test"), }, }, &database.SyncEventResource{ @@ -498,11 +532,21 @@ func TestPopulateNodes(t *testing.T) { ProviderNode: "n2", SubscriberNode: "n4", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n2", + SubscriberNode: "n4", + }, &database.ReplicationSlotAdvanceFromCTSResource{ DatabaseName: "test", ProviderNode: "n3", SubscriberNode: "n4", }, + &database.ReplicationOriginAdvanceResource{ + DatabaseName: "test", + ProviderNode: "n3", + SubscriberNode: "n4", + }, }, nil, ), diff --git a/server/internal/database/peer_catchup_resource.go b/server/internal/database/peer_catchup_resource.go new file mode 100644 index 00000000..11d5666e --- /dev/null +++ b/server/internal/database/peer_catchup_resource.go @@ -0,0 +1,112 @@ +package database + +import ( + "context" + "fmt" + "time" + + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*PeerCatchupResource)(nil) + +const ResourceTypePeerCatchup resource.Type = "database.peer_catchup" + +func PeerCatchupResourceIdentifier(sourceNode, peerNode, databaseName string) resource.Identifier { + return resource.Identifier{ + Type: ResourceTypePeerCatchup, + ID: fmt.Sprintf("%s:%s:%s", sourceNode, peerNode, databaseName), + } +} + +// PeerCatchupResource waits until the source node's apply progress from the +// peer node has reached the peer's sync event LSN. This ensures the COPY +// snapshot (Phase 5 source→new subscription) includes all peer writes up to +// the slot creation point, preventing data loss on add-node. +// +// Uses spock.progress.remote_lsn (apply progress at last committed +// transaction) rather than received_lsn, which can advance on keepalive +// messages before commits have been applied. +// +// Ref: zodan.sql lines 1455–1523, spock PR #392 +type PeerCatchupResource struct { + DatabaseName string `json:"database_name"` + SourceNode string `json:"source_node"` // node where we check progress + PeerNode string `json:"peer_node"` // peer whose commits must be applied +} + +func (r *PeerCatchupResource) ResourceVersion() string { return "1" } +func (r *PeerCatchupResource) DiffIgnore() []string { return nil } + +func (r *PeerCatchupResource) Executor() resource.Executor { + return resource.PrimaryExecutor(r.SourceNode) +} + +func (r *PeerCatchupResource) Identifier() resource.Identifier { + return PeerCatchupResourceIdentifier(r.SourceNode, r.PeerNode, r.DatabaseName) +} + +func (r *PeerCatchupResource) Dependencies() []resource.Identifier { + return []resource.Identifier{ + SyncEventResourceIdentifier(r.PeerNode, r.SourceNode, r.DatabaseName), + } +} + +func (r *PeerCatchupResource) TypeDependencies() []resource.Type { + return nil +} + +func (r *PeerCatchupResource) Refresh(ctx context.Context, rc *resource.Context) error { + syncEvent, err := resource.FromContext[*SyncEventResource]( + rc, + SyncEventResourceIdentifier(r.PeerNode, r.SourceNode, r.DatabaseName), + ) + if err != nil { + return fmt.Errorf("failed to get sync event for peer %q: %w", r.PeerNode, err) + } + if syncEvent.SyncEventLsn == "" { + return resource.ErrNotFound + } + + source, err := GetPrimaryInstance(ctx, rc, r.SourceNode) + if err != nil { + return fmt.Errorf("failed to get source instance for node %q: %w", r.SourceNode, err) + } + conn, err := source.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to source node %q: %w", r.SourceNode, err) + } + defer conn.Close(ctx) + + const pollInterval = 500 * time.Millisecond + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + reached, err := postgres.SpockProgressReachedLSN(r.PeerNode, syncEvent.SyncEventLsn). + Scalar(ctx, conn) + if err != nil { + return fmt.Errorf("failed to query spock progress for peer %q: %w", r.PeerNode, err) + } + if reached { + return nil + } + + time.Sleep(pollInterval) + } +} + +func (r *PeerCatchupResource) Create(ctx context.Context, rc *resource.Context) error { + return r.Refresh(ctx, rc) +} + +func (r *PeerCatchupResource) Update(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *PeerCatchupResource) Delete(ctx context.Context, rc *resource.Context) error { + return nil +} diff --git a/server/internal/database/replication_origin_advance_resource.go b/server/internal/database/replication_origin_advance_resource.go new file mode 100644 index 00000000..9b688c23 --- /dev/null +++ b/server/internal/database/replication_origin_advance_resource.go @@ -0,0 +1,99 @@ +package database + +import ( + "context" + "fmt" + + "github.com/pgEdge/control-plane/server/internal/postgres" + "github.com/pgEdge/control-plane/server/internal/resource" +) + +var _ resource.Resource = (*ReplicationOriginAdvanceResource)(nil) + +const ResourceTypeReplicationOriginAdvance resource.Type = "database.replication_origin_advance" + +func ReplicationOriginAdvanceResourceIdentifier(providerNode, subscriberNode, databaseName string) resource.Identifier { + return resource.Identifier{ + Type: ResourceTypeReplicationOriginAdvance, + ID: fmt.Sprintf("%s:%s:%s", providerNode, subscriberNode, databaseName), + } +} + +// ReplicationOriginAdvanceResource advances the replication origin on the +// subscriber to the LSN that ReplicationSlotAdvanceFromCTSResource recorded +// after advancing the provider-side slot. Both must be updated together to +// prevent the apply worker from replaying historical WAL from 0/0. +// +// Runs on the subscriber's host (cross-host connections are not allowed, so +// this must be separate from ReplicationSlotAdvanceFromCTSResource which runs +// on the provider's host). +type ReplicationOriginAdvanceResource struct { + DatabaseName string `json:"database_name"` + ProviderNode string `json:"provider_node"` + SubscriberNode string `json:"subscriber_node"` +} + +func (r *ReplicationOriginAdvanceResource) ResourceVersion() string { return "1" } +func (r *ReplicationOriginAdvanceResource) DiffIgnore() []string { return nil } + +func (r *ReplicationOriginAdvanceResource) Executor() resource.Executor { + return resource.PrimaryExecutor(r.SubscriberNode) +} + +func (r *ReplicationOriginAdvanceResource) Identifier() resource.Identifier { + return ReplicationOriginAdvanceResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName) +} + +func (r *ReplicationOriginAdvanceResource) Dependencies() []resource.Identifier { + return []resource.Identifier{ + ReplicationSlotAdvanceFromCTSResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName), + } +} + +func (r *ReplicationOriginAdvanceResource) TypeDependencies() []resource.Type { return nil } + +func (r *ReplicationOriginAdvanceResource) Refresh(ctx context.Context, rc *resource.Context) error { + return nil +} + +func (r *ReplicationOriginAdvanceResource) Create(ctx context.Context, rc *resource.Context) error { + slotAdvance, err := resource.FromContext[*ReplicationSlotAdvanceFromCTSResource]( + rc, + ReplicationSlotAdvanceFromCTSResourceIdentifier(r.ProviderNode, r.SubscriberNode, r.DatabaseName), + ) + if err != nil { + return fmt.Errorf("failed to get slot advance resource: %w", err) + } + if slotAdvance.AdvancedToLSN == "" { + // Slot advance was skipped (slot active or no commit timestamp) — nothing to do. + return nil + } + + subscriber, err := GetPrimaryInstance(ctx, rc, r.SubscriberNode) + if err != nil { + return fmt.Errorf("failed to get subscriber instance for node %q: %w", r.SubscriberNode, err) + } + conn, err := subscriber.Connection(ctx, rc, r.DatabaseName) + if err != nil { + return fmt.Errorf("failed to connect to subscriber %q: %w", r.SubscriberNode, err) + } + defer conn.Close(ctx) + + slotName := postgres.ReplicationSlotName(r.DatabaseName, r.ProviderNode, r.SubscriberNode) + + if err := postgres.EnsureReplicationOriginExists(slotName).Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to ensure replication origin on subscriber %q: %w", r.SubscriberNode, err) + } + if err := postgres.AdvanceReplicationOrigin(slotName, slotAdvance.AdvancedToLSN).Exec(ctx, conn); err != nil { + return fmt.Errorf("failed to advance replication origin on subscriber %q: %w", r.SubscriberNode, err) + } + return nil +} + +func (r *ReplicationOriginAdvanceResource) Update(ctx context.Context, rc *resource.Context) error { + return r.Create(ctx, rc) +} + +func (r *ReplicationOriginAdvanceResource) Delete(ctx context.Context, rc *resource.Context) error { + return nil +} diff --git a/server/internal/database/replication_slot_advance_from_cts_resource.go b/server/internal/database/replication_slot_advance_from_cts_resource.go index 3b0cc238..665f415d 100644 --- a/server/internal/database/replication_slot_advance_from_cts_resource.go +++ b/server/internal/database/replication_slot_advance_from_cts_resource.go @@ -25,16 +25,22 @@ func ReplicationSlotAdvanceFromCTSResourceIdentifier(providerNode, subscriberNod // ReplicationSlotAdvanceFromCTSResource advances the replication slot on the provider // to the LSN derived from the commit timestamp captured in lag_tracker. +// AdvancedToLSN is written as output after a successful advance so that +// ReplicationOriginAdvanceResource (running on the subscriber) can read it. type ReplicationSlotAdvanceFromCTSResource struct { DatabaseName string `json:"database_name"` ProviderNode string `json:"provider_node"` // slot lives here SubscriberNode string `json:"subscriber_node"` // target/receiver node + + // Output: LSN the slot was advanced to (empty if advance was skipped). + AdvancedToLSN string `json:"advanced_to_lsn,omitempty"` } func (r *ReplicationSlotAdvanceFromCTSResource) ResourceVersion() string { return "1" } -// No diff-ignore fields needed; this always executes idempotently when asked. -func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string { return nil } +func (r *ReplicationSlotAdvanceFromCTSResource) DiffIgnore() []string { + return []string{"advanced_to_lsn"} +} // Execute on the provider node (the slot exists there). func (r *ReplicationSlotAdvanceFromCTSResource) Executor() resource.Executor { @@ -142,6 +148,9 @@ func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc * return fmt.Errorf("failed to advance replication slot: %w", err) } + // Record the LSN so ReplicationOriginAdvanceResource (running on the + // subscriber's host) can advance the origin to the same position. + r.AdvancedToLSN = targetLSN return nil } diff --git a/server/internal/database/resources.go b/server/internal/database/resources.go index 13f4665a..998b1190 100644 --- a/server/internal/database/resources.go +++ b/server/internal/database/resources.go @@ -12,6 +12,8 @@ func RegisterResourceTypes(registry *resource.Registry) { resource.RegisterResourceType[*ReplicationSlotResource](registry, ResourceTypeReplicationSlot) resource.RegisterResourceType[*LagTrackerCommitTimestampResource](registry, ResourceTypeLagTrackerCommitTS) resource.RegisterResourceType[*ReplicationSlotAdvanceFromCTSResource](registry, ResourceTypeReplicationSlotAdvanceFromCTS) + resource.RegisterResourceType[*ReplicationOriginAdvanceResource](registry, ResourceTypeReplicationOriginAdvance) + resource.RegisterResourceType[*PeerCatchupResource](registry, ResourceTypePeerCatchup) resource.RegisterResourceType[*SwitchoverResource](registry, ResourceTypeSwitchover) resource.RegisterResourceType[*PostgresDatabaseResource](registry, ResourceTypePostgresDatabase) resource.RegisterResourceType[*DumpRolesResource](registry, ResourceTypeDumpRoles) diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index 3189658e..b8117f94 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -85,10 +85,11 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con return ctx.Err() } - // Check subscription health first — fail early if broken. - // Only statuses where the spock worker is running can make - // progress. The others ("disabled", "down") mean sync will - // never complete. + // Check subscription health. "disabled" and "down" are transient + // during add-node: the apply worker may not have started yet. + // Keep polling until the context deadline rather than failing fast. + // Only statuses where the worker is confirmed broken warrant an + // immediate error. status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode). Scalar(ctx, subscriberConn) if err != nil { @@ -100,6 +101,10 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con switch status { case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: // Worker is running — continue waiting + case postgres.SubStatusDisabled, postgres.SubStatusDown: + // Worker not yet started; transient — keep polling + time.Sleep(pollInterval) + continue default: return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", status, r.ProviderNode, r.SubscriberNode) diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index a474c252..a206b0b1 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -36,7 +36,7 @@ func NewVersions(cfg config.Config) *Versions { }) versions.addImage(ds.MustPgEdgeVersion("16.13", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "16.13-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "16.13-spock5.0.8-standard-1"), }) // pg17 @@ -50,7 +50,7 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "17.8-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("17.9", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "17.9-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "17.9-spock5.0.8-standard-1"), }) // pg18 @@ -64,7 +64,7 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "18.2-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("18.3", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "18.3-spock5.0.7-standard-1"), + PgEdgeImage: imageTag(cfg, "18.3-spock5.0.8-standard-1"), }) versions.defaultVersion = ds.MustPgEdgeVersion("18.3", "5") diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index 90d2e1d5..cd6e0107 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -402,6 +402,52 @@ func AdvanceReplicationSlotToLSN(databaseName, providerNode, subscriberNode stri } } +func EnsureReplicationOriginExists(slotName string) ConditionalStatement { + return ConditionalStatement{ + If: Query[bool]{ + SQL: "SELECT NOT EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = @slot_name);", + Args: pgx.NamedArgs{"slot_name": slotName}, + }, + Then: Statement{ + SQL: "SELECT pg_replication_origin_create(@slot_name);", + Args: pgx.NamedArgs{"slot_name": slotName}, + }, + } +} + +func AdvanceReplicationOrigin(slotName, lsn string) Statement { + return Statement{ + SQL: "SELECT pg_replication_origin_advance(@slot_name, @lsn::pg_lsn);", + Args: pgx.NamedArgs{ + "slot_name": slotName, + "lsn": lsn, + }, + } +} + +// SpockProgressReachedLSN reports whether the local node's apply progress +// from the named peer has reached targetLSN. Uses remote_lsn (the LSN of the +// last applied commit in Spock 5.x) rather than received_lsn, which can +// advance on keepalive messages before any commits have been applied. +func SpockProgressReachedLSN(peerNodeName, targetLSN string) Query[bool] { + return Query[bool]{ + SQL: ` + SELECT COALESCE( + (SELECT p.remote_lsn >= @target_lsn::pg_lsn + FROM spock.progress p + JOIN spock.node n ON n.node_id = p.remote_node_id + WHERE p.node_id = (SELECT node_id FROM spock.node_info()) + AND n.node_name = @peer_node_name), + false + ) + `, + Args: pgx.NamedArgs{ + "peer_node_name": peerNodeName, + "target_lsn": targetLSN, + }, + } +} + // GetSubscriptionStatus returns the current status of a specific subscription func GetSubscriptionStatus(providerNode, subscriberNode string) Query[string] { return Query[string]{ From 1ee14f7507405454afc339792de6bd09c47a5a31 Mon Sep 17 00:00:00 2001 From: Siva Date: Thu, 14 May 2026 18:26:50 +0530 Subject: [PATCH 2/3] addressing AI review comments --- .../database/replication_slot_advance_from_cts_resource.go | 2 ++ server/internal/database/wait_for_sync_event_resource.go | 6 +++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/internal/database/replication_slot_advance_from_cts_resource.go b/server/internal/database/replication_slot_advance_from_cts_resource.go index 665f415d..4b3fb710 100644 --- a/server/internal/database/replication_slot_advance_from_cts_resource.go +++ b/server/internal/database/replication_slot_advance_from_cts_resource.go @@ -68,6 +68,8 @@ func (r *ReplicationSlotAdvanceFromCTSResource) Refresh(ctx context.Context, rc } func (r *ReplicationSlotAdvanceFromCTSResource) Create(ctx context.Context, rc *resource.Context) error { + r.AdvancedToLSN = "" + // Fetch commit timestamp from lag tracker resource lagTracker, err := resource.FromContext[*LagTrackerCommitTimestampResource]( rc, diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index b8117f94..45e97098 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -103,7 +103,11 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con // Worker is running — continue waiting case postgres.SubStatusDisabled, postgres.SubStatusDown: // Worker not yet started; transient — keep polling - time.Sleep(pollInterval) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pollInterval): + } continue default: return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", From 24c56a68c9e58458fba265eb08703f9bae4c05d9 Mon Sep 17 00:00:00 2001 From: Siva Date: Mon, 18 May 2026 16:16:41 +0530 Subject: [PATCH 3/3] updating spock 5.0.8 --- e2e/add_node_data_safety_test.go | 1 - e2e/minor_version_upgrade_test.go | 4 ++-- .../internal/database/peer_catchup_resource.go | 2 -- server/internal/orchestrator/swarm/images.go | 18 +++++++++++++----- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/e2e/add_node_data_safety_test.go b/e2e/add_node_data_safety_test.go index b74cb341..df56b4ba 100644 --- a/e2e/add_node_data_safety_test.go +++ b/e2e/add_node_data_safety_test.go @@ -21,7 +21,6 @@ import ( // // Covers: Change 1 — EnsureReplicationOriginExists + AdvanceReplicationOrigin // wired into ReplicationSlotAdvanceFromCTSResource. -// Ref: zodan.sql:2071-2073, 2183-2185; spock PR #397. func TestAddNodeOriginAdvanced(t *testing.T) { t.Parallel() diff --git a/e2e/minor_version_upgrade_test.go b/e2e/minor_version_upgrade_test.go index 74360fac..dfbd8e57 100644 --- a/e2e/minor_version_upgrade_test.go +++ b/e2e/minor_version_upgrade_test.go @@ -25,8 +25,8 @@ func TestMinorVersionUpgrade(t *testing.T) { username := "admin" password := "password" - fromVersion := "18.2" - toVersion := "18.3" + fromVersion := "18.3" + toVersion := "18.4" ctx, cancel := context.WithTimeout(t.Context(), 5*time.Minute) defer cancel() diff --git a/server/internal/database/peer_catchup_resource.go b/server/internal/database/peer_catchup_resource.go index 11d5666e..597b0e7f 100644 --- a/server/internal/database/peer_catchup_resource.go +++ b/server/internal/database/peer_catchup_resource.go @@ -28,8 +28,6 @@ func PeerCatchupResourceIdentifier(sourceNode, peerNode, databaseName string) re // Uses spock.progress.remote_lsn (apply progress at last committed // transaction) rather than received_lsn, which can advance on keepalive // messages before commits have been applied. -// -// Ref: zodan.sql lines 1455–1523, spock PR #392 type PeerCatchupResource struct { DatabaseName string `json:"database_name"` SourceNode string `json:"source_node"` // node where we check progress diff --git a/server/internal/orchestrator/swarm/images.go b/server/internal/orchestrator/swarm/images.go index a206b0b1..9fcf20a5 100644 --- a/server/internal/orchestrator/swarm/images.go +++ b/server/internal/orchestrator/swarm/images.go @@ -34,9 +34,11 @@ func NewVersions(cfg config.Config) *Versions { versions.addImage(ds.MustPgEdgeVersion("16.12", "5"), &Images{ PgEdgeImage: imageTag(cfg, "16.12-spock5.0.5-standard-1"), }) - versions.addImage(ds.MustPgEdgeVersion("16.13", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "16.13-spock5.0.8-standard-1"), + PgEdgeImage: imageTag(cfg, "16.13-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("16.14", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "16.14-spock5.0.8-standard-1"), }) // pg17 @@ -50,7 +52,10 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "17.8-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("17.9", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "17.9-spock5.0.8-standard-1"), + PgEdgeImage: imageTag(cfg, "17.9-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("17.10", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "17.10-spock5.0.8-standard-1"), }) // pg18 @@ -64,10 +69,13 @@ func NewVersions(cfg config.Config) *Versions { PgEdgeImage: imageTag(cfg, "18.2-spock5.0.5-standard-1"), }) versions.addImage(ds.MustPgEdgeVersion("18.3", "5"), &Images{ - PgEdgeImage: imageTag(cfg, "18.3-spock5.0.8-standard-1"), + PgEdgeImage: imageTag(cfg, "18.3-spock5.0.6-standard-2"), + }) + versions.addImage(ds.MustPgEdgeVersion("18.4", "5"), &Images{ + PgEdgeImage: imageTag(cfg, "18.4-spock5.0.8-standard-1"), }) - versions.defaultVersion = ds.MustPgEdgeVersion("18.3", "5") + versions.defaultVersion = ds.MustPgEdgeVersion("18.4", "5") return versions }