Skip to content

Commit 1f3858b

Browse files
committed
feat: add global SQS configuration
1 parent 9cdb897 commit 1f3858b

3 files changed

Lines changed: 213 additions & 68 deletions

File tree

.env.example

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
SERVER_NAME=evolution
12
SERVER_TYPE=http
23
SERVER_PORT=8080
34
# Server URL - Set your application url
@@ -96,6 +97,35 @@ SQS_SECRET_ACCESS_KEY=
9697
SQS_ACCOUNT_ID=
9798
SQS_REGION=
9899

100+
SQS_GLOBAL_ENABLED=false
101+
SQS_GLOBAL_APPLICATION_STARTUP=false
102+
SQS_GLOBAL_CALL=false
103+
SQS_GLOBAL_CHATS_DELETE=false
104+
SQS_GLOBAL_CHATS_SET=false
105+
SQS_GLOBAL_CHATS_UPDATE=false
106+
SQS_GLOBAL_CHATS_UPSERT=false
107+
SQS_GLOBAL_CONNECTION_UPDATE=false
108+
SQS_GLOBAL_CONTACTS_SET=false
109+
SQS_GLOBAL_CONTACTS_UPDATE=false
110+
SQS_GLOBAL_CONTACTS_UPSERT=false
111+
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
112+
SQS_GLOBAL_GROUP_UPDATE=false
113+
SQS_GLOBAL_GROUPS_UPSERT=false
114+
SQS_GLOBAL_LABELS_ASSOCIATION=false
115+
SQS_GLOBAL_LABELS_EDIT=false
116+
SQS_GLOBAL_LOGOUT_INSTANCE=false
117+
SQS_GLOBAL_MESSAGES_DELETE=false
118+
SQS_GLOBAL_MESSAGES_EDITED=false
119+
SQS_GLOBAL_MESSAGES_SET=false
120+
SQS_GLOBAL_MESSAGES_UPDATE=false
121+
SQS_GLOBAL_MESSAGES_UPSERT=false
122+
SQS_GLOBAL_PRESENCE_UPDATE=false
123+
SQS_GLOBAL_QRCODE_UPDATED=false
124+
SQS_GLOBAL_REMOVE_INSTANCE=false
125+
SQS_GLOBAL_SEND_MESSAGE=false
126+
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
127+
SQS_GLOBAL_TYPEBOT_START=false
128+
99129
# Websocket - Environment variables
100130
WEBSOCKET_ENABLED=false
101131
WEBSOCKET_GLOBAL_EVENTS=false

src/api/integrations/event/sqs/sqs.controller.ts

Lines changed: 117 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import { PrismaRepository } from '@api/repository/repository.service';
22
import { WAMonitoringService } from '@api/services/monitor.service';
33
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
4-
import { configService, Log, Sqs } from '@config/env.config';
4+
import { configService, Log, HttpServer, Sqs, S3 } from '@config/env.config';
55
import { Logger } from '@config/logger.config';
6+
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
7+
import { join } from 'path';
68

79
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
810
import { EventDto } from '../event.dto';
@@ -20,7 +22,7 @@ export class SqsController extends EventController implements EventControllerInt
2022
return;
2123
}
2224

23-
new Promise<void>((resolve) => {
25+
new Promise<void>(async (resolve) => {
2426
const awsConfig = configService.get<Sqs>('SQS');
2527

2628
this.sqs = new SQS({
@@ -34,6 +36,12 @@ export class SqsController extends EventController implements EventControllerInt
3436

3537
this.logger.info('SQS initialized');
3638

39+
const sqsConfig = configService.get<Sqs>('SQS');
40+
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
41+
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter(e => sqsConfig.EVENTS[e]);
42+
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
43+
}
44+
3745
resolve();
3846
});
3947
}
@@ -47,7 +55,7 @@ export class SqsController extends EventController implements EventControllerInt
4755
}
4856

4957
override async set(instanceName: string, data: EventDto): Promise<any> {
50-
if (!this.status) {
58+
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
5159
return;
5260
}
5361

@@ -75,6 +83,7 @@ export class SqsController extends EventController implements EventControllerInt
7583
instanceId: this.monitor.waInstances[instanceName].instanceId,
7684
},
7785
};
86+
7887
console.log('*** payload: ', payload);
7988
return this.prisma[this.name].upsert(payload);
8089
}
@@ -98,66 +107,104 @@ export class SqsController extends EventController implements EventControllerInt
98107
return;
99108
}
100109

