Skip to content

Commit bb96907

Browse files
committed
Fix: Fetch remote shards before push to enable proper merging
CRITICAL BUG FIX: - Push was overwriting remote data because getRemoteItemData only handled type='items' but we switched to type='sharded' - This caused machines to completely overwrite each other's data instead of merging Solution: - Added fetchResolvedShards() to load shard data before push - Pass resolved shards through the push pipeline - Now push properly merges local items with remote items Refactored sync-engine.ts into operations.ts to stay under 200 lines. This implements CRDT-style additive merging where both machines' items are preserved instead of one overwriting the other.
1 parent d020028 commit bb96907

6 files changed

Lines changed: 240 additions & 75 deletions

File tree

eslint.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export default tseslint.config(
2525
rules: {
2626
// ═══════════════════════════════════════════════════════════════════════
2727
// COMPLEXITY RULES - Keep code readable for humans and LLMs
28+
// ⚠️ DO NOT MODIFY THESE LIMITS - Refactor code instead of changing rules
2829
// ═══════════════════════════════════════════════════════════════════════
2930

3031
// Max 200 lines per file - keeps files small and LLM-friendly

src/sync/engine/helpers.ts

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ import type {
1111
SyncConfig,
1212
Tombstone,
1313
} from '../../types/index.js';
14-
import type { CategoryData, PassphraseOption } from '../operations/types.js';
14+
import { isShardedRef } from '../../types/index.js';
15+
import type { CategoryData, PassphraseOption, ResolvedShard } from '../operations/types.js';
1516
import { isItemCategoryData } from '../operations/types.js';
1617
import type { PullOptions } from '../operations/pull.js';
1718
import type { PreparePushResult } from '../operations/push.js';
1819
import { preparePushData } from '../operations/push.js';
20+
import type { StorageBackend } from '../../storage/index.js';
21+
import { fetchCategoryShard } from './manifest.js';
22+
import { syncLog } from './logger.js';
1923

2024
/**
2125
* Build checksums map from local item category data for merge-based pull.
@@ -63,8 +67,19 @@ export interface PushOptsBase {
6367
}
6468

6569
/** Execute push and return files for storage */
66-
export function executePush(opts: PushOptsBase, remoteManifest?: Manifest): PreparePushResult {
67-
return preparePushData(remoteManifest ? { ...opts, remoteManifest } : opts);
70+
export function executePush(
71+
opts: PushOptsBase,
72+
remoteManifest?: Manifest,
73+
resolvedShards?: Record<SyncCategory, ResolvedShard>
74+
): PreparePushResult {
75+
const pushOpts = { ...opts } as Parameters<typeof preparePushData>[0];
76+
if (remoteManifest) {
77+
pushOpts.remoteManifest = remoteManifest;
78+
}
79+
if (resolvedShards) {
80+
pushOpts.resolvedShards = resolvedShards;
81+
}
82+
return preparePushData(pushOpts);
6883
}
6984

7085
/** Convert push files to storage format */
@@ -105,3 +120,38 @@ export function extractTombstoneIds(
105120
}
106121
return result;
107122
}
123+
124+
/**
125+
* Fetch resolved shard data for sharded categories.
126+
* This ensures we merge with existing remote data instead of overwriting.
127+
*/
128+
export async function fetchResolvedShards(
129+
backend: StorageBackend,
130+
remote?: Manifest
131+
): Promise<Record<SyncCategory, ResolvedShard> | undefined> {
132+
if (!remote) return undefined;
133+
134+
const shardedCategories: SyncCategory[] = ['sessions', 'messages'];
135+
const resolved: Record<SyncCategory, ResolvedShard> = {} as Record<SyncCategory, ResolvedShard>;
136+
let hasAny = false;
137+
138+
for (const category of shardedCategories) {
139+
const info = remote.categories[category];
140+
if (info && isShardedRef(info)) {
141+
syncLog(`[PUSH] Fetching shard for ${category}: ${info.shardFile}`);
142+
const shard = await fetchCategoryShard(backend, info.shardFile);
143+
if (shard) {
144+
resolved[category] = {
145+
items: shard.items,
146+
tombstones: shard.tombstones,
147+
};
148+
hasAny = true;
149+
syncLog(
150+
`[PUSH] Loaded ${String(Object.keys(shard.items).length)} remote ${category} for merge`
151+
);
152+
}
153+
}
154+
}
155+
156+
return hasAny ? resolved : undefined;
157+
}

src/sync/engine/operations.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Sync Engine Operations
3+
*
4+
* Push, pull, and conflict resolution operations for the sync engine.
5+
*/
6+
7+
import type { StorageBackend } from '../../storage/index.js';
8+
import type { Manifest, SyncResult, LocalSyncState, SyncConfig } from '../../types/index.js';
9+
import type { CategoryData } from '../operations/types.js';
10+
import { pullCategories } from '../operations/pull.js';
11+
import { mergeAllCategories } from '../operations/merge-operation.js';
12+
import { syncLog } from './logger.js';
13+
import { MANIFEST_FILENAME } from './types.js';
14+
import { buildLocalState, getStorageFilesMap, mergeDataForState } from './state.js';
15+
import {
16+
buildPushResult,
17+
buildPullResult,
18+
buildConflictResult,
19+
buildErrorResult,
20+
} from './result.js';
21+
import {
22+
buildPullOptions,
23+
executePush,
24+
toStorageFiles,
25+
buildCryptoOptions,
26+
extractTombstoneIds,
27+
fetchResolvedShards,
28+
} from './helpers.js';
29+
30+
export interface OperationContext {
31+
backend: StorageBackend;
32+
config: SyncConfig;
33+
passphrase: string | undefined;
34+
oldPassphrase: string | undefined;
35+
getStorageId: () => string;
36+
}
37+
38+
export interface PushContext extends OperationContext {
39+
localState: LocalSyncState | null;
40+
}
41+
42+
/** Execute a push operation and return updated local state */
43+
export async function executePushOperation(
44+
ctx: PushContext,
45+
data: CategoryData[],
46+
remote?: Manifest
47+
): Promise<{ result: SyncResult; newState: LocalSyncState }> {
48+
const existing = (await ctx.backend.listFiles()).map((f) => f.filename);
49+
50+
// Pre-fetch shards for sharded categories to enable proper merging
51+
const resolvedShards = await fetchResolvedShards(ctx.backend, remote);
52+
53+
const opts = {
54+
localData: data,
55+
config: ctx.config,
56+
localState: ctx.localState,
57+
passphrase: buildCryptoOptions(ctx.passphrase, ctx.oldPassphrase),
58+
existingFiles: existing,
59+
};
60+
const { files, manifest, changedCategories } = executePush(opts, remote, resolvedShards);
61+
const fileCount = Object.keys(files).length;
62+
syncLog(`[SYNC] Push: ${String(fileCount)} files, ${String(existing.length)} existing`);
63+
await ctx.backend.updateFiles(
64+
toStorageFiles(files, MANIFEST_FILENAME, JSON.stringify(manifest, null, 2))
65+
);
66+
const newState = buildLocalState(manifest, data, ctx.getStorageId(), ctx.config.machineId);
67+
return { result: buildPushResult(changedCategories), newState };
68+
}
69+
70+
/** Execute a pull operation and return updated local state */
71+
export async function executePullOperation(
72+
ctx: OperationContext,
73+
remote?: Manifest,
74+
data?: CategoryData[]
75+
): Promise<{ result: SyncResult; newState: LocalSyncState | null }> {
76+
if (!remote) return { result: buildErrorResult('No remote data found'), newState: null };
77+
const sf = await getStorageFilesMap(ctx.backend);
78+
const opts = buildPullOptions(
79+
{
80+
manifest: remote,
81+
storageFiles: sf,
82+
enabledCategories: ctx.config.sync,
83+
passphrase: buildCryptoOptions(ctx.passphrase, ctx.oldPassphrase),
84+
backend: ctx.backend,
85+
},
86+
data
87+
);
88+
const { pulledData, changedCategories, tombstonedItems } = await pullCategories(opts);
89+
syncLog(`[SYNC] Pull: ${String(changedCategories.length)} categories`);
90+
const mergedData = mergeDataForState(data, pulledData);
91+
const newState = buildLocalState(remote, mergedData, ctx.getStorageId(), ctx.config.machineId);
92+
const tombstoneIds = extractTombstoneIds(tombstonedItems);
93+
return {
94+
result: buildPullResult({ changedCategories, pulledData, tombstonedItems: tombstoneIds }),
95+
newState,
96+
};
97+
}
98+
99+
export interface ConflictContext extends PushContext {
100+
pushFn: (data: CategoryData[], retry: number, manifest: Manifest) => Promise<SyncResult>;
101+
}
102+
103+
/** Handle conflict by merging and pushing */
104+
export async function executeConflictOperation(
105+
ctx: ConflictContext,
106+
data: CategoryData[],
107+
manifest: Manifest,
108+
retry: number
109+
): Promise<SyncResult> {
110+
const sf = await getStorageFilesMap(ctx.backend);
111+
const mergeCtx = {
112+
remoteManifest: manifest,
113+
storageFiles: sf,
114+
localState: ctx.localState,
115+
passphrase: buildCryptoOptions(ctx.passphrase, ctx.oldPassphrase),
116+
machineId: ctx.config.machineId,
117+
backend: ctx.backend,
118+
};
119+
const { mergedData, conflicts } = await mergeAllCategories(data, mergeCtx);
120+
return buildConflictResult(await ctx.pushFn(mergedData, retry, manifest), conflicts);
121+
}

src/sync/engine/sync-engine.ts

Lines changed: 31 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import type { StorageBackend } from '../../storage/index.js';
22
import { RepoConflictError } from '../../storage/index.js';
33
import { syncLog } from './logger.js';
4-
import { pullCategories } from '../operations/pull.js';
5-
import { mergeAllCategories } from '../operations/merge-operation.js';
64
import {
75
createEmptyManifest,
86
type Manifest,
@@ -11,32 +9,18 @@ import {
119
} from '../../types/index.js';
1210
import type { CategoryData } from '../operations/types.js';
1311
import type { SyncEngineOptions } from './types.js';
14-
import { MANIFEST_FILENAME } from './types.js';
1512
import { fetchManifest } from './manifest.js';
16-
import {
17-
buildLocalState,
18-
isLockedByOther,
19-
getStorageFilesMap,
20-
mergeDataForState,
21-
} from './state.js';
22-
import {
23-
buildPushResult,
24-
buildPullResult,
25-
buildConflictResult,
26-
buildErrorResult,
27-
buildSkippedResult,
28-
handleSyncError,
29-
} from './result.js';
13+
import { isLockedByOther } from './state.js';
14+
import { buildErrorResult, buildSkippedResult, handleSyncError } from './result.js';
3015
import { checkMaxRetries, calculateBackoff, sleep } from './retry.js';
3116
import { acquireLock, releaseLock, getLockHolder } from '../local-lock.js';
32-
import {
33-
buildPullOptions,
34-
executePush,
35-
toStorageFiles,
36-
buildCryptoOptions,
37-
extractTombstoneIds,
38-
} from './helpers.js';
3917
import { determineAction, executeRoute } from './routing.js';
18+
import {
19+
executePushOperation,
20+
executePullOperation,
21+
executeConflictOperation,
22+
type PushContext,
23+
} from './operations.js';
4024

4125
export { type CategoryData };
4226

@@ -109,10 +93,22 @@ export class SyncEngine {
10993
private hasStorageConfigured(): boolean {
11094
return Boolean(this.config.repoOwner && this.config.repoName);
11195
}
96+
11297
private getStorageId(): string {
11398
return `${this.config.repoOwner ?? ''}/${this.config.repoName ?? ''}`;
11499
}
115100

101+
private getOperationContext(): PushContext {
102+
return {
103+
backend: this.backend,
104+
config: this.config,
105+
passphrase: this.passphrase,
106+
oldPassphrase: this.oldPassphrase,
107+
getStorageId: () => this.getStorageId(),
108+
localState: this.localState,
109+
};
110+
}
111+
116112
private async syncWithRetry(data: CategoryData[], retry: number): Promise<SyncResult> {
117113
if (!this.hasStorageConfigured()) return buildErrorResult('No storage configured');
118114
try {
@@ -149,61 +145,29 @@ export class SyncEngine {
149145
}
150146

151147
private async performPush(data: CategoryData[], remote?: Manifest): Promise<SyncResult> {
152-
const existing = (await this.backend.listFiles()).map((f) => f.filename);
153-
const opts = {
154-
localData: data,
155-
config: this.config,
156-
localState: this.localState,
157-
passphrase: buildCryptoOptions(this.passphrase, this.oldPassphrase),
158-
existingFiles: existing,
159-
};
160-
const { files, manifest, changedCategories } = executePush(opts, remote);
161-
const fileCount = Object.keys(files).length;
162-
syncLog(`[SYNC] Push: ${String(fileCount)} files, ${String(existing.length)} existing`);
163-
await this.backend.updateFiles(
164-
toStorageFiles(files, MANIFEST_FILENAME, JSON.stringify(manifest, null, 2))
165-
);
166-
this.localState = buildLocalState(manifest, data, this.getStorageId(), this.config.machineId);
167-
return buildPushResult(changedCategories);
148+
const ctx = this.getOperationContext();
149+
const { result, newState } = await executePushOperation(ctx, data, remote);
150+
this.localState = newState;
151+
return result;
168152
}
169153

170154
private async performPull(remote?: Manifest, data?: CategoryData[]): Promise<SyncResult> {
171155
const m = remote ?? (await fetchManifest(this.backend));
172-
if (!m) return buildErrorResult('No remote data found');
173-
const sf = await getStorageFilesMap(this.backend);
174-
const opts = buildPullOptions(
175-
{
176-
manifest: m,
177-
storageFiles: sf,
178-
enabledCategories: this.config.sync,
179-
passphrase: buildCryptoOptions(this.passphrase, this.oldPassphrase),
180-
backend: this.backend,
181-
},
182-
data
183-
);
184-
const { pulledData, changedCategories, tombstonedItems } = await pullCategories(opts);
185-
syncLog(`[SYNC] Pull: ${String(changedCategories.length)} categories`);
186-
const mergedData = mergeDataForState(data, pulledData);
187-
this.localState = buildLocalState(m, mergedData, this.getStorageId(), this.config.machineId);
188-
const tombstoneIds = extractTombstoneIds(tombstonedItems);
189-
return buildPullResult({ changedCategories, pulledData, tombstonedItems: tombstoneIds });
156+
const ctx = this.getOperationContext();
157+
const { result, newState } = await executePullOperation(ctx, m ?? undefined, data);
158+
if (newState) this.localState = newState;
159+
return result;
190160
}
191161

192162
private async handleConflict(
193163
data: CategoryData[],
194164
m: Manifest,
195165
retry: number
196166
): Promise<SyncResult> {
197-
const sf = await getStorageFilesMap(this.backend);
198167
const ctx = {
199-
remoteManifest: m,
200-
storageFiles: sf,
201-
localState: this.localState,
202-
passphrase: buildCryptoOptions(this.passphrase, this.oldPassphrase),
203-
machineId: this.config.machineId,
204-
backend: this.backend,
168+
...this.getOperationContext(),
169+
pushFn: (d: CategoryData[], r: number, man: Manifest) => this.push(d, r, man),
205170
};
206-
const { mergedData, conflicts } = await mergeAllCategories(data, ctx);
207-
return buildConflictResult(await this.push(mergedData, retry, m), conflicts);
171+
return executeConflictOperation(ctx, data, m, retry);
208172
}
209173
}

0 commit comments

Comments
 (0)