Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions core/src/subscriber/external/goldsky.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,34 @@ import {
} from '../base';
import { logger } from '../../utils/logger';

// ----------------------------------------------------------------------------
// GraphQL response types
// ----------------------------------------------------------------------------

interface GoldskyOrderFilledEvent {
id: string;
timestamp: string;
maker: string;
taker: string;
makerAssetId: string;
takerAssetId: string;
makerAmountFilled: string;
takerAmountFilled: string;
}

interface GoldskyTransfer {
id: string;
blockTimestamp: string;
from: string;
to: string;
value: string;
}

interface GoldskyGraphQlResponse {
data?: Record<string, unknown>;
errors?: ReadonlyArray<{ message: string }>;
}

// ----------------------------------------------------------------------------
// GoldSky Config
// ----------------------------------------------------------------------------
Expand All @@ -18,7 +46,7 @@ import { logger } from '../../utils/logger';
export interface GoldSkyGraphQlQuery {
url: string;
query: string;
variables?: Record<string, any>;
variables?: Record<string, string | number | boolean>;
}

/**
Expand Down Expand Up @@ -135,8 +163,10 @@ export const POLYMARKET_DEFAULT_SUBSCRIPTION: GoldSkySubscriptionBuilder = async
]);

const seen = new Set<string>();
const trades: any[] = [];
for (const row of [...(makerData?.orderFilledEvents as any[] ?? []), ...(takerData?.orderFilledEvents as any[] ?? [])]) {
const trades: GoldskyOrderFilledEvent[] = [];
const makerEvents = (makerData?.orderFilledEvents ?? []) as GoldskyOrderFilledEvent[];
const takerEvents = (takerData?.orderFilledEvents ?? []) as GoldskyOrderFilledEvent[];
for (const row of [...makerEvents, ...takerEvents]) {
if (!seen.has(row.id)) {
seen.add(row.id);
trades.push(row);
Expand Down Expand Up @@ -186,11 +216,12 @@ export const LIMITLESS_DEFAULT_SUBSCRIPTION: GoldSkySubscriptionBuilder = async
*/
export const buildPolymarketTradesActivity: SubscribedActivityBuilder = (data, address, types): SubscribedResult | null => {
if (!types.includes('trades')) return null;
const filled = (data as any)?.orderFilledEvents;
const record = data as Record<string, unknown> | null;
const filled = record?.orderFilledEvents;
if (!Array.isArray(filled) || filled.length === 0) return null;

const addr = address.toLowerCase();
const trades: Trade[] = filled.map((f: any): Trade => {
const trades: Trade[] = (filled as GoldskyOrderFilledEvent[]).map((f): Trade => {
const isMaker = f.maker?.toLowerCase() === addr;
const currAssetId = BigInt(isMaker ? f.makerAssetId : f.takerAssetId);
const isBuying = currAssetId === 0n;
Expand Down Expand Up @@ -245,13 +276,14 @@ export const buildPolymarketActivity: SubscribedActivityBuilder = (data, address
*/
export const buildLimitlessBalanceActivity: SubscribedActivityBuilder = (data, address, types, lastActivity): SubscribedResult | null => {
if (!types.includes('balances')) return null;
const transfers = (data as any)?.transfers;
const record = data as Record<string, unknown> | null;
const transfers = record?.transfers;
if (!Array.isArray(transfers) || transfers.length === 0) return null;

const prev = lastActivity?.balances?.find(b => b.currency === 'USDC');
if (!prev) return null;

const t = transfers[0];
const t = transfers[0] as GoldskyTransfer;
const addr = address.toLowerCase();
const isIncoming = t.to?.toLowerCase() === addr;
const delta = parseFloat(t.value) / 1e6;
Expand Down Expand Up @@ -285,6 +317,7 @@ export class GoldSkySubscriber implements BaseSubscriber {
private pollTimers = new Map<string, ReturnType<typeof setInterval>>();
private callbacks = new Map<string, (data: unknown) => void>();
private addressQueryTypes = new Map<string, SubscriptionOption[]>();
private querying = new Set<string>();
private closed = false;

constructor(config: GoldSkyConfig) {
Expand All @@ -304,7 +337,11 @@ export class GoldSkySubscriber implements BaseSubscriber {
this.pollTimers.delete(address);
}

const timer = setInterval(() => this.query(address), this.pollMs);
const timer = setInterval(() => {
this.query(address).catch((err: unknown) => {
logger.error(`GoldSkySubscriber: query failed for ${address}`, { error: String(err) });
});
}, this.pollMs);
this.pollTimers.set(address, timer);
}

Expand All @@ -318,6 +355,7 @@ export class GoldSkySubscriber implements BaseSubscriber {
this.abortControllers.delete(address);
this.callbacks.delete(address);
this.addressQueryTypes.delete(address);
this.querying.delete(address);
}

close(): void {
Expand All @@ -332,15 +370,22 @@ export class GoldSkySubscriber implements BaseSubscriber {
const types = this.addressQueryTypes.get(address);
if (!callback || !types) return;

this.abortControllers.get(address)?.abort();
const controller = new AbortController();
this.abortControllers.set(address, controller);
if (this.querying.has(address)) return;
this.querying.add(address);

const goldSkyFetch: GoldSkyFetch = (q) => this.runQuery(q, controller.signal);
const data = await this.config.buildSubscription(address, types, goldSkyFetch, this.config.baseUrl);
if (!data) return;
try {
this.abortControllers.get(address)?.abort();
const controller = new AbortController();
this.abortControllers.set(address, controller);

callback(data);
const goldSkyFetch: GoldSkyFetch = (q) => this.runQuery(q, controller.signal);
const data = await this.config.buildSubscription(address, types, goldSkyFetch, this.config.baseUrl);
if (!data) return;

callback(data);
} finally {
this.querying.delete(address);
}
}

private async runQuery(q: GoldSkyGraphQlQuery, signal: AbortSignal): Promise<Record<string, unknown> | null> {
Expand All @@ -366,14 +411,15 @@ export class GoldSkySubscriber implements BaseSubscriber {
logger.warn(`GoldSkySubscriber: HTTP ${res.status} from ${q.url}`);
return null;
}
const json = await res.json() as any;
if (json?.errors) {
const json: GoldskyGraphQlResponse = await res.json();
if (json.errors) {
logger.warn(`GoldSkySubscriber: GraphQL errors from ${q.url}`, { errors: JSON.stringify(json.errors) });
return null;
}
return json?.data ?? null;
} catch (err: any) {
if (err?.name !== 'AbortError' && err?.name !== 'TimeoutError') {
return json.data ?? null;
} catch (err: unknown) {
const name = err instanceof Error ? err.name : undefined;
if (name !== 'AbortError' && name !== 'TimeoutError') {
logger.warn(`GoldSkySubscriber: Fetch failed for ${q.url}`, { error: String(err) });
}
return null;
Expand Down
Loading