From f567ec986965eaa170e64775f763f1f0de9a17a2 Mon Sep 17 00:00:00 2001 From: Ido Shamun <1993245+idoshamun@users.noreply.github.com> Date: Sun, 24 May 2026 10:06:28 +0300 Subject: [PATCH] fix: use canonical highlight history for bragi evaluation --- __tests__/cron/channelHighlights.ts | 327 ++++++-- src/common/channelHighlight/canonical.ts | 414 ++++++++++ src/common/channelHighlight/channels.ts | 52 ++ src/common/channelHighlight/constants.ts | 8 + src/common/channelHighlight/decisions.ts | 22 +- src/common/channelHighlight/evaluate.ts | 17 +- src/common/channelHighlight/generate.ts | 787 ++----------------- src/common/channelHighlight/legacyFanout.ts | 150 ++++ src/common/channelHighlight/publish.ts | 80 +- src/common/channelHighlight/queries.ts | 199 +---- src/common/channelHighlight/runs.ts | 136 ++++ src/common/channelHighlight/stories.ts | 37 +- src/common/channelHighlight/storyFamilies.ts | 47 ++ src/common/channelHighlight/types.ts | 2 +- src/common/channelHighlight/utils.ts | 18 + src/cron/channelHighlights.ts | 12 +- 16 files changed, 1214 insertions(+), 1094 deletions(-) create mode 100644 src/common/channelHighlight/canonical.ts create mode 100644 src/common/channelHighlight/channels.ts create mode 100644 src/common/channelHighlight/constants.ts create mode 100644 src/common/channelHighlight/legacyFanout.ts create mode 100644 src/common/channelHighlight/runs.ts create mode 100644 src/common/channelHighlight/storyFamilies.ts create mode 100644 src/common/channelHighlight/utils.ts diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts index 454f2480ec..f7a8872d86 100644 --- a/__tests__/cron/channelHighlights.ts +++ b/__tests__/cron/channelHighlights.ts @@ -226,11 +226,7 @@ describe('channel highlight generation cron', () => { }); expect(runs).toMatchObject([ { - channel: 'backend', - status: 'completed', - }, - { - channel: 'vibes', + channel: 'global', status: 'completed', }, ]); @@ -241,6 +237,66 @@ describe('channel highlight generation cron', () => { } }); + it('should generate canonical highlights without legacy definitions', async () => { + const now = new Date('2026-03-03T10:00:00.000Z'); + await saveArticle({ + id: 'defless-1', + title: 'Definitionless story', + createdAt: new Date('2026-03-03T09:45:00.000Z'), + channels: ['backend', 'vibes'], + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateHighlights') + .mockResolvedValue({ + items: [ + { + postId: 'defless-1', + headline: 'Definitionless headline', + significanceLabel: 'major', + reason: 'test', + }, + ], + }); + + await runChannelHighlights({ con, now }); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0]).toMatchObject({ + maxItems: 20, + }); + expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ + expect.objectContaining({ + postId: 'defless-1', + title: 'Definitionless story', + }), + ]); + + const canonicalHighlights = await con + .getRepository(HighlightsCanonical) + .find(); + expect(canonicalHighlights).toEqual([ + expect.objectContaining({ + postId: 'defless-1', + channels: ['backend', 'vibes'], + headline: 'Definitionless headline', + significance: PostHighlightSignificance.Major, + reason: 'test', + }), + ]); + + const liveHighlights = await con.getRepository(PostHighlight).find(); + expect(liveHighlights).toEqual([]); + + const runs = await con.getRepository(ChannelHighlightRun).find(); + expect(runs).toMatchObject([ + { + channel: 'global', + status: 'completed', + }, + ]); + }); + it('should keep live highlights unchanged in shadow mode and store a comparison run', async () => { const now = new Date('2026-03-03T10:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ @@ -269,7 +325,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -284,24 +340,17 @@ describe('channel highlight generation cron', () => { await runChannelHighlights({ con, now }); expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].targetAudience).toBe( - 'Developers following vibes', + expect(evaluatorSpy.mock.calls[0][0].maxItems).toBe(20); + expect(evaluatorSpy.mock.calls[0][0].currentHighlights).toEqual([]); + expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + postId: 'fresh-1', + title: 'Fresh story', + relatedItemsCount: 1, + }), + ]), ); - expect(evaluatorSpy.mock.calls[0][0].maxItems).toBe(3); - expect(evaluatorSpy.mock.calls[0][0].currentHighlights).toEqual([ - expect.objectContaining({ - postId: 'live-1', - headline: 'Live headline', - summary: 'Live story summary', - }), - ]); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh story', - relatedItemsCount: 1, - }), - ]); const liveHighlights = await con.getRepository(PostHighlight).find({ where: { channel: 'vibes', retiredAt: IsNull() }, @@ -318,21 +367,94 @@ describe('channel highlight generation cron', () => { .find({ order: { highlightedAt: 'DESC' }, }); - expect(canonicalHighlights).toEqual([]); + expect(canonicalHighlights).toEqual([ + expect.objectContaining({ + postId: 'fresh-1', + channels: ['vibes'], + headline: 'Fresh headline', + significance: PostHighlightSignificance.Breaking, + reason: 'test', + }), + ]); const run = await con.getRepository(ChannelHighlightRun).findOneByOrFail({ - channel: 'vibes', + channel: 'global', }); expect(run.status).toBe('completed'); expect(run.comparison).toMatchObject({ wouldPublish: true, - published: false, - baselineCount: 1, - internalCount: 2, + published: true, + baselineCount: 0, + internalCount: 1, addedPostIds: ['fresh-1'], }); }); + it('should send global canonical highlight history to the evaluator', async () => { + const now = new Date('2026-03-03T10:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ + channel: 'vibes', + mode: 'shadow', + candidateHorizonHours: 72, + maxItems: 3, + }); + await saveArticle({ + id: 'legacy-only', + title: 'Legacy only story', + summary: 'Legacy only summary', + createdAt: new Date('2026-03-03T08:00:00.000Z'), + }); + await saveArticle({ + id: 'canonical-1', + title: 'Canonical story', + summary: 'Canonical summary', + createdAt: new Date('2026-03-03T08:30:00.000Z'), + }); + await saveArticle({ + id: 'fresh-1', + title: 'Fresh story', + createdAt: new Date('2026-03-03T09:20:00.000Z'), + }); + await con.getRepository(PostHighlight).save({ + channel: 'vibes', + postId: 'legacy-only', + highlightedAt: new Date('2026-03-03T09:00:00.000Z'), + headline: 'Legacy only headline', + }); + await con.getRepository(HighlightsCanonical).save({ + postId: 'canonical-1', + channels: ['backend'], + highlightedAt: new Date('2026-03-03T09:10:00.000Z'), + headline: 'Canonical headline', + significance: PostHighlightSignificance.Major, + reason: 'canonical history', + }); + + const evaluatorSpy = jest + .spyOn(evaluator, 'evaluateHighlights') + .mockResolvedValue({ items: [] }); + + await runChannelHighlights({ con, now }); + + expect(evaluatorSpy).toHaveBeenCalledTimes(1); + expect(evaluatorSpy.mock.calls[0][0].currentHighlights).toEqual([ + expect.objectContaining({ + postId: 'canonical-1', + headline: 'Canonical headline', + summary: 'Canonical summary', + reason: 'canonical history', + }), + ]); + expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + postId: 'fresh-1', + title: 'Fresh story', + }), + ]), + ); + }); + it('should publish admitted highlights in publish mode and trim FIFO by maxItems', async () => { const now = new Date('2026-03-03T11:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ @@ -385,7 +507,7 @@ describe('channel highlight generation cron', () => { }, ]); - jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + jest.spyOn(evaluator, 'evaluateHighlights').mockResolvedValue({ items: [ { postId: 'fresh-1', @@ -475,7 +597,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -491,10 +613,7 @@ describe('channel highlight generation cron', () => { expect(evaluatorSpy).toHaveBeenCalledTimes(1); expect(evaluatorSpy.mock.calls[0][0]).toMatchObject({ - channel: 'global', - targetAudience: - 'software engineers and engineering leaders who want to stay current on meaningful developments that affect how modern software is built, shipped, operated, and grown', - maxItems: 5, + maxItems: 20, }); expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ expect.objectContaining({ @@ -527,7 +646,7 @@ describe('channel highlight generation cron', () => { expect(canonicalHighlights).toEqual([ expect.objectContaining({ postId: 'global-fresh', - channels: ['backend', 'vibes'], + channels: ['backend', 'disabled', 'vibes'], headline: 'Global headline', significance: PostHighlightSignificance.Major, }), @@ -538,12 +657,7 @@ describe('channel highlight generation cron', () => { }); expect(runs).toMatchObject([ { - channel: 'backend', - status: 'completed', - comparison: expect.objectContaining({ published: true }), - }, - { - channel: 'vibes', + channel: 'global', status: 'completed', comparison: expect.objectContaining({ published: true }), }, @@ -581,16 +695,25 @@ describe('channel highlight generation cron', () => { significance: PostHighlightSignificance.Major, reason: 'existing', }); - await con.getRepository(HighlightsCanonical).save({ - postId: 'child-upgrade', - channels: ['vibes'], - highlightedAt: new Date('2026-03-03T11:00:00.000Z'), - headline: 'Original child headline', - significance: PostHighlightSignificance.Major, - reason: 'existing', - }); + const originalCanonicalUpdatedAt = new Date('2026-03-03T10:30:00.000Z'); + const originalCanonical = await con + .getRepository(HighlightsCanonical) + .save({ + postId: 'child-upgrade', + channels: ['vibes'], + highlightedAt: new Date('2026-03-03T11:00:00.000Z'), + headline: 'Original child headline', + significance: PostHighlightSignificance.Major, + reason: 'existing', + }); + await con.getRepository(HighlightsCanonical).update( + { id: originalCanonical.id }, + { + updatedAt: originalCanonicalUpdatedAt, + }, + ); - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); + const evaluatorSpy = jest.spyOn(evaluator, 'evaluateHighlights'); await runChannelHighlights({ con, now }); @@ -627,6 +750,9 @@ describe('channel highlight generation cron', () => { reason: 'existing', }), ]); + expect(canonicalHighlights[0].updatedAt.getTime()).toBeGreaterThan( + originalCanonicalUpdatedAt.getTime(), + ); }); it('should exclude retired highlights from candidates and keep them retired', async () => { @@ -658,7 +784,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -673,12 +799,14 @@ describe('channel highlight generation cron', () => { await runChannelHighlights({ con, now }); expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh candidate', - }), - ]); + expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + postId: 'fresh-1', + title: 'Fresh candidate', + }), + ]), + ); const liveHighlights = await con.getRepository(PostHighlight).find({ where: { channel: 'vibes', retiredAt: IsNull() }, @@ -697,7 +825,7 @@ describe('channel highlight generation cron', () => { expect(retiredHighlights[0].retiredAt).toBeInstanceOf(Date); }); - it('should send recent retired highlights to the evaluator while excluding resurfaced stories', async () => { + it('should send recent canonical history to the evaluator while excluding resurfaced stories', async () => { const now = new Date('2026-03-03T12:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', @@ -745,9 +873,16 @@ describe('channel highlight generation cron', () => { significance: PostHighlightSignificance.Notable, retiredAt: new Date('2026-03-03T11:40:00.000Z'), }); + await con.getRepository(HighlightsCanonical).save({ + postId: 'retired-child', + channels: ['vibes'], + highlightedAt: new Date('2026-03-03T11:30:00.000Z'), + headline: 'Retired child headline', + significance: PostHighlightSignificance.Notable, + }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [] }); await runChannelHighlights({ con, now }); @@ -789,6 +924,14 @@ describe('channel highlight generation cron', () => { significance: PostHighlightSignificance.Major, reason: 'previous run', }); + await con.getRepository(HighlightsCanonical).save({ + postId: 'underlying-1', + channels: ['vibes'], + highlightedAt: new Date('2026-03-03T11:30:00.000Z'), + headline: 'Original headline', + significance: PostHighlightSignificance.Major, + reason: 'previous run', + }); // A user shares the article between runs. await saveShare({ id: 'share-1', @@ -798,7 +941,7 @@ describe('channel highlight generation cron', () => { }); jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [] }); await runChannelHighlights({ con, now }); @@ -840,9 +983,17 @@ describe('channel highlight generation cron', () => { significance: PostHighlightSignificance.Major, reason: 'pre-fix run', }); + await con.getRepository(HighlightsCanonical).save({ + postId: 'share-2', + channels: ['vibes'], + highlightedAt: new Date('2026-03-03T11:30:00.000Z'), + headline: 'Legacy share headline', + significance: PostHighlightSignificance.Major, + reason: 'pre-fix run', + }); jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [] }); await runChannelHighlights({ con, now }); @@ -876,9 +1027,7 @@ describe('channel highlight generation cron', () => { title: 'CVE collection', createdAt: new Date('2026-02-15T10:00:00.000Z'), }); - // Old child highlighted >14 days ago (outside eval window) and retired. - // It is in retiredHighlightPostIds (kept forever) but not in any - // share-fallback / canonicalization path. + // Existing canonical story is a child that now belongs to the collection. await saveArticle({ id: 'old-child', title: 'Original CVE writeup', @@ -898,6 +1047,14 @@ describe('channel highlight generation cron', () => { reason: 'first run', retiredAt: new Date('2026-02-15T20:00:00.000Z'), }); + await con.getRepository(HighlightsCanonical).save({ + postId: 'old-child', + channels: ['vibes'], + highlightedAt: new Date('2026-03-15T11:30:00.000Z'), + headline: 'Original CVE headline', + significance: PostHighlightSignificance.Major, + reason: 'first run', + }); // Brand-new child article. Its relation to the collection is created // *after* fetchRelations runs (simulated below) — exactly the race // observed in production for collection NB1YEYCZ6 on 2026-05-06. @@ -908,7 +1065,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockImplementation(async () => { await con.getRepository(PostRelation).save({ postId: 'collection-cve', @@ -936,11 +1093,17 @@ describe('channel highlight generation cron', () => { expect.objectContaining({ postId: 'new-child' }), ]); - // The admit-time guard re-fetches relations and drops the duplicate. + // The admit-time guard re-fetches relations and drops the duplicate, + // leaving the canonical collection projected into legacy. const liveHighlights = await con.getRepository(PostHighlight).find({ where: { channel: 'vibes', retiredAt: IsNull() }, }); - expect(liveHighlights).toHaveLength(0); + expect(liveHighlights).toEqual([ + expect.objectContaining({ + postId: 'collection-cve', + headline: 'Original CVE headline', + }), + ]); const allHighlightsForNewChild = await con .getRepository(PostHighlight) @@ -1002,7 +1165,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -1042,7 +1205,7 @@ describe('channel highlight generation cron', () => { expect(retiredDigestHighlight.retiredAt).toBeInstanceOf(Date); }); - it('should remove highlights that aged past the configured horizon', async () => { + it('should remove legacy highlights that aged past the projection horizon', async () => { const now = new Date('2026-03-03T12:00:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ channel: 'vibes', @@ -1053,7 +1216,7 @@ describe('channel highlight generation cron', () => { await saveArticle({ id: 'expired-live', title: 'Expired live story', - createdAt: new Date('2026-03-01T10:00:00.000Z'), + createdAt: new Date('2026-01-01T10:00:00.000Z'), }); await con.getRepository(PostHighlight).save({ channel: 'vibes', @@ -1062,7 +1225,7 @@ describe('channel highlight generation cron', () => { headline: 'Expired headline', }); - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); + const evaluatorSpy = jest.spyOn(evaluator, 'evaluateHighlights'); await runChannelHighlights({ con, now }); @@ -1101,7 +1264,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -1171,7 +1334,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockImplementation(async ({ newCandidates }) => ({ items: [ { @@ -1239,7 +1402,7 @@ describe('channel highlight generation cron', () => { private: true, }); - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); + const evaluatorSpy = jest.spyOn(evaluator, 'evaluateHighlights'); await runChannelHighlights({ con, now }); @@ -1258,7 +1421,17 @@ describe('channel highlight generation cron', () => { mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, - lastFetchedAt: new Date('2026-03-03T12:20:00.000Z'), + }); + await con.getRepository(ChannelHighlightRun).save({ + channel: 'global', + scheduledAt: new Date('2026-03-03T12:20:00.000Z'), + status: 'completed', + baselineSnapshot: [], + inputSummary: {}, + internalSnapshot: [], + comparison: {}, + metrics: {}, + completedAt: new Date('2026-03-03T12:21:00.000Z'), }); await saveArticle({ id: 'stats-only-1', @@ -1274,7 +1447,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [ { @@ -1337,7 +1510,7 @@ describe('channel highlight generation cron', () => { }); const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') + .spyOn(evaluator, 'evaluateHighlights') .mockResolvedValue({ items: [] }); await runChannelHighlights({ con, now }); diff --git a/src/common/channelHighlight/canonical.ts b/src/common/channelHighlight/canonical.ts new file mode 100644 index 0000000000..46ad1b3eed --- /dev/null +++ b/src/common/channelHighlight/canonical.ts @@ -0,0 +1,414 @@ +import { type DataSource, type EntityManager } from 'typeorm'; +import { HighlightsCanonical } from '../../entity/HighlightsCanonical'; +import { UNKNOWN_SOURCE } from '../../entity/Source'; +import { getChannelDigestSourceIds } from '../channelDigest/definitions'; +import { + DEFAULT_CANDIDATE_HORIZON_HOURS, + DEFAULT_MAX_ITEMS, +} from './constants'; +import { compareSnapshots } from './decisions'; +import { evaluateHighlights } from './evaluate'; +import { + createHighlightChannelResolver, + type HighlightChannelResolver, +} from './channels'; +import { + toHighlightItemFromCanonical, + upsertCanonicalHighlights, +} from './publish'; +import { + fetchCollectionMembership, + fetchEvaluationHistoryCanonicalHighlights, + fetchIncrementalPosts, + fetchPostsByIds, + fetchPublicShareFallbackPostIds, + fetchRelations, + getFetchStart, + getHorizonStart, + mergePosts, +} from './queries'; +import { + applyPublicShareFallbackToCandidates, + applyPublicShareFallbackToHighlights, + buildCandidates, + canonicalizeCurrentHighlights, + toHighlightItem, +} from './stories'; +import type { PostRelation } from '../../entity/posts/PostRelation'; +import type { HighlightCandidate, HighlightItem, HighlightPost } from './types'; +import { dedupeHighlightsByPostId } from './utils'; + +export type GenerationConfig = { + horizonStart: Date; + fetchStart: Date; + maxItems: number; +}; + +export type CanonicalInput = { + excludedSourceIds: string[]; + canonicalHistoryRows: HighlightsCanonical[]; + incrementalPosts: HighlightPost[]; + relationPosts: HighlightPost[]; + availablePosts: HighlightPost[]; + relations: PostRelation[]; + inaccessiblePostIds: Set; + fallbackPostIds: Map; +}; + +export type CanonicalHighlights = { + history: HighlightItem[]; + newCandidates: HighlightCandidate[]; + admitted: HighlightItem[]; + snapshot: HighlightItem[]; + comparison: ReturnType; + itemsToUpsert: HighlightItem[]; + channelsByPostId: Map>; +}; + +export const getGenerationConfig = ({ + now, + lastFetchedAt, +}: { + now: Date; + lastFetchedAt: Date | null; +}): GenerationConfig => { + const horizonStart = getHorizonStart({ + now, + definition: { + candidateHorizonHours: DEFAULT_CANDIDATE_HORIZON_HOURS, + }, + }); + const fetchStart = getFetchStart({ + now, + definition: { + candidateHorizonHours: DEFAULT_CANDIDATE_HORIZON_HOURS, + lastFetchedAt, + }, + }); + + return { + horizonStart, + fetchStart, + maxItems: DEFAULT_MAX_ITEMS, + }; +}; + +export const loadCanonicalInput = async ({ + con, + config, + now, +}: { + con: DataSource; + config: GenerationConfig; + now: Date; +}): Promise => { + const [excludedSourceIds, canonicalHistoryRows] = await Promise.all([ + getChannelDigestSourceIds({ + con, + }), + fetchEvaluationHistoryCanonicalHighlights({ + con, + now, + }), + ]); + const evaluationHistoryPostIds = [ + ...new Set(canonicalHistoryRows.map((item) => item.postId)), + ]; + const [incrementalPosts, evaluationHistoryPosts] = await Promise.all([ + fetchIncrementalPosts({ + con, + fetchStart: config.fetchStart, + horizonStart: config.horizonStart, + excludedSourceIds, + }), + fetchPostsByIds({ + con, + ids: evaluationHistoryPostIds, + excludedSourceIds, + }), + ]); + const basePosts = mergePosts([incrementalPosts, evaluationHistoryPosts]); + const sharedUnderlyingIds = [ + ...new Set( + basePosts + .map((post) => post.sharedPostId) + .filter((id): id is string => !!id), + ), + ]; + const [relations, sharedUnderlyingPosts] = await Promise.all([ + fetchRelations({ + con, + postIds: basePosts.map((post) => post.id), + }), + fetchPostsByIds({ + con, + ids: sharedUnderlyingIds, + excludedSourceIds, + }), + ]); + const relationPosts = await fetchPostsByIds({ + con, + ids: [ + ...new Set( + relations.flatMap((relation) => [ + relation.postId, + relation.relatedPostId, + ]), + ), + ], + excludedSourceIds, + }); + const availablePosts = mergePosts([ + basePosts, + relationPosts, + sharedUnderlyingPosts, + ]); + const inaccessiblePostIds = new Set( + availablePosts + .filter((post) => post.sourceId === UNKNOWN_SOURCE) + .map((post) => post.id), + ); + const fallbackPostIds = await fetchPublicShareFallbackPostIds({ + con, + sharedPostIds: [...new Set(availablePosts.map((post) => post.id))], + excludedSourceIds, + }); + + return { + excludedSourceIds, + canonicalHistoryRows, + incrementalPosts, + relationPosts, + availablePosts, + relations, + inaccessiblePostIds, + fallbackPostIds, + }; +}; + +const toCanonicalChannels = ({ + items, + channelResolver, +}: { + items: HighlightItem[]; + channelResolver: HighlightChannelResolver; +}): Map> => + new Map( + items.map((item) => [item.postId, new Set(channelResolver(item.postId))]), + ); + +const selectNewCandidates = ({ + input, + config, + canonicalHistoryPostIds, + channelResolver, +}: { + input: CanonicalInput; + config: GenerationConfig; + canonicalHistoryPostIds: Set; + channelResolver: HighlightChannelResolver; +}): HighlightCandidate[] => + applyPublicShareFallbackToCandidates({ + candidates: buildCandidates({ + posts: input.availablePosts, + relations: input.relations, + horizonStart: config.horizonStart, + }), + inaccessiblePostIds: input.inaccessiblePostIds, + fallbackPostIds: input.fallbackPostIds, + }).filter((candidate) => { + if (canonicalHistoryPostIds.has(candidate.postId)) { + return false; + } + + return channelResolver(candidate.postId).length > 0; + }); + +const evaluateNewHighlights = async ({ + config, + canonicalHistory, + newCandidates, + now, +}: { + config: GenerationConfig; + canonicalHistory: HighlightItem[]; + newCandidates: HighlightCandidate[]; + now: Date; +}): Promise => { + if (!newCandidates.length) { + return []; + } + + const result = await evaluateHighlights({ + maxItems: config.maxItems, + currentHighlights: canonicalHistory, + newCandidates, + }); + + return result.items.map((item) => ({ + postId: item.postId, + headline: item.headline, + summary: null, + highlightedAt: now, + significanceLabel: item.significanceLabel, + reason: item.reason, + })); +}; + +export const generateCanonicalHighlights = async ({ + con, + input, + config, + now, +}: { + con: DataSource; + input: CanonicalInput; + config: GenerationConfig; + now: Date; +}): Promise => { + const channelResolver = createHighlightChannelResolver({ + posts: input.availablePosts, + relations: input.relations, + fallbackPostIds: input.fallbackPostIds, + }); + const canonicalHistory = dedupeHighlightsByPostId( + applyPublicShareFallbackToHighlights({ + highlights: canonicalizeCurrentHighlights({ + highlights: input.canonicalHistoryRows.map(toHighlightItem), + relations: input.relations, + posts: input.availablePosts, + inaccessiblePostIds: input.inaccessiblePostIds, + }), + inaccessiblePostIds: input.inaccessiblePostIds, + fallbackPostIds: input.fallbackPostIds, + }), + ); + const canonicalHistoryPostIds = new Set( + canonicalHistory.map((item) => item.postId), + ); + const originalHistoryPostIds = new Set( + input.canonicalHistoryRows.map((highlight) => highlight.postId), + ); + const maintenanceItems = canonicalHistory.filter( + (item) => !originalHistoryPostIds.has(item.postId), + ); + const newCandidates = selectNewCandidates({ + input, + config, + canonicalHistoryPostIds, + channelResolver, + }); + const evaluatedHighlights = await evaluateNewHighlights({ + config, + canonicalHistory, + newCandidates, + now, + }); + const admitted = await dropAdmissionsRacingCollections({ + con, + admitted: evaluatedHighlights, + fallbackPostIds: input.fallbackPostIds, + currentHighlightPostIds: canonicalHistoryPostIds, + }); + const snapshot = dedupeHighlightsByPostId([...canonicalHistory, ...admitted]); + const comparison = compareSnapshots({ + baseline: canonicalHistory, + internal: snapshot, + }); + const itemsToUpsert = dedupeHighlightsByPostId([ + ...admitted, + ...maintenanceItems, + ]); + + return { + history: canonicalHistory, + newCandidates, + admitted, + snapshot, + comparison, + itemsToUpsert, + channelsByPostId: toCanonicalChannels({ + items: itemsToUpsert, + channelResolver, + }), + }; +}; + +export const saveCanonicalHighlights = ({ + manager, + canonical, + relations, +}: { + manager: EntityManager; + canonical: CanonicalHighlights; + relations: PostRelation[]; +}): Promise => { + if (!canonical.itemsToUpsert.length) { + return Promise.resolve([]); + } + + return upsertCanonicalHighlights({ + manager, + items: canonical.itemsToUpsert, + channelsByPostId: canonical.channelsByPostId, + relations, + }); +}; + +export const toCanonicalHighlightsForFanout = ({ + canonical, + savedCanonicalHighlights, +}: { + canonical: CanonicalHighlights; + savedCanonicalHighlights: HighlightsCanonical[]; +}): HighlightItem[] => + dedupeHighlightsByPostId([ + ...savedCanonicalHighlights.map(toHighlightItemFromCanonical), + ...canonical.history, + ]); + +const dropAdmissionsRacingCollections = async ({ + con, + admitted, + fallbackPostIds, + currentHighlightPostIds, +}: { + con: DataSource; + admitted: HighlightItem[]; + fallbackPostIds: Map; + currentHighlightPostIds: Set; +}): Promise => { + if (!admitted.length) return admitted; + + const shareToUnderlying = new Map( + [...fallbackPostIds].map(([underlying, share]) => [share, underlying]), + ); + const underlyingId = (postId: string) => + shareToUnderlying.get(postId) ?? postId; + + const { collectionByChild, childrenByCollection } = + await fetchCollectionMembership({ + con, + postIds: [...new Set(admitted.map((item) => underlyingId(item.postId)))], + }); + if (!collectionByChild.size && !childrenByCollection.size) return admitted; + + const isCovered = (postId: string): boolean => { + const aliased = fallbackPostIds.get(postId) ?? postId; + return ( + currentHighlightPostIds.has(postId) || + currentHighlightPostIds.has(aliased) + ); + }; + + return admitted.filter((item) => { + const postId = underlyingId(item.postId); + const collectionId = collectionByChild.get(postId) || postId; + if (!childrenByCollection.has(collectionId)) return true; + const members = [ + collectionId, + ...(childrenByCollection.get(collectionId) ?? []), + ]; + return !members.some(isCovered); + }); +}; diff --git a/src/common/channelHighlight/channels.ts b/src/common/channelHighlight/channels.ts new file mode 100644 index 0000000000..9158dbc3b2 --- /dev/null +++ b/src/common/channelHighlight/channels.ts @@ -0,0 +1,52 @@ +import type { HighlightPost } from './types'; +import { buildStoryFamilies } from './storyFamilies'; + +type HighlightChannelResolverInput = { + posts: HighlightPost[]; + relations: { postId: string; relatedPostId: string }[]; + fallbackPostIds: Map; +}; + +export type HighlightChannelResolver = (postId: string) => string[]; + +const getPostChannels = (post: HighlightPost | undefined): string[] => { + const contentMeta = post?.contentMeta as { channels?: unknown } | undefined; + const channels = contentMeta?.channels; + if (!Array.isArray(channels)) { + return []; + } + + return [ + ...new Set( + channels.filter( + (channel): channel is string => typeof channel === 'string', + ), + ), + ].sort(); +}; + +export const createHighlightChannelResolver = ({ + posts, + relations, + fallbackPostIds, +}: HighlightChannelResolverInput): HighlightChannelResolver => { + const postsById = new Map(posts.map((post) => [post.id, post])); + const shareToUnderlying = new Map( + [...fallbackPostIds].map(([underlying, share]) => [share, underlying]), + ); + const storyFamilies = buildStoryFamilies({ relations }); + + return (postId) => { + const underlyingPostId = shareToUnderlying.get(postId) || postId; + const storyPostIds = storyFamilies.getFamilyPostIds(underlyingPostId); + const channels = new Set(); + + for (const storyPostId of storyPostIds) { + for (const channel of getPostChannels(postsById.get(storyPostId))) { + channels.add(channel); + } + } + + return [...channels].sort(); + }; +}; diff --git a/src/common/channelHighlight/constants.ts b/src/common/channelHighlight/constants.ts new file mode 100644 index 0000000000..592d98ff12 --- /dev/null +++ b/src/common/channelHighlight/constants.ts @@ -0,0 +1,8 @@ +export const GLOBAL_HIGHLIGHT_CHANNEL = 'global'; + +export const DEFAULT_CANDIDATE_HORIZON_HOURS = 168; + +export const DEFAULT_MAX_ITEMS = 20; + +export const GLOBAL_TARGET_AUDIENCE = + 'software engineers and engineering leaders who want to stay current on meaningful developments that affect how modern software is built, shipped, operated, and grown'; diff --git a/src/common/channelHighlight/decisions.ts b/src/common/channelHighlight/decisions.ts index 73e8f1a286..66510a25d2 100644 --- a/src/common/channelHighlight/decisions.ts +++ b/src/common/channelHighlight/decisions.ts @@ -28,24 +28,20 @@ export const compareSnapshots = ({ const changed = baseline.map(toItemSignature).join('||') !== internal.map(toItemSignature).join('||'); + const addedPostIds = [...internalByPostId.keys()].filter( + (postId) => !baselineByPostId.has(postId), + ); + const removedPostIds = [...baselineByPostId.keys()].filter( + (postId) => !internalByPostId.has(postId), + ); return { changed, baselineCount: baseline.length, internalCount: internal.length, overlapCount: overlap.length, - addedPostIds: [...internalByPostId.keys()].filter( - (postId) => !baselineByPostId.has(postId), - ), - removedPostIds: [...baselineByPostId.keys()].filter( - (postId) => !internalByPostId.has(postId), - ), - churnCount: - [...internalByPostId.keys()].filter( - (postId) => !baselineByPostId.has(postId), - ).length + - [...baselineByPostId.keys()].filter( - (postId) => !internalByPostId.has(postId), - ).length, + addedPostIds, + removedPostIds, + churnCount: addedPostIds.length + removedPostIds.length, }; }; diff --git a/src/common/channelHighlight/evaluate.ts b/src/common/channelHighlight/evaluate.ts index 2ac5a05064..acd98d2f5a 100644 --- a/src/common/channelHighlight/evaluate.ts +++ b/src/common/channelHighlight/evaluate.ts @@ -6,11 +6,10 @@ import { EvaluateChannelHighlightsRequest as BragiEvaluateChannelHighlightsRequest, } from '@dailydotdev/schema'; import { getBragiClient } from '../../integrations/bragi/clients'; +import { GLOBAL_HIGHLIGHT_CHANNEL, GLOBAL_TARGET_AUDIENCE } from './constants'; import type { HighlightCandidate, HighlightItem } from './types'; -export type EvaluateChannelHighlightsRequest = { - channel: string; - targetAudience: string; +export type EvaluateHighlightsRequest = { maxItems: number; currentHighlights: HighlightItem[]; newCandidates: HighlightCandidate[]; @@ -23,7 +22,7 @@ export type EvaluatedHighlightItem = { reason: string; }; -export type EvaluateChannelHighlightsResponse = { +export type EvaluateHighlightsResponse = { items: EvaluatedHighlightItem[]; }; @@ -95,13 +94,11 @@ const toCandidate = ( relatedItemsCount: candidate.relatedItemsCount, }); -export const evaluateChannelHighlights = async ({ - channel, - targetAudience, +export const evaluateHighlights = async ({ maxItems, currentHighlights, newCandidates, -}: EvaluateChannelHighlightsRequest): Promise => { +}: EvaluateHighlightsRequest): Promise => { if (!newCandidates.length) { return { items: [], @@ -110,8 +107,8 @@ export const evaluateChannelHighlights = async ({ const bragiClient = getBragiClient(); const request = new BragiEvaluateChannelHighlightsRequest({ - channel, - targetAudience, + channel: GLOBAL_HIGHLIGHT_CHANNEL, + targetAudience: GLOBAL_TARGET_AUDIENCE, maxItems, currentHighlights: currentHighlights.map(toCurrentHighlight), newCandidates: newCandidates.map(toCandidate), diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts index 5da393179d..281e17d778 100644 --- a/src/common/channelHighlight/generate.ts +++ b/src/common/channelHighlight/generate.ts @@ -1,748 +1,101 @@ -import { In, type DataSource } from 'typeorm'; +import type { DataSource } from 'typeorm'; import { logger as baseLogger } from '../../logger'; -import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import type { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; -import { UNKNOWN_SOURCE } from '../../entity/Source'; -import { getChannelDigestSourceIds } from '../channelDigest/definitions'; -import { compareSnapshots } from './decisions'; -import { evaluateChannelHighlights } from './evaluate'; -import { publishHighlightsForChannel } from './publish'; import { - fetchCollectionMembership, - fetchCurrentHighlightsForChannels, - fetchEvaluationHistoryHighlightsForChannels, - fetchGlobalIncrementalPosts, - fetchPostsByIds, - fetchPublicShareFallbackPostIds, - fetchRetiredHighlightPostIdsForChannels, - fetchRelations, - getEvaluationHistoryStart, - getFetchStart, - getHorizonStart, - mergePosts, -} from './queries'; + getGenerationConfig, + generateCanonicalHighlights, + loadCanonicalInput, + saveCanonicalHighlights, + toCanonicalHighlightsForFanout, +} from './canonical'; +import { syncLegacyHighlightsFromCanonical } from './legacyFanout'; import { - applyPublicShareFallbackToCandidates, - applyPublicShareFallbackToHighlights, - buildCandidates, - canonicalizeCurrentHighlights, - toHighlightItem, - toStoredSnapshotItem, -} from './stories'; -import type { - GenerateChannelHighlightResult, - HighlightItem, - HighlightPost, -} from './types'; - -type EvaluationConfig = { - channel: string; - targetAudience: string; - maxItems: number; -}; - -const trimHighlights = ({ - items, - maxItems, -}: { - items: HighlightItem[]; - maxItems: number; -}): HighlightItem[] => - [...items] - .sort( - (left, right) => - right.highlightedAt.getTime() - left.highlightedAt.getTime(), - ) - .slice(0, maxItems); - -const groupHighlightsByChannel = ( - highlights: T[], -): Map => { - const grouped = new Map(); - - for (const highlight of highlights) { - const items = grouped.get(highlight.channel) || []; - items.push(highlight); - grouped.set(highlight.channel, items); - } - - return grouped; -}; - -const dedupeHighlightsByPostId = (items: HighlightItem[]): HighlightItem[] => { - const deduped = new Map(); - - for (const item of [...items].sort( - (left, right) => - right.highlightedAt.getTime() - left.highlightedAt.getTime(), - )) { - if (!deduped.has(item.postId)) { - deduped.set(item.postId, item); - } - } - - return [...deduped.values()]; -}; - -const getEvaluationConfig = ( - definitions: ChannelHighlightDefinition[], -): EvaluationConfig => { - if (definitions.length === 1) { - const definition = definitions[0]; - - return { - channel: definition.channel, - targetAudience: - definition.targetAudience.trim() || - `daily.dev readers following ${definition.channel}`, - maxItems: definition.maxItems, - }; - } - - return { - channel: 'global', - targetAudience: - 'software engineers and engineering leaders who want to stay current on meaningful developments that affect how modern software is built, shipped, operated, and grown', - maxItems: definitions.reduce( - (total, definition) => total + definition.maxItems, - 0, - ), - }; -}; - -const getHighlightChannels = ({ - postId, - posts, - relations, - fallbackPostIds, - enabledChannels, -}: { - postId: string; - posts: HighlightPost[]; - relations: { postId: string; relatedPostId: string }[]; - fallbackPostIds: Map; - enabledChannels: Set; -}): string[] => { - const postsById = new Map(posts.map((post) => [post.id, post])); - const shareToUnderlying = new Map( - [...fallbackPostIds].map(([underlying, share]) => [share, underlying]), - ); - const childrenByCollection = new Map(); - const collectionByChild = new Map(); - - for (const relation of relations) { - const children = childrenByCollection.get(relation.postId) || []; - children.push(relation.relatedPostId); - childrenByCollection.set(relation.postId, children); - collectionByChild.set(relation.relatedPostId, relation.postId); - } - - const underlyingPostId = shareToUnderlying.get(postId) || postId; - const collectionId = collectionByChild.get(underlyingPostId); - const storyPostIds = [ - ...(collectionId ? [collectionId] : []), - underlyingPostId, - ...(childrenByCollection.get(collectionId || underlyingPostId) || []), - ]; - const channels = new Set(); - - for (const storyPostId of storyPostIds) { - const contentMeta = postsById.get(storyPostId)?.contentMeta as - | { channels?: unknown } - | undefined; - const postChannels = contentMeta?.channels; - if (!Array.isArray(postChannels)) { - continue; - } - - for (const channel of postChannels) { - if (typeof channel === 'string' && enabledChannels.has(channel)) { - channels.add(channel); - } - } - } - - return [...channels]; -}; - -export const generateChannelHighlights = async ({ + completeGlobalRun, + createGlobalRun, + failGlobalRun, + fetchPreviousGlobalRun, +} from './runs'; +import type { GenerateHighlightsResult } from './types'; + +export const generateHighlights = async ({ con, - definitions, + definitions = [], now = new Date(), }: { con: DataSource; - definitions: ChannelHighlightDefinition[]; + definitions?: ChannelHighlightDefinition[]; now?: Date; -}): Promise => { - if (!definitions.length) { - return { - runs: [], - published: false, - }; - } - - const channels = definitions.map((definition) => definition.channel); - const definitionsByChannel = new Map( - definitions.map((definition) => [definition.channel, definition]), - ); +}): Promise => { const runRepo = con.getRepository(ChannelHighlightRun); - const runs = await runRepo.save( - definitions.map((definition) => - runRepo.create({ - channel: definition.channel, - scheduledAt: now, - status: 'processing', - baselineSnapshot: [], - inputSummary: {}, - internalSnapshot: [], - comparison: {}, - metrics: {}, - }), - ), - ); - const runByChannel = new Map(runs.map((run) => [run.channel, run])); + const previousRun = await fetchPreviousGlobalRun({ + runRepo, + now, + }); + const run = await createGlobalRun({ + runRepo, + now, + }); try { - const [ - currentHighlights, - retiredHighlightPostIdsByChannel, - excludedSourceIds, - evaluationHistoryHighlights, - ] = await Promise.all([ - fetchCurrentHighlightsForChannels({ - con, - channels, - }), - fetchRetiredHighlightPostIdsForChannels({ - con, - channels, - }), - getChannelDigestSourceIds({ - con, - }), - fetchEvaluationHistoryHighlightsForChannels({ - con, - channels, - now, - }), - ]); - const maxCandidateHorizonHours = Math.max( - ...definitions.map((definition) => definition.candidateHorizonHours), - ); - const horizonStart = getHorizonStart({ + const config = getGenerationConfig({ now, - definition: { - candidateHorizonHours: maxCandidateHorizonHours, - }, + lastFetchedAt: previousRun?.scheduledAt || null, }); - const fetchStart = definitions - .map((definition) => - getFetchStart({ - now, - definition, - }), - ) - .sort((left, right) => left.getTime() - right.getTime())[0]; - const currentHighlightsByChannel = - groupHighlightsByChannel(currentHighlights); - const evaluationHistoryHighlightsByChannel = groupHighlightsByChannel( - evaluationHistoryHighlights, - ); - const baselineHighlightsByChannel = new Map( - channels.map((channel) => [ - channel, - (currentHighlightsByChannel.get(channel) || []).map(toHighlightItem), - ]), - ); - const activeHighlightsByChannel = new Map( - definitions.map((definition) => { - const channelBaseline = - baselineHighlightsByChannel.get(definition.channel) || []; - const channelHorizonStart = getHorizonStart({ - now, - definition, - }); - - return [ - definition.channel, - channelBaseline.filter( - (item) => item.highlightedAt >= channelHorizonStart, - ), - ]; - }), - ); - const highlightedPostIds = [ - ...new Set( - [...activeHighlightsByChannel.values()].flatMap((items) => - items.map((item) => item.postId), - ), - ), - ]; - const evaluationHistoryPostIds = [ - ...new Set(evaluationHistoryHighlights.map((item) => item.postId)), - ]; - const [incrementalPosts, highlightedPosts, evaluationHistoryPosts] = - await Promise.all([ - fetchGlobalIncrementalPosts({ - con, - fetchStart, - horizonStart, - excludedSourceIds, - }), - fetchPostsByIds({ - con, - ids: highlightedPostIds, - excludedSourceIds, - }), - fetchPostsByIds({ - con, - ids: evaluationHistoryPostIds, - excludedSourceIds, - }), - ]); - const basePosts = mergePosts([ - incrementalPosts, - highlightedPosts, - evaluationHistoryPosts, - ]); - const sharedUnderlyingIds = [ - ...new Set( - basePosts - .map((post) => post.sharedPostId) - .filter((id): id is string => !!id), - ), - ]; - const [relations, sharedUnderlyingPosts] = await Promise.all([ - fetchRelations({ - con, - postIds: basePosts.map((post) => post.id), - }), - fetchPostsByIds({ - con, - ids: sharedUnderlyingIds, - excludedSourceIds, - }), - ]); - const relationPosts = await fetchPostsByIds({ + const input = await loadCanonicalInput({ con, - ids: [ - ...new Set( - relations.flatMap((relation) => [ - relation.postId, - relation.relatedPostId, - ]), - ), - ], - excludedSourceIds, + config, + now, }); - const availablePosts = mergePosts([ - basePosts, - relationPosts, - sharedUnderlyingPosts, - ]); - const inaccessiblePostIds = new Set( - availablePosts - .filter((post) => post.sourceId === UNKNOWN_SOURCE) - .map((post) => post.id), - ); - const retiredHighlightPostIds = [ - ...new Set( - [...retiredHighlightPostIdsByChannel.values()].flatMap((postIds) => [ - ...postIds, - ]), - ), - ]; - const fallbackPostIds = await fetchPublicShareFallbackPostIds({ + const canonical = await generateCanonicalHighlights({ con, - sharedPostIds: [ - ...new Set([ - ...availablePosts.map((post) => post.id), - ...retiredHighlightPostIds, - ]), - ], - excludedSourceIds, - }); - const liveHighlightsByChannel = new Map( - channels.map((channel) => [ - channel, - applyPublicShareFallbackToHighlights({ - highlights: canonicalizeCurrentHighlights({ - highlights: activeHighlightsByChannel.get(channel) || [], - relations, - posts: availablePosts, - inaccessiblePostIds, - }), - inaccessiblePostIds, - fallbackPostIds, - }), - ]), - ); - const evaluationHighlightsByChannel = new Map( - channels.map((channel) => [ - channel, - applyPublicShareFallbackToHighlights({ - highlights: canonicalizeCurrentHighlights({ - highlights: ( - evaluationHistoryHighlightsByChannel.get(channel) || [] - ).map(toHighlightItem), - relations, - posts: availablePosts, - inaccessiblePostIds, - }), - inaccessiblePostIds, - fallbackPostIds, - }), - ]), - ); - const retiredEvaluationHighlightsByChannel = new Map( - channels.map((channel) => [ - channel, - applyPublicShareFallbackToHighlights({ - highlights: canonicalizeCurrentHighlights({ - highlights: ( - evaluationHistoryHighlightsByChannel.get(channel) || [] - ) - .filter((item) => !!item.retiredAt) - .map(toHighlightItem), - relations, - posts: availablePosts, - inaccessiblePostIds, - }), - inaccessiblePostIds, - fallbackPostIds, - }), - ]), - ); - const currentHighlightPostIdsByChannel = new Map( - channels.map((channel) => [ - channel, - new Set( - (liveHighlightsByChannel.get(channel) || []).map( - (item) => item.postId, - ), - ), - ]), - ); - const sharedByShareId = new Map(); - for (const post of availablePosts) { - if (post.sharedPostId) sharedByShareId.set(post.id, post.sharedPostId); - } - const retiredHighlightPostIdSetByChannel = new Map( - channels.map((channel) => { - const postIds = - retiredHighlightPostIdsByChannel.get(channel) || new Set(); - - return [ - channel, - new Set( - [...postIds] - .flatMap((postId) => [ - postId, - fallbackPostIds.get(postId), - sharedByShareId.get(postId), - ]) - .filter((id): id is string => !!id), - ), - ]; - }), - ); - const retiredEvaluationPostIdSetByChannel = new Map( - channels.map((channel) => [ - channel, - new Set( - (retiredEvaluationHighlightsByChannel.get(channel) || []).map( - (item) => item.postId, - ), - ), - ]), - ); - const enabledChannels = new Set(channels); - const getPublishableChannels = ({ - postId, - itemChannels, - }: { - postId: string; - itemChannels: string[]; - }): string[] => - itemChannels.filter( - (channel) => - !currentHighlightPostIdsByChannel.get(channel)?.has(postId) && - !retiredHighlightPostIdSetByChannel.get(channel)?.has(postId) && - !retiredEvaluationPostIdSetByChannel.get(channel)?.has(postId), - ); - const liveHighlightItems = dedupeHighlightsByPostId( - [...liveHighlightsByChannel.values()].flat(), - ); - const liveHighlightPostIds = new Set( - liveHighlightItems.map((item) => item.postId), - ); - - const candidates = applyPublicShareFallbackToCandidates({ - candidates: buildCandidates({ - posts: availablePosts, - relations, - horizonStart, - }), - inaccessiblePostIds, - fallbackPostIds, + input, + config, + now, }); - const candidateChannelsByPostId = new Map(); - const newCandidates = candidates.filter((candidate) => { - const candidateChannels = getHighlightChannels({ - postId: candidate.postId, - posts: availablePosts, - relations, - fallbackPostIds, - enabledChannels, - }).filter((channel) => { - const definition = definitionsByChannel.get(channel); - return ( - !!definition && - candidate.lastActivityAt >= - getHorizonStart({ - now, - definition, - }) - ); + await con.transaction(async (manager) => { + const savedCanonicalHighlights = await saveCanonicalHighlights({ + manager, + canonical, + relations: input.relations, }); - const publishableChannels = liveHighlightPostIds.has(candidate.postId) - ? [] - : getPublishableChannels({ - postId: candidate.postId, - itemChannels: candidateChannels, - }); - - if (!publishableChannels.length) { - return false; - } - - candidateChannelsByPostId.set(candidate.postId, publishableChannels); - return true; - }); - const evaluationConfig = getEvaluationConfig(definitions); - const evaluationHighlights = dedupeHighlightsByPostId( - [...evaluationHighlightsByChannel.values()].flat(), - ); - const evaluatedHighlights = - newCandidates.length === 0 - ? [] - : ( - await evaluateChannelHighlights({ - channel: evaluationConfig.channel, - targetAudience: evaluationConfig.targetAudience, - maxItems: evaluationConfig.maxItems, - currentHighlights: evaluationHighlights, - newCandidates, - }) - ).items.map((item) => ({ - postId: item.postId, - headline: item.headline, - summary: null, - highlightedAt: now, - significanceLabel: item.significanceLabel, - reason: item.reason, - })); - const admittedHighlightsByChannel = new Map(); - - for (const channel of channels) { - const channelAdmissions = evaluatedHighlights.filter((item) => - candidateChannelsByPostId.get(item.postId)?.includes(channel), - ); - const admittedHighlights = await dropAdmissionsRacingCollections({ - con, - admitted: channelAdmissions, - fallbackPostIds, - currentHighlightPostIds: - currentHighlightPostIdsByChannel.get(channel) || new Set(), - retiredHighlightPostIds: - retiredHighlightPostIdSetByChannel.get(channel) || new Set(), + const legacyFanout = await syncLegacyHighlightsFromCanonical({ + manager, + definitions, + canonicalHighlights: toCanonicalHighlightsForFanout({ + canonical, + savedCanonicalHighlights, + }), + posts: input.availablePosts, + relations: input.relations, + fallbackPostIds: input.fallbackPostIds, + now, + }); + await completeGlobalRun({ + manager, + run, + config, + input, + canonical, + legacyFanout, + now, }); - admittedHighlightsByChannel.set(channel, admittedHighlights); - } - - let published = false; - await con.transaction(async (manager) => { - for (const definition of definitions) { - const baselineHighlights = - baselineHighlightsByChannel.get(definition.channel) || []; - const liveHighlights = - liveHighlightsByChannel.get(definition.channel) || []; - const admittedHighlights = - admittedHighlightsByChannel.get(definition.channel) || []; - const internalHighlights = trimHighlights({ - items: [...liveHighlights, ...admittedHighlights], - maxItems: definition.maxItems, - }); - const comparison = compareSnapshots({ - baseline: baselineHighlights, - internal: internalHighlights, - }); - const shouldPublish = - definition.mode === 'publish' && comparison.changed; - const run = runByChannel.get(definition.channel); - - await manager.getRepository(ChannelHighlightDefinition).update( - { channel: definition.channel }, - { - lastFetchedAt: now, - }, - ); - - if (shouldPublish) { - await publishHighlightsForChannel({ - manager, - channel: definition.channel, - items: internalHighlights, - relations, - }); - published = true; - } - - if (run) { - await manager.getRepository(ChannelHighlightRun).update( - { id: run.id }, - { - status: 'completed', - completedAt: new Date(), - baselineSnapshot: baselineHighlights.map(toStoredSnapshotItem), - inputSummary: { - fetchStart: fetchStart.toISOString(), - horizonStart: getHorizonStart({ - now, - definition, - }).toISOString(), - evaluationHistoryStart: getEvaluationHistoryStart({ - now, - }).toISOString(), - excludedSourceIds, - currentHighlightPostIds: liveHighlights.map( - (item) => item.postId, - ), - evaluationHighlightPostIds: ( - evaluationHighlightsByChannel.get(definition.channel) || [] - ).map((item) => item.postId), - retiredEvaluationHighlightPostIds: ( - retiredEvaluationHighlightsByChannel.get( - definition.channel, - ) || [] - ).map((item) => item.postId), - retiredHighlightPostIds: [ - ...(retiredHighlightPostIdsByChannel.get( - definition.channel, - ) || []), - ], - candidatePostIds: newCandidates - .filter((candidate) => - candidateChannelsByPostId - .get(candidate.postId) - ?.includes(definition.channel), - ) - .map((candidate) => candidate.postId), - }, - internalSnapshot: internalHighlights.map(toStoredSnapshotItem), - comparison: { - ...comparison, - wouldPublish: comparison.changed, - published: shouldPublish, - }, - metrics: { - fetchedPosts: incrementalPosts.length + highlightedPosts.length, - relationPosts: relationPosts.length, - currentHighlights: baselineHighlights.length, - activeHighlights: - activeHighlightsByChannel.get(definition.channel)?.length || - 0, - canonicalizedHighlights: liveHighlights.length, - evaluationHighlights: - evaluationHighlightsByChannel.get(definition.channel) - ?.length || 0, - retiredEvaluationHighlights: - retiredEvaluationHighlightsByChannel.get(definition.channel) - ?.length || 0, - newCandidates: newCandidates.filter((candidate) => - candidateChannelsByPostId - .get(candidate.postId) - ?.includes(definition.channel), - ).length, - admittedHighlights: admittedHighlights.length, - }, - }, - ); - } - } }); return { runs: await runRepo.findBy({ - id: In(runs.map((run) => run.id)), + id: run.id, }), - published, + published: canonical.admitted.length > 0, }; } catch (err) { - baseLogger.error({ err, channels }, 'Failed channel highlight run'); - await runRepo.update( - { - id: In(runs.map((run) => run.id)), - }, - { - status: 'failed', - completedAt: new Date(), - error: { - message: err instanceof Error ? err.message : 'Unknown error', - }, - }, - ); + baseLogger.error({ err }, 'Failed highlight run'); + await failGlobalRun({ + runRepo, + run, + err, + }); throw err; } }; - -const dropAdmissionsRacingCollections = async ({ - con, - admitted, - fallbackPostIds, - currentHighlightPostIds, - retiredHighlightPostIds, -}: { - con: DataSource; - admitted: HighlightItem[]; - fallbackPostIds: Map; - currentHighlightPostIds: Set; - retiredHighlightPostIds: Set; -}): Promise => { - if (!admitted.length) return admitted; - - const shareToUnderlying = new Map( - [...fallbackPostIds].map(([underlying, share]) => [share, underlying]), - ); - const underlyingId = (postId: string) => - shareToUnderlying.get(postId) ?? postId; - - const { collectionByChild, childrenByCollection } = - await fetchCollectionMembership({ - con, - postIds: [...new Set(admitted.map((item) => underlyingId(item.postId)))], - }); - if (!collectionByChild.size && !childrenByCollection.size) return admitted; - - const isCovered = (postId: string): boolean => { - const aliased = fallbackPostIds.get(postId) ?? postId; - return ( - currentHighlightPostIds.has(postId) || - retiredHighlightPostIds.has(postId) || - currentHighlightPostIds.has(aliased) || - retiredHighlightPostIds.has(aliased) - ); - }; - - return admitted.filter((item) => { - const postId = underlyingId(item.postId); - const collectionId = collectionByChild.get(postId) || postId; - if (!childrenByCollection.has(collectionId)) return true; - const members = [ - collectionId, - ...(childrenByCollection.get(collectionId) ?? []), - ]; - return !members.some(isCovered); - }); -}; diff --git a/src/common/channelHighlight/legacyFanout.ts b/src/common/channelHighlight/legacyFanout.ts new file mode 100644 index 0000000000..fae26dd4e8 --- /dev/null +++ b/src/common/channelHighlight/legacyFanout.ts @@ -0,0 +1,150 @@ +import { In, type EntityManager } from 'typeorm'; +import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; +import { replaceLegacyHighlightsForChannel } from './publish'; +import { + createHighlightChannelResolver, + type HighlightChannelResolver, +} from './channels'; +import type { HighlightItem, HighlightPost } from './types'; + +export type LegacyFanout = { + publishChannels: Set; + states: { + definition: ChannelHighlightDefinition; + highlights: HighlightItem[]; + }[]; +}; + +const trimHighlights = ({ + items, + maxItems, +}: { + items: HighlightItem[]; + maxItems: number; +}): HighlightItem[] => + [...items] + .sort( + (left, right) => + right.highlightedAt.getTime() - left.highlightedAt.getTime(), + ) + .slice(0, maxItems); + +const groupHighlightsForFanout = ({ + highlights, + channelResolver, + publishChannels, +}: { + highlights: HighlightItem[]; + channelResolver: HighlightChannelResolver; + publishChannels: Set; +}): Map => { + const highlightsByChannel = new Map(); + + for (const highlight of highlights) { + for (const channel of channelResolver(highlight.postId)) { + if (!publishChannels.has(channel)) { + continue; + } + + const items = highlightsByChannel.get(channel) || []; + items.push(highlight); + highlightsByChannel.set(channel, items); + } + } + + return highlightsByChannel; +}; + +const buildLegacyFanout = ({ + definitions, + canonicalHighlights, + posts, + relations, + fallbackPostIds, +}: { + definitions: ChannelHighlightDefinition[]; + canonicalHighlights: HighlightItem[]; + posts: HighlightPost[]; + relations: { postId: string; relatedPostId: string }[]; + fallbackPostIds: Map; +}): LegacyFanout => { + const publishChannels = new Set( + definitions + .filter((definition) => definition.mode === 'publish') + .map((definition) => definition.channel), + ); + const channelResolver = createHighlightChannelResolver({ + posts, + relations, + fallbackPostIds, + }); + const highlightsByChannel = groupHighlightsForFanout({ + highlights: canonicalHighlights, + channelResolver, + publishChannels, + }); + const states = definitions + .filter((definition) => definition.mode === 'publish') + .map((definition) => ({ + definition, + highlights: trimHighlights({ + items: highlightsByChannel.get(definition.channel) || [], + maxItems: definition.maxItems, + }), + })); + + return { + publishChannels, + states, + }; +}; + +export const syncLegacyHighlightsFromCanonical = async ({ + manager, + definitions, + canonicalHighlights, + posts, + relations, + fallbackPostIds, + now, +}: { + manager: EntityManager; + definitions: ChannelHighlightDefinition[]; + canonicalHighlights: HighlightItem[]; + posts: HighlightPost[]; + relations: { postId: string; relatedPostId: string }[]; + fallbackPostIds: Map; + now: Date; +}): Promise => { + const legacyFanout = buildLegacyFanout({ + definitions, + canonicalHighlights, + posts, + relations, + fallbackPostIds, + }); + + for (const state of legacyFanout.states) { + await replaceLegacyHighlightsForChannel({ + manager, + channel: state.definition.channel, + items: state.highlights, + }); + } + + const activeDefinitionChannels = definitions + .filter((definition) => definition.mode !== 'disabled') + .map((definition) => definition.channel); + if (activeDefinitionChannels.length) { + await manager.getRepository(ChannelHighlightDefinition).update( + { + channel: In(activeDefinitionChannels), + }, + { + lastFetchedAt: now, + }, + ); + } + + return legacyFanout; +}; diff --git a/src/common/channelHighlight/publish.ts b/src/common/channelHighlight/publish.ts index beacc0e4dc..f888d43e78 100644 --- a/src/common/channelHighlight/publish.ts +++ b/src/common/channelHighlight/publish.ts @@ -7,6 +7,7 @@ import { } from '../../entity/PostHighlight'; import type { PostRelation } from '../../entity/posts/PostRelation'; import type { HighlightItem } from './types'; +import { buildStoryFamilies } from './storyFamilies'; type HighlightRelation = Pick; @@ -30,7 +31,7 @@ const normalizeHighlightItems = ({ return [...dedupedItems.values()]; }; -const replaceLegacyHighlightsForChannel = async ({ +export const replaceLegacyHighlightsForChannel = async ({ manager, channel, items, @@ -93,15 +94,15 @@ const replaceLegacyHighlightsForChannel = async ({ ); }; -const upsertCanonicalHighlights = async ({ +export const upsertCanonicalHighlights = async ({ manager, - channel, items, + channelsByPostId, relations, }: { manager: EntityManager; - channel: string; items: HighlightItem[]; + channelsByPostId: Map>; relations: HighlightRelation[]; }): Promise => { const repo = manager.getRepository(HighlightsCanonical); @@ -114,23 +115,12 @@ const upsertCanonicalHighlights = async ({ return []; } - const childrenByCollection = new Map(); - const collectionByChild = new Map(); - for (const relation of relations) { - const children = childrenByCollection.get(relation.postId) || []; - children.push(relation.relatedPostId); - childrenByCollection.set(relation.postId, children); - collectionByChild.set(relation.relatedPostId, relation.postId); - } + const storyFamilies = buildStoryFamilies({ relations }); const familyPostIdsByPostId = new Map( - nextItems.map((item) => { - const collectionId = collectionByChild.get(item.postId) || item.postId; - - return [ - item.postId, - [collectionId, ...(childrenByCollection.get(collectionId) || [])], - ]; - }), + nextItems.map((item) => [ + item.postId, + storyFamilies.getFamilyPostIds(item.postId), + ]), ); const lookupPostIds = [ ...new Set([...familyPostIdsByPostId.values()].flat()), @@ -152,7 +142,12 @@ const upsertCanonicalHighlights = async ({ return repo.create({ id: current?.id, postId: item.postId, - channels: [...new Set([...(current?.channels || []), channel])].sort(), + channels: [ + ...new Set([ + ...(current?.channels || []), + ...(channelsByPostId.get(item.postId) || []), + ]), + ].sort(), highlightedAt: item.highlightedAt, headline: item.headline, significance: toPostHighlightSignificance(item.significanceLabel), @@ -162,36 +157,13 @@ const upsertCanonicalHighlights = async ({ ); }; -export const publishHighlightsForChannel = async ({ - manager, - channel, - items, - relations = [], -}: { - manager: EntityManager; - channel: string; - items: HighlightItem[]; - relations?: HighlightRelation[]; -}): Promise => { - const canonicalHighlights = await upsertCanonicalHighlights({ - manager, - channel, - items, - relations, - }); - - await replaceLegacyHighlightsForChannel({ - manager, - channel, - items: canonicalHighlights.map((highlight) => ({ - postId: highlight.postId, - highlightedAt: highlight.highlightedAt, - headline: highlight.headline, - summary: null, - significanceLabel: toPostHighlightSignificanceLabel( - highlight.significance, - ), - reason: highlight.reason, - })), - }); -}; +export const toHighlightItemFromCanonical = ( + highlight: HighlightsCanonical, +): HighlightItem => ({ + postId: highlight.postId, + highlightedAt: highlight.highlightedAt, + headline: highlight.headline, + summary: null, + significanceLabel: toPostHighlightSignificanceLabel(highlight.significance), + reason: highlight.reason, +}); diff --git a/src/common/channelHighlight/queries.ts b/src/common/channelHighlight/queries.ts index 709f6000f5..1288990505 100644 --- a/src/common/channelHighlight/queries.ts +++ b/src/common/channelHighlight/queries.ts @@ -1,13 +1,6 @@ -import { - Brackets, - In, - IsNull, - MoreThanOrEqual, - Not, - type DataSource, -} from 'typeorm'; +import { Brackets, In, Not, type DataSource } from 'typeorm'; import { ONE_HOUR_IN_SECONDS, ONE_WEEK_IN_SECONDS } from '../constants'; -import { PostHighlight } from '../../entity/PostHighlight'; +import { HighlightsCanonical } from '../../entity/HighlightsCanonical'; import { Post } from '../../entity/posts/Post'; import { PostRelation, @@ -68,149 +61,26 @@ export const getFetchStart = ({ return overlapStart > horizonStart ? overlapStart : horizonStart; }; -export const fetchCurrentHighlights = async ({ - con, - channel, -}: { - con: DataSource; - channel: string; -}): Promise => - con.getRepository(PostHighlight).find({ - where: { - channel, - retiredAt: IsNull(), - }, - order: { - highlightedAt: 'DESC', - }, - }); - -export const fetchCurrentHighlightsForChannels = async ({ - con, - channels, -}: { - con: DataSource; - channels: string[]; -}): Promise => { - if (!channels.length) { - return []; - } - - return con.getRepository(PostHighlight).find({ - where: { - channel: In(channels), - retiredAt: IsNull(), - }, - order: { - highlightedAt: 'DESC', - }, - }); -}; - export const getEvaluationHistoryStart = ({ now }: { now: Date }): Date => new Date(now.getTime() - HIGHLIGHT_EVALUATION_HISTORY_SECONDS * 1000); -export const fetchEvaluationHistoryHighlights = async ({ - con, - channel, - now, -}: { - con: DataSource; - channel: string; - now: Date; -}): Promise => - con.getRepository(PostHighlight).find({ - where: { - channel, - highlightedAt: MoreThanOrEqual( - getEvaluationHistoryStart({ - now, - }), - ), - }, - order: { - highlightedAt: 'DESC', - }, - }); - -export const fetchEvaluationHistoryHighlightsForChannels = async ({ +export const fetchEvaluationHistoryCanonicalHighlights = async ({ con, - channels, now, }: { con: DataSource; - channels: string[]; now: Date; -}): Promise => { - if (!channels.length) { - return []; - } - - return con.getRepository(PostHighlight).find({ - where: { - channel: In(channels), - highlightedAt: MoreThanOrEqual( - getEvaluationHistoryStart({ - now, - }), - ), - }, - order: { - highlightedAt: 'DESC', - }, - }); -}; - -export const fetchRetiredHighlightPostIds = async ({ - con, - channel, -}: { - con: DataSource; - channel: string; -}): Promise => { - const highlights = await con.getRepository(PostHighlight).find({ - select: { - postId: true, - }, - where: { - channel, - retiredAt: Not(IsNull()), - }, - }); - - return highlights.map((highlight) => highlight.postId); -}; - -export const fetchRetiredHighlightPostIdsForChannels = async ({ - con, - channels, -}: { - con: DataSource; - channels: string[]; -}): Promise>> => { - if (!channels.length) { - return new Map(); - } - - const highlights = await con.getRepository(PostHighlight).find({ - select: { - channel: true, - postId: true, - }, - where: { - channel: In(channels), - retiredAt: Not(IsNull()), - }, - }); - - const retiredByChannel = new Map>(); - for (const highlight of highlights) { - const postIds = retiredByChannel.get(highlight.channel) || new Set(); - postIds.add(highlight.postId); - retiredByChannel.set(highlight.channel, postIds); - } - - return retiredByChannel; +}): Promise => { + return con + .getRepository(HighlightsCanonical) + .createQueryBuilder('highlight') + .where('highlight.highlightedAt >= :historyStart', { + historyStart: getEvaluationHistoryStart({ + now, + }), + }) + .orderBy('highlight.highlightedAt', 'DESC') + .getMany(); }; export const fetchPostsByIds = async ({ @@ -240,47 +110,6 @@ export const fetchPostsByIds = async ({ }; export const fetchIncrementalPosts = async ({ - con, - channel, - fetchStart, - horizonStart, - excludedSourceIds = [], -}: { - con: DataSource; - channel: string; - fetchStart: Date; - horizonStart: Date; - excludedSourceIds?: string[]; -}): Promise => - con - .getRepository(Post) - .createQueryBuilder('post') - .where('post.createdAt >= :horizonStart', { horizonStart }) - .andWhere('post.visible = true') - .andWhere('post.deleted = false') - .andWhere('post.banned = false') - .andWhere('post.showOnFeed = true') - .andWhere('post.sharedPostId IS NULL') - .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) - .andWhere(`NOT (post."contentCuration" && :rejectedCurations)`, { - rejectedCurations: REJECTED_CONTENT_CURATIONS, - }) - .andWhere( - excludedSourceIds.length - ? 'post."sourceId" NOT IN (:...excludedSourceIds)' - : '1=1', - { excludedSourceIds }, - ) - .andWhere( - new Brackets((builder) => { - builder - .where('post.createdAt >= :fetchStart', { fetchStart }) - .orWhere('post.metadataChangedAt >= :fetchStart', { fetchStart }); - }), - ) - .getMany() as unknown as Promise; - -export const fetchGlobalIncrementalPosts = async ({ con, fetchStart, horizonStart, diff --git a/src/common/channelHighlight/runs.ts b/src/common/channelHighlight/runs.ts new file mode 100644 index 0000000000..cef97631c8 --- /dev/null +++ b/src/common/channelHighlight/runs.ts @@ -0,0 +1,136 @@ +import { + IsNull, + LessThan, + Not, + type EntityManager, + type Repository, +} from 'typeorm'; +import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; +import { GLOBAL_HIGHLIGHT_CHANNEL } from './constants'; +import type { + CanonicalHighlights, + CanonicalInput, + GenerationConfig, +} from './canonical'; +import type { LegacyFanout } from './legacyFanout'; +import { getEvaluationHistoryStart } from './queries'; +import { toStoredSnapshotItem } from './stories'; + +export const fetchPreviousGlobalRun = ({ + runRepo, + now, +}: { + runRepo: Repository; + now: Date; +}): Promise => + runRepo.findOne({ + where: { + channel: GLOBAL_HIGHLIGHT_CHANNEL, + status: 'completed', + completedAt: Not(IsNull()), + scheduledAt: LessThan(now), + }, + order: { + scheduledAt: 'DESC', + }, + }); + +export const createGlobalRun = ({ + runRepo, + now, +}: { + runRepo: Repository; + now: Date; +}): Promise => + runRepo.save( + runRepo.create({ + channel: GLOBAL_HIGHLIGHT_CHANNEL, + scheduledAt: now, + status: 'processing', + baselineSnapshot: [], + inputSummary: {}, + internalSnapshot: [], + comparison: {}, + metrics: {}, + }), + ); + +export const completeGlobalRun = async ({ + manager, + run, + config, + input, + canonical, + legacyFanout, + now, +}: { + manager: EntityManager; + run: ChannelHighlightRun; + config: GenerationConfig; + input: CanonicalInput; + canonical: CanonicalHighlights; + legacyFanout: LegacyFanout; + now: Date; +}): Promise => { + await manager.getRepository(ChannelHighlightRun).update( + { id: run.id }, + { + status: 'completed', + completedAt: new Date(), + baselineSnapshot: canonical.history.map(toStoredSnapshotItem), + inputSummary: { + fetchStart: config.fetchStart.toISOString(), + horizonStart: config.horizonStart.toISOString(), + evaluationHistoryStart: getEvaluationHistoryStart({ + now, + }).toISOString(), + excludedSourceIds: input.excludedSourceIds, + candidatePostIds: canonical.newCandidates.map( + (candidate) => candidate.postId, + ), + admittedPostIds: canonical.admitted.map((item) => item.postId), + projectedChannels: [...legacyFanout.publishChannels], + }, + internalSnapshot: canonical.snapshot.map(toStoredSnapshotItem), + comparison: { + ...canonical.comparison, + wouldPublish: canonical.comparison.changed, + published: canonical.admitted.length > 0, + }, + metrics: { + fetchedPosts: input.incrementalPosts.length, + relationPosts: input.relationPosts.length, + evaluationHighlights: canonical.history.length, + newCandidates: canonical.newCandidates.length, + admittedHighlights: canonical.admitted.length, + projectedChannels: legacyFanout.publishChannels.size, + projectedHighlights: legacyFanout.states.reduce( + (total, state) => total + state.highlights.length, + 0, + ), + }, + }, + ); +}; + +export const failGlobalRun = ({ + runRepo, + run, + err, +}: { + runRepo: Repository; + run: ChannelHighlightRun; + err: unknown; +}): Promise => + runRepo.update( + { + id: run.id, + }, + { + status: 'failed', + completedAt: new Date(), + error: { + message: err instanceof Error ? err.message : 'Unknown error', + }, + }, + ); diff --git a/src/common/channelHighlight/stories.ts b/src/common/channelHighlight/stories.ts index e83428b1e4..2780496683 100644 --- a/src/common/channelHighlight/stories.ts +++ b/src/common/channelHighlight/stories.ts @@ -1,6 +1,7 @@ import type { PostContentQuality } from '../../entity/posts/Post'; import { toPostHighlightSignificanceLabel } from '../../entity/PostHighlight'; import type { PostRelation } from '../../entity/posts/PostRelation'; +import { buildStoryFamilies } from './storyFamilies'; import type { CurrentHighlight, HighlightCandidate, @@ -36,26 +37,6 @@ export const toQualitySummary = ( : null, }); -const buildCollectionByChildId = ( - relations: PostRelation[], - availablePostIds: Set, -): Map => { - const collectionByChildId = new Map(); - - for (const relation of relations) { - if ( - !availablePostIds.has(relation.postId) || - !availablePostIds.has(relation.relatedPostId) - ) { - continue; - } - - collectionByChildId.set(relation.relatedPostId, relation.postId); - } - - return collectionByChildId; -}; - type EngagementPost = { upvotes: number; comments: number; @@ -168,13 +149,13 @@ export const buildCandidates = ({ horizonStart: Date; }): HighlightCandidate[] => { const availablePostIds = new Set(posts.map((post) => post.id)); - const collectionByChildId = buildCollectionByChildId( + const storyFamilies = buildStoryFamilies({ relations, - availablePostIds, - ); + postIds: availablePostIds, + }); const groupedPosts = groupPostsByCanonicalPostId({ posts, - collectionByChildId, + collectionByChildId: storyFamilies.collectionByChild, }); return [...groupedPosts.entries()] @@ -220,10 +201,10 @@ export const canonicalizeCurrentHighlights = ({ inaccessiblePostIds: Set; }): HighlightItem[] => { const postsById = new Map(posts.map((post) => [post.id, post])); - const collectionByChildId = buildCollectionByChildId( + const storyFamilies = buildStoryFamilies({ relations, - new Set(postsById.keys()), - ); + postIds: new Set(postsById.keys()), + }); const sharedByShareId = new Map(); for (const post of posts) { if (post.sharedPostId) sharedByShareId.set(post.id, post.sharedPostId); @@ -243,7 +224,7 @@ export const canonicalizeCurrentHighlights = ({ for (const highlight of highlights) { const downgradedPostId = downgradeShareToUnderlying(highlight.postId); const canonicalPostId = - collectionByChildId.get(downgradedPostId) || downgradedPostId; + storyFamilies.collectionByChild.get(downgradedPostId) || downgradedPostId; const canonicalPost = postsById.get(canonicalPostId); if (!canonicalPost) { diff --git a/src/common/channelHighlight/storyFamilies.ts b/src/common/channelHighlight/storyFamilies.ts new file mode 100644 index 0000000000..9ed48348b2 --- /dev/null +++ b/src/common/channelHighlight/storyFamilies.ts @@ -0,0 +1,47 @@ +type StoryRelation = { + postId: string; + relatedPostId: string; +}; + +export type StoryFamilies = { + collectionByChild: Map; + childrenByCollection: Map; + getFamilyPostIds: (postId: string) => string[]; +}; + +export const buildStoryFamilies = ({ + relations, + postIds, +}: { + relations: StoryRelation[]; + postIds?: Set; +}): StoryFamilies => { + const collectionByChild = new Map(); + const childrenByCollection = new Map(); + + for (const relation of relations) { + if ( + postIds && + (!postIds.has(relation.postId) || !postIds.has(relation.relatedPostId)) + ) { + continue; + } + + const children = childrenByCollection.get(relation.postId) || []; + children.push(relation.relatedPostId); + childrenByCollection.set(relation.postId, children); + collectionByChild.set(relation.relatedPostId, relation.postId); + } + + const getCollectionId = (postId: string): string => + collectionByChild.get(postId) || postId; + + return { + collectionByChild, + childrenByCollection, + getFamilyPostIds: (postId) => { + const collectionId = getCollectionId(postId); + return [collectionId, ...(childrenByCollection.get(collectionId) || [])]; + }, + }; +}; diff --git a/src/common/channelHighlight/types.ts b/src/common/channelHighlight/types.ts index e0465dd4ac..d8d08cc4cd 100644 --- a/src/common/channelHighlight/types.ts +++ b/src/common/channelHighlight/types.ts @@ -69,7 +69,7 @@ export type HighlightItem = { reason: string | null; }; -export type GenerateChannelHighlightResult = { +export type GenerateHighlightsResult = { runs: ChannelHighlightRun[]; published: boolean; }; diff --git a/src/common/channelHighlight/utils.ts b/src/common/channelHighlight/utils.ts new file mode 100644 index 0000000000..a51043895b --- /dev/null +++ b/src/common/channelHighlight/utils.ts @@ -0,0 +1,18 @@ +import type { HighlightItem } from './types'; + +export const dedupeHighlightsByPostId = ( + items: HighlightItem[], +): HighlightItem[] => { + const deduped = new Map(); + + for (const item of [...items].sort( + (left, right) => + right.highlightedAt.getTime() - left.highlightedAt.getTime(), + )) { + if (!deduped.has(item.postId)) { + deduped.set(item.postId, item); + } + } + + return [...deduped.values()]; +}; diff --git a/src/cron/channelHighlights.ts b/src/cron/channelHighlights.ts index 3aa566836c..69f6f978c6 100644 --- a/src/cron/channelHighlights.ts +++ b/src/cron/channelHighlights.ts @@ -1,5 +1,5 @@ import { getChannelHighlightDefinitions } from '../common/channelHighlight/definitions'; -import { generateChannelHighlights } from '../common/channelHighlight/generate'; +import { generateHighlights } from '../common/channelHighlight/generate'; import { Cron } from './cron'; import type { DataSource } from 'typeorm'; @@ -13,16 +13,10 @@ export const runChannelHighlights = async ({ const definitions = await getChannelHighlightDefinitions({ con, }); - const enabledDefinitions = definitions.filter( - (definition) => definition.mode !== 'disabled', - ); - if (!enabledDefinitions.length) { - return; - } - await generateChannelHighlights({ + await generateHighlights({ con, - definitions: enabledDefinitions, + definitions, now, }); };