Skip to content

Commit b83c059

Browse files
committed
feat(sync): implement per-item sync with tombstone support
- Introduced per-item category data structure for granular sync of sessions and messages. - Added tombstone management to track deleted items across machines. - Implemented sharding for large item collections to optimize manifest size. - Enhanced push and pull operations to handle item-level changes and tombstone propagation. - Updated manifest structure to support sharded categories and tombstone metadata. - Refactored packing and unpacking logic for items, including checksum calculations. - Added helper functions for managing tombstones, including creation, expiration checks, and merging. - Updated sync configuration to include tombstone grace period settings.
1 parent 60e10bf commit b83c059

22 files changed

Lines changed: 1794 additions & 355 deletions

src/data/category-loader.ts

Lines changed: 130 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,20 @@
22
* Category Loader
33
*
44
* Load data for sync categories.
5+
* Supports both blob-based (config, state, etc.) and per-item (sessions, messages) loading.
56
*/
67

8+
import { readFile, readdir, stat } from 'node:fs/promises';
9+
import { join, basename } from 'node:path';
710
import type { SyncCategory, PathConfig, SyncConfig } from '../types/index.js';
11+
import type { LocalSyncState } from '../types/sync.js';
12+
import type { Tombstone } from '../types/manifest.js';
13+
import { shouldUseItemSync } from '../types/manifest.js';
814
import { getCategoryPaths } from '../types/paths.js';
9-
import type { CategoryData } from '../sync/operations/types.js';
15+
import type { CategoryData, BlobCategoryData, ItemCategoryData } from '../sync/operations/types.js';
1016
import { loadSinglePath, getPathKey } from './directory-loader.js';
17+
import { calculateChecksum } from '../sync/item-packer.js';
18+
import { createTombstone, detectLocalDeletions } from '../sync/tombstone.js';
1119

