Skip to content

Commit a788c04

Browse files
committed
feat(sync): enhance pull and push operations with improved error handling
1 parent 46bf439 commit a788c04

7 files changed

Lines changed: 117 additions & 34 deletions

File tree

src/storage/repo/graphql-client.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export class GraphQLClient {
5555

5656
// Process in batches to avoid GraphQL query complexity limits
5757
let apiCalls = 0;
58+
const totalBatches = Math.ceil(paths.length / GRAPHQL_BATCH_SIZE);
5859
for (let i = 0; i < paths.length; i += GRAPHQL_BATCH_SIZE) {
5960
const batch = paths.slice(i, i + GRAPHQL_BATCH_SIZE);
6061
const batchResult = await this.fetchFilesViaGraphQL(batch, branch, fetchBlob);
@@ -63,6 +64,11 @@ export class GraphQLClient {
6364
for (const [path, content] of Object.entries(batchResult)) {
6465
result[path] = content;
6566
}
67+
68+
// Log progress every 10 batches for large fetches
69+
if (totalBatches > 10 && apiCalls % 10 === 0) {
70+
logProgress(`GraphQL fetch progress: ${String(apiCalls)}/${String(totalBatches)} batches`);
71+
}
6672
}
6773

6874
const duration = Date.now() - startTime;

src/sync/engine/helpers.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,22 @@ export function buildLocalChecksums(
3838
}
3939

4040
/**
41-
* Build pull options, conditionally including localChecksums.
41+
* Build pull options, conditionally including localChecksums and localRemoteShas.
4242
*/
4343
export function buildPullOptions(
44-
base: Omit<PullOptions, 'localChecksums'>,
45-
localData?: CategoryData[]
44+
base: Omit<PullOptions, 'localChecksums' | 'localRemoteShas'>,
45+
localData?: CategoryData[],
46+
localState?: LocalSyncState | null
4647
): PullOptions {
4748
const localChecksums = buildLocalChecksums(localData);
49+
const opts: PullOptions = { ...base };
4850
if (localChecksums) {
49-
return { ...base, localChecksums };
51+
opts.localChecksums = localChecksums;
52+
}
53+
if (localState?.remoteShas) {
54+
opts.localRemoteShas = localState.remoteShas;
5055
}
51-
return base;
56+
return opts;
5257
}
5358

5459
/** Build push options with conditional remoteManifest */
@@ -58,6 +63,8 @@ export interface PushOptsBase {
5863
localState: LocalSyncState | null;
5964
passphrase: PassphraseOption;
6065
existingFiles: string[];
66+
/** Remote file SHAs for incremental push (filename -> git blob SHA) */
67+
remoteShas?: Record<string, string>;
6168
}
6269

6370
/** Execute push and return files for storage */

src/sync/engine/operations.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,22 @@ export async function executePushOperation(
4444
data: CategoryData[],
4545
remote?: Manifest
4646
): Promise<{ result: SyncResult; newState: LocalSyncState }> {
47-
const existing = (await ctx.backend.listFiles()).map((f) => f.filename);
47+
const remoteFiles = await ctx.backend.listFiles();
48+
const existing = remoteFiles.map((f) => f.filename);
49+
50+
// Build SHA map for comparison (filename -> git blob SHA)
51+
const remoteShas: Record<string, string> = {};
52+
for (const f of remoteFiles) {
53+
if (f.sha) remoteShas[f.filename] = f.sha;
54+
}
4855

4956
const opts = {
5057
localData: data,
5158
config: ctx.config,
5259
localState: ctx.localState,
5360
passphrase: buildCryptoOptions(ctx.passphrase, ctx.oldPassphrase),
5461
existingFiles: existing,
62+
remoteShas,
5563
};
5664
const { files, manifest, changedCategories } = executePush(opts, remote);
5765
const fileCount = Object.keys(files).length;
@@ -67,7 +75,7 @@ export async function executePushOperation(
6775

6876
/** Execute a pull operation and return updated local state */
6977
export async function executePullOperation(
70-
ctx: OperationContext,
78+
ctx: PushContext,
7179
remote?: Manifest,
7280
data?: CategoryData[]
7381
): Promise<{ result: SyncResult; newState: LocalSyncState | null }> {
@@ -79,12 +87,18 @@ export async function executePullOperation(
7987
passphrase: buildCryptoOptions(ctx.passphrase, ctx.oldPassphrase),
8088
backend: ctx.backend,
8189
},
82-
data
90+
data,
91+
ctx.localState
8392
);
84-
const { pulledData, changedCategories, tombstonedItems } = await pullCategories(opts);
93+
const { pulledData, changedCategories, tombstonedItems, remoteShas } = await pullCategories(opts);
8594
syncLog(`[SYNC] Pull: ${String(changedCategories.length)} categories`);
8695
const mergedData = mergeDataForState(data, pulledData);
8796
const newState = buildLocalState(remote, mergedData, ctx.getStorageId(), ctx.config.machineId);
97+
// Merge remote SHAs: keep existing ones and add/update from pull
98+
newState.remoteShas = {
99+
...(ctx.localState?.remoteShas ?? {}),
100+
...remoteShas,
101+
};
88102
const tombstoneIds = extractTombstoneIds(tombstonedItems);
89103
return {
90104
result: buildPullResult({ changedCategories, pulledData, tombstonedItems: tombstoneIds }),

src/sync/operations/pull.ts

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ export interface PullOptions {
2929
backend: StorageBackend;
3030
/** Map of category → item ID → checksum for existing local items */
3131
localChecksums?: Record<SyncCategory, Record<string, string>>;
32+
/** Map of category → filename → git SHA for incremental pull */
33+
localRemoteShas?: Partial<Record<SyncCategory, Record<string, string>>>;
3234
}
3335

3436
/**
@@ -39,6 +41,8 @@ export interface ExtendedPullResult extends PullResult {
3941
downloadedItems: Record<SyncCategory, string[]>;
4042
/** Items that should be deleted locally (tombstoned remotely) */
4143
tombstonedItems: Record<SyncCategory, Record<string, Tombstone>>;
44+
/** Remote SHAs for all synced items (for incremental pull) */
45+
remoteShas: Partial<Record<SyncCategory, Record<string, string>>>;
4246
}
4347

4448
/** Result accumulator for pull operation */
@@ -47,6 +51,7 @@ interface PullAccumulator {
4751
changedCategories: SyncCategory[];
4852
downloadedItems: Record<SyncCategory, string[]>;
4953
tombstonedItems: Record<SyncCategory, Record<string, Tombstone>>;
54+
remoteShas: Partial<Record<SyncCategory, Record<string, string>>>;
5055
}
5156

5257
/** Info about a file to fetch */
@@ -63,6 +68,7 @@ function createPullAccumulator(): PullAccumulator {
6368
changedCategories: [],
6469
downloadedItems: {} as Record<SyncCategory, string[]>,
6570
tombstonedItems: {} as Record<SyncCategory, Record<string, Tombstone>>,
71+
remoteShas: {},
6672
};
6773
}
6874

@@ -71,7 +77,8 @@ function recordItemPull(
7177
acc: PullAccumulator,
7278
cat: SyncCategory,
7379
data: ItemCategoryData,
74-
tombstones: Record<string, Tombstone>
80+
tombstones: Record<string, Tombstone>,
81+
shas: Record<string, string>
7582
): void {
7683
if (Object.keys(data.items).length > 0) {
7784
acc.pulledData.push(data);
@@ -81,24 +88,36 @@ function recordItemPull(
8188
if (Object.keys(tombstones).length > 0) {
8289
acc.tombstonedItems[cat] = tombstones;
8390
}
91+
// Always record SHAs (merge with existing for incremental)
92+
acc.remoteShas[cat] = { ...(acc.remoteShas[cat] ?? {}), ...shas };
8493
}
8594

86-
/** Build list of files to fetch, excluding tombstoned items */
95+
/** Build list of files to fetch, excluding tombstoned and unchanged items */
8796
function buildFilesToFetch(
8897
categoryFiles: StorageFile[],
89-
categoryTombstones: Record<string, Tombstone>
98+
categoryTombstones: Record<string, Tombstone>,
99+
localShas?: Record<string, string>
90100
): FetchInfo[] {
91101
const filesToFetch: FetchInfo[] = [];
102+
let skipped = 0;
92103
for (const file of categoryFiles) {
93104
const itemId = getItemIdFromFilename(file.filename);
94105
if (!itemId) continue;
95106
if (itemId in categoryTombstones) continue;
107+
// Skip if local has same SHA (unchanged)
108+
if (localShas && file.sha && localShas[itemId] === file.sha) {
109+
skipped++;
110+
continue;
111+
}
96112
filesToFetch.push({
97113
itemId,
98114
filename: file.filename,
99115
checksum: file.sha ?? '',
100116
});
101117
}
118+
if (skipped > 0) {
119+
syncLog(`[PULL] Skipped ${String(skipped)} unchanged files`);
120+
}
102121
return filesToFetch;
103122
}
104123

@@ -127,8 +146,27 @@ function processDownloadedFiles(
127146
return { items, checksums };
128147
}
129148

149+
/** Build SHA map from fetched files */
150+
function buildShaMap(filesToFetch: FetchInfo[]): Record<string, string> {
151+
const shas: Record<string, string> = {};
152+
for (const f of filesToFetch) {
153+
if (f.checksum) shas[f.itemId] = f.checksum;
154+
}
155+
return shas;
156+
}
157+
158+
/** Load tombstones for a category */
159+
async function loadCategoryTombstones(
160+
backend: StorageBackend,
161+
cat: SyncCategory
162+
): Promise<Record<string, Tombstone>> {
163+
const content = await backend.getFile(TOMBSTONES_FILENAME);
164+
const file = parseTombstonesFile(content);
165+
return getCategoryTombstones(file, cat);
166+
}
167+
130168
export async function pullCategories(options: PullOptions): Promise<ExtendedPullResult> {
131-
const { manifest, enabledCategories, backend, localChecksums } = options;
169+
const { manifest, enabledCategories, backend, localRemoteShas } = options;
132170
const acc = createPullAccumulator();
133171

134172
for (const [category, info] of Object.entries(manifest.categories)) {
@@ -138,7 +176,7 @@ export async function pullCategories(options: PullOptions): Promise<ExtendedPull
138176
continue;
139177
}
140178
syncLog(`[PULL] Processing ${cat} (type: ${info.type})`);
141-
await pullTreeIndexedCategory(cat, info, localChecksums?.[cat] ?? {}, backend, acc);
179+
await pullTreeIndexedCategory(cat, info, localRemoteShas?.[cat], backend, acc);
142180
}
143181

144182
return acc;
@@ -150,7 +188,7 @@ export async function pullCategories(options: PullOptions): Promise<ExtendedPull
150188
async function pullTreeIndexedCategory(
151189
cat: SyncCategory,
152190
info: TreeIndexedCategoryInfo,
153-
_localChecksums: Record<string, string>,
191+
localShas: Record<string, string> | undefined,
154192
backend: StorageBackend,
155193
acc: PullAccumulator
156194
): Promise<void> {
@@ -160,33 +198,28 @@ async function pullTreeIndexedCategory(
160198
const allFiles = await backend.listFiles();
161199
const categoryFiles = allFiles.filter((f) => f.filename.startsWith(info.pathPrefix));
162200
syncLog(`[PULL] ${cat}: found ${String(categoryFiles.length)} files in tree`);
163-
164201
if (categoryFiles.length === 0) return;
165202

166-
// Load tombstones from separate file
167-
const tombstonesContent = await backend.getFile(TOMBSTONES_FILENAME);
168-
const tombstonesFile = parseTombstonesFile(tombstonesContent);
169-
const categoryTombstones = getCategoryTombstones(tombstonesFile, cat);
203+
// Load tombstones and build file list
204+
const categoryTombstones = await loadCategoryTombstones(backend, cat);
170205
syncLog(`[PULL] ${cat}: ${String(Object.keys(categoryTombstones).length)} tombstones`);
171-
172-
// Build list of files to download (excluding tombstoned items)
173-
const filesToFetch = buildFilesToFetch(categoryFiles, categoryTombstones);
206+
const filesToFetch = buildFilesToFetch(categoryFiles, categoryTombstones, localShas);
207+
syncLog(`[PULL] ${cat}: ${String(filesToFetch.length)} files to fetch`);
174208

175209
if (filesToFetch.length === 0) {
176-
if (Object.keys(categoryTombstones).length > 0) {
177-
acc.tombstonedItems[cat] = categoryTombstones;
178-
}
210+
if (Object.keys(categoryTombstones).length > 0) acc.tombstonedItems[cat] = categoryTombstones;
179211
return;
180212
}
181213

182214
// Bulk download and process files
215+
syncLog(`[PULL] ${cat}: starting bulk download...`);
183216
const contents = await backend.getFiles(filesToFetch.map((f) => f.filename));
184217
const { items, checksums } = processDownloadedFiles(filesToFetch, contents);
185218
syncLog(
186219
`[PULL] ${cat}: downloaded ${String(Object.keys(items).length)}/${String(filesToFetch.length)} items`
187220
);
188221

189-
// Record results
222+
// Record results with SHAs for incremental sync
190223
const data: ItemCategoryData = { category: cat, type: 'items', items, checksums };
191-
recordItemPull(acc, cat, data, categoryTombstones);
224+
recordItemPull(acc, cat, data, categoryTombstones, buildShaMap(filesToFetch));
192225
}

src/sync/operations/push.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/** Push Operation - Pushes local data to remote storage. */
22

3-
import { packItem, buildItemInfo, diffItems } from '../item-packer.js';
3+
import { packItem, buildItemInfo, diffItems, getItemFilename } from '../item-packer.js';
44
import {
55
processTombstonesForPush,
66
removeItemsById,
@@ -45,11 +45,17 @@ export function preparePushData(opts: PreparePushOptions): PreparePushResult {
4545
const newFiles = new Set<string>();
4646
const changedCategories: SyncCategory[] = [];
4747

48+
// Get previously synced checksums to detect what actually changed
49+
const lastSyncedChecksums = localState?.itemChecksums;
50+
4851
for (const catData of localData) {
4952
if (!config.sync[catData.category]) continue;
50-
const filenames = packCategoryData(catData, remoteManifest, ctx);
53+
const prevChecksums = lastSyncedChecksums?.[catData.category];
54+
const filenames = packCategoryData(catData, remoteManifest, ctx, prevChecksums);
5155
for (const f of filenames) newFiles.add(f);
52-
changedCategories.push(catData.category);
56+
if (filenames.length > 0) {
57+
changedCategories.push(catData.category);
58+
}
5359
}
5460

5561
markOrphanedFiles(ctx.files, existingFiles, newFiles);
@@ -61,10 +67,23 @@ export function preparePushData(opts: PreparePushOptions): PreparePushResult {
6167
function packCategoryData(
6268
catData: CategoryData,
6369
_remoteManifest: Manifest | undefined,
64-
ctx: PushContext
70+
ctx: PushContext,
71+
prevChecksums?: Record<string, string>
6572
): string[] {
66-
// All categories now use tree-indexed sync, no remote item tracking needed
67-
return packItemCategoryData(catData, {}, ctx, {});
73+
// Use previous checksums to build pseudo ItemInfo for diff comparison
74+
const remoteItems: Record<string, ItemInfo> = {};
75+
if (prevChecksums) {
76+
for (const [itemId, checksum] of Object.entries(prevChecksums)) {
77+
remoteItems[itemId] = {
78+
filename: getItemFilename(catData.category, itemId),
79+
checksum,
80+
size: 0,
81+
lastModified: '',
82+
lastModifiedBy: '',
83+
};
84+
}
85+
}
86+
return packItemCategoryData(catData, remoteItems, ctx, {});
6887
}
6988

7089
/** Result of processing items for push */

src/sync/operations/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export interface PreparePushOptions {
7171
passphrase: PassphraseOption;
7272
existingFiles?: string[];
7373
remoteManifest?: Manifest;
74+
/** Remote file SHAs for incremental push (filename -> git blob SHA) */
75+
remoteShas?: Record<string, string>;
7476
}
7577

7678
export { type SyncResult, type Manifest, type SyncCategory, type LocalSyncState };

src/types/sync.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ export interface LocalSyncState {
1818
baseVersions: Partial<Record<SyncCategory, string>>; // JSON stringified data
1919
// Per-item tracking for deletion detection (sessions, messages)
2020
itemChecksums?: Partial<Record<SyncCategory, Record<string, string>>>;
21+
// Remote git blob SHAs for incremental pull (skip unchanged files)
22+
remoteShas?: Partial<Record<SyncCategory, Record<string, string>>>;
2123
}
2224

2325
// ============================================================================

0 commit comments

Comments
 (0)