Skip to content

Commit 8c855c6

Browse files
authored
Fix #23765: Duplicate Notifications in Multi-Replica Deployment (#24866)
* Fix #23765: Duplicate Notifications in Multi-Replica Deployment * Fix tests * Fix tests in EventSubscription * Fix tests in EventSubscription
1 parent 2d2cafa commit 8c855c6

4 files changed

Lines changed: 516 additions & 99 deletions

File tree

openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/changeEvent/AbstractEventConsumer.java

Lines changed: 78 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,47 @@ protected AbstractEventConsumer(DIContainer dependencies) {
7171
}
7272

7373
private void init(JobExecutionContext context) {
74-
EventSubscription sub =
75-
(EventSubscription) context.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY);
7674
this.jobDetail = context.getJobDetail();
77-
this.eventSubscription = sub;
78-
EventSubscriptionOffset eventSubscriptionOffset = loadInitialOffset(context);
79-
this.offset = eventSubscriptionOffset.getCurrentOffset();
80-
this.startingOffset = eventSubscriptionOffset.getStartingOffset();
81-
this.alertMetrics = loadInitialMetrics();
82-
this.destinationMap = loadDestinationsMap(context);
83-
this.doInit(context);
75+
try {
76+
Object alertInfoValue = context.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY);
77+
if (alertInfoValue == null) {
78+
LOG.error("ALERT_INFO_KEY not found in JobDataMap");
79+
return;
80+
}
81+
82+
if (alertInfoValue instanceof String subscriptionJson) {
83+
this.eventSubscription = JsonUtils.readValue(subscriptionJson, EventSubscription.class);
84+
if (this.eventSubscription == null) {
85+
LOG.error("Failed to deserialize EventSubscription from JSON: {}", subscriptionJson);
86+
return;
87+
}
88+
} else if (alertInfoValue instanceof EventSubscription subscription) {
89+
this.eventSubscription = subscription;
90+
} else {
91+
LOG.error(
92+
"Unexpected type for ALERT_INFO_KEY: {}. Expected String or EventSubscription.",
93+
alertInfoValue.getClass().getName());
94+
return;
95+
}
96+
97+
if (this.eventSubscription.getDestinations() == null
98+
|| this.eventSubscription.getDestinations().isEmpty()) {
99+
LOG.error(
100+
"EventSubscription {} has no destinations configured",
101+
this.eventSubscription.getName());
102+
return;
103+
}
104+
105+
EventSubscriptionOffset eventSubscriptionOffset = loadInitialOffset(context);
106+
this.offset = eventSubscriptionOffset.getCurrentOffset();
107+
this.startingOffset = eventSubscriptionOffset.getStartingOffset();
108+
this.alertMetrics = loadInitialMetrics();
109+
this.destinationMap = loadDestinationsMap(context);
110+
this.doInit(context);
111+
} catch (Exception e) {
112+
LOG.error("Failed to initialize EventConsumer from JobDataMap", e);
113+
this.eventSubscription = null;
114+
}
84115
}
85116

86117
protected void doInit(JobExecutionContext context) {
@@ -136,60 +167,55 @@ private void recordSuccessfulChangeEvent(UUID eventSubscriptionId, ChangeEvent e
136167
}
137168

138169
private EventSubscriptionOffset loadInitialOffset(JobExecutionContext context) {
139-
EventSubscriptionOffset jobStoredOffset =
140-
(EventSubscriptionOffset) jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY);
141-
// If the Job Data Map has the latest offset, use it
142-
if (jobStoredOffset != null) {
143-
return jobStoredOffset;
144-
} else {
145-
EventSubscriptionOffset eventSubscriptionOffset =
146-
getStartingOffset(eventSubscription.getId());
147-
// Update the Job Data Map with the latest offset
148-
context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, eventSubscriptionOffset);
149-
return eventSubscriptionOffset;
170+
Object offsetValue = jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY);
171+
if (offsetValue != null) {
172+
EventSubscriptionOffset offset = null;
173+
if (offsetValue instanceof String offsetJson) {
174+
offset = JsonUtils.readValue(offsetJson, EventSubscriptionOffset.class);
175+
} else if (offsetValue instanceof EventSubscriptionOffset existingOffset) {
176+
offset = existingOffset;
177+
}
178+
if (offset != null) {
179+
return offset;
180+
}
181+
}
182+
183+
EventSubscriptionOffset dbOffset = getStartingOffset(eventSubscription.getId());
184+
if (dbOffset != null) {
185+
context.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, JsonUtils.pojoToJson(dbOffset));
186+
return dbOffset;
150187
}
188+
189+
LOG.warn("No offset found for subscription {}, using default", eventSubscription.getId());
190+
return getStartingOffset(eventSubscription.getId());
151191
}
152192

