From 6043a4e754ec3577c07c99c3c2b7fd1f73259181 Mon Sep 17 00:00:00 2001 From: David Badura Date: Sun, 29 Mar 2026 14:35:41 +0200 Subject: [PATCH 1/2] add stateful subscriber feature --- composer.json | 2 +- composer.lock | 2 +- phpstan-baseline.neon | 12 ++ .../DoctrineStatefulSubscriberStore.php | 85 ++++++++++++ .../StatefulSubscriber/StatefulSubscriber.php | 38 ++++++ .../StatefulSubscriberStore.php | 12 ++ .../ProfileInlineStatefulSubscriber.php | 29 ++++ .../Subscription/SubscriptionTest.php | 129 ++++++++++++++++++ 8 files changed, 307 insertions(+), 2 deletions(-) create mode 100644 src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php create mode 100644 src/Subscription/StatefulSubscriber/StatefulSubscriber.php create mode 100644 src/Subscription/StatefulSubscriber/StatefulSubscriberStore.php create mode 100644 tests/Integration/Subscription/Subscriber/ProfileInlineStatefulSubscriber.php diff --git a/composer.json b/composer.json index 2aeb6883..fb8bfb58 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ "php": "~8.2.0 || ~8.3.0 || ~8.4.0 || ~8.5.0", "doctrine/dbal": "^4.4.0", "doctrine/migrations": "^3.3.2", - "patchlevel/hydrator": "^1.8.0", + "patchlevel/hydrator": "^1.19.0", "patchlevel/worker": "^1.4.0", "psr/cache": "^2.0.0 || ^3.0.0", "psr/clock": "^1.0", diff --git a/composer.lock b/composer.lock index be435ed0..809b0b78 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "040555186133771a1ea827cd0aade8d4", + "content-hash": "cdaa799831fe19272dcf093cbd130298", "packages": [ { "name": "brick/math", diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 81ec8d23..2c549bb1 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -222,6 +222,18 @@ parameters: count: 1 path: src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php + - + message: '#^Parameter \#1 \$json of function json_decode expects string, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php + + - + message: '#^Parameter \#2 \$data of method Patchlevel\\Hydrator\\HydratorWithContext\:\:hydrate\(\) expects array\, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php + - message: '#^Parameter \#3 \$errorContext of class Patchlevel\\EventSourcing\\Subscription\\SubscriptionError constructor expects list\\}\>\}\>\|null, mixed given\.$#' identifier: argument.type diff --git a/src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php b/src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php new file mode 100644 index 00000000..6d015055 --- /dev/null +++ b/src/Subscription/StatefulSubscriber/DoctrineStatefulSubscriberStore.php @@ -0,0 +1,85 @@ +subscriberId($subscriber); + $data = $this->hydrator->extract($subscriber); + + $this->connection->insert( + $this->tableName, + [ + 'id' => $subscriberId, + 'state' => json_encode($data), + ], + ); + } + + public function load(StatefulSubscriber $subscriber): void + { + $subscriberId = $this->subscriberId($subscriber); + + $data = $this->connection->fetchAssociative( + 'SELECT * FROM ' . $this->tableName . ' WHERE id = ?', + [$subscriberId], + ); + + if (!$data) { + return; + } + + $this->hydrator->hydrate( + $subscriber::class, + json_decode($data['state'], true), + [HydratorWithContext::OBJECT_TO_POPULATE => $subscriber], + ); + } + + public function configureSchema(Schema $schema, Connection $connection): void + { + if (!DoctrineHelper::sameDatabase($this->connection, $connection)) { + return; + } + + $table = $schema->createTable($this->tableName); + + $table->addColumn('id', Types::STRING) + ->setLength(255) + ->setNotnull(true); + $table->addColumn('state', Types::JSON) + ->setNotnull(true); + + $table->setPrimaryKey(['id']); + } + + public function subscriberId(StatefulSubscriber $projection): string + { + return $this->subscriberMetadataFactory->metadata($projection::class)->id; + } +} diff --git a/src/Subscription/StatefulSubscriber/StatefulSubscriber.php b/src/Subscription/StatefulSubscriber/StatefulSubscriber.php new file mode 100644 index 00000000..d27f36d5 --- /dev/null +++ b/src/Subscription/StatefulSubscriber/StatefulSubscriber.php @@ -0,0 +1,38 @@ +store->load($this); + } + + public function beginBatch(): void + { + // do nothing + } + + public function commitBatch(): void + { + $this->store->store($this); + } + + public function rollbackBatch(): void + { + $this->store->load($this); + } + + public function forceCommit(): bool + { + return false; + } +} diff --git a/src/Subscription/StatefulSubscriber/StatefulSubscriberStore.php b/src/Subscription/StatefulSubscriber/StatefulSubscriberStore.php new file mode 100644 index 00000000..0dcf1d06 --- /dev/null +++ b/src/Subscription/StatefulSubscriber/StatefulSubscriberStore.php @@ -0,0 +1,12 @@ + */ + public array $profiles = []; + + #[Subscribe(ProfileCreated::class)] + public function handleProfileCreated(ProfileCreated $profileCreated): void + { + $this->profiles[$profileCreated->profileId->toString()] = $profileCreated->name; + } + + public function findById(ProfileId $id): string|null + { + return $this->profiles[$id->toString()] ?? null; + } +} diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 4e1684b2..a32f7658 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -33,6 +33,7 @@ use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; +use Patchlevel\EventSourcing\Subscription\StatefulSubscriber\DoctrineStatefulSubscriberStore; use Patchlevel\EventSourcing\Subscription\Status; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\LookupResolver; @@ -43,6 +44,7 @@ use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ErrorProducerWithSelfRecoverySubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\LookupSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\MigrateAggregateToStreamStoreSubscriber; +use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileInlineStatefulSubscriber; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileNewProjection; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProcessor; use Patchlevel\EventSourcing\Tests\Integration\Subscription\Subscriber\ProfileProjection; @@ -53,6 +55,7 @@ use function gc_collect_cycles; use function iterator_to_array; +use function json_decode; use function sprintf; #[CoversNothing] @@ -1632,6 +1635,132 @@ class { self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode()); } + public function testStatefulSubscriber(): void + { + $store = new DoctrineDbalStore( + $this->connection, + DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), + ); + + $clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00')); + + $subscriptionStore = new DoctrineSubscriptionStore( + $this->connection, + $clock, + ); + + $manager = new DefaultRepositoryManager( + new AggregateRootRegistry(['profile' => Profile::class]), + $store, + ); + + $repository = $manager->get(Profile::class); + + $stateStore = new DoctrineStatefulSubscriberStore($this->connection); + + $schemaDirector = new DoctrineSchemaDirector( + $this->connection, + new ChainDoctrineSchemaConfigurator([ + $store, + $subscriptionStore, + $stateStore, + ]), + ); + + $schemaDirector->create(); + + $subscriberRepository = new MetadataSubscriberAccessorRepository([ + new ProfileInlineStatefulSubscriber( + $stateStore, + ), + ]); + + $engine = new DefaultSubscriptionEngine( + new EventFilteredStoreMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), + $subscriptionStore, + $subscriberRepository, + ); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + RunMode::FromBeginning, + Status::Active, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $profileId = ProfileId::fromString('019d3991-e575-73b2-a18c-7da3fd7f5d70'); + $profile = Profile::create($profileId, 'John'); + $repository->save($profile); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); + + self::assertEquals( + [ + new Subscription( + 'profile_inline', + 'projector', + RunMode::FromBeginning, + Status::Active, + 1, + lastSavedAt: new DateTimeImmutable('2021-01-01T00:00:00'), + ), + ], + $engine->subscriptions(), + ); + + $result = $this->connection->fetchAssociative( + 'SELECT * FROM stateful_subscriber_state WHERE id = ?', + ['profile_inline'], + ); + + self::assertIsArray($result); + self::assertArrayHasKey('id', $result); + self::assertEquals('profile_inline', $result['id']); + + self::assertArrayHasKey('state', $result); + self::assertIsString($result['state']); + + self::assertEquals( + ['profiles' => ['019d3991-e575-73b2-a18c-7da3fd7f5d70' => 'John']], + json_decode($result['state'], true), + ); + + $projection = new ProfileInlineStatefulSubscriber( + $stateStore, + ); + + self::assertEquals(['019d3991-e575-73b2-a18c-7da3fd7f5d70' => 'John'], $projection->profiles); + } + /** @param list $subscriptions */ private static function findSubscription(array $subscriptions, string $id): Subscription { From d16bf78e36271da33a7a332a845fabedcdb20a6e Mon Sep 17 00:00:00 2001 From: David Badura Date: Thu, 16 Apr 2026 11:25:47 +0200 Subject: [PATCH 2/2] add lazy create helper method --- .../StatefulSubscriber/StatefulSubscriber.php | 17 +++++++++++++++++ .../Subscription/SubscriptionTest.php | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/Subscription/StatefulSubscriber/StatefulSubscriber.php b/src/Subscription/StatefulSubscriber/StatefulSubscriber.php index d27f36d5..f5a187fe 100644 --- a/src/Subscription/StatefulSubscriber/StatefulSubscriber.php +++ b/src/Subscription/StatefulSubscriber/StatefulSubscriber.php @@ -4,8 +4,12 @@ namespace Patchlevel\EventSourcing\Subscription\StatefulSubscriber; +use LogicException; use Patchlevel\EventSourcing\Subscription\Subscriber\BatchableSubscriber; use Patchlevel\Hydrator\Attribute\Ignore; +use ReflectionClass; + +use const PHP_VERSION_ID; abstract class StatefulSubscriber implements BatchableSubscriber { @@ -35,4 +39,17 @@ public function forceCommit(): bool { return false; } + + public static function createLazy(StatefulSubscriberStore $store): static + { + if (PHP_VERSION_ID < 80400) { + throw new LogicException('Lazy subscriber is only supported on PHP 8.4 or higher'); + } + + $reflection = new ReflectionClass(static::class); + + return $reflection->newLazyGhost(static function ($object) use ($store): void { + $object->__construct($store); + }); + } } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index a32f7658..1fe3ffdc 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -1670,7 +1670,7 @@ public function testStatefulSubscriber(): void $schemaDirector->create(); $subscriberRepository = new MetadataSubscriberAccessorRepository([ - new ProfileInlineStatefulSubscriber( + ProfileInlineStatefulSubscriber::createLazy( $stateStore, ), ]);