101-
const instanceSqs = await this.get(instanceName);
102-
const sqsLocal = instanceSqs?.events;
103-
const we = event.replace(/[.-]/gm, '_').toUpperCase();
104-
105-
if (instanceSqs?.enabled) {
106-
if (this.sqs) {
107-
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
108-
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
109-
const queueName = `${instanceName}_${eventFormatted}.fifo`;
110-
const sqsConfig = configService.get<Sqs>('SQS');
111-
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
112-
113-
const message = {
114-
event,
115-
instance: instanceName,
116-
data,
117-
server_url: serverUrl,
118-
date_time: dateTime,
119-
sender,
120-
apikey: apiKey,
121-
};
122-
123-
const params = {
124-
MessageBody: JSON.stringify(message),
125-
MessageGroupId: 'evolution',
126-
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
127-
QueueUrl: sqsUrl,
128-
};
129-
130-
this.sqs.sendMessage(params, (err) => {
131-
if (err) {
132-
this.logger.error({
133-
local: `${origin}.sendData-SQS`,
134-
message: err?.message,
135-
hostName: err?.hostname,
136-
code: err?.code,
137-
stack: err?.stack,
138-
name: err?.name,
139-
url: queueName,
140-
server_url: serverUrl,
141-
});
142-
} else {
143-
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
144-
const logData = {
145-
local: `${origin}.sendData-SQS`,
146-
...message,
147-
};
148-
149-
this.logger.log(logData);
150-
}
151-
}
110+
if (this.sqs) {
111+
const sqsConfig = configService.get<Sqs>('SQS');
112+
113+
const we = event.replace(/[.-]/gm, '_').toUpperCase();
114+
115+
let sqsEvents = [];
116+
if (sqsConfig.GLOBAL_ENABLED) {
117+
sqsEvents = Object.keys(sqsConfig.EVENTS).filter(e => sqsConfig.EVENTS[e]);
118+
} else {
119+
const instanceSqs = await this.get(instanceName);
120+
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
121+
sqsEvents = instanceSqs?.events;
122+
}
123+
}
124+
125+
if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
126+
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
127+
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
128+
const queueName = `${prefixName}_${eventFormatted}.fifo`;
129+
130+
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
131+
132+
const message = {
133+
event,
134+
instance: instanceName,
135+
dataType: 'json',
136+
data,
137+
server: configService.get<HttpServer>('SERVER').NAME,
138+
server_url: serverUrl,
139+
date_time: dateTime,
140+
sender,
141+
apikey: apiKey,
142+
};
143+
144+
const jsonStr = JSON.stringify(message);
145+
const size = Buffer.byteLength(jsonStr, 'utf8');
146+
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
147+
if (!configService.get<S3>('S3').ENABLE) {
148+
this.logger.error(`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`);
149+
return;
150+
}
151+
152+
const buffer = Buffer.from(jsonStr, 'utf8');
153+
const fileName = `${instanceName}_${eventFormatted}_${Date.now()}.json`;
154+
const fullName = join(
155+
'messages',
156+
fileName
157+
);
158+
159+
await s3Service.uploadFile(fullName, buffer, size, {
160+
'Content-Type': 'application/json',
161+
'Cache-Control': 'no-store'
152162
});
163+
164+
const fileUrl = await s3Service.getObjectUrl(fullName);
165+
166+
message.data = { fileUrl };
167+
message.dataType = 's3';
153168
}
169+
170+
const params = {
171+
MessageBody: JSON.stringify(message),
172+
MessageGroupId: 'evolution',
173+
QueueUrl: sqsUrl,
174+
};
175+
176+
this.sqs.sendMessage(params, (err) => {
177+
if (err) {
178+
this.logger.error({
179+
local: `${origin}.sendData-SQS`,
180+
params: JSON.stringify(message),
181+
sqsUrl: sqsUrl,
182+
message: err?.message,
183+
hostName: err?.hostname,
184+
code: err?.code,
185+
stack: err?.stack,
186+
name: err?.name,
187+
url: queueName,
188+
server_url: serverUrl,
189+
});
190+
} else {
191+
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
192+
const logData = {
193+
local: `${origin}.sendData-SQS`,
194+
...message,
195+
};
196+
197+
this.logger.log(logData);
198+
}
199+
}
200+
});
154201
}
155202
}
156203
}
157204