153193
private Map<UUID, Destination<ChangeEvent>> loadDestinationsMap(JobExecutionContext context) {
154-
Map<UUID, Destination<ChangeEvent>> dMap =
155-
(Map<UUID, Destination<ChangeEvent>>)
156-
context.getJobDetail().getJobDataMap().get(DESTINATION_MAP_KEY);
157-
if (dMap == null) {
158-
dMap = new HashMap<>();
159-
for (SubscriptionDestination subscriptionDest : eventSubscription.getDestinations()) {
160-
dMap.put(
161-
subscriptionDest.getId(), AlertFactory.getAlert(eventSubscription, subscriptionDest));
162-
}
163-
context.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, dMap);
194+
Map<UUID, Destination<ChangeEvent>> dMap = new HashMap<>();
195+
for (SubscriptionDestination subscriptionDest : eventSubscription.getDestinations()) {
196+
dMap.put(
197+
subscriptionDest.getId(), AlertFactory.getAlert(eventSubscription, subscriptionDest));
164198
}
165199
return dMap;
166200
}
167201

168202
private AlertMetrics loadInitialMetrics() {
169-
AlertMetrics metrics = (AlertMetrics) jobDetail.getJobDataMap().get(METRICS_EXTENSION);
170-
if (metrics != null) {
171-
return metrics;
172-
} else {
173-
String json =
174-
Entity.getCollectionDAO()
175-
.eventSubscriptionDAO()
176-
.getSubscriberExtension(eventSubscription.getId().toString(), METRICS_EXTENSION);
177-
if (json != null) {
178-
return JsonUtils.readValue(json, AlertMetrics.class);
179-
}
180-
// Update the Job Data Map with the latest offset
181-
return new AlertMetrics().withTotalEvents(0).withFailedEvents(0).withSuccessEvents(0);
203+
String json =
204+
Entity.getCollectionDAO()
205+
.eventSubscriptionDAO()
206+
.getSubscriberExtension(eventSubscription.getId().toString(), METRICS_EXTENSION);
207+
if (json != null) {
208+
return JsonUtils.readValue(json, AlertMetrics.class);
182209
}
210+
return new AlertMetrics().withTotalEvents(0).withFailedEvents(0).withSuccessEvents(0);
183211
}
184212

185213
@Override
186214
public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
187-
// If no events return
188215
if (events.isEmpty()) {
189216
return;
190217
}
191218

192-
// Filter the Change Events based on Alert Trigger Config
193219
Map<ChangeEvent, Set<UUID>> filteredEvents = getFilteredEvents(eventSubscription, events);
194220

195221
for (var eventWithReceivers : filteredEvents.entrySet()) {
@@ -208,7 +234,6 @@ public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
208234
@Override
209235
public void commit(JobExecutionContext jobExecutionContext) {
210236
long currentTime = System.currentTimeMillis();
211-
// Upsert Offset
212237
EventSubscriptionOffset eventSubscriptionOffset =
213238
new EventSubscriptionOffset()
214239
.withCurrentOffset(offset)
@@ -228,7 +253,8 @@ public void commit(JobExecutionContext jobExecutionContext) {
228253
.getJobDataMap()
229254
.put(ALERT_OFFSET_KEY, eventSubscriptionOffset);
230255

231-
// Upsert Metrics
256+
jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_INFO_KEY, eventSubscription);
257+
232258
AlertMetrics metrics =
233259
new AlertMetrics()
234260
.withTotalEvents(alertMetrics.getTotalEvents())
@@ -243,11 +269,6 @@ public void commit(JobExecutionContext jobExecutionContext) {
243269
METRICS_EXTENSION,
244270
"alertMetrics",
245271
JsonUtils.pojoToJson(metrics));
246-
247-
jobExecutionContext.getJobDetail().getJobDataMap().put(METRICS_EXTENSION, alertMetrics);
248-
249-
// Populate the Destination map
250-
jobExecutionContext.getJobDetail().getJobDataMap().put(DESTINATION_MAP_KEY, destinationMap);
251272
}
252273

253274
@Override
@@ -269,16 +290,17 @@ public ResultList<ChangeEvent> pollEvents(long offset, long batchSize) {
269290

270291
@Override
271292
public void execute(JobExecutionContext jobExecutionContext) {
272-
// Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor
273293
this.init(jobExecutionContext);
294+
if (this.eventSubscription == null) {
295+
LOG.error("Skipping job execution - EventSubscription could not be loaded");
296+
return;
297+
}
274298
long batchSize = 0;
275299
Map<ChangeEvent, Set<UUID>> eventsWithReceivers = new HashMap<>();
276300
try {
277-
// Poll Events from Change Event Table
278301
ResultList<ChangeEvent> batch = pollEvents(offset, eventSubscription.getBatchSize());
279302
batchSize = batch.getPaging().getTotal();
280303
eventsWithReceivers.putAll(createEventsWithReceivers(batch.getData()));
281-
// Publish Events
282304
if (!eventsWithReceivers.isEmpty()) {
283305
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + eventsWithReceivers.size());
284306
publishEvents(eventsWithReceivers);
@@ -293,15 +315,14 @@ public void execute(JobExecutionContext jobExecutionContext) {
293315

294316
} finally {
295317
if (!eventsWithReceivers.isEmpty()) {
296-
// Commit the Offset
297318
offset += batchSize;
298319
commit(jobExecutionContext);
299320
}
300321
}
301322
}
302323

303324
public EventSubscription getEventSubscription() {
304-
return (EventSubscription) jobDetail.getJobDataMap().get(ALERT_INFO_KEY);
325+
return eventSubscription;
305326
}
306327

307328
private Map<ChangeEvent, Set<UUID>> createEventsWithReceivers(List<ChangeEvent> events) {

0 commit comments

Comments
 (0)