From 03ca456e9a2e014baa9cda778e808dd40d58c469 Mon Sep 17 00:00:00 2001 From: Fabien Date: Wed, 20 May 2026 13:51:24 +0200 Subject: [PATCH 1/8] Deduplicate transactions before committing to the DB Don't let prisma handle the duplicates on its own but filter out the duplicate before running the commit which does more and is expensive. --- services/chronikService.ts | 4 +- services/transactionService.ts | 64 ++++++++++++++++++++++++++ tests/unittests/chronikService.test.ts | 1 + 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index b3e96938..305f2664 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -7,6 +7,7 @@ import prisma from 'prisma-local/clientInstance' import { TransactionWithAddressAndPrices, createManyTransactions, + filterRowsNeedingCreateMany, deleteTransactions, fetchUnconfirmedTransactions, markTransactionsOrphaned, @@ -877,7 +878,8 @@ export class ChronikBlockchainClient { runTriggers: boolean ): Promise { const rows = commitTuples.map(p => p.row) - const createdTxs = await createManyTransactions(rows) + const rowsToUpsert = await filterRowsNeedingCreateMany(rows) + const createdTxs = await createManyTransactions(rowsToUpsert) console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}/${commitTuples.length}`) const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId)) diff --git a/services/transactionService.ts b/services/transactionService.ts index 859d7d08..1030b78c 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -565,6 +565,70 @@ export async function connectAllTransactionsToPrices (): Promise { console.log('[PRICES] Finished connecting txs to prices.') } +interface ExistingTxSnapshot { + confirmed: boolean + timestamp: number + orphaned: boolean +} + +const txSnapshotKey = (hash: string, addressId: string): string => + `${hash}:${addressId}` + +const rowNeedsUpsert = ( + row: Prisma.TransactionUncheckedCreateInput, + existing: ExistingTxSnapshot +): boolean => { + const confirmed = row.confirmed ?? false + const timestamp = row.timestamp + const orphaned = row.orphaned ?? false + return ( + existing.confirmed !== confirmed || + existing.timestamp !== timestamp || + existing.orphaned !== orphaned + ) +} + +/** + * Cheap dedupe before createManyTransactions: returns only new rows or rows + * whose confirmed, timestamp, or orphaned may have changed. + */ +export async function filterRowsNeedingCreateMany ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { + if (transactionsData.length === 0) { + return [] + } + + const existingTxs = await prisma.transaction.findMany({ + where: { + OR: transactionsData.map(tx => ({ + hash: tx.hash, + addressId: tx.addressId + })) + }, + select: { + hash: true, + addressId: true, + confirmed: true, + timestamp: true, + orphaned: true + } + }) + + const existingMap = new Map() + for (const tx of existingTxs) { + existingMap.set(txSnapshotKey(tx.hash, tx.addressId), tx) + } + + return transactionsData.filter(row => { + const existing = existingMap.get(txSnapshotKey(row.hash, row.addressId)) + if (existing == null) { + return true + } + return rowNeedsUpsert(row, existing) + }) +} + export async function createManyTransactions ( transactionsData: Prisma.TransactionUncheckedCreateInput[] ): Promise { diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index a8d9440f..f6d5a677 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -61,6 +61,7 @@ jest.mock('../../services/addressService', () => ({ jest.mock('../../services/transactionService', () => ({ createManyTransactions: jest.fn(), + filterRowsNeedingCreateMany: jest.fn(async (rows: unknown[]) => rows), deleteTransactions: jest.fn(), fetchUnconfirmedTransactions: jest.fn(), upsertTransaction: jest.fn(), From 3a2ba27bbe219b6c3e65d19607a9a2ffd9d93501 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 22 May 2026 12:12:08 +0200 Subject: [PATCH 2/8] Prepare for state-machine address syncing We aim to fix a producer/consumer issue. It happens that we produce more txs to commit to the db than what can be committed to disk, leading to memory explosion. We are going to use a state machine to properly pause the tx producers while the queue of txs to commit is draining. This commit introduces the steps only, no change in behavior yet. --- services/chronikService.ts | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 305f2664..074f5474 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -113,7 +113,11 @@ export function getNullDataScriptData (outputScript: string): OpReturnData | nul } interface ChronikTxWithAddress { tx: Tx, address: Address } + +type FetchBatchPhase = 'tx-drain' | 'drain-complete' | 'addresses-synced' + interface FetchedTxsBatch { + phase: FetchBatchPhase chronikTxs: ChronikTxWithAddress[] addressesSynced: string[] } @@ -443,19 +447,31 @@ export class ChronikBlockchainClient { // are not enough transactions to fill the batch. while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) - yield { chronikTxs: chronikTxsSlice, addressesSynced: [] } + yield { + phase: 'tx-drain', + chronikTxs: chronikTxsSlice, + addressesSynced: [] + } } // If no active workers, yield any remaining transactions (even if < batch size) if (activeWorkers.size === 0 && chronikTxs.length > 0) { const remaining = chronikTxs.splice(0) - yield { chronikTxs: remaining, addressesSynced: [] } + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } } // Yield completed addresses if any if (completedAddresses.length > 0) { const completed = completedAddresses.splice(0) - yield { chronikTxs: [], addressesSynced: completed } + yield { + phase: 'addresses-synced', + chronikTxs: [], + addressesSynced: completed + } } // If no active workers and no more transactions, break @@ -479,7 +495,11 @@ export class ChronikBlockchainClient { if (chronikTxs.length > 0) { const remaining = chronikTxs chronikTxs = [] - yield { chronikTxs: remaining, addressesSynced: [] } + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } } } @@ -969,12 +989,20 @@ export class ChronikBlockchainClient { console.log(`${pfx} will fetch batches of ${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY} addresses from chronik`) for await (const batch of this.fetchLatestTxsForAddresses(addresses)) { - if (batch.addressesSynced.length > 0) { + if (batch.phase === 'addresses-synced') { // marcador de slice => desmarca syncing await setSyncingBatch(batch.addressesSynced, false) continue } + if (batch.phase === 'drain-complete') { + continue + } + + if (batch.chronikTxs.length === 0) { + continue + } + const involvedAddrIds = new Set(batch.chronikTxs.map(({ address }) => address.id)) try { From 7dd3e529b79017140999df54ef6547fa2bfbacc8 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 22 May 2026 12:17:48 +0200 Subject: [PATCH 3/8] Implement the producer pause mechanism It's not used yet as nothing can set producersPaused to true, so no change in behavior. --- services/chronikService.ts | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/services/chronikService.ts b/services/chronikService.ts index 074f5474..396b60bd 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -343,6 +343,29 @@ export class ChronikBlockchainClient { let chronikTxs: ChronikTxWithAddress[] = [] const completedAddresses: string[] = [] + let producersPaused = false + const producerPauseWaiters: Array<() => void> = [] + + const waitWhilePaused = async (): Promise => { + // producersPaused is cleared by resumeProducers during drain cycles + // eslint-disable-next-line no-unmodified-loop-condition + while (producersPaused) { + await new Promise(resolve => { + producerPauseWaiters.push(resolve) + }) + } + } + + // Used when a drain cycle completes + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const resumeProducers = (): void => { + producersPaused = false + const waiters = producerPauseWaiters.splice(0) + for (const resolve of waiters) { + resolve() + } + } + // Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers const activeWorkers = new Set>() let nextAddressIndex = 0 @@ -359,6 +382,8 @@ export class ChronikBlockchainClient { try { while (!hasReachedStoppingCondition) { + await waitWhilePaused() + const pageIndex = nextBurstBasePageIndex let pageTxs: Tx[] = [] From b450316401a4f57d2884d751acb13a637967c999 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 22 May 2026 13:26:12 +0200 Subject: [PATCH 4/8] Use the state machine Chronik workers fill up the buffer until it has >= 200 txs, then trigger the db commit worker to run the drain cycle. This pauses the producers, blocking them before the next chronik page is read, until the queue is fully drained. Then it resumes, and so on until all addresses are completely synced. This avoids the buffer to be filled faster than it's drained and the app heap usage to grow unbounded. --- services/chronikService.ts | 59 +++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/services/chronikService.ts b/services/chronikService.ts index 396b60bd..8853ff84 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -340,7 +340,7 @@ export class ChronikBlockchainClient { `(addressConcurrency=${INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY}, pageConcurrency=1).` ) - let chronikTxs: ChronikTxWithAddress[] = [] + const chronikTxs: ChronikTxWithAddress[] = [] const completedAddresses: string[] = [] let producersPaused = false @@ -356,8 +356,6 @@ export class ChronikBlockchainClient { } } - // Used when a drain cycle completes - // eslint-disable-next-line @typescript-eslint/no-unused-vars const resumeProducers = (): void => { producersPaused = false const waiters = producerPauseWaiters.splice(0) @@ -366,6 +364,38 @@ export class ChronikBlockchainClient { } } + async function * runDrainCycle (): AsyncGenerator { + producersPaused = true + + while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) + yield { + phase: 'tx-drain', + chronikTxs: chronikTxsSlice, + addressesSynced: [] + } + } + + if (chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) + yield { + phase: 'tx-drain', + chronikTxs: remaining, + addressesSynced: [] + } + } + + yield { + phase: 'drain-complete', + chronikTxs: [], + addressesSynced: [] + } + + // Consumer processes drain-complete before this runs (for await next()) + producersPaused = false + resumeProducers() + } + // Worker pool: maintain exactly INITIAL_ADDRESS_SYNC_FETCH_CONCURRENTLY active workers const activeWorkers = new Set>() let nextAddressIndex = 0 @@ -468,15 +498,9 @@ export class ChronikBlockchainClient { // Poll and yield batches while workers are active while (activeWorkers.size > 0 || chronikTxs.length > 0) { - // Yield batches if buffer is large enough. Make sure to drain until there - // are not enough transactions to fill the batch. - while (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { - const chronikTxsSlice = chronikTxs.splice(0, TX_EMIT_BATCH_SIZE) - yield { - phase: 'tx-drain', - chronikTxs: chronikTxsSlice, - addressesSynced: [] - } + if (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + yield * runDrainCycle() + continue } // If no active workers, yield any remaining transactions (even if < batch size) @@ -489,8 +513,8 @@ export class ChronikBlockchainClient { } } - // Yield completed addresses if any - if (completedAddresses.length > 0) { + // Yield completed addresses if any (not during an active drain pause) + if (completedAddresses.length > 0 && !producersPaused) { const completed = completedAddresses.splice(0) yield { phase: 'addresses-synced', @@ -517,9 +541,10 @@ export class ChronikBlockchainClient { } // Final TX flush after all addresses processed - if (chronikTxs.length > 0) { - const remaining = chronikTxs - chronikTxs = [] + if (chronikTxs.length >= TX_EMIT_BATCH_SIZE) { + yield * runDrainCycle() + } else if (chronikTxs.length > 0) { + const remaining = chronikTxs.splice(0) yield { phase: 'tx-drain', chronikTxs: remaining, From 9d902baf103cbbf3d0279c32685325d5e79713e4 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 22 May 2026 13:46:47 +0200 Subject: [PATCH 5/8] Commit to db even if the batch is below the threshold We start the drain task after 200 txs but it's very likely that some txs are already in the db and shouldn't be committed. Instead of waiting for the full batch of 200 txs, commit immediately so we can release memory asap. --- services/chronikService.ts | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/services/chronikService.ts b/services/chronikService.ts index 8853ff84..a568a4e9 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -1046,6 +1046,15 @@ export class ChronikBlockchainClient { } if (batch.phase === 'drain-complete') { + if (toCommit.length > 0) { + const remainder = toCommit.splice(0) + await this.commitTransactionsBatch( + remainder, + productionAddressesIds, + runTriggers + ) + remainder.length = 0 + } continue } From 278a5b28a5cb02d883065e7b38fd2522d58fd1f5 Mon Sep 17 00:00:00 2001 From: Fabien Date: Wed, 20 May 2026 17:48:29 +0200 Subject: [PATCH 6/8] Simplify the syncing logic The createMayTransactions path does a lot of things: - insert the txs into the DB - sync the prices - update the cache - run the triggers All that work allocates a ton of memory, and a good chunk of it is not needed during sync: - syncing the price is done anyway after the tx fetching during the syncing process, so it can be avoided during the initial phase - cache update only works if there is a price associated, so better skip it as well during the tx fetching. Similarly, building and caching the payment information per Paybutton is useless at this point. This commit implements a differentiated strategy during sync that skips price update + cache update, and instead only focus on fetching the transactions. Price update happens at the next stage, and the cache will eventually be rebuilt anyway when the user needs it. To enforce this, the cache is cleared after the syncing completed. This significantly reduce the memory usage during syncing at the cost of an inaccurate UI view during that time. On my machine, syncing from an 3 months old prod db with > 12k addresses resulted in OOM (32GB allocated) after ~1300 eCash addresses. After this commit, fetching all txs peaked at ~2,6GB (price fetching needs more). --- jobs/workers.ts | 34 +++- redis/paymentCache.ts | 11 ++ services/chronikService.ts | 64 ++++++-- services/transactionService.ts | 207 +++++++++++++++---------- tests/unittests/chronikService.test.ts | 4 + 5 files changed, 229 insertions(+), 91 deletions(-) diff --git a/jobs/workers.ts b/jobs/workers.ts index 99899820..ad8f6262 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -1,12 +1,43 @@ import { Worker } from 'bullmq' import { redisBullMQ } from 'redis/clientInstance' import { DEFAULT_WORKER_LOCK_DURATION } from 'constants/index' +import { clearBalanceCache } from 'redis/balanceCache' +import { clearDashboardCache } from 'redis/dashboardCache' +import { clearPaymentCacheForAddress } from 'redis/paymentCache' +import { fetchAllAddresses } from 'services/addressService' +import { cleanupExpiredClientPayments } from 'services/clientPaymentService' import { multiBlockchainClient } from 'services/chronikService' import { connectAllTransactionsToPrices } from 'services/transactionService' -import { cleanupExpiredClientPayments } from 'services/clientPaymentService' +import { fetchAllUsers } from 'services/userService' import * as priceService from 'services/priceService' +const ADDRESS_INVALIDATION_BATCH_SIZE = 100 + +/** + * Drop Redis payment-week, balance, and dashboard caches after bulk blockchain sync. + * Rebuild is lazy on the next API request. + */ +async function invalidateCachesAfterBlockchainSync (): Promise { + console.log('[CACHE]: Invalidating caches after blockchain sync...') + const addresses = await fetchAllAddresses() + for (let i = 0; i < addresses.length; i += ADDRESS_INVALIDATION_BATCH_SIZE) { + const batch = addresses.slice(i, i + ADDRESS_INVALIDATION_BATCH_SIZE) + await Promise.all( + batch.map(async (a) => { + await clearPaymentCacheForAddress(a.address) + await clearBalanceCache(a.address) + }) + ) + } + const users = await fetchAllUsers() + await Promise.all(users.map(async (u) => await clearDashboardCache(u.id))) + console.log( + `[CACHE]: Invalidated payment/balance caches for ${addresses.length} addresses ` + + `and dashboard caches for ${users.length} users.` + ) +} + export const syncCurrentPricesWorker = async (queueName: string): Promise => { const worker = new Worker( queueName, @@ -39,6 +70,7 @@ export const syncBlockchainAndPricesWorker = async (queueName: string, onComplet await priceService.syncPastDaysNewerPrices() await multiBlockchainClient.syncMissedTransactions() await connectAllTransactionsToPrices() + await invalidateCachesAfterBlockchainSync() }, { connection: redisBullMQ, diff --git a/redis/paymentCache.ts b/redis/paymentCache.ts index ac6de862..18e12872 100755 --- a/redis/paymentCache.ts +++ b/redis/paymentCache.ts @@ -280,6 +280,17 @@ export const clearRecentAddressCache = async (addressString: string, timestamps: ) } +/** Remove all week-grouped payment keys for an address (forces rebuild from DB). */ +export const clearPaymentCacheForAddress = async (addressString: string): Promise => { + const weekKeys = await getCachedWeekKeysForAddress(addressString) + if (weekKeys.length === 0) { + return + } + await Promise.all( + weekKeys.map(async (key) => await redis.del(key, () => {})) + ) +} + export const initPaymentCache = async (address: Address): Promise => { const cachedKeys = await getCachedWeekKeysForAddress(address.address) if (cachedKeys.length === 0) { diff --git a/services/chronikService.ts b/services/chronikService.ts index a568a4e9..5167c752 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -7,7 +7,9 @@ import prisma from 'prisma-local/clientInstance' import { TransactionWithAddressAndPrices, createManyTransactions, + createManyTransactionsForSync, filterRowsNeedingCreateMany, + SyncPersistedTransaction, deleteTransactions, fetchUnconfirmedTransactions, markTransactionsOrphaned, @@ -853,6 +855,40 @@ export class ChronikBlockchainClient { } } + private broadcastIncomingTxFromSyncRow ( + addressString: string, + chronikTx: Tx, + createdTx: SyncPersistedTransaction + ): BroadcastTxData { + const broadcastTxData: BroadcastTxData = {} as BroadcastTxData + broadcastTxData.address = addressString + broadcastTxData.messageType = 'NewTx' + const inputAddresses = this.getSortedInputAddresses(chronikTx) + const outputAddresses = this.getSortedOutputAddresses(chronikTx) + const stubTx = { + hash: createdTx.hash, + amount: createdTx.amount, + confirmed: createdTx.confirmed, + opReturn: '', + timestamp: createdTx.timestamp, + address: { address: addressString }, + prices: [], + inputs: [] + } as unknown as TransactionWithAddressAndPrices + const newSimplifiedTransaction = getSimplifiedTrasaction( + stubTx, + inputAddresses, + outputAddresses + ) + broadcastTxData.txs = [newSimplifiedTransaction] + try { + this.wsEndpoint.emit(SOCKET_MESSAGES.TXS_BROADCAST, broadcastTxData) + } catch (err: any) { + console.error(RESPONSE_MESSAGES.COULD_NOT_BROADCAST_TX_TO_WS_SERVER_500.message, err.stack) + } + return broadcastTxData + } + private broadcastIncomingTx (addressString: string, chronikTx: Tx, createdTx: TransactionWithAddressAndPrices): BroadcastTxData { const broadcastTxData: BroadcastTxData = {} as BroadcastTxData broadcastTxData.address = addressString @@ -949,30 +985,36 @@ export class ChronikBlockchainClient { ): Promise { const rows = commitTuples.map(p => p.row) const rowsToUpsert = await filterRowsNeedingCreateMany(rows) - const createdTxs = await createManyTransactions(rowsToUpsert) - console.log(`${this.CHRONIK_MSG_PREFIX} committed — created=${createdTxs.length}/${commitTuples.length}`) + const syncResult = await createManyTransactionsForSync(rowsToUpsert) + console.log( + `${this.CHRONIK_MSG_PREFIX} committed — created=${syncResult.insertedCount}/` + + `${commitTuples.length}` + ) - const createdForProd = createdTxs.filter(t => productionAddressesIds.includes(t.addressId)) + const createdForProd = syncResult.inserted.filter(t => + productionAddressesIds.includes(t.addressId) + ) if (createdForProd.length > 0) { - await appendTxsToFile(createdForProd as unknown as Prisma.TransactionCreateManyInput[]) + await appendTxsToFile(createdForProd) } - if (createdTxs.length > 0) { + if (runTriggers && syncResult.inserted.length > 0) { const triggerBatch: BroadcastTxData[] = [] - for (const createdTx of createdTxs) { + for (const createdTx of syncResult.inserted) { const tuple = commitTuples.find(t => t.row.hash === createdTx.hash) if (tuple == null) { continue } - const bd = this.broadcastIncomingTx(createdTx.address.address, tuple.raw, createdTx) + const bd = this.broadcastIncomingTxFromSyncRow( + tuple.addressString, + tuple.raw, + createdTx + ) triggerBatch.push(bd) } - if (runTriggers && triggerBatch.length > 0) { + if (triggerBatch.length > 0) { await executeTriggersBatch(triggerBatch, this.networkId) } - - // Release memory - createdTxs.length = 0 triggerBatch.length = 0 } diff --git a/services/transactionService.ts b/services/transactionService.ts index 1030b78c..6b6a8589 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -588,6 +588,26 @@ const rowNeedsUpsert = ( ) } +/** Minimal row returned from bulk sync persist (no prices / paybuttons / cache). */ +export interface SyncPersistedTransaction { + id: string + hash: string + addressId: string + amount: Prisma.Decimal + timestamp: number + confirmed: boolean +} + +export interface CreateManyTransactionsSyncResult { + insertedCount: number + inserted: SyncPersistedTransaction[] +} + +interface PersistManyTransactionRowsResult { + inserted: SyncPersistedTransaction[] + updatedCount: number +} + /** * Cheap dedupe before createManyTransactions: returns only new rows or rows * whose confirmed, timestamp, or orphaned may have changed. @@ -629,14 +649,12 @@ export async function filterRowsNeedingCreateMany ( }) } -export async function createManyTransactions ( +/** + * Insert or update transactions and inputs only (no prices, cache, or heavy includes). + */ +async function persistManyTransactionRows ( transactionsData: Prisma.TransactionUncheckedCreateInput[] -): Promise { - if (transactionsData.length === 0) { - return [] - } - - // Extract flat transaction data and separate inputs/outputs +): Promise { const flatTxData = transactionsData.map(tx => ({ hash: tx.hash, amount: tx.amount, @@ -657,17 +675,16 @@ export async function createManyTransactions ( } }) - const insertedTransactions: TransactionWithNetwork[] = [] - const updatedTransactions: TransactionWithNetwork[] = [] + const inserted: SyncPersistedTransaction[] = [] + let updatedCount = 0 await prisma.$transaction( - async (prisma) => { - // 1. Query existing transactions in one go - const existingTxs = await prisma.transaction.findMany({ + async (tx) => { + const existingTxs = await tx.transaction.findMany({ where: { - OR: flatTxData.map(tx => ({ - hash: tx.hash, - addressId: tx.addressId + OR: flatTxData.map(row => ({ + hash: row.hash, + addressId: row.addressId })) }, select: { @@ -680,13 +697,11 @@ export async function createManyTransactions ( } }) - // Create a map for quick lookup const existingMap = new Map() - for (const tx of existingTxs) { - existingMap.set(`${tx.hash}:${tx.addressId}`, tx) + for (const row of existingTxs) { + existingMap.set(`${row.hash}:${row.addressId}`, row) } - // 2. Split into new and existing transactions const newTxs: typeof flatTxData = [] const newTxsInputs: typeof txInputs = [] const toUpdate: Array<{ @@ -697,119 +712,111 @@ export async function createManyTransactions ( }> = [] for (let i = 0; i < flatTxData.length; i++) { - const tx = flatTxData[i] - const key = `${tx.hash}:${tx.addressId}` + const row = flatTxData[i] + const key = `${row.hash}:${row.addressId}` const existing = existingMap.get(key) if (existing != null) { - // Check if confirmed, timestamp, or orphaned changed. These are the - // only fields that can be changed after the transaction is created. - // This is to avoid updating the transaction with the same data. - const confirmedChanged = existing.confirmed !== tx.confirmed - const timestampChanged = existing.timestamp !== tx.timestamp - const orphanedChanged = existing.orphaned !== tx.orphaned + const confirmedChanged = existing.confirmed !== row.confirmed + const timestampChanged = existing.timestamp !== row.timestamp + const orphanedChanged = existing.orphaned !== row.orphaned if (confirmedChanged || timestampChanged || orphanedChanged) { toUpdate.push({ id: existing.id, - confirmed: tx.confirmed, - timestamp: tx.timestamp, - orphaned: tx.orphaned + confirmed: row.confirmed, + timestamp: row.timestamp, + orphaned: row.orphaned }) } } else { - newTxs.push(tx) + newTxs.push(row) newTxsInputs.push(txInputs[i]) } } - // 3. Create new transactions using createMany (bulk operation) if (newTxs.length > 0) { - // Create all transactions at once (without inputs/outputs) - await prisma.transaction.createMany({ + await tx.transaction.createMany({ data: newTxs, skipDuplicates: true }) - // Query back the created transactions to get their IDs - const createdTxs = await prisma.transaction.findMany({ + const createdTxs = await tx.transaction.findMany({ where: { - OR: newTxs.map(tx => ({ - hash: tx.hash, - addressId: tx.addressId + OR: newTxs.map(row => ({ + hash: row.hash, + addressId: row.addressId })) }, select: { id: true, hash: true, addressId: true, - address: { - select: { - networkId: true - } - } + amount: true, + timestamp: true, + confirmed: true } }) - // Create a map to match transactions with their inputs const txMap = new Map() for (let i = 0; i < newTxs.length; i++) { - const tx = newTxs[i] - const created = createdTxs.find(ct => ct.hash === tx.hash && ct.addressId === tx.addressId) + const row = newTxs[i] + const created = createdTxs.find( + ct => ct.hash === row.hash && ct.addressId === row.addressId + ) if (created != null) { - txMap.set(`${tx.hash}:${tx.addressId}`, { - tx: created as any, + txMap.set(`${row.hash}:${row.addressId}`, { + tx: created, inputs: newTxsInputs[i].inputs }) } } - // Create all inputs at once - const allInputs: Array<{ transactionId: string, address: string, index: number, amount: Prisma.Decimal }> = [] - for (const [, { tx, inputs }] of txMap) { + const allInputs: Array<{ + transactionId: string + address: string + index: number + amount: Prisma.Decimal + }> = [] + for (const [, { tx: createdTx, inputs }] of txMap) { for (const input of inputs) { allInputs.push({ - transactionId: tx.id, + transactionId: createdTx.id, address: input.address, index: input.index, - amount: input.amount instanceof Prisma.Decimal ? input.amount : new Prisma.Decimal(input.amount as string | number) + amount: input.amount instanceof Prisma.Decimal + ? input.amount + : new Prisma.Decimal(input.amount as string | number) }) } } if (allInputs.length > 0) { - await prisma.transactionInput.createMany({ + await tx.transactionInput.createMany({ data: allInputs, skipDuplicates: true }) } - // Fetch the full transactions with includes for return value - const fullTxs = await prisma.transaction.findMany({ - where: { - id: { in: createdTxs.map(t => t.id) } - }, - include: includeNetwork - }) - - insertedTransactions.push(...fullTxs) + for (const createdTx of createdTxs) { + inserted.push(createdTx) + } } - // 4. Update existing transactions that changed if (toUpdate.length > 0) { - const updatePromises = toUpdate.map(async update => - await prisma.transaction.update({ - where: { id: update.id }, - data: { - confirmed: update.confirmed, - timestamp: update.timestamp, - orphaned: update.orphaned - }, - include: includeNetwork - }) + await Promise.all( + toUpdate.map(async update => + await tx.transaction.update({ + where: { id: update.id }, + data: { + confirmed: update.confirmed, + timestamp: update.timestamp, + orphaned: update.orphaned + } + }) + ) ) - const updated = await Promise.all(updatePromises) - updatedTransactions.push(...updated) + updatedCount = toUpdate.length } }, { @@ -817,9 +824,51 @@ export async function createManyTransactions ( } ) - // 5. Connect prices only for newly created transactions + return { inserted, updatedCount } +} + +/** + * Bulk sync path: persist txs + inputs only. Prices, Redis cache, and paybutton + * graphs are deferred to connectAllTransactionsToPrices after the sync job. + */ +export async function createManyTransactionsForSync ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { + if (transactionsData.length === 0) { + return { insertedCount: 0, inserted: [] } + } + + const { inserted } = await persistManyTransactionRows(transactionsData) + return { + insertedCount: inserted.length, + inserted + } +} + +export async function createManyTransactions ( + transactionsData: Prisma.TransactionUncheckedCreateInput[] +): Promise { + if (transactionsData.length === 0) { + return [] + } + + const { inserted } = await persistManyTransactionRows(transactionsData) + + if (inserted.length === 0) { + return [] + } + + const insertedTransactions = await prisma.transaction.findMany({ + where: { + id: { in: inserted.map(t => t.id) } + }, + include: includeNetwork + }) + await connectTransactionsListToPrices(insertedTransactions) - const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList(insertedTransactions.map((tx) => tx.id)) + const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList( + inserted.map((tx) => tx.id) + ) void CacheSet.txsCreation(txsWithPaybuttonsAndPrices) diff --git a/tests/unittests/chronikService.test.ts b/tests/unittests/chronikService.test.ts index f6d5a677..b6e989bf 100644 --- a/tests/unittests/chronikService.test.ts +++ b/tests/unittests/chronikService.test.ts @@ -61,6 +61,10 @@ jest.mock('../../services/addressService', () => ({ jest.mock('../../services/transactionService', () => ({ createManyTransactions: jest.fn(), + createManyTransactionsForSync: jest.fn(async () => ({ + insertedCount: 0, + inserted: [] + })), filterRowsNeedingCreateMany: jest.fn(async (rows: unknown[]) => rows), deleteTransactions: jest.fn(), fetchUnconfirmedTransactions: jest.fn(), From 62fda1ba36ef68ee5db591886de989eb77f22d22 Mon Sep 17 00:00:00 2001 From: Fabien Date: Fri, 22 May 2026 18:48:17 +0200 Subject: [PATCH 7/8] Fixes related to rabbit feedback Don't fail on cache clear issue, reconnect price for updated txs as well, avoid duplicate triggers if several addresses link a single tx, keep op_return, use scanStrean for redis cleanup. --- jobs/workers.ts | 56 ++++++++++++++++++++++++++++++---- redis/clientInstance.ts | 10 ++++-- redis/paymentCache.ts | 14 ++++++++- services/chronikService.ts | 13 +++++--- services/transactionService.ts | 45 +++++++++++++++++---------- 5 files changed, 108 insertions(+), 30 deletions(-) diff --git a/jobs/workers.ts b/jobs/workers.ts index ad8f6262..25ec1a71 100644 --- a/jobs/workers.ts +++ b/jobs/workers.ts @@ -14,6 +14,29 @@ import * as priceService from 'services/priceService' const ADDRESS_INVALIDATION_BATCH_SIZE = 100 +interface AddressCacheInvalidationResult { + paymentFailures: number + balanceFailures: number +} + +const invalidateAddressCaches = async ( + address: string +): Promise => { + let paymentFailures = 0 + let balanceFailures = 0 + try { + await clearPaymentCacheForAddress(address) + } catch { + paymentFailures = 1 + } + try { + await clearBalanceCache(address) + } catch { + balanceFailures = 1 + } + return { paymentFailures, balanceFailures } +} + /** * Drop Redis payment-week, balance, and dashboard caches after bulk blockchain sync. * Rebuild is lazy on the next API request. @@ -21,17 +44,38 @@ const ADDRESS_INVALIDATION_BATCH_SIZE = 100 async function invalidateCachesAfterBlockchainSync (): Promise { console.log('[CACHE]: Invalidating caches after blockchain sync...') const addresses = await fetchAllAddresses() + let paymentCacheFailures = 0 + let balanceCacheFailures = 0 for (let i = 0; i < addresses.length; i += ADDRESS_INVALIDATION_BATCH_SIZE) { const batch = addresses.slice(i, i + ADDRESS_INVALIDATION_BATCH_SIZE) - await Promise.all( - batch.map(async (a) => { - await clearPaymentCacheForAddress(a.address) - await clearBalanceCache(a.address) - }) + const results = await Promise.all( + batch.map(async (a) => await invalidateAddressCaches(a.address)) ) + for (const r of results) { + paymentCacheFailures += r.paymentFailures + balanceCacheFailures += r.balanceFailures + } } const users = await fetchAllUsers() - await Promise.all(users.map(async (u) => await clearDashboardCache(u.id))) + let dashboardCacheFailures = 0 + await Promise.all( + users.map(async (u) => { + try { + await clearDashboardCache(u.id) + } catch { + dashboardCacheFailures += 1 + } + }) + ) + const totalFailures = + paymentCacheFailures + balanceCacheFailures + dashboardCacheFailures + if (totalFailures > 0) { + console.warn( + `[CACHE]: Cache invalidation completed with ${totalFailures} failure(s) ` + + `(payment: ${paymentCacheFailures}, balance: ${balanceCacheFailures}, ` + + `dashboard: ${dashboardCacheFailures}). DB sync already succeeded.` + ) + } console.log( `[CACHE]: Invalidated payment/balance caches for ${addresses.length} addresses ` + `and dashboard caches for ${users.length} users.` diff --git a/redis/clientInstance.ts b/redis/clientInstance.ts index 9f472a31..148f11e8 100755 --- a/redis/clientInstance.ts +++ b/redis/clientInstance.ts @@ -18,11 +18,15 @@ class RedisMocked { } scanStream (opt: ScanStreamOptions): ScanStream { - return new ScanStream({ + const stream = new ScanStream({ ...opt, - command: '', - redis: {} + command: 'scan', + redis: this as unknown as IORedis }) + process.nextTick(() => { + stream.emit('end') + }) + return stream } pipeline (commands?: unknown[][]): any { diff --git a/redis/paymentCache.ts b/redis/paymentCache.ts index 18e12872..ad18d9f6 100755 --- a/redis/paymentCache.ts +++ b/redis/paymentCache.ts @@ -39,7 +39,19 @@ export const getPaymentList = async (userId: string): Promise => { } const getCachedWeekKeysForAddress = async (addressString: string): Promise => { - return await redis.keys(`${addressString}:payments:*`) + const pattern = `${addressString}:payments:*` + const keys: string[] = [] + const stream = redis.scanStream({ + match: pattern, + count: 100 + }) + return await new Promise((resolve, reject) => { + stream.on('data', (batch: string[]) => { + keys.push(...batch) + }) + stream.on('end', () => resolve(keys)) + stream.on('error', reject) + }) } export const getCachedWeekKeysForUser = async (userId: string): Promise => { diff --git a/services/chronikService.ts b/services/chronikService.ts index 5167c752..6f50b92d 100644 --- a/services/chronikService.ts +++ b/services/chronikService.ts @@ -858,7 +858,8 @@ export class ChronikBlockchainClient { private broadcastIncomingTxFromSyncRow ( addressString: string, chronikTx: Tx, - createdTx: SyncPersistedTransaction + createdTx: SyncPersistedTransaction, + opReturn: string ): BroadcastTxData { const broadcastTxData: BroadcastTxData = {} as BroadcastTxData broadcastTxData.address = addressString @@ -869,7 +870,7 @@ export class ChronikBlockchainClient { hash: createdTx.hash, amount: createdTx.amount, confirmed: createdTx.confirmed, - opReturn: '', + opReturn, timestamp: createdTx.timestamp, address: { address: addressString }, prices: [], @@ -1001,14 +1002,18 @@ export class ChronikBlockchainClient { if (runTriggers && syncResult.inserted.length > 0) { const triggerBatch: BroadcastTxData[] = [] for (const createdTx of syncResult.inserted) { - const tuple = commitTuples.find(t => t.row.hash === createdTx.hash) + const tuple = commitTuples.find( + t => t.row.hash === createdTx.hash && t.row.addressId === createdTx.addressId + ) if (tuple == null) { continue } + const opReturn = typeof tuple.row.opReturn === 'string' ? tuple.row.opReturn : '' const bd = this.broadcastIncomingTxFromSyncRow( tuple.addressString, tuple.raw, - createdTx + createdTx, + opReturn ) triggerBatch.push(bd) } diff --git a/services/transactionService.ts b/services/transactionService.ts index 6b6a8589..7978e5b4 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -605,9 +605,19 @@ export interface CreateManyTransactionsSyncResult { interface PersistManyTransactionRowsResult { inserted: SyncPersistedTransaction[] + updated: SyncPersistedTransaction[] updatedCount: number } +const syncPersistedTxSelect = { + id: true, + hash: true, + addressId: true, + amount: true, + timestamp: true, + confirmed: true +} as const + /** * Cheap dedupe before createManyTransactions: returns only new rows or rows * whose confirmed, timestamp, or orphaned may have changed. @@ -676,6 +686,7 @@ async function persistManyTransactionRows ( }) const inserted: SyncPersistedTransaction[] = [] + const updated: SyncPersistedTransaction[] = [] let updatedCount = 0 await prisma.$transaction( @@ -748,14 +759,7 @@ async function persistManyTransactionRows ( addressId: row.addressId })) }, - select: { - id: true, - hash: true, - addressId: true, - amount: true, - timestamp: true, - confirmed: true - } + select: syncPersistedTxSelect }) const txMap = new Map() @@ -816,7 +820,12 @@ async function persistManyTransactionRows ( }) ) ) - updatedCount = toUpdate.length + const updatedTxs = await tx.transaction.findMany({ + where: { id: { in: toUpdate.map(u => u.id) } }, + select: syncPersistedTxSelect + }) + updated.push(...updatedTxs) + updatedCount = updatedTxs.length } }, { @@ -824,7 +833,7 @@ async function persistManyTransactionRows ( } ) - return { inserted, updatedCount } + return { inserted, updated, updatedCount } } /** @@ -852,22 +861,26 @@ export async function createManyTransactions ( return [] } - const { inserted } = await persistManyTransactionRows(transactionsData) + const { inserted, updated } = await persistManyTransactionRows(transactionsData) + const persistedIds = [ + ...inserted.map(t => t.id), + ...updated.map(t => t.id) + ] - if (inserted.length === 0) { + if (persistedIds.length === 0) { return [] } - const insertedTransactions = await prisma.transaction.findMany({ + const persistedTransactions = await prisma.transaction.findMany({ where: { - id: { in: inserted.map(t => t.id) } + id: { in: persistedIds } }, include: includeNetwork }) - await connectTransactionsListToPrices(insertedTransactions) + await connectTransactionsListToPrices(persistedTransactions) const txsWithPaybuttonsAndPrices = await fetchTransactionsWithPaybuttonsAndPricesForIdList( - inserted.map((tx) => tx.id) + persistedIds ) void CacheSet.txsCreation(txsWithPaybuttonsAndPrices) From a5c2b2ce0bb0bd2f9d5b18d2fed06b3f30e2a074 Mon Sep 17 00:00:00 2001 From: Fabien Date: Mon, 25 May 2026 12:08:15 +0200 Subject: [PATCH 8/8] Delete prices in chunks Now that price linkage is deferred, there can be a lot of prices to insert and delete. The insertion is done by chunks but the deletion was not, leading to this error: ``` Invalid `prisma.pricesOnTransactions.deleteMany()` invocation: Error occurred during query execution: ConnectorError(ConnectorError { user_facing_error: None, kind: QueryError(Server(MysqlError { code: 1390, message: "Prepared statement contains too many placeholders", state: "HY000" })), transient: false }) ``` --- constants/index.ts | 2 +- services/transactionService.ts | 25 +++++++++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/constants/index.ts b/constants/index.ts index d835c275..c3386fc5 100644 --- a/constants/index.ts +++ b/constants/index.ts @@ -288,7 +288,7 @@ export const TRIGGER_EMAIL_CONCURRENCY = 100 export const TRIGGER_LOG_BATCH_SIZE = 200 export const PRICES_CONNECTION_BATCH_SIZE = 1_000 -// interactive $transaction timeout in ms (for the single delete + several createMany of prices) +// interactive $transaction timeout in ms (for batched delete + createMany of prices) export const PRICES_CONNECTION_TIMEOUT = 30_000 export const CLIENT_PAYMENT_EXPIRATION_TIME = (7) * (24 * 60 * 60 * 1000) // (number of days) * (24 * 60 * 60 * 1000) diff --git a/services/transactionService.ts b/services/transactionService.ts index 7978e5b4..221905d2 100644 --- a/services/transactionService.ts +++ b/services/transactionService.ts @@ -428,6 +428,24 @@ function buildPriceTxConnectionInput (tx: Transaction, allPrices: AllPrices): Pr ] } +async function deletePriceTxConnectionsInChunks ( + client: Prisma.TransactionClient, + transactionIds: string[] +): Promise { + let pricesUnlinkedCount = 0 + console.log( + `[PRICES] Disconnecting existing price links for ${transactionIds.length} txs...` + ) + for (let i = 0; i < transactionIds.length; i += PRICES_CONNECTION_BATCH_SIZE) { + const slice = transactionIds.slice(i, i + PRICES_CONNECTION_BATCH_SIZE) + const result = await client.pricesOnTransactions.deleteMany({ + where: { transactionId: { in: slice } } + }) + pricesUnlinkedCount += result.count + } + console.log(`[PRICES] Disconnected ${pricesUnlinkedCount} price links.`) +} + async function createPriceTxConnectionInChunks ( client: Prisma.TransactionClient | typeof prisma, rows: Prisma.PricesOnTransactionsCreateManyInput[] @@ -538,12 +556,7 @@ export async function connectTransactionsListToPrices ( await prisma.$transaction( async (tx) => { - console.log( - `[PRICES] Disconnecting existing price links for ${txList.length} txs...` - ) - await tx.pricesOnTransactions.deleteMany({ - where: { transactionId: { in: txList.map((t) => t.id) } } - }) + await deletePriceTxConnectionsInChunks(tx, txList.map((t) => t.id)) await createPriceTxConnectionInChunks(tx, rows) }, { timeout: PRICES_CONNECTION_TIMEOUT }