Skip to content

Commit 240a77d

Browse files
authored
Update monitor.service.ts
Fix monitor
1 parent 1dc5bb8 commit 240a77d

1 file changed

Lines changed: 303 additions & 0 deletions

File tree

src/whatsapp/services/monitor.service.ts

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,78 @@
1+
import { execSync } from 'child_process';
2+
import EventEmitter2 from 'eventemitter2';
3+
import { opendirSync, readdirSync, rmSync } from 'fs';
4+
import { Db } from 'mongodb';
5+
import { join } from 'path';
6+
7+
import { Auth, ConfigService, Database, DelInstance, HttpServer, Redis } from '../../config/env.config';
8+
import { Logger } from '../../config/logger.config';
9+
import { INSTANCE_DIR, STORE_DIR } from '../../config/path.config';
10+
import { dbserver } from '../../libs/db.connect';
11+
import { RedisCache } from '../../libs/redis.client';
12+
import {
13+
AuthModel,
14+
ChatwootModel,
15+
ContactModel,
16+
MessageModel,
17+
MessageUpModel,
18+
SettingsModel,
19+
WebhookModel,
20+
} from '../models';
21+
import { RepositoryBroker } from '../repository/repository.manager';
22+
import { WAStartupService } from './whatsapp.service';
23+
24+
export class WAMonitoringService {
25+
constructor(
26+
private readonly eventEmitter: EventEmitter2,
27+
private readonly configService: ConfigService,
28+
private readonly repository: RepositoryBroker,
29+
private readonly cache: RedisCache,
30+
) {
31+
this.logger.verbose('instance created');
32+
33+
this.removeInstance();
34+
this.noConnection();
35+
this.delInstanceFiles();
36+
37+
Object.assign(this.db, configService.get<Database>('DATABASE'));
38+
Object.assign(this.redis, configService.get<Redis>('REDIS'));
39+
40+
this.dbInstance = this.db.ENABLED
41+
? this.repository.dbServer?.db(this.db.CONNECTION.DB_PREFIX_NAME + '-instances')
42+
: undefined;
43+
}
44+
45+
private readonly db: Partial<Database> = {};
46+
private readonly redis: Partial<Redis> = {};
47+
48+
private dbInstance: Db;
49+
50+
private dbStore = dbserver;
51+
52+
private readonly logger = new Logger(WAMonitoringService.name);
53+
public readonly waInstances: Record<string, WAStartupService> = {};
54+
55+
public delInstanceTime(instance: string) {
56+
const time = this.configService.get<DelInstance>('DEL_INSTANCE');
57+
if (typeof time === 'number' && time > 0) {
58+
this.logger.verbose(`Instance "${instance}" don't have connection, will be removed in ${time} minutes`);
59+
60+
setTimeout(async () => {
61+
if (this.waInstances[instance]?.connectionStatus?.state !== 'open') {
62+
if (this.waInstances[instance]?.connectionStatus?.state === 'connecting') {
63+
await this.waInstances[instance]?.client?.logout('Log out instance: ' + instance);
64+
this.waInstances[instance]?.client?.ws?.close();
65+
this.waInstances[instance]?.client?.end(undefined);
66+
delete this.waInstances[instance];
67+
} else {
68+
delete this.waInstances[instance];
69+
this.eventEmitter.emit('remove.instance', instance, 'inner');
70+
}
71+
}
72+
}, 1000 * 60 * time);
73+
}
74+
}
75+
176
public async instanceInfo(instanceName?: string) {
277
this.logger.verbose('get instance info');
378

@@ -52,3 +127,231 @@
52127

53128
return instances;
54129
}
130+
131+
private delInstanceFiles() {
132+
this.logger.verbose('cron to delete instance files started');
133+
setInterval(async () => {
134+
if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
135+
const collections = await this.dbInstance.collections();
136+
collections.forEach(async (collection) => {
137+
const name = collection.namespace.replace(/^[\w-]+./, '');
138+
await this.dbInstance.collection(name).deleteMany({
139+
$or: [{ _id: { $regex: /^app.state.*/ } }, { _id: { $regex: /^session-.*/ } }],
140+
});
141+
this.logger.verbose('instance files deleted: ' + name);
142+
});
143+
} else if (!this.redis.ENABLED) {
144+
const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' });
145+
for await (const dirent of dir) {
146+
if (dirent.isDirectory()) {
147+
const files = readdirSync(join(INSTANCE_DIR, dirent.name), {
148+
encoding: 'utf-8',
149+
});
150+
files.forEach(async (file) => {
151+
if (file.match(/^app.state.*/) || file.match(/^session-.*/)) {
152+
rmSync(join(INSTANCE_DIR, dirent.name, file), {
153+
recursive: true,
154+
force: true,
155+
});
156+
}
157+
});
158+
this.logger.verbose('instance files deleted: ' + dirent.name);
159+
}
160+
}
161+
}
162+
}, 3600 * 1000 * 2);
163+
}
164+
165+
public async cleaningUp(instanceName: string) {
166+
this.logger.verbose('cleaning up instance: ' + instanceName);
167+
if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
168+
this.logger.verbose('cleaning up instance in database: ' + instanceName);
169+
await this.repository.dbServer.connect();
170+
const collections: any[] = await this.dbInstance.collections();
171+
if (collections.length > 0) {
172+
await this.dbInstance.dropCollection(instanceName);
173+
}
174+
return;
175+
}
176+
177+
if (this.redis.ENABLED) {
178+
this.logger.verbose('cleaning up instance in redis: ' + instanceName);
179+
this.cache.reference = instanceName;
180+
await this.cache.delAll();
181+
return;
182+
}
183+
184+
this.logger.verbose('cleaning up instance in files: ' + instanceName);
185+
rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true });
186+
}
187+
188+
public async cleaningStoreFiles(instanceName: string) {
189+
if (!this.db.ENABLED) {
190+
this.logger.verbose('cleaning store files instance: ' + instanceName);
191+
rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true });
192+
193+
execSync(`rm -rf ${join(STORE_DIR, 'chats', instanceName)}`);
194+
execSync(`rm -rf ${join(STORE_DIR, 'contacts', instanceName)}`);
195+
execSync(`rm -rf ${join(STORE_DIR, 'message-up', instanceName)}`);
196+
execSync(`rm -rf ${join(STORE_DIR, 'messages', instanceName)}`);
197+
198+
execSync(`rm -rf ${join(STORE_DIR, 'auth', 'apikey', instanceName + '.json')}`);
199+
execSync(`rm -rf ${join(STORE_DIR, 'webhook', instanceName + '.json')}`);
200+
execSync(`rm -rf ${join(STORE_DIR, 'chatwoot', instanceName + '*')}`);
201+
execSync(`rm -rf ${join(STORE_DIR, 'chamaai', instanceName + '*')}`);
202+
execSync(`rm -rf ${join(STORE_DIR, 'proxy', instanceName + '*')}`);
203+
execSync(`rm -rf ${join(STORE_DIR, 'rabbitmq', instanceName + '*')}`);
204+
execSync(`rm -rf ${join(STORE_DIR, 'typebot', instanceName + '*')}`);
205+
execSync(`rm -rf ${join(STORE_DIR, 'websocket', instanceName + '*')}`);
206+
execSync(`rm -rf ${join(STORE_DIR, 'settings', instanceName + '*')}`);
207+
208+
return;
209+
}
210+
211+
this.logger.verbose('cleaning store database instance: ' + instanceName);
212+
213+
await AuthModel.deleteMany({ owner: instanceName });
214+
await ContactModel.deleteMany({ owner: instanceName });
215+
await MessageModel.deleteMany({ owner: instanceName });
216+
await MessageUpModel.deleteMany({ owner: instanceName });
217+
await AuthModel.deleteMany({ _id: instanceName });
218+
await WebhookModel.deleteMany({ _id: instanceName });
219+
await ChatwootModel.deleteMany({ _id: instanceName });
220+
await SettingsModel.deleteMany({ _id: instanceName });
221+
222+
return;
223+
}
224+
225+
public async loadInstance() {
226+
this.logger.verbose('Loading instances');
227+
228+
try {
229+
if (this.redis.ENABLED) {
230+
await this.loadInstancesFromRedis();
231+
} else if (this.db.ENABLED && this.db.SAVE_DATA.INSTANCE) {
232+
await this.loadInstancesFromDatabase();
233+
} else {
234+
await this.loadInstancesFromFiles();
235+
}
236+
} catch (error) {
237+
this.logger.error(error);
238+
}
239+
}
240+
241+
private async setInstance(name: string) {
242+
const instance = new WAStartupService(this.configService, this.eventEmitter, this.repository, this.cache);
243+
instance.instanceName = name;
244+
this.logger.verbose('Instance loaded: ' + name);
245+
246+
await instance.connectToWhatsapp();
247+
this.logger.verbose('connectToWhatsapp: ' + name);
248+
249+
this.waInstances[name] = instance;
250+
}
251+
252+
private async loadInstancesFromRedis() {
253+
this.logger.verbose('Redis enabled');
254+
await this.cache.connect(this.redis as Redis);
255+
const keys = await this.cache.instanceKeys();
256+
257+
if (keys?.length > 0) {
258+
this.logger.verbose('Reading instance keys and setting instances');
259+
await Promise.all(keys.map((k) => this.setInstance(k.split(':')[1])));
260+
} else {
261+
this.logger.verbose('No instance keys found');
262+
}
263+
}
264+
265+
private async loadInstancesFromDatabase() {
266+
this.logger.verbose('Database enabled');
267+
await this.repository.dbServer.connect();
268+
const collections: any[] = await this.dbInstance.collections();
269+
270+
if (collections.length > 0) {
271+
this.logger.verbose('Reading collections and setting instances');
272+
await Promise.all(collections.map((coll) => this.setInstance(coll.namespace.replace(/^[\w-]+\./, ''))));
273+
} else {
274+
this.logger.verbose('No collections found');
275+
}
276+
}
277+
278+
private async loadInstancesFromFiles() {
279+
this.logger.verbose('Store in files enabled');
280+
const dir = opendirSync(INSTANCE_DIR, { encoding: 'utf-8' });
281+
const instanceDirs = [];
282+
283+
for await (const dirent of dir) {
284+
if (dirent.isDirectory()) {
285+
instanceDirs.push(dirent.name);
286+
} else {
287+
this.logger.verbose('No instance files found');
288+
}
289+
}
290+
291+
await Promise.all(
292+
instanceDirs.map(async (instanceName) => {
293+
this.logger.verbose('Reading instance files and setting instances: ' + instanceName);
294+
const files = readdirSync(join(INSTANCE_DIR, instanceName), { encoding: 'utf-8' });
295+
296+
if (files.length === 0) {
297+
rmSync(join(INSTANCE_DIR, instanceName), { recursive: true, force: true });
298+
} else {
299+
await this.setInstance(instanceName);
300+
}
301+
}),
302+
);
303+
}
304+
305+
private removeInstance() {
306+
this.eventEmitter.on('remove.instance', async (instanceName: string) => {
307+
this.logger.verbose('remove instance: ' + instanceName);
308+
try {
309+
this.logger.verbose('instance: ' + instanceName + ' - removing from memory');
310+
this.waInstances[instanceName] = undefined;
311+
} catch (error) {
312+
this.logger.error(error);
313+
}
314+
315+
try {
316+
this.logger.verbose('request cleaning up instance: ' + instanceName);
317+
this.cleaningUp(instanceName);
318+
this.cleaningStoreFiles(instanceName);
319+
} finally {
320+
this.logger.warn(`Instance "${instanceName}" - REMOVED`);
321+
}
322+
});
323+
this.eventEmitter.on('logout.instance', async (instanceName: string) => {
324+
this.logger.verbose('logout instance: ' + instanceName);
325+
try {
326+
this.logger.verbose('request cleaning up instance: ' + instanceName);
327+
this.cleaningUp(instanceName);
328+
} finally {
329+
this.logger.warn(`Instance "${instanceName}" - LOGOUT`);
330+
}
331+
});
332+
}
333+
334+
private noConnection() {
335+
this.logger.verbose('checking instances without connection');
336+
this.eventEmitter.on('no.connection', async (instanceName) => {
337+
try {
338+
this.logger.verbose('logging out instance: ' + instanceName);
339+
await this.waInstances[instanceName]?.client?.logout('Log out instance: ' + instanceName);
340+
341+
this.logger.verbose('close connection instance: ' + instanceName);
342+
this.waInstances[instanceName]?.client?.ws?.close();
343+
344+
this.waInstances[instanceName].instance.qrcode = { count: 0 };
345+
this.waInstances[instanceName].stateConnection.state = 'close';
346+
} catch (error) {
347+
this.logger.error({
348+
localError: 'noConnection',
349+
warn: 'Error deleting instance from memory.',
350+
error,
351+
});
352+
} finally {
353+
this.logger.warn(`Instance "${instanceName}" - NOT CONNECTION`);
354+
}
355+
});
356+
}
357+
}

0 commit comments

Comments
 (0)