Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sync/streaming/SSEHandler/NotificationKeeper.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ITelemetryTracker } from '../../../trackers/types';
import { CONNECTION_ESTABLISHED, DISABLED, ENABLED, OCCUPANCY_PRI, OCCUPANCY_SEC, PAUSED, STREAMING_STATUS } from '../../../utils/constants';
import { StreamingEventType } from '../../submitters/types';
import { ControlType, PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN } from '../constants';
import { ControlType, PUSH_SUBSYSTEM_UP, PUSH_NON_RETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN } from '../constants';
import { IPushEventEmitter } from '../types';

const CONTROL_CHANNEL_REGEXS = [/control_pri$/, /control_sec$/];
Expand Down Expand Up @@ -84,7 +84,7 @@ export function notificationKeeperFactory(pushEmitter: IPushEventEmitter, teleme
c.cTime = timestamp;
if (controlType === ControlType.STREAMING_DISABLED) {
telemetryTracker.streamingEvent(STREAMING_STATUS, DISABLED);
pushEmitter.emit(PUSH_NONRETRYABLE_ERROR);
pushEmitter.emit(PUSH_NON_RETRYABLE_ERROR);
} else if (hasPublishers) {
if (controlType === ControlType.STREAMING_PAUSED && hasResumed) {
telemetryTracker.streamingEvent(STREAMING_STATUS, PAUSED);
Expand Down
8 changes: 4 additions & 4 deletions src/sync/streaming/SSEHandler/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @ts-nocheck
import { SSEHandlerFactory } from '..';
import { PUSH_SUBSYSTEM_UP, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { PUSH_SUBSYSTEM_UP, PUSH_NON_RETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, PUSH_RETRYABLE_ERROR, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, ControlType } from '../../constants';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';

// update messages
Expand Down Expand Up @@ -123,7 +123,7 @@ test('`handlerMessage` for CONTROL notifications (NotificationKeeper)', () => {
expect(pushEmitter.emit).toHaveBeenLastCalledWith(ControlType.STREAMING_RESET); // must handle streaming reset

sseHandler.handleMessage(controlStreamingDisabledSec); // testing STREAMING_DISABLED with second region
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NONRETRYABLE_ERROR); // must emit PUSH_NONRETRYABLE_ERROR if received a STREAMING_DISABLED control message
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NON_RETRYABLE_ERROR); // must emit PUSH_NON_RETRYABLE_ERROR if received a STREAMING_DISABLED control message

const sseHandler2 = SSEHandlerFactory(loggerMock, pushEmitter, telemetryTracker);
sseHandler2.handleOpen();
Expand All @@ -132,7 +132,7 @@ test('`handlerMessage` for CONTROL notifications (NotificationKeeper)', () => {
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_SUBSYSTEM_DOWN);

sseHandler2.handleMessage(controlStreamingDisabled);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NONRETRYABLE_ERROR); // must emit PUSH_NONRETRYABLE_ERROR if received a STREAMING_DISABLED control message, even if streaming is off
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NON_RETRYABLE_ERROR); // must emit PUSH_NON_RETRYABLE_ERROR if received a STREAMING_DISABLED control message, even if streaming is off

});

Expand Down Expand Up @@ -213,7 +213,7 @@ test('handleError', () => {

const ably4XXNonRecoverableError = { data: '{"message":"Token expired","code":42910,"statusCode":429}' };
sseHandler.handleError(ably4XXNonRecoverableError);
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NONRETRYABLE_ERROR); // An Ably non-recoverable error must emit PUSH_NONRETRYABLE_ERROR
expect(pushEmitter.emit).toHaveBeenLastCalledWith(PUSH_NON_RETRYABLE_ERROR); // An Ably non-recoverable error must emit PUSH_NON_RETRYABLE_ERROR
expect(telemetryTracker.streamingEvent).toHaveBeenLastCalledWith(ABLY_ERROR, 42910);

const ably5XXError = { data: '{"message":"...","code":50000,"statusCode":500}' };
Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/SSEHandler/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { errorParser, messageParser } from './NotificationParser';
import { notificationKeeperFactory } from './NotificationKeeper';
import { PUSH_RETRYABLE_ERROR, PUSH_NONRETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RB_SEGMENT_UPDATE } from '../constants';
import { PUSH_RETRYABLE_ERROR, PUSH_NON_RETRYABLE_ERROR, OCCUPANCY, CONTROL, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, RB_SEGMENT_UPDATE } from '../constants';
import { IPushEventEmitter } from '../types';
import { ISseEventHandler } from '../SSEClient/types';
import { INotificationError, INotificationMessage } from './types';
Expand Down Expand Up @@ -56,7 +56,7 @@ export function SSEHandlerFactory(log: ILogger, pushEmitter: IPushEventEmitter,
if (isRetryableError(errorWithParsedData)) {
pushEmitter.emit(PUSH_RETRYABLE_ERROR);
} else {
pushEmitter.emit(PUSH_NONRETRYABLE_ERROR);
pushEmitter.emit(PUSH_NON_RETRYABLE_ERROR);
}
},

Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export const SECONDS_BEFORE_EXPIRATION = 600;
* emitted on SSE and Authenticate non-recoverable errors, STREAMING_DISABLED control notification and authentication with pushEnabled false
* triggers `handleNonRetryableError` call
*/
export const PUSH_NONRETRYABLE_ERROR = 'PUSH_NONRETRYABLE_ERROR';
export const PUSH_NON_RETRYABLE_ERROR = 'PUSH_NON_RETRYABLE_ERROR';
/**
* emitted on SSE and Authenticate recoverable errors
* triggers `handleRetryableError` call
Expand All @@ -19,7 +19,7 @@ export const PUSH_RETRYABLE_ERROR = 'PUSH_RETRYABLE_ERROR';
export const PUSH_SUBSYSTEM_UP = 'PUSH_SUBSYSTEM_UP';

/**
* emitted on STREAMING_PAUSED control notification, OCCUPANCY equal to 0, PUSH_NONRETRYABLE_ERROR and PUSH_RETRYABLE_ERROR events.
* emitted on STREAMING_PAUSED control notification, OCCUPANCY equal to 0, PUSH_NON_RETRYABLE_ERROR and PUSH_RETRYABLE_ERROR events.
* triggers `startPolling` and `stopWorkers` calls
*/
export const PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN';
Expand Down
14 changes: 7 additions & 7 deletions src/sync/streaming/pushManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { authenticateFactory, hashUserKey } from './AuthClient';
import { forOwn } from '../../utils/lang';
import { SSEClient } from './SSEClient';
import { checkIfServerSide, getMatching } from '../../utils/key';
import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants';
import { MEMBERSHIPS_MS_UPDATE, MEMBERSHIPS_LS_UPDATE, PUSH_NON_RETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, RB_SEGMENT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants';
import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MEMBERSHIPS_UPDATE } from '../../logger/constants';
import { IMembershipMSUpdateData, IMembershipLSUpdateData, KeyList, UpdateStrategy } from './SSEHandler/types';
import { getDelay, isInBitmap, parseBitmap, parseCompressedData } from './parseUtils';
Expand Down Expand Up @@ -67,10 +67,10 @@ export function pushManagerFactory(
// [Only for client-side] variable to flag that a new client was added. It is needed to reconnect streaming.
let connectForNewClient = false;

// flag that indicates if `stop/disconnectPush` was called, either by the SyncManager, when the client is destroyed, or due to a PUSH_NONRETRYABLE_ERROR error.
// flag that indicates if `stop/disconnectPush` was called, either by the SyncManager, when the client is destroyed, or due to a PUSH_NON_RETRYABLE_ERROR error.
// It is used to halt the `connectPush` process if it was in progress.
let disconnected: boolean | undefined;
// flag that indicates a PUSH_NONRETRYABLE_ERROR, condition with which starting pushManager again is ignored.
// flag that indicates a PUSH_NON_RETRYABLE_ERROR, condition with which starting pushManager again is ignored.
// true if STREAMING_DISABLED control event, or 'pushEnabled: false', or non-recoverable SSE or Auth errors.
let disabled: boolean | undefined; // `disabled` implies `disconnected === true`

Expand Down Expand Up @@ -117,11 +117,11 @@ export function pushManagerFactory(
function (authData) {
if (disconnected) return;

// 'pushEnabled: false' is handled as a PUSH_NONRETRYABLE_ERROR instead of PUSH_SUBSYSTEM_DOWN, in order to
// 'pushEnabled: false' is handled as a PUSH_NON_RETRYABLE_ERROR instead of PUSH_SUBSYSTEM_DOWN, in order to
// close the sseClient in case the org has been bloqued while the instance was connected to streaming
if (!authData.pushEnabled) {
log.info(STREAMING_DISABLED);
pushEmitter.emit(PUSH_NONRETRYABLE_ERROR);
pushEmitter.emit(PUSH_NON_RETRYABLE_ERROR);
return;
}

Expand All @@ -140,7 +140,7 @@ export function pushManagerFactory(
// Handle 4XX HTTP errors: 401 (invalid SDK Key) or 400 (using incorrect SDK Key, i.e., client-side SDK Key on server-side)
if (error.statusCode >= 400 && error.statusCode < 500) {
telemetryTracker.streamingEvent(AUTH_REJECTION);
pushEmitter.emit(PUSH_NONRETRYABLE_ERROR);
pushEmitter.emit(PUSH_NON_RETRYABLE_ERROR);
return;
}

Expand Down Expand Up @@ -183,7 +183,7 @@ export function pushManagerFactory(

/** Fallback to polling without retry due to: STREAMING_DISABLED control event, or 'pushEnabled: false', or non-recoverable SSE and Authentication errors */

pushEmitter.on(PUSH_NONRETRYABLE_ERROR, function handleNonRetryableError() {
pushEmitter.on(PUSH_NON_RETRYABLE_ERROR, function handleNonRetryableError() {
disabled = true;
// Note: `stopWorkers` is been called twice, but it is not harmful
disconnectPush();
Expand Down
4 changes: 2 additions & 2 deletions src/sync/streaming/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { ControlType } from './constants';
// Internal SDK events, subscribed by SyncManager and PushManager
export type PUSH_SUBSYSTEM_UP = 'PUSH_SUBSYSTEM_UP'
export type PUSH_SUBSYSTEM_DOWN = 'PUSH_SUBSYSTEM_DOWN'
export type PUSH_NONRETRYABLE_ERROR = 'PUSH_NONRETRYABLE_ERROR'
export type PUSH_NON_RETRYABLE_ERROR = 'PUSH_NON_RETRYABLE_ERROR'
export type PUSH_RETRYABLE_ERROR = 'PUSH_RETRYABLE_ERROR'

// Update-type push notifications, handled by NotificationProcessor
Expand All @@ -22,7 +22,7 @@ export type RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE';
export type CONTROL = 'CONTROL';
export type OCCUPANCY = 'OCCUPANCY';

export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NONRETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | RB_SEGMENT_UPDATE | ControlType.STREAMING_RESET
export type IPushEvent = PUSH_SUBSYSTEM_UP | PUSH_SUBSYSTEM_DOWN | PUSH_NON_RETRYABLE_ERROR | PUSH_RETRYABLE_ERROR | MEMBERSHIPS_MS_UPDATE | MEMBERSHIPS_LS_UPDATE | SEGMENT_UPDATE | SPLIT_UPDATE | SPLIT_KILL | RB_SEGMENT_UPDATE | ControlType.STREAMING_RESET

type IParsedData<T extends IPushEvent> =
T extends MEMBERSHIPS_MS_UPDATE ? IMembershipMSUpdateData :
Expand Down
20 changes: 16 additions & 4 deletions types/splitio.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,15 @@ declare namespace SplitIO {
}

/**
* Metadata for the update event emitted when the SDK cache is updated with new data for flags or segments.
* Metadata for the update event emitted when the SDK cache is updated with new data for flags, configs, or segments.
*/
type SdkUpdateMetadata = {
/**
* The type of update event.
*/
type: SdkUpdateMetadataType;
/**
* The names of the flags or segments that were updated.
* The names of the flags or configs that were updated. Empty array if the update is of type 'SEGMENTS_UPDATE'.
*/
names: string[];
}
Expand Down Expand Up @@ -2316,11 +2316,23 @@ declare namespace SplitIO {
*/
authorizationKey: string;
/**
* Configs definitions refresh rate for polling, in seconds.
* Polling rate for configs and segments refresh, in seconds. Minimum value: 5.
*
* @defaultValue `60`
*/
configsRefreshRate?: number;
pollingRate?: number;
/**
* Push rate for events and impressions, in seconds. Minimum value: 60.
*
* @defaultValue `60`
*/
pushRate?: number;
/**
* Maximum queue size for events and impressions. When the queue reaches this size, a flush is triggered. Minimum value: 1000.
*
* @defaultValue `10000`
*/
queueSize?: number;
/**
* Logging level.
*
Expand Down