Skip to content

Commit d1552f9

Browse files
committed
remove project and task current concurrency tracking
1 parent 3f97125 commit d1552f9

9 files changed

Lines changed: 88 additions & 519 deletions

File tree

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 67 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,6 @@ describe("RunQueue", () => {
216216
expect(queueConcurrency).toBe(0);
217217
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvDev);
218218
expect(envConcurrency).toBe(0);
219-
const projectConcurrency = await queue.currentConcurrencyOfProject(authenticatedEnvDev);
220-
expect(projectConcurrency).toBe(0);
221-
const taskConcurrency = await queue.currentConcurrencyOfTask(
222-
authenticatedEnvDev,
223-
messageDev.taskIdentifier
224-
);
225-
expect(taskConcurrency).toBe(0);
226219

227220
const dequeued = await queue.dequeueMessageFromMasterQueue(
228221
"test_12345",
@@ -243,13 +236,6 @@ describe("RunQueue", () => {
243236
expect(queueConcurrency2).toBe(1);
244237
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvDev);
245238
expect(envConcurrency2).toBe(1);
246-
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvDev);
247-
expect(projectConcurrency2).toBe(1);
248-
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
249-
authenticatedEnvDev,
250-
messageDev.taskIdentifier
251-
);
252-
expect(taskConcurrency2).toBe(1);
253239

254240
//queue lengths
255241
const result3 = await queue.lengthOfQueue(authenticatedEnvDev, messageDev.queue);
@@ -337,13 +323,6 @@ describe("RunQueue", () => {
337323
expect(queueConcurrency).toBe(0);
338324
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
339325
expect(envConcurrency).toBe(0);
340-
const projectConcurrency = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
341-
expect(projectConcurrency).toBe(0);
342-
const taskConcurrency = await queue.currentConcurrencyOfTask(
343-
authenticatedEnvProd,
344-
messageProd.taskIdentifier
345-
);
346-
expect(taskConcurrency).toBe(0);
347326

348327
//dequeue
349328
const dequeued = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
@@ -361,13 +340,6 @@ describe("RunQueue", () => {
361340
expect(queueConcurrency2).toBe(1);
362341
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
363342
expect(envConcurrency2).toBe(1);
364-
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
365-
expect(projectConcurrency2).toBe(1);
366-
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
367-
authenticatedEnvProd,
368-
messageProd.taskIdentifier
369-
);
370-
expect(taskConcurrency2).toBe(1);
371343

372344
//queue length
373345
const length2 = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
@@ -517,13 +489,6 @@ describe("RunQueue", () => {
517489
expect(queueConcurrency).toBe(0);
518490
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
519491
expect(envConcurrency).toBe(0);
520-
const projectConcurrency = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
521-
expect(projectConcurrency).toBe(0);
522-
const taskConcurrency = await queue.currentConcurrencyOfTask(
523-
authenticatedEnvProd,
524-
messageProd.taskIdentifier
525-
);
526-
expect(taskConcurrency).toBe(0);
527492

528493
//queue lengths
529494
const queueLength3 = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
@@ -586,13 +551,6 @@ describe("RunQueue", () => {
586551
expect(queueConcurrency).toBe(0);
587552
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
588553
expect(envConcurrency).toBe(0);
589-
const projectConcurrency = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
590-
expect(projectConcurrency).toBe(0);
591-
const taskConcurrency = await queue.currentConcurrencyOfTask(
592-
authenticatedEnvProd,
593-
messageProd.taskIdentifier
594-
);
595-
expect(taskConcurrency).toBe(0);
596554

597555
//queue lengths
598556
const queueLength3 = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
@@ -651,13 +609,6 @@ describe("RunQueue", () => {
651609
expect(queueConcurrency).toBe(1);
652610
const envConcurrency = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
653611
expect(envConcurrency).toBe(1);
654-
const projectConcurrency = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
655-
expect(projectConcurrency).toBe(1);
656-
const taskConcurrency = await queue.currentConcurrencyOfTask(
657-
authenticatedEnvProd,
658-
messageProd.taskIdentifier
659-
);
660-
expect(taskConcurrency).toBe(1);
661612

662613
await queue.nackMessage({
663614
orgId: messages[0].message.orgId,
@@ -675,13 +626,6 @@ describe("RunQueue", () => {
675626
expect(queueConcurrency2).toBe(0);
676627
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
677628
expect(envConcurrency2).toBe(0);
678-
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
679-
expect(projectConcurrency2).toBe(0);
680-
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
681-
authenticatedEnvProd,
682-
messageProd.taskIdentifier
683-
);
684-
expect(taskConcurrency2).toBe(0);
685629

686630
//queue lengths
687631
const queueLength = await queue.lengthOfQueue(authenticatedEnvProd, messageProd.queue);
@@ -704,127 +648,89 @@ describe("RunQueue", () => {
704648
}
705649
});
706650

707-
redisTest(
708-
"Releasing concurrency",
709-
{ timeout: 5_000 },
710-
async ({ redisContainer, redisOptions }) => {
711-
const queue = new RunQueue({
712-
...testOptions,
713-
queueSelectionStrategy: new FairQueueSelectionStrategy({
714-
redis: {
715-
keyPrefix: "runqueue:test:",
716-
host: redisContainer.getHost(),
717-
port: redisContainer.getPort(),
718-
},
719-
keys: testOptions.keys,
720-
}),
651+
redisTest("Releasing concurrency", async ({ redisContainer, redisOptions }) => {
652+
const queue = new RunQueue({
653+
...testOptions,
654+
queueSelectionStrategy: new FairQueueSelectionStrategy({
721655
redis: {
722656
keyPrefix: "runqueue:test:",
723657
host: redisContainer.getHost(),
724658
port: redisContainer.getPort(),
725659
},
660+
keys: testOptions.keys,
661+
}),
662+
redis: {
663+
keyPrefix: "runqueue:test:",
664+
host: redisContainer.getHost(),
665+
port: redisContainer.getPort(),
666+
},
667+
});
668+
669+
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
670+
671+
try {
672+
await queue.enqueueMessage({
673+
env: authenticatedEnvProd,
674+
message: messageProd,
675+
masterQueues: "main",
726676
});
727677

728-
const redis = createRedisClient({ ...redisOptions, keyPrefix: "runqueue:test:" });
678+
const messages = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
679+
expect(messages.length).toBe(1);
729680

730-
try {
731-
await queue.enqueueMessage({
732-
env: authenticatedEnvProd,
733-
message: messageProd,
734-
masterQueues: "main",
735-
});
681+
//check the message is gone
682+
const key = queue.keys.messageKey(messages[0].message.orgId, messages[0].messageId);
683+
const exists = await redis.exists(key);
684+
expect(exists).toBe(1);
736685

737-
const messages = await queue.dequeueMessageFromMasterQueue("test_12345", "main", 10);
738-
expect(messages.length).toBe(1);
686+
//concurrencies
687+
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
688+
1
689+
);
690+
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
739691

740-
//check the message is gone
741-
const key = queue.keys.messageKey(messages[0].message.orgId, messages[0].messageId);
742-
const exists = await redis.exists(key);
743-
expect(exists).toBe(1);
692+
//release the concurrency
693+
await queue.releaseConcurrency(authenticatedEnvProd.organization.id, messages[0].messageId);
744694

745-
//concurrencies
746-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
747-
1
748-
);
749-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
750-
expect(await queue.currentConcurrencyOfProject(authenticatedEnvProd)).toBe(1);
751-
expect(
752-
await queue.currentConcurrencyOfTask(authenticatedEnvProd, messageProd.taskIdentifier)
753-
).toBe(1);
754-
755-
//release the concurrency (not the queue)
756-
await queue.releaseConcurrency(
757-
authenticatedEnvProd.organization.id,
758-
messages[0].messageId,
759-
false
760-
);
695+
//concurrencies
696+
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
697+
0
698+
);
699+
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
761700

762-
//concurrencies
763-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
764-
1
765-
);
766-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
767-
expect(await queue.currentConcurrencyOfProject(authenticatedEnvProd)).toBe(0);
768-
expect(
769-
await queue.currentConcurrencyOfTask(authenticatedEnvProd, messageProd.taskIdentifier)
770-
).toBe(0);
771-
772-
//reacquire the concurrency
773-
await queue.reacquireConcurrency(
774-
authenticatedEnvProd.organization.id,
775-
messages[0].messageId
776-
);
701+
//reacquire the concurrency
702+
await queue.reacquireConcurrency(authenticatedEnvProd.organization.id, messages[0].messageId);
777703

778-
//check concurrencies are back to what they were before
779-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
780-
1
781-
);
782-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
783-
expect(await queue.currentConcurrencyOfProject(authenticatedEnvProd)).toBe(1);
784-
expect(
785-
await queue.currentConcurrencyOfTask(authenticatedEnvProd, messageProd.taskIdentifier)
786-
).toBe(1);
787-
788-
//release the concurrency (with the queue this time)
789-
await queue.releaseConcurrency(
790-
authenticatedEnvProd.organization.id,
791-
messages[0].messageId,
792-
true
793-
);
704+
//check concurrencies are back to what they were before
705+
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
706+
1
707+
);
708+
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
794709

795-
//concurrencies
796-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
797-
0
798-
);
799-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
800-
expect(await queue.currentConcurrencyOfProject(authenticatedEnvProd)).toBe(0);
801-
expect(
802-
await queue.currentConcurrencyOfTask(authenticatedEnvProd, messageProd.taskIdentifier)
803-
).toBe(0);
804-
805-
//reacquire the concurrency
806-
await queue.reacquireConcurrency(
807-
authenticatedEnvProd.organization.id,
808-
messages[0].messageId
809-
);
710+
//release the concurrency (with the queue this time)
711+
await queue.releaseConcurrency(authenticatedEnvProd.organization.id, messages[0].messageId);
810712

811-
//check concurrencies are back to what they were before
812-
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
813-
1
814-
);
815-
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
816-
expect(await queue.currentConcurrencyOfProject(authenticatedEnvProd)).toBe(1);
817-
expect(
818-
await queue.currentConcurrencyOfTask(authenticatedEnvProd, messageProd.taskIdentifier)
819-
).toBe(1);
820-
} finally {
821-
try {
822-
await queue.quit();
823-
await redis.quit();
824-
} catch (e) {}
825-
}
713+
//concurrencies
714+
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
715+
0
716+
);
717+
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(0);
718+
719+
//reacquire the concurrency
720+
await queue.reacquireConcurrency(authenticatedEnvProd.organization.id, messages[0].messageId);
721+
722+
//check concurrencies are back to what they were before
723+
expect(await queue.currentConcurrencyOfQueue(authenticatedEnvProd, messageProd.queue)).toBe(
724+
1
725+
);
726+
expect(await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd)).toBe(1);
727+
} finally {
728+
try {
729+
await queue.quit();
730+
await redis.quit();
731+
} catch (e) {}
826732
}
827-
);
733+
});
828734

829735
redisTest("Dead Letter Queue", async ({ redisContainer, redisOptions }) => {
830736
const queue = new RunQueue({
@@ -882,13 +788,6 @@ describe("RunQueue", () => {
882788
expect(queueConcurrency2).toBe(0);
883789
const envConcurrency2 = await queue.currentConcurrencyOfEnvironment(authenticatedEnvProd);
884790
expect(envConcurrency2).toBe(0);
885-
const projectConcurrency2 = await queue.currentConcurrencyOfProject(authenticatedEnvProd);
886-
expect(projectConcurrency2).toBe(0);
887-
const taskConcurrency2 = await queue.currentConcurrencyOfTask(
888-
authenticatedEnvProd,
889-
messageProd.taskIdentifier
890-
);
891-
expect(taskConcurrency2).toBe(0);
892791

893792
//check the message is still there
894793
const message = await queue.readMessage(messages[0].message.orgId, messages[0].messageId);

0 commit comments

Comments
 (0)