forked from EvolutionAPI/evolution-api
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbaileysMessage.processor.ts
More file actions
65 lines (59 loc) · 2.54 KB
/
baileysMessage.processor.ts
File metadata and controls
65 lines (59 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import { Logger } from '@config/logger.config';
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs';
type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
type MountProps = {
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
};
export class BaileysMessageProcessor {
private processorLogs = new Logger('BaileysMessageProcessor');
private subscription?: Subscription;
protected messageSubject = new Subject<{
messages: proto.IWebMessageInfo[];
type: MessageUpsertType;
requestId?: string;
settings: any;
}>();
mount({ onMessageReceive }: MountProps) {
this.processorLogs.log(`🧪 [TESTE] mount chamado - BaileysMessageProcessor inicializado`);
this.subscription = this.messageSubject
.pipe(
tap(({ messages }) => {
this.processorLogs.log(`🚀 [BaileysMessageProcessor] Processing batch of ${messages.length} messages`);
this.processorLogs.log(`🧪 [TESTE] LOG DE TESTE FUNCIONANDO - ${new Date().toISOString()}`);
messages.forEach((msg, index) => {
this.processorLogs.log(`📱 [BaileysMessageProcessor] Message ${index + 1}: ${msg.key?.remoteJid} - ${msg.message?.conversation || msg.message?.extendedTextMessage?.text || 'NO_TEXT'}`);
});
}),
concatMap(({ messages, type, requestId, settings }) =>
from(onMessageReceive({ messages, type, requestId }, settings)).pipe(
retryWhen((errors) =>
errors.pipe(
tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)),
delay(1000), // 1 segundo de delay
take(3), // Máximo 3 tentativas
),
),
),
),
catchError((error) => {
this.processorLogs.error(`Error processing message batch: ${error}`);
return EMPTY;
}),
)
.subscribe({
error: (error) => {
this.processorLogs.error(`Message stream error: ${error}`);
},
});
}
processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
this.processorLogs.log(`🧪 [TESTE] processMessage chamado - messages: ${messages.length}, type: ${type}`);
this.messageSubject.next({ messages, type, requestId, settings });
}
onDestroy() {
this.subscription?.unsubscribe();
this.messageSubject.complete();
}
}