Skip to content

Commit 7dd589f

Browse files
committed
Make rabbitMQ per events
1 parent 6b93005 commit 7dd589f

6 files changed

Lines changed: 73 additions & 39 deletions

File tree

Docker/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ REDIS_URI=redis://redis:6379
4747
REDIS_PREFIX_KEY=evdocker
4848

4949
RABBITMQ_ENABLED=false
50+
RABBITMQ_GLOBAL_EVENT_QUEUE=false
5051
RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
5152

5253
WEBSOCKET_ENABLED=false

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ ENV REDIS_URI=redis://redis:6379
6262
ENV REDIS_PREFIX_KEY=evolution
6363

6464
ENV RABBITMQ_ENABLED=false
65+
ENV RABBITMQ_GLOBAL_EVENT_QUEUE=false
6566
ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
6667

6768
ENV WEBSOCKET_ENABLED=false

src/config/env.config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export type Redis = {
7171

7272
export type Rabbitmq = {
7373
ENABLED: boolean;
74+
GLOBAL_EVENT_QUEUE: boolean;
7475
URI: string;
7576
};
7677

@@ -282,6 +283,7 @@ export class ConfigService {
282283
},
283284
RABBITMQ: {
284285
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
286+
GLOBAL_EVENT_QUEUE: process.env?.RABBITMQ_GLOBAL_EVENT_QUEUE === 'true',
285287
URI: process.env.RABBITMQ_URI || '',
286288
},
287289
SQS: {

src/dev-env.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ REDIS:
8383

8484
RABBITMQ:
8585
ENABLED: false
86+
GLOBAL_EVENT_QUEUE: false
8687
URI: "amqp://guest:guest@localhost:5672"
8788

8889
SQS:

src/libs/amqp.server.ts

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as amqp from 'amqplib/callback_api';
22

3-
import { configService, Rabbitmq } from '../config/env.config';
3+
import { configService, HttpServer, Rabbitmq } from '../config/env.config';
44
import { Logger } from '../config/logger.config';
55

66
const logger = new Logger('AMQP');
@@ -9,8 +9,8 @@ let amqpChannel: amqp.Channel | null = null;
99

1010
export const initAMQP = () => {
1111
return new Promise<void>((resolve, reject) => {
12-
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
13-
amqp.connect(uri, (error, connection) => {
12+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
13+
amqp.connect(rabbitConfig.URI, (error, connection) => {
1414
if (error) {
1515
reject(error);
1616
return;
@@ -45,6 +45,7 @@ export const getAMQP = (): amqp.Channel | null => {
4545

4646
export const initQueues = (instanceName: string, events: string[]) => {
4747
if (!instanceName || !events || !events.length) return;
48+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
4849

4950
const queues = events.map((event) => {
5051
return `${event.replace(/_/g, '.').toLowerCase()}`;
@@ -60,7 +61,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
6061
assert: true,
6162
});
6263

63-
const queueName = `${instanceName}.${event}`;
64+
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
6465

6566
amqp.assertQueue(queueName, {
6667
durable: true,
@@ -76,6 +77,7 @@ export const initQueues = (instanceName: string, events: string[]) => {
7677

7778
export const removeQueues = (instanceName: string, events: string[]) => {
7879
if (!events || !events.length) return;
80+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
7981

8082
const channel = getAMQP();
8183

@@ -94,10 +96,64 @@ export const removeQueues = (instanceName: string, events: string[]) => {
9496
assert: true,
9597
});
9698

97-
const queueName = `${instanceName}.${event}`;
99+
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
98100

99101
amqp.deleteQueue(queueName);
100102
});
101103

102104
channel.deleteExchange(exchangeName);
103105
};
106+
107+
interface SendEventData {
108+
instanceName: string;
109+
wuid: string;
110+
event: string;
111+
apiKey?: string;
112+
data: any;
113+
}
114+
115+
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
116+
const exchangeName = instanceName ?? 'evolution_exchange';
117+
118+
amqpChannel.assertExchange(exchangeName, 'topic', {
119+
durable: true,
120+
autoDelete: false,
121+
assert: true,
122+
});
123+
124+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
125+
const queueName = rabbitConfig.GLOBAL_EVENT_QUEUE ? event : `${instanceName}.${event}`;
126+
127+
amqpChannel.assertQueue(queueName, {
128+
durable: true,
129+
autoDelete: false,
130+
arguments: { 'x-queue-type': 'quorum' },
131+
});
132+
133+
amqpChannel.bindQueue(queueName, exchangeName, event);
134+
135+
const serverUrl = configService.get<HttpServer>('SERVER').URL;
136+
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
137+
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
138+
const now = localISOTime;
139+
140+
const message = {
141+
event,
142+
instance: instanceName,
143+
data,
144+
server_url: serverUrl,
145+
date_time: now,
146+
sender: wuid,
147+
};
148+
149+
if (apiKey) {
150+
message['apikey'] = apiKey;
151+
}
152+
153+
logger.log({
154+
queueName,
155+
exchangeName,
156+
event,
157+
});
158+
amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
159+
};

src/whatsapp/services/whatsapp.service.ts

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
import { Logger } from '../../config/logger.config';
2121
import { ROOT_DIR } from '../../config/path.config';
2222
import { NotFoundException } from '../../exceptions';
23-
import { getAMQP, removeQueues } from '../../libs/amqp.server';
23+
import { getAMQP, removeQueues, sendEventData } from '../../libs/amqp.server';
2424
import { getIO } from '../../libs/socket.server';
2525
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
2626
import { ChamaaiRaw, IntegrationRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models';
@@ -685,40 +685,13 @@ export class WAStartupService {
685685

686686
if (amqp) {
687687
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
688-
const exchangeName = this.instanceName ?? 'evolution_exchange';
689-
690-
amqp.assertExchange(exchangeName, 'topic', {
691-
durable: true,
692-
autoDelete: false,
693-
assert: true,
694-
});
695-
696-
const queueName = `${this.instanceName}.${event}`;
697-
698-
amqp.assertQueue(queueName, {
699-
durable: true,
700-
autoDelete: false,
701-
arguments: {
702-
'x-queue-type': 'quorum',
703-
},
704-
});
705-
706-
amqp.bindQueue(queueName, exchangeName, event);
707-
708-
const message = {
709-
event,
710-
instance: this.instance.name,
688+
sendEventData({
711689
data,
712-
server_url: serverUrl,
713-
date_time: now,
714-
sender: this.wuid,
715-
};
716-
717-
if (expose && instanceApikey) {
718-
message['apikey'] = instanceApikey;
719-
}
720-
721-
amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
690+
event,
691+
instanceName: this.instanceName,
692+
wuid: this.wuid,
693+
apiKey: expose && instanceApikey ? instanceApikey : undefined,
694+
});
722695

723696
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
724697
const logData = {

0 commit comments

Comments
 (0)