1220
export interface LoadedData {
1321
categories: CategoryData[];
@@ -20,12 +28,22 @@ export interface LoadError {
2028
error: Error;
2129
}
2230

31+
export interface LoadOptions {
32+
/** Previous local state for deletion detection */
33+
localState?: LocalSyncState | null;
34+
/** Machine ID for tombstone creation */
35+
machineId?: string;
36+
/** Tombstone grace period in days (default: 30) */
37+
tombstoneGraceDays?: number;
38+
}
39+
2340
/**
2441
* Load all data for enabled categories.
2542
*/
2643
export async function loadLocalData(
2744
pathConfig: PathConfig,
28-
enabledCategories: SyncConfig['sync']
45+
enabledCategories: SyncConfig['sync'],
46+
options: LoadOptions = {}
2947
): Promise<LoadedData> {
3048
const categoryPaths = getCategoryPaths(pathConfig);
3149
const categories: CategoryData[] = [];
@@ -35,7 +53,7 @@ export async function loadLocalData(
3553
if (!enabledCategories[category as SyncCategory]) continue;
3654

3755
try {
38-
const data = await loadCategoryData(category as SyncCategory, paths);
56+
const data = await loadCategoryData(category as SyncCategory, paths, options);
3957
if (data) categories.push(data);
4058
} catch (error) {
4159
const firstPath = paths[0];
@@ -54,11 +72,26 @@ export async function loadLocalData(
5472

5573
/**
5674
* Load data for a single category.
75+
* Routes to blob or per-item loading based on category.
5776
*/
5877
async function loadCategoryData(
5978
category: SyncCategory,
60-
paths: string[]
79+
paths: string[],
80+
options: LoadOptions
6181
): Promise<CategoryData | null> {
82+
if (shouldUseItemSync(category)) {
83+
return loadItemCategoryData(category, paths, options);
84+
}
85+
return loadBlobCategoryData(category, paths);
86+
}
87+
88+
/**
89+
* Load blob-based category data (legacy approach).
90+
*/
91+
async function loadBlobCategoryData(
92+
category: SyncCategory,
93+
paths: string[]
94+
): Promise<BlobCategoryData | null> {
6295
const categoryData: Record<string, unknown> = {};
6396
let hasData = false;
6497

@@ -73,5 +106,97 @@ async function loadCategoryData(
73106
if (!hasData) return null;
74107

75108
const isJsonl = category === 'state';
76-
return { category, data: JSON.stringify(categoryData), isJsonl };
109+
return { category, type: 'blob', data: JSON.stringify(categoryData), isJsonl };
110+
}
111+
112+
/**
113+
* Load per-item category data (sessions, messages).
114+
* Each file is loaded as a separate item with its own checksum.
115+
* Also detects locally deleted items and creates tombstones for them.
116+
*/
117+
async function loadItemCategoryData(
118+
category: SyncCategory,
119+
paths: string[],
120+
options: LoadOptions
121+
): Promise<ItemCategoryData | null> {
122+
const items: Record<string, string> = {};
123+
const checksums: Record<string, string> = {};
124+
125+
for (const basePath of paths) {
126+
await loadItemsFromPath(basePath, '', items, checksums);
127+
}
128+
129+
// Detect locally deleted items by comparing with previous state
130+
const tombstones: Record<string, Tombstone> = {};
131+
const { localState, machineId, tombstoneGraceDays } = options;
132+
const prevChecksums = localState?.itemChecksums;
133+
const prevCategoryChecksums = prevChecksums ? prevChecksums[category] : undefined;
134+
if (prevCategoryChecksums && machineId) {
135+
const previousItemIds = new Set(Object.keys(prevCategoryChecksums));
136+
const currentItemIds = new Set(Object.keys(items));
137+
const deletedIds = detectLocalDeletions(currentItemIds, previousItemIds);
138+
139+
for (const itemId of deletedIds) {
140+
tombstones[itemId] = createTombstone(itemId, machineId, tombstoneGraceDays);
141+
}
142+
}
143+
144+
// Return null only if no items AND no tombstones
145+
if (Object.keys(items).length === 0 && Object.keys(tombstones).length === 0) {
146+
return null;
147+
}
148+
149+
const result: ItemCategoryData = { category, type: 'items', items, checksums };
150+
if (Object.keys(tombstones).length > 0) {
151+
result.tombstones = tombstones;
152+
}
153+
return result;
154+
}
155+
156+
/**
157+
* Recursively load items from a path.
158+
* @param basePath - The base path being scanned
159+
* @param relativePath - Current relative path from basePath
160+
* @param items - Map to populate with item content
161+
* @param checksums - Map to populate with item checksums
162+
*/
163+
async function loadItemsFromPath(
164+
basePath: string,
165+
relativePath: string,
166+
items: Record<string, string>,
167+
checksums: Record<string, string>
168+
): Promise<void> {
169+
const fullPath = relativePath ? join(basePath, relativePath) : basePath;
170+
171+
try {
172+
const stats = await stat(fullPath);
173+
174+
if (stats.isFile()) {
175+
// Load file as an item
176+
const content = await readFile(fullPath, 'utf-8');
177+
const itemId = getItemId(basePath, relativePath);
178+
items[itemId] = content;
179+
checksums[itemId] = calculateChecksum(content);
180+
} else if (stats.isDirectory()) {
181+
// Recurse into directory
182+
const entries = await readdir(fullPath, { withFileTypes: true });
183+
for (const entry of entries) {
184+
if (entry.name.startsWith('.')) continue; // Skip hidden files
185+
const childRelPath = relativePath ? join(relativePath, entry.name) : entry.name;
186+
await loadItemsFromPath(basePath, childRelPath, items, checksums);
187+
}
188+
}
189+
} catch {
190+
// Path doesn't exist - skip
191+
}
192+
}
193+
194+
/**
195+
* Generate a unique item ID from base path and relative path.
196+
* Example: basePath=/data/storage/session, relativePath=abc123.json → session/abc123.json
197+
*/
198+
function getItemId(basePath: string, relativePath: string): string {
199+
const baseDir = basename(basePath);
200+
if (!relativePath) return baseDir;
201+
return `${baseDir}/${relativePath}`;
77202
}

src/data/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
export { loadLocalData } from './category-loader.js';
88
export type { LoadedData, LoadError } from './category-loader.js';
99

10-
export { writeLocalData } from './writer.js';
10+
export { writeLocalData, deleteTombstonedItems } from './writer.js';
1111

1212
export {
1313
loadConfig,

src/data/writer.ts

Lines changed: 107 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,103 @@
22
* Data Writer
33
*
44
* Writes synced data back to local filesystem.
5+
* Supports both blob-based overwrite and per-item merge writes.
56
*/
67

7-
import { mkdir, writeFile } from 'node:fs/promises';
8-
import { join } from 'node:path';
9-
import type { PathConfig } from '../types/index.js';
8+
import { mkdir, writeFile, unlink } from 'node:fs/promises';
9+
import { join, dirname } from 'node:path';
10+
import type { PathConfig, SyncCategory } from '../types/index.js';
1011
import { getCategoryPaths } from '../types/paths.js';
11-
import type { CategoryData } from '../sync/operations/types.js';
12+
import type { CategoryData, ItemCategoryData } from '../sync/operations/types.js';
13+
import { isBlobCategoryData, isItemCategoryData } from '../sync/operations/types.js';
1214

1315
/**
1416
* Write synced data back to local filesystem.
17+
* - Blob categories: overwrites local data
18+
* - Item categories: merges new items (does NOT overwrite existing)
1519
*/
1620
export async function writeLocalData(
1721
pathConfig: PathConfig,
1822
categories: CategoryData[]
1923
): Promise<void> {
2024
const categoryPaths = getCategoryPaths(pathConfig);
2125

22-
for (const { category, data } of categories) {
23-
const paths = categoryPaths[category];
26+
for (const catData of categories) {
27+
const paths = categoryPaths[catData.category];
2428
if (paths.length === 0) continue;
2529

26-
const parsed = JSON.parse(data) as Record<string, unknown>;
27-
await writeCategoryData(paths, parsed);
30+
if (isItemCategoryData(catData)) {
31+
// Per-item merge write
32+
await writeItemCategoryData(catData.category, paths, catData);
33+
} else if (isBlobCategoryData(catData)) {
34+
// Blob overwrite
35+
const parsed = JSON.parse(catData.data) as Record<string, unknown>;
36+
await writeBlobCategoryData(paths, parsed);
37+
}
2838
}
2939
}
3040

3141
/**
32-
* Write category data to filesystem.
42+
* Write blob-based category data to filesystem (overwrites).
3343
*/
34-
async function writeCategoryData(paths: string[], data: Record<string, unknown>): Promise<void> {
44+
async function writeBlobCategoryData(
45+
paths: string[],
46+
data: Record<string, unknown>
47+
): Promise<void> {
3548
for (const [key, value] of Object.entries(data)) {
3649
const targetPath = findTargetPath(paths, key);
3750
if (targetPath === undefined) continue;
3851
await writeEntry(targetPath, value);
3952
}
4053
}
4154

55+
/**
56+
* Write per-item category data to filesystem (merges).
57+
* Only writes new items that were pulled from remote.
58+
*/
59+
async function writeItemCategoryData(
60+
_category: SyncCategory,
61+
paths: string[],
62+
catData: ItemCategoryData
63+
): Promise<void> {
64+
// Find the base directory for this category
65+
const basePath = paths[0];
66+
if (!basePath) return;
67+
68+
// Each item ID is like "session/abc123.json" or "message/xyz/msg.json"
69+
// We need to write to the appropriate location
70+
for (const [itemId, content] of Object.entries(catData.items)) {
71+
const targetPath = resolveItemPath(basePath, itemId);
72+
await writeItemFile(targetPath, content);
73+
}
74+
}
75+
76+
/**
77+
* Resolve the full filesystem path for an item.
78+
* Item IDs are like "session/abc123.json" → basePath + abc123.json
79+
* Or "message/session_id/msg.json" → basePath + session_id/msg.json
80+
*/
81+
function resolveItemPath(basePath: string, itemId: string): string {
82+
// Item ID format: "basedir/relative/path.json"
83+
// We need to strip the first segment (which matches basePath's basename)
84+
const parts = itemId.split('/');
85+
parts.shift(); // Remove the first segment (e.g., "session" or "message")
86+
const relativePath = parts.join('/');
87+
return join(basePath, relativePath);
88+
}
89+
90+
/**
91+
* Write a single item file.
92+
*/
93+
async function writeItemFile(filePath: string, content: string): Promise<void> {
94+
try {
95+
await ensureParentDir(filePath);
96+
await writeFile(filePath, content, 'utf-8');
97+
} catch (error) {
98+
console.error(`Failed to write item ${filePath}:`, error);
99+
}
100+
}
101+
42102
/**
43103
* Find target path for a key.
44104
*/
@@ -104,10 +164,44 @@ function serializeContent(filePath: string, value: unknown): string {
104164
* Ensure parent directory exists.
105165
*/
106166
async function ensureParentDir(filePath: string): Promise<void> {
107-
const parts = filePath.split('/');
108-
parts.pop();
109-
const parentDir = parts.join('/');
167+
const parentDir = dirname(filePath);
110168
if (parentDir) {
111169
await mkdir(parentDir, { recursive: true });
112170
}
113171
}
172+
173+
/**
174+
* Delete tombstoned items from local filesystem.
175+
* Called after pull when remote has tombstones for items that should be deleted locally.
176+
*/
177+
export async function deleteTombstonedItems(
178+
pathConfig: PathConfig,
179+
tombstonedItems: Partial<Record<SyncCategory, string[]>>
180+
): Promise<void> {
181+
const categoryPaths = getCategoryPaths(pathConfig);
182+
183+
for (const [category, itemIds] of Object.entries(tombstonedItems)) {
184+
const paths = categoryPaths[category as SyncCategory];
185+
const basePath = paths[0];
186+
if (!basePath) continue;
187+
188+
for (const itemId of itemIds) {
189+
const targetPath = resolveItemPath(basePath, itemId);
190+
await deleteItemFile(targetPath);
191+
}
192+
}
193+
}
194+
195+
/**
196+
* Delete a single item file (silently ignores if not found).
197+
*/
198+
async function deleteItemFile(filePath: string): Promise<void> {
199+
try {
200+
await unlink(filePath);
201+
} catch (error) {
202+
// ENOENT = file doesn't exist, which is fine
203+
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
204+
console.error(`Failed to delete item ${filePath}:`, error);
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)