Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions src/datastream/datastream-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RulesEngineClient } from '../rules-engine';
import { Logger } from '../logger';
import { LazyEmitter } from './emitter';
import { partialCompany, partialUser, deepCopyCompany as deepCopyCompanyFn } from './merge';
import * as serializers from '../serialization';

// Import cache providers from the cache module
import type { CacheProvider } from '../cache/types';
Expand Down Expand Up @@ -705,7 +706,12 @@ export class DataStreamClient extends LazyEmitter {
return;
}
} else {
company = message.data as Schematic.RulesengineCompany;
try {
company = serializers.RulesengineCompany.parseOrThrow(message.data);
} catch (error) {
this.logger.warn(`Failed to deserialize company payload: ${error}`);
return;
}
}

if (!company) {
Expand Down Expand Up @@ -768,7 +774,12 @@ export class DataStreamClient extends LazyEmitter {
return;
}
} else {
user = message.data as Schematic.RulesengineUser;
try {
user = serializers.RulesengineUser.parseOrThrow(message.data);
} catch (error) {
this.logger.warn(`Failed to deserialize user payload: ${error}`);
return;
}
}

if (!user) {
Expand Down Expand Up @@ -808,13 +819,28 @@ export class DataStreamClient extends LazyEmitter {
* handleFlagsMessage processes bulk flags messages
*/
private async handleFlagsMessage(message: DataStreamResp): Promise<void> {
const flags = message.data as Schematic.RulesengineFlag[];
if (!Array.isArray(flags)) {
const rawFlags = message.data as unknown[];

if (!Array.isArray(rawFlags)) {
this.logger.warn('Expected flags array in bulk flags message');
return;
}

const flags: Schematic.RulesengineFlag[] = [];
let parseFailureCount = 0;
let firstFailure: unknown = undefined;
for (const raw of rawFlags) {
try {
flags.push(serializers.RulesengineFlag.parseOrThrow(raw));
} catch (error) {
parseFailureCount++;
if (firstFailure === undefined) firstFailure = error;
}
}
if (parseFailureCount > 0) {
this.logger.warn(`Failed to deserialize ${parseFailureCount} flag(s) in bulk message: ${String(firstFailure)}`);
}

const results = await Promise.allSettled(
flags
.filter((flag) => flag?.key)
Expand Down Expand Up @@ -854,7 +880,13 @@ export class DataStreamClient extends LazyEmitter {
* handleFlagMessage processes single flag messages
*/
private async handleFlagMessage(message: DataStreamResp): Promise<void> {
const flag = message.data as Schematic.RulesengineFlag;
let flag: Schematic.RulesengineFlag;
try {
flag = serializers.RulesengineFlag.parseOrThrow(message.data);
} catch (error) {
this.logger.warn(`Failed to deserialize flag payload: ${error}`);
return;
}

if (!flag?.key) {
return;
Expand Down
83 changes: 38 additions & 45 deletions src/datastream/merge.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
import type * as Schematic from "../api/types";

/**
* Helper to read a property that may be in camelCase or snake_case form.
* Wire data from WebSocket uses snake_case; Fern-generated types use camelCase.
*/
function getProp(obj: Record<string, unknown>, camel: string, snake: string): unknown {
return obj[camel] ?? obj[snake];
}

/**
* Creates a complete deep copy of a Company object.
*/
export function deepCopyCompany(c: Schematic.RulesengineCompany): Schematic.RulesengineCompany {
return JSON.parse(JSON.stringify(c));
}

/**
* Creates a complete deep copy of a User object.
*/
export function deepCopyUser(u: Schematic.RulesengineUser): Schematic.RulesengineUser {
return JSON.parse(JSON.stringify(u));
}

/**
* Merges a partial update into an existing Company.
* Deep-copies the existing company, then applies only the fields
* present in the partial object.
*
* Wire format uses snake_case keys. The existing company from cache
* may have either camelCase or snake_case keys depending on how it
* was stored.
*/
// Partial updates arrive as raw wire payloads (snake_case keys) and are merged
// into an existing camelCase-canonicalized entity. Each case writes the
// corresponding camelCase field so the cached entity stays in a single shape.
export function partialCompany(
existing: Schematic.RulesengineCompany,
partial: Record<string, unknown>,
Expand All @@ -40,42 +20,54 @@ export function partialCompany(
for (const key of Object.keys(partial)) {
switch (key) {
case "id":
merged.id = partial[key];
break;
case "account_id":
merged.accountId = partial[key];
break;
case "environment_id":
merged[key] = partial[key];
merged.environmentId = partial[key];
break;
case "base_plan_id":
merged[key] = partial[key] ?? null;
merged.basePlanId = partial[key] ?? null;
break;
case "billing_product_ids":
merged.billingProductIds = partial[key];
break;
case "plan_ids":
merged.planIds = partial[key];
break;
case "plan_version_ids":
merged.planVersionIds = partial[key];
break;
case "entitlements":
merged.entitlements = partial[key];
break;
case "rules":
merged.rules = partial[key];
break;
case "traits":
merged.traits = partial[key];
break;
case "subscription":
merged[key] = partial[key];
merged.subscription = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const existingKeys = (merged.keys ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
merged.keys = { ...existingKeys, ...incomingKeys };
break;
}
case "credit_balances": {
const existingCB = (getProp(merged, "creditBalances", "credit_balances") ?? {}) as Record<
string,
number
>;
const existingCB = (merged.creditBalances ?? {}) as Record<string, number>;
const incomingCB = partial[key] as Record<string, number>;
merged[key] = { ...existingCB, ...incomingCB };
merged.creditBalances = { ...existingCB, ...incomingCB };
break;
}
case "metrics": {
const existingMetrics = ((getProp(merged, "metrics", "metrics") as unknown[]) ??
[]) as Schematic.RulesengineCompanyMetric[];
const existingMetrics = (merged.metrics ?? []) as Schematic.RulesengineCompanyMetric[];
const incomingMetrics = partial[key] as Schematic.RulesengineCompanyMetric[];
merged[key] = upsertMetrics(existingMetrics, incomingMetrics);
merged.metrics = upsertMetrics(existingMetrics, incomingMetrics);
break;
}
// Ignore unknown keys silently
Expand All @@ -85,11 +77,6 @@ export function partialCompany(
return merged as unknown as Schematic.RulesengineCompany;
}

/**
* Merges a partial update into an existing User.
* Deep-copies the existing user, then applies only the fields
* present in the partial object.
*/
export function partialUser(
existing: Schematic.RulesengineUser,
partial: Record<string, unknown>,
Expand All @@ -99,19 +86,25 @@ export function partialUser(
for (const key of Object.keys(partial)) {
switch (key) {
case "id":
merged.id = partial[key];
break;
case "account_id":
merged.accountId = partial[key];
break;
case "environment_id":
merged[key] = partial[key];
merged.environmentId = partial[key];
break;
case "keys": {
const existingKeys = (getProp(merged, "keys", "keys") ?? {}) as Record<string, string>;
const existingKeys = (merged.keys ?? {}) as Record<string, string>;
const incomingKeys = partial[key] as Record<string, string>;
merged[key] = { ...existingKeys, ...incomingKeys };
merged.keys = { ...existingKeys, ...incomingKeys };
break;
}
case "traits":
merged.traits = partial[key];
break;
case "rules":
merged[key] = partial[key];
merged.rules = partial[key];
break;
// Ignore unknown keys silently
}
Expand Down
Loading
Loading