158-
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
205+
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
159206
if (enable) {
160-
const eventsFinded = await this.listQueuesByInstance(instanceName);
207+
const eventsFinded = await this.listQueues(prefixName);
161208
console.log('eventsFinded', eventsFinded);
162209

163210
for (const event of events) {
@@ -168,13 +215,13 @@ export class SqsController extends EventController implements EventControllerInt
168215
continue;
169216
}
170217

171-
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
172-
218+
const queueName = `${prefixName}_${normalizedEvent}.fifo`;
173219
try {
174220
const createCommand = new CreateQueueCommand({
175221
QueueName: queueName,
176222
Attributes: {
177223
FifoQueue: 'true',
224+
ContentBasedDeduplication: 'true'
178225
},
179226
});
180227
const data = await this.sqs.send(createCommand);
@@ -186,12 +233,14 @@ export class SqsController extends EventController implements EventControllerInt
186233
}
187234
}
188235

189-
private async listQueuesByInstance(instanceName: string) {
236+
private async listQueues(prefixName: string) {
190237
let existingQueues: string[] = [];
238+
191239
try {
192-
const listCommand = new ListQueuesCommand({
193-
QueueNamePrefix: `${instanceName}_`,
240+
let listCommand = new ListQueuesCommand({
241+
QueueNamePrefix: `${prefixName}_`,
194242
});
243+
195244
const listData = await this.sqs.send(listCommand);
196245
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
197246
// Extrai o nome da fila a partir da URL
@@ -201,32 +250,32 @@ export class SqsController extends EventController implements EventControllerInt
201250
});
202251
}
203252
} catch (error: any) {
204-
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
253+
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
205254
return;
206255
}
207256

208257
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
209258
return existingQueues
210259
.map((queueName) => {
211260
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
212-
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
213-
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
261+
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
262+
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
214263
}
215264
return '';
216265
})
217266
.filter((event) => event !== '');
218267
}
219268

220269
// Para uma futura feature de exclusão forçada das queues
221-
private async removeQueuesByInstance(instanceName: string) {
270+
private async removeQueuesByInstance(prefixName: string) {
222271
try {
223272
const listCommand = new ListQueuesCommand({
224-
QueueNamePrefix: `${instanceName}_`,
273+
QueueNamePrefix: `${prefixName}_`,
225274
});
226275
const listData = await this.sqs.send(listCommand);
227276

228277
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
229-
this.logger.info(`No queues found for instance ${instanceName}`);
278+
this.logger.info(`No queues found for ${prefixName}`);
230279
return;
231280
}
232281

@@ -240,7 +289,7 @@ export class SqsController extends EventController implements EventControllerInt
240289
}
241290
}
242291
} catch (err: any) {
243-
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
292+
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
244293
}
245294
}
246295
}

0 commit comments

Comments
 (0)