Skip to content

Commit ac6e9ae

Browse files
committed
Improvements in RabbitMQ, added 2 new modes
1 parent 7dd589f commit ac6e9ae

7 files changed

Lines changed: 149 additions & 56 deletions

File tree

Docker/.env.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ REDIS_URI=redis://redis:6379
4747
REDIS_PREFIX_KEY=evdocker
4848

4949
RABBITMQ_ENABLED=false
50-
RABBITMQ_GLOBAL_EVENT_QUEUE=false
50+
RABBITMQ_RABBITMQ_MODE=isolated
51+
RABBITMQ_EXCHANGE_NAME=evolution_exchange
5152
RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
5253

5354
WEBSOCKET_ENABLED=false

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ ENV REDIS_URI=redis://redis:6379
6262
ENV REDIS_PREFIX_KEY=evolution
6363

6464
ENV RABBITMQ_ENABLED=false
65-
ENV RABBITMQ_GLOBAL_EVENT_QUEUE=false
65+
ENV RABBITMQ_MODE=isolated
66+
ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange
6667
ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
6768

6869
ENV WEBSOCKET_ENABLED=false

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
"yamljs": "^0.3.0"
9292
},
9393
"devDependencies": {
94+
"@types/amqplib": "^0.10.5",
9495
"@types/compression": "^1.7.2",
9596
"@types/cors": "^2.8.13",
9697
"@types/express": "^4.17.17",

src/config/env.config.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ export type Redis = {
7171

7272
export type Rabbitmq = {
7373
ENABLED: boolean;
74-
GLOBAL_EVENT_QUEUE: boolean;
74+
MODE: 'isolated' | 'global' | 'single';
75+
EXCHANGE_NAME: string; // available for global and single, isolated mode will use instance name as exchange
7576
URI: string;
7677
};
7778

@@ -283,7 +284,8 @@ export class ConfigService {
283284
},
284285
RABBITMQ: {
285286
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
286-
GLOBAL_EVENT_QUEUE: process.env?.RABBITMQ_GLOBAL_EVENT_QUEUE === 'true',
287+
MODE: (process.env?.RABBITMQ_MODE as Rabbitmq['MODE']) || 'single',
288+
EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange',
287289
URI: process.env.RABBITMQ_URI || '',
288290
},
289291
SQS: {

src/dev-env.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ REDIS:
8383

8484
RABBITMQ:
8585
ENABLED: false
86-
GLOBAL_EVENT_QUEUE: false
86+
MODE: "global"
87+
EXCHANGE_NAME: "evolution_exchange"
8788
URI: "amqp://guest:guest@localhost:5672"
8889

8990
SQS:

src/libs/amqp.server.ts

Lines changed: 136 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,35 @@
1-
import * as amqp from 'amqplib/callback_api';
1+
import { Channel, connect } from 'amqplib/callback_api';
22

33
import { configService, HttpServer, Rabbitmq } from '../config/env.config';
44
import { Logger } from '../config/logger.config';
5+
import { Events } from '../whatsapp/types/wa.types';
56

67
const logger = new Logger('AMQP');
78

8-
let amqpChannel: amqp.Channel | null = null;
9+
const parseEvtName = (evt: string) => evt.replace(/_/g, '.').toLowerCase();
10+
11+
const globalQueues: { [key: string]: Events[] } = {
12+
contacts: [Events.CONTACTS_SET, Events.CONTACTS_UPDATE, Events.CONTACTS_UPSERT],
13+
messages: [
14+
Events.MESSAGES_DELETE,
15+
Events.MESSAGES_SET,
16+
Events.MESSAGES_UPDATE,
17+
Events.MESSAGES_UPSERT,
18+
Events.MESSAGING_HISTORY_SET,
19+
Events.SEND_MESSAGE,
20+
],
21+
chats: [Events.CHATS_DELETE, Events.CHATS_SET, Events.CHATS_UPDATE, Events.CHATS_UPSERT],
22+
groups: [Events.GROUPS_UPDATE, Events.GROUPS_UPSERT, Events.GROUP_PARTICIPANTS_UPDATE],
23+
others: [], // All other events not included in the above categories
24+
};
25+
26+
let amqpChannel: Channel | null = null;
927

1028
export const initAMQP = () => {
1129
return new Promise<void>((resolve, reject) => {
1230
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
13-
amqp.connect(rabbitConfig.URI, (error, connection) => {
31+
console.log(rabbitConfig);
32+
connect(rabbitConfig.URI, (error, connection) => {
1433
if (error) {
1534
reject(error);
1635
return;
@@ -22,12 +41,9 @@ export const initAMQP = () => {
2241
return;
2342
}
2443

25-
const exchangeName = 'evolution_exchange';
26-
27-
channel.assertExchange(exchangeName, 'topic', {
44+
channel.assertExchange(rabbitConfig.EXCHANGE_NAME, 'topic', {
2845
durable: true,
2946
autoDelete: false,
30-
assert: true,
3147
});
3248

3349
amqpChannel = channel;
@@ -39,69 +55,131 @@ export const initAMQP = () => {
3955
});
4056
};
4157

42-
export const getAMQP = (): amqp.Channel | null => {
58+
export const getAMQP = (): Channel | null => {
4359
return amqpChannel;
4460
};
4561

4662
export const initQueues = (instanceName: string, events: string[]) => {
4763
if (!instanceName || !events || !events.length) return;
4864
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
65+
const TWO_DAYS_IN_MS = 2 * 24 * 60 * 60 * 1000;
66+
const amqp = getAMQP();
4967

50-
const queues = events.map((event) => {
51-
return `${event.replace(/_/g, '.').toLowerCase()}`;
52-
});
68+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
5369

54-
queues.forEach((event) => {
55-
const amqp = getAMQP();
56-
const exchangeName = instanceName ?? 'evolution_exchange';
70+
const receivedEvents = events.map(parseEvtName);
71+
if (rabbitConfig.MODE === 'isolated') {
72+
exchangeName = instanceName;
5773

74+
receivedEvents.forEach((event) => {
75+
amqp.assertExchange(exchangeName, 'topic', {
76+
durable: true,
77+
autoDelete: false,
78+
});
79+
80+
const queueName = event;
81+
amqp.assertQueue(queueName, {
82+
durable: true,
83+
autoDelete: false,
84+
messageTtl: TWO_DAYS_IN_MS,
85+
arguments: {
86+
'x-queue-type': 'quorum',
87+
},
88+
});
89+
90+
amqp.bindQueue(queueName, exchangeName, event);
91+
});
92+
} else if (rabbitConfig.MODE === 'single') {
5893
amqp.assertExchange(exchangeName, 'topic', {
5994
durable: true,
6095
autoDelete: false,
61-
assert: true,
6296
});
6397

64-
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
65-
98+
const queueName = 'evolution';
6699
amqp.assertQueue(queueName, {
67100
durable: true,
68101
autoDelete: false,
102+
messageTtl: TWO_DAYS_IN_MS,
69103
arguments: {
70104
'x-queue-type': 'quorum',
71105
},
72106
});
73107

74-
amqp.bindQueue(queueName, exchangeName, event);
75-
});
108+
receivedEvents.forEach((event) => {
109+
amqp.bindQueue(queueName, exchangeName, event);
110+
});
111+
} else if (rabbitConfig.MODE === 'global') {
112+
const queues = Object.keys(globalQueues);
113+
114+
const addQueues = queues.filter((evt) => {
115+
if (evt === 'others') {
116+
return receivedEvents.some(
117+
(e) =>
118+
!Object.values(globalQueues)
119+
.flat()
120+
.includes(e as Events),
121+
);
122+
}
123+
return globalQueues[evt].some((e) => receivedEvents.includes(e));
124+
});
125+
126+
addQueues.forEach((event) => {
127+
amqp.assertExchange(exchangeName, 'topic', {
128+
durable: true,
129+
autoDelete: false,
130+
});
131+
132+
const queueName = event;
133+
amqp.assertQueue(queueName, {
134+
durable: true,
135+
autoDelete: false,
136+
messageTtl: TWO_DAYS_IN_MS,
137+
arguments: {
138+
'x-queue-type': 'quorum',
139+
},
140+
});
141+
142+
if (globalQueues[event].length === 0) {
143+
// Other events
144+
const otherEvents = Object.values(globalQueues).flat();
145+
for (const subEvent in Events) {
146+
const eventCode = Events[subEvent];
147+
if (otherEvents.includes(eventCode)) continue;
148+
if (!receivedEvents.includes(eventCode)) continue;
149+
amqp.bindQueue(queueName, exchangeName, eventCode);
150+
}
151+
} else {
152+
globalQueues[event].forEach((subEvent) => {
153+
amqp.bindQueue(queueName, exchangeName, subEvent);
154+
});
155+
}
156+
});
157+
} else {
158+
throw new Error('Invalid RabbitMQ mode');
159+
}
76160
};
77161

78162
export const removeQueues = (instanceName: string, events: string[]) => {
79163
if (!events || !events.length) return;
80164
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
165+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
166+
const amqp = getAMQP();
167+
168+
const receivedEvents = events.map(parseEvtName);
169+
if (rabbitConfig.MODE === 'isolated') {
170+
exchangeName = instanceName;
171+
receivedEvents.forEach((event) => {
172+
amqp.assertExchange(exchangeName, 'topic', {
173+
durable: true,
174+
autoDelete: false,
175+
});
81176

82-
const channel = getAMQP();
83-
84-
const queues = events.map((event) => {
85-
return `${event.replace(/_/g, '.').toLowerCase()}`;
86-
});
87-
88-
const exchangeName = instanceName ?? 'evolution_exchange';
89-
90-
queues.forEach((event) => {
91-
const amqp = getAMQP();
177+
const queueName = event;
92178

93-
amqp.assertExchange(exchangeName, 'topic', {
94-
durable: true,
95-
autoDelete: false,
96-
assert: true,
179+
amqp.unbindQueue(queueName, exchangeName, event);
97180
});
98-
99-
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
100-
101-
amqp.deleteQueue(queueName);
102-
});
103-
104-
channel.deleteExchange(exchangeName);
181+
amqp.deleteExchange(instanceName);
182+
}
105183
};
106184

107185
interface SendEventData {
@@ -113,30 +191,40 @@ interface SendEventData {
113191
}
114192

115193
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
116-
const exchangeName = instanceName ?? 'evolution_exchange';
194+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
195+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
196+
if (rabbitConfig.MODE === 'isolated') exchangeName = instanceName;
117197

118198
amqpChannel.assertExchange(exchangeName, 'topic', {
119199
durable: true,
120200
autoDelete: false,
121-
assert: true,
122201
});
123-
124-
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
125-
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
126-
202+
let queueName = event;
203+
if (rabbitConfig.MODE === 'single') {
204+
queueName = 'evolution';
205+
} else if (rabbitConfig.MODE === 'global') {
206+
let eventName = '';
207+
Object.keys(globalQueues).forEach((key) => {
208+
if (globalQueues[key].includes(event as Events)) {
209+
eventName = key;
210+
}
211+
if (eventName === '' && key === 'others') {
212+
eventName = key;
213+
}
214+
});
215+
queueName = eventName;
216+
}
127217
amqpChannel.assertQueue(queueName, {
128218
durable: true,
129219
autoDelete: false,
130220
arguments: { 'x-queue-type': 'quorum' },
131221
});
132-
133222
amqpChannel.bindQueue(queueName, exchangeName, event);
134223

135224
const serverUrl = configService.get<HttpServer>('SERVER').URL;
136225
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
137226
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
138227
const now = localISOTime;
139-
140228
const message = {
141229
event,
142230
instance: instanceName,
@@ -145,11 +233,9 @@ export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendE
145233
date_time: now,
146234
sender: wuid,
147235
};
148-
149236
if (apiKey) {
150237
message['apikey'] = apiKey;
151238
}
152-
153239
logger.log({
154240
queueName,
155241
exchangeName,

src/whatsapp/services/whatsapp.baileys.service.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,8 @@ export class BaileysStartupService extends WAStartupService {
11241124
5: 'PLAYED',
11251125
};
11261126
for await (const { key, update } of args) {
1127-
if (settings?.groups_ignore && key.remoteJid.includes('@g.us')) {
1127+
console.log(key);
1128+
if (settings?.groups_ignore && key.remoteJid?.includes('@g.us')) {
11281129
this.logger.verbose('group ignored');
11291130
return;
11301131
}

0 commit comments

Comments
 (0)