diff --git a/composer.json b/composer.json index 2aeb6883..b03ad562 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ "php": "~8.2.0 || ~8.3.0 || ~8.4.0 || ~8.5.0", "doctrine/dbal": "^4.4.0", "doctrine/migrations": "^3.3.2", - "patchlevel/hydrator": "^1.8.0", + "patchlevel/hydrator": "^1.23.0", "patchlevel/worker": "^1.4.0", "psr/cache": "^2.0.0 || ^3.0.0", "psr/clock": "^1.0", diff --git a/composer.lock b/composer.lock index c6f10c7f..0de823c1 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "040555186133771a1ea827cd0aade8d4", + "content-hash": "eca5f39c7de5df0fe77e710d800b5975", "packages": [ { "name": "brick/math", diff --git a/docs/pages/getting_started.md b/docs/pages/getting_started.md index 1e7fb6c9..8e6669cb 100644 --- a/docs/pages/getting_started.md +++ b/docs/pages/getting_started.md @@ -271,18 +271,9 @@ After we have defined everything, we still have to plug the whole thing together ```php use Doctrine\DBAL\DriverManager; use Doctrine\DBAL\Tools\DsnParser; -use Patchlevel\EventSourcing\Metadata\AggregateRoot\AttributeAggregateRootRegistryFactory; -use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; -use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; -use Patchlevel\EventSourcing\Store\DoctrineDbalStore; -use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; -use Patchlevel\EventSourcing\Subscription\Repository\RunSubscriptionEngineRepositoryManager; -use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; -use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; - -$connection = DriverManager::getConnection( - (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/app'), -); +use Patchlevel\EventSourcing\Container\Configuration; +use Patchlevel\EventSourcing\Container\Factory; +use Patchlevel\EventSourcing\Repository\RepositoryManager; $projectionConnection = DriverManager::getConnection( (new DsnParser())->parse('pdo-pgsql://user:secret@localhost/projection'), @@ -291,38 +282,18 @@ $projectionConnection = DriverManager::getConnection( /* your own mailer */ $mailer; -$serializer = DefaultEventSerializer::createFromPaths(['src/Domain/Hotel/Event']); -$aggregateRegistry = (new AttributeAggregateRootRegistryFactory())->create(['src/Domain/Hotel']); - -$eventStore = new DoctrineDbalStore( - $connection, - $serializer, -); - -$hotelProjector = new HotelProjector($projectionConnection); - -$subscriberRepository = new MetadataSubscriberAccessorRepository([ - $hotelProjector, - new SendCheckInEmailProcessor($mailer), -]); - -$subscriptionStore = new DoctrineSubscriptionStore($connection); - -$engine = new DefaultSubscriptionEngine( - $eventStore, - $subscriptionStore, - $subscriberRepository, -); - -$repositoryManager = new RunSubscriptionEngineRepositoryManager( - new DefaultRepositoryManager( - $aggregateRegistry, - $eventStore, - ), - $engine, -); - -$hotelRepository = $repositoryManager->get(Hotel::class); +$configuration = Configuration::createWithConnectionUrl('pdo-pgsql://user:secret@localhost/app') + ->withDefaultSettings( + ['src/Domain/Hotel'], + ['src/Domain/Hotel/Event'], + ) + ->withSubscribers([ + new HotelProjector($projectionConnection), + new SendCheckInEmailProcessor($mailer) + ]); +$container = Factory::create($configuration); + +$hotelRepository = $container->get(RepositoryManager::class)->get(Hotel::class); ``` !!! note diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 3764a738..d9a24e07 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -6,6 +6,102 @@ parameters: count: 1 path: src/Console/DoctrineHelper.php + - + message: '#^Anonymous function should return Doctrine\\DBAL\\Connection but returns mixed\.$#' + identifier: return.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Anonymous function should return Patchlevel\\EventSourcing\\EventBus\\EventBus but returns mixed\.$#' + identifier: return.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Anonymous function should return Patchlevel\\EventSourcing\\Store\\Store but returns mixed\.$#' + identifier: return.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Anonymous function should return Psr\\Log\\LoggerInterface but returns mixed\.$#' + identifier: return.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Console\\Command\\DatabaseCreateCommand constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Console\\Command\\DatabaseDropCommand constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Cryptography\\ExtensionDoctrineCipherKeyStore constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Schema\\DoctrineSchemaDirector constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Store\\StreamDoctrineDbalStore constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 2 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$connection of class Patchlevel\\EventSourcing\\Subscription\\Store\\DoctrineSubscriptionStore constructor expects Doctrine\\DBAL\\Connection, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#1 \$strategies of class Patchlevel\\EventSourcing\\Subscription\\RetryStrategy\\RetryStrategyRepository constructor expects array\, array\ given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#2 \$baseDelay of class Patchlevel\\EventSourcing\\Subscription\\RetryStrategy\\ClockBasedRetryStrategy constructor expects int, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#2 \$newStore of class Patchlevel\\EventSourcing\\Console\\Command\\StoreMigrateCommand constructor expects Patchlevel\\EventSourcing\\Store\\Store, object given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#3 \$delayFactor of class Patchlevel\\EventSourcing\\Subscription\\RetryStrategy\\ClockBasedRetryStrategy constructor expects float, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Parameter \#4 \$maxAttempts of class Patchlevel\\EventSourcing\\Subscription\\RetryStrategy\\ClockBasedRetryStrategy constructor expects int\<1, max\>, mixed given\.$#' + identifier: argument.type + count: 1 + path: src/Container/Factory.php + + - + message: '#^Strict comparison using \!\=\= between ''custom'' and ''custom'' will always evaluate to false\.$#' + identifier: notIdentical.alwaysFalse + count: 1 + path: src/Container/Factory.php + - message: '#^Parameter \#1 \$key of class Patchlevel\\Hydrator\\Cryptography\\Cipher\\CipherKey constructor expects non\-empty\-string, string given\.$#' identifier: argument.type @@ -246,18 +342,6 @@ parameters: count: 1 path: tests/Integration/BasicImplementation/BasicIntegrationTest.php - - - message: '#^Property Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\Profile\:\:\$id is never read, only written\.$#' - identifier: property.onlyWritten - count: 1 - path: tests/Integration/BasicImplementation/Profile.php - - - - message: '#^Property Patchlevel\\EventSourcing\\Tests\\Integration\\BasicImplementation\\ProfileWithCommands\:\:\$id is never read, only written\.$#' - identifier: property.onlyWritten - count: 1 - path: tests/Integration/BasicImplementation/ProfileWithCommands.php - - message: '#^Cannot access offset ''name'' on array\\|false\.$#' identifier: offsetAccess.nonOffsetAccessible @@ -372,6 +456,120 @@ parameters: count: 5 path: tests/Unit/CommandBus/InstantRetryCommandBusTest.php + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\DebugCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\DebugCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\ShowAggregateCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\ShowAggregateCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\ShowCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\ShowCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionBootCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionBootCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionPauseCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionPauseCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionReactivateCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionReactivateCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionRemoveCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionRemoveCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionRunCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionRunCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionSetupCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionSetupCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionStatusCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionStatusCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\SubscriptionTeardownCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\SubscriptionTeardownCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Console\\\\Command\\\\WatchCommand'' and Patchlevel\\EventSourcing\\Console\\Command\\WatchCommand will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Metadata\\\\AggregateRoot\\\\AggregateRootRegistry'' and Patchlevel\\EventSourcing\\Metadata\\AggregateRoot\\AggregateRootRegistry will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Metadata\\\\Event\\\\EventRegistry'' and Patchlevel\\EventSourcing\\Metadata\\Event\\EventRegistry will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Metadata\\\\Message\\\\MessageHeaderRegistry'' and Patchlevel\\EventSourcing\\Metadata\\Message\\MessageHeaderRegistry will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Repository\\\\MessageDecorator\\\\SplitStreamDecorator'' and Patchlevel\\EventSourcing\\Repository\\MessageDecorator\\SplitStreamDecorator will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Serializer\\\\DefaultEventSerializer'' and Patchlevel\\EventSourcing\\Serializer\\DefaultEventSerializer will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Store\\\\StreamDoctrineDbalStore'' and Patchlevel\\EventSourcing\\Store\\StreamDoctrineDbalStore will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 8 + path: tests/Unit/Container/FactoryTest.php + + - + message: '#^Call to static method PHPUnit\\Framework\\Assert\:\:assertInstanceOf\(\) with ''Patchlevel\\\\EventSourcing\\\\Subscription\\\\RetryStrategy\\\\RetryStrategyRepository'' and Patchlevel\\EventSourcing\\Subscription\\RetryStrategy\\RetryStrategyRepository will always evaluate to true\.$#' + identifier: staticMethod.alreadyNarrowedType + count: 1 + path: tests/Unit/Container/FactoryTest.php + - message: '#^Parameter \#1 \$data of static method Patchlevel\\EventSourcing\\Tests\\Unit\\Fixture\\Message\:\:fromArray\(\) expects array\{id\: string, text\: string, createdAt\: string\}, array\ given\.$#' identifier: argument.type diff --git a/src/Container/Configuration.php b/src/Container/Configuration.php new file mode 100644 index 00000000..7c357787 --- /dev/null +++ b/src/Container/Configuration.php @@ -0,0 +1,632 @@ + + * @phpstan-type StoreOptions array{ + * table_name?: string, + * aggregate_id_type?: 'string'|'uuid', + * locking?: bool, + * lock_id?: int, + * lock_timeout?: int, + * kernel_reset?: bool + * } + * @phpstan-type SnapshotAdapterMap array + * @phpstan-type SnapshotStoreDefinition array{type: 'psr6'|'psr16'|'custom', service: string} + * @phpstan-type SnapshotStoreDefinitions array + * @phpstan-type RetryStrategyType 'clock_based'|'no_retry'|'custom' + * @phpstan-type RetryStrategyDefinition array{ + * type: RetryStrategyType, + * service?: string, + * options?: array + * } + * @phpstan-type RetryStrategyDefinitions array + * @phpstan-type StoreMigrationConfig array{ + * enabled: bool, + * type: 'dbal_aggregate'|'dbal_stream'|'in_memory'|'custom', + * service: string|null, + * options: StoreOptions, + * translators: list + * } + */ +final class Configuration +{ + public const STORE_DBAL_STREAM = 'dbal_stream'; + public const STORE_IN_MEMORY = 'in_memory'; + public const STORE_CUSTOM = 'custom'; + + public const EVENT_BUS_DEFAULT = 'default'; + public const EVENT_BUS_PSR14 = 'psr14'; + public const EVENT_BUS_CUSTOM = 'custom'; + + public const SUBSCRIPTION_STORE_IN_MEMORY = 'in_memory'; + public const SUBSCRIPTION_STORE_DBAL = 'dbal'; + public const SUBSCRIPTION_STORE_STATIC_IN_MEMORY = 'static_in_memory'; + public const SUBSCRIPTION_STORE_CUSTOM = 'custom'; + + public const SUBSCRIPTION_RETRY_CLOCK_BASED = 'clock_based'; + public const SUBSCRIPTION_RETRY_NO_RETRY = 'no_retry'; + public const SUBSCRIPTION_RETRY_CUSTOM = 'custom'; + + /** @var list */ + public array $aggregates = []; + + /** @var list */ + public array $events = []; + + /** @var list */ + public array $headers = []; + + /** @var list */ + public array $listeners = []; + + /** @var ServiceMap */ + public array $services = []; + /** @var array */ + public array $parameters = []; + + /** @var StoreOptions */ + public array $storeOptions = []; + + /** @var SnapshotAdapterMap */ + public array $snapshotAdapters = []; + + /** @var list */ + public array $upcasters = []; + + /** @var list */ + public array $messageDecorators = []; + + /** @var list */ + public array $hydratorExtensions = []; + + /** @var list */ + public array $commandHandlerProviders = []; + + /** @var list */ + public array $queryHandlerProviders = []; + + /** @var list */ + public array $subscribers = []; + + /** @var list */ + public array $subscriptionArgumentResolvers = []; + + /** @var list */ + public array $subscriptionCleanupTaskHandlers = []; + + /** @var RetryStrategyDefinitions */ + public array $subscriptionRetryStrategyDefinitions = []; + + /** @var list */ + public array $storeMigrationTranslators = []; + public string $storeType = self::STORE_IN_MEMORY; + public bool $readOnlyStore = false; + public bool $eventBusEnabled = false; + public string $eventBusType = self::EVENT_BUS_DEFAULT; + public string|null $connectionService = null; + public string|null $storeService = null; + public string|null $eventBusService = null; + public string|null $clockService = null; + public DateTimeImmutable|null $frozenClock = null; + public string|null $loggerService = null; + public bool $hydratorDefaultLazy = false; + public bool $hydratorLifecycleEnabled = false; + public bool $hydratorStackCryptographyEnabled = false; + /** @var non-empty-string */ + public string $hydratorStackCryptographyAlgorithm = 'aes128'; + public bool $hydratorStackCryptographyLegacyMetadataMapping = true; + public string|null $hydratorStackCryptographyCipherKeyStoreService = null; + public bool $commandBusInstantRetry = false; + + /** @var positive-int */ + public int $commandBusInstantRetryDefaultMaxRetries = 3; + /** @var list> */ + public array $commandBusInstantRetryDefaultExceptions = [AggregateOutdated::class]; + public string $subscriptionStoreType = self::SUBSCRIPTION_STORE_IN_MEMORY; + public string|null $subscriptionStoreService = null; + public string $subscriptionStoreTableName = 'subscriptions'; + public string $subscriptionDefaultRetryStrategy = 'default'; + public bool $subscriptionThrowOnError = false; + public bool $subscriptionCatchUp = false; + public int|null $subscriptionCatchUpLimit = null; + /** @var list|null */ + public array|null $subscriptionRunAfterAggregateSaveIds = null; + /** @var list|null */ + public array|null $subscriptionRunAfterAggregateSaveGroups = null; + /** @var positive-int|null */ + public int|null $subscriptionRunAfterAggregateSaveLimit = null; + public bool $subscriptionGapDetection = false; + /** @var list */ + public array $subscriptionGapDetectionRetriesInMs = [0, 5, 50, 500]; + public DateInterval|null $subscriptionGapDetectionWindow = null; + public bool $migrationEnabled = false; + public string $migrationNamespace = 'EventSourcingMigrations'; + public string $migrationPath = 'migrations'; + public bool $storeMigrationEnabled = false; + public string $storeMigrationType = self::STORE_IN_MEMORY; + public string|null $storeMigrationService = null; + /** @var StoreOptions */ + public array $storeMigrationOptions = []; + + /** @var array */ + public array $guesser = []; + public string|null $connectionUrl = null; + public bool $dedicatedProjectionConnection = false; + public bool $subscriptionRunAfterAggregateSaveEnabled = false; + + public static function createWithConnectionUrl(string $connectionUrl): self + { + return (new self()) + ->withConnectionUrl($connectionUrl) + ->withStreamStore() + ->withSubscriptionDBALStore(); + } + + public static function createWithConnectionService(object $connectionService): self + { + return (new self()) + ->withService('connection.service', $connectionService) + ->withConnectionService('connection.service') + ->withStreamStore() + ->withSubscriptionDBALStore(); + } + + /** + * @param array $aggregatePaths + * @param array $eventPaths + */ + public function withDefaultSettings( + array $aggregatePaths = [], + array $eventPaths = [], + ): self { + return clone $this + ->withAggregates(...$aggregatePaths) + ->withEvents(...$eventPaths) + ->withHydratorStackCryptography() + ->withLazyHydrator() + ->withHydratorLifecycle() + ->withCommandBusInstantRetry() + ->withSubscriptionGapDetection(); + } + + public function withService(string $id, object $service): self + { + $newConfiguration = clone $this; + $newConfiguration->services[$id] = $service; + + return $newConfiguration; + } + + public function withParameter(string $id, mixed $parameter): self + { + $newConfiguration = clone $this; + $newConfiguration->parameters[$id] = $parameter; + + return $newConfiguration; + } + + public function withAggregates(string ...$aggregates): self + { + $newConfiguration = clone $this; + $newConfiguration->aggregates = array_values($aggregates); + + return $newConfiguration; + } + + public function withEvents(string ...$events): self + { + $newConfiguration = clone $this; + $newConfiguration->events = array_values($events); + + return $newConfiguration; + } + + public function withHeaders(string ...$headers): self + { + $newConfiguration = clone $this; + $newConfiguration->headers = array_values($headers); + + return $newConfiguration; + } + + /** @param StoreOptions $options */ + public function withStreamStore(array $options = [], bool $readOnly = false): self + { + $newConfiguration = clone $this; + $newConfiguration->storeType = self::STORE_DBAL_STREAM; + $newConfiguration->storeOptions = $options; + $newConfiguration->readOnlyStore = $readOnly; + + return $newConfiguration; + } + + /** @param StoreOptions $options */ + public function withInMemoryStore(array $options = []): self + { + $newConfiguration = clone $this; + $newConfiguration->storeType = self::STORE_IN_MEMORY; + $newConfiguration->storeOptions = $options; + + return $newConfiguration; + } + + public function withCustomStore(string $store): self + { + $newConfiguration = clone $this; + $newConfiguration->storeType = self::STORE_CUSTOM; + $newConfiguration->storeService = $store; + + return $newConfiguration; + } + + public function withConnectionUrl(string $url): self + { + $newConfiguration = clone $this; + $newConfiguration->connectionUrl = $url; + + return $newConfiguration; + } + + public function withConnectionService(string|null $connectionService): self + { + $newConfiguration = clone $this; + $newConfiguration->connectionService = $connectionService; + $newConfiguration->connectionUrl = null; + $newConfiguration->dedicatedProjectionConnection = false; + + return $newConfiguration; + } + + public function withDedicatedProjectionConnection(): self + { + $newConfiguration = clone $this; + $newConfiguration->dedicatedProjectionConnection = true; + + return $newConfiguration; + } + + public function withEventBus(string $type = self::EVENT_BUS_DEFAULT): self + { + $newConfiguration = clone $this; + $newConfiguration->eventBusEnabled = true; + $newConfiguration->eventBusType = $type; + + return $newConfiguration; + } + + public function withEventBusService(string|null $eventBusService): self + { + $newConfiguration = clone $this; + $newConfiguration->eventBusEnabled = true; + $newConfiguration->eventBusService = $eventBusService; + + return $newConfiguration; + } + + public function withClockService(string|null $clockService): self + { + $newConfiguration = clone $this; + $newConfiguration->clockService = $clockService; + + return $newConfiguration; + } + + public function withFrozenClock(DateTimeImmutable $frozenClock): self + { + $newConfiguration = clone $this; + $newConfiguration->frozenClock = $frozenClock; + + return $newConfiguration; + } + + public function withLoggerService(string|null $loggerService): self + { + $newConfiguration = clone $this; + $newConfiguration->loggerService = $loggerService; + + return $newConfiguration; + } + + /** @param list $upcasters */ + public function withUpcasters(array $upcasters): self + { + $newConfiguration = clone $this; + $newConfiguration->upcasters = $upcasters; + + return $newConfiguration; + } + + /** @param list $messageDecorators */ + public function withMessageDecorators(array $messageDecorators): self + { + $newConfiguration = clone $this; + $newConfiguration->messageDecorators = $messageDecorators; + + return $newConfiguration; + } + + /** @param list $hydratorExtensions */ + public function withHydratorExtensions(array $hydratorExtensions): self + { + $newConfiguration = clone $this; + $newConfiguration->hydratorExtensions = $hydratorExtensions; + + return $newConfiguration; + } + + /** @param SnapshotAdapterMap $snapshotAdapters */ + public function withSnapshotAdapters(array $snapshotAdapters): self + { + $newConfiguration = clone $this; + $newConfiguration->snapshotAdapters = $snapshotAdapters; + + return $newConfiguration; + } + + public function withLazyHydrator(): self + { + $newConfiguration = clone $this; + $newConfiguration->hydratorDefaultLazy = true; + + return $newConfiguration; + } + + public function withHydratorLifecycle(): self + { + $newConfiguration = clone $this; + $newConfiguration->hydratorLifecycleEnabled = true; + + return $newConfiguration; + } + + public function withHydratorStackCryptography( + bool $enabled = true, + string $algorithm = 'aes128', + bool $legacyMetadataMapping = true, + string|null $cipherKeyStoreService = null, + ): self { + $newConfiguration = clone $this; + $newConfiguration->hydratorStackCryptographyEnabled = $enabled; + $newConfiguration->hydratorStackCryptographyAlgorithm = $algorithm === '' ? 'aes128' : $algorithm; + $newConfiguration->hydratorStackCryptographyLegacyMetadataMapping = $legacyMetadataMapping; + $newConfiguration->hydratorStackCryptographyCipherKeyStoreService = $cipherKeyStoreService; + + return $newConfiguration; + } + + /** @param list $commandHandlerProviders */ + public function withCommandHandlerProviders(array $commandHandlerProviders): self + { + $newConfiguration = clone $this; + $newConfiguration->commandHandlerProviders = $commandHandlerProviders; + + return $newConfiguration; + } + + /** + * @param positive-int $defaultMaxRetries + * @param list> $defaultExceptions + */ + public function withCommandBusInstantRetry( + int $defaultMaxRetries = 3, + array $defaultExceptions = [AggregateOutdated::class], + ): self { + $newConfiguration = clone $this; + $newConfiguration->commandBusInstantRetry = true; + $newConfiguration->commandBusInstantRetryDefaultMaxRetries = $defaultMaxRetries; + $newConfiguration->commandBusInstantRetryDefaultExceptions = $defaultExceptions; + + return $newConfiguration; + } + + /** @param list $queryHandlerProviders */ + public function withQueryHandlerProviders(array $queryHandlerProviders): self + { + $newConfiguration = clone $this; + $newConfiguration->queryHandlerProviders = $queryHandlerProviders; + + return $newConfiguration; + } + + /** @param list $subscribers */ + public function withSubscribers(array $subscribers): self + { + $newConfiguration = clone $this; + $newConfiguration->subscribers = $subscribers; + + return $newConfiguration; + } + + /** @param list $subscriptionArgumentResolvers */ + public function withSubscriptionArgumentResolvers(array $subscriptionArgumentResolvers): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionArgumentResolvers = $subscriptionArgumentResolvers; + + return $newConfiguration; + } + + /** @param list $subscriptionCleanupTaskHandlers */ + public function withSubscriptionCleanupTaskHandlers(array $subscriptionCleanupTaskHandlers): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionCleanupTaskHandlers = $subscriptionCleanupTaskHandlers; + + return $newConfiguration; + } + + /** @param positive-int $maxAttempts */ + public function withSubscriptionRetryDefaults( + int $baseDelay = 5, + float $delayFactor = 2.0, + int $maxAttempts = 5, + ): self { + return (clone $this)->withSubscriptionRetryStrategyDefinitions([ + 'default' => [ + 'type' => 'clock_based', + 'options' => [ + 'base_delay' => $baseDelay, + 'delay_factor' => $delayFactor, + 'max_attempts' => $maxAttempts, + ], + ], + ]); + } + + /** @param RetryStrategyDefinitions $definitions */ + public function withSubscriptionRetryStrategyDefinitions( + array $definitions, + string $defaultRetryStrategy = 'default', + ): self { + $newConfiguration = clone $this; + $newConfiguration->subscriptionRetryStrategyDefinitions = $definitions; + $newConfiguration->subscriptionDefaultRetryStrategy = $defaultRetryStrategy; + + return $newConfiguration; + } + + public function withSubscriptionDBALStore(string $tableName = 'subscriptions'): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionStoreType = self::SUBSCRIPTION_STORE_DBAL; + $newConfiguration->subscriptionStoreService = null; + $newConfiguration->subscriptionStoreTableName = $tableName; + + return $newConfiguration; + } + + public function withSubscriptionInMemoryStore(string $tableName = 'subscriptions'): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionStoreType = self::SUBSCRIPTION_STORE_IN_MEMORY; + $newConfiguration->subscriptionStoreService = null; + $newConfiguration->subscriptionStoreTableName = $tableName; + + return $newConfiguration; + } + + public function withSubscriptionStaticInMemoryStore(string $tableName = 'subscriptions'): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionStoreType = self::SUBSCRIPTION_STORE_STATIC_IN_MEMORY; + $newConfiguration->subscriptionStoreService = null; + $newConfiguration->subscriptionStoreTableName = $tableName; + + return $newConfiguration; + } + + public function withSubscriptionCustomStore(string $storeService): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionStoreType = self::SUBSCRIPTION_STORE_CUSTOM; + $newConfiguration->subscriptionStoreService = $storeService; + + return $newConfiguration; + } + + public function withSubscriptionEngineThrowOnError(): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionThrowOnError = true; + + return $newConfiguration; + } + + public function withSubscriptionEngineCatchUp(int|null $catchUpLimit = null): self + { + $newConfiguration = clone $this; + $newConfiguration->subscriptionCatchUp = true; + $newConfiguration->subscriptionCatchUpLimit = $catchUpLimit; + + return $newConfiguration; + } + + /** + * @param list|null $ids + * @param list|null $groups + * @param positive-int|null $limit + */ + public function withRunSubscriptionsAfterAggregateSave( + array|null $ids = null, + array|null $groups = null, + int|null $limit = null, + ): self { + $newConfiguration = clone $this; + $newConfiguration->subscriptionRunAfterAggregateSaveEnabled = true; + $newConfiguration->subscriptionRunAfterAggregateSaveIds = $ids; + $newConfiguration->subscriptionRunAfterAggregateSaveGroups = $groups; + $newConfiguration->subscriptionRunAfterAggregateSaveLimit = $limit; + + return $newConfiguration; + } + + /** @param list $retriesInMs */ + public function withSubscriptionGapDetection( + array $retriesInMs = [0, 5, 50, 500], + DateInterval|null $detectionWindow = null, + ): self { + $newConfiguration = clone $this; + $newConfiguration->subscriptionGapDetection = true; + $newConfiguration->subscriptionGapDetectionRetriesInMs = $retriesInMs; + $newConfiguration->subscriptionGapDetectionWindow = $detectionWindow; + + return $newConfiguration; + } + + public function withMigration( + bool $enabled = true, + string $namespace = 'EventSourcingMigrations', + string $path = 'migrations', + ): self { + $newConfiguration = clone $this; + $newConfiguration->migrationEnabled = $enabled; + $newConfiguration->migrationNamespace = $namespace; + $newConfiguration->migrationPath = $path; + + return $newConfiguration; + } + + /** + * @param StoreOptions $options + * @param list $translators + */ + public function withStoreMigration( + bool $enabled = true, + string $type = self::STORE_IN_MEMORY, + string|null $service = null, + array $options = [], + array $translators = [], + ): self { + $newConfiguration = clone $this; + $newConfiguration->storeMigrationEnabled = $enabled; + $newConfiguration->storeMigrationType = $type; + $newConfiguration->storeMigrationService = $service; + /** @var StoreOptions $migrationOptions */ + $migrationOptions = $options; + $newConfiguration->storeMigrationOptions = $migrationOptions; + $newConfiguration->storeMigrationTranslators = $translators; + + return $newConfiguration; + } +} diff --git a/src/Container/Container.php b/src/Container/Container.php new file mode 100644 index 00000000..af899ad6 --- /dev/null +++ b/src/Container/Container.php @@ -0,0 +1,120 @@ + + * @phpstan-type FactoryMap array + * @phpstan-type AliasMap array + */ +final class Container implements ContainerInterface +{ + /** @var array */ + private array $building = []; + + /** + * @param ServiceMap $services + * @param FactoryMap $factories + * @param AliasMap $aliases + */ + public function __construct( + private array $services = [], + private array $factories = [], + private array $aliases = [], + ) { + } + + /** + * @param string|class-string $id + * + * @return ($id is class-string ? T : object) + * + * @template T of object + */ + public function get(string $id): mixed + { + $id = $this->resolveAlias($id); + + if (array_key_exists($id, $this->services)) { + return $this->services[$id]; + } + + if (!array_key_exists($id, $this->factories)) { + throw new ServiceNotFound($id); + } + + if (array_key_exists($id, $this->building)) { + throw new ServiceCreationFailed(sprintf('Circular service reference detected for "%s".', $id)); + } + + try { + $this->building[$id] = true; + $this->services[$id] = ($this->factories[$id])($this); + } catch (ServiceNotFound | ServiceCreationFailed $exception) { + throw $exception; + } catch (Throwable $exception) { + throw new ServiceCreationFailed( + sprintf('Could not create service "%s": %s', $id, $exception->getMessage()), + 0, + $exception, + ); + } finally { + unset($this->building[$id]); + } + + return $this->services[$id]; + } + + public function has(string $id): bool + { + $id = $this->resolveAlias($id); + + return array_key_exists($id, $this->services) || array_key_exists($id, $this->factories); + } + + private function resolveAlias(string $id): string + { + $resolved = $id; + + while (array_key_exists($resolved, $this->aliases)) { + $resolved = $this->aliases[$resolved]; + } + + return $resolved; + } + + public function bind(string $id, object|callable $service): void + { + if (is_callable($service)) { + $this->factories[$id] = $service; + + return; + } + + $this->services[$id] = $service; + } + + public function alias(string $id, string $alias): void + { + $this->aliases[$id] = $alias; + } + + public function decorate(string $id, string $alias, callable $service): void + { + $innerAlias = $this->resolveAlias($id); + $this->bind( + $alias, + static fn (ContainerInterface $container) => $service($container, $container->get($innerAlias)), + ); + $this->alias($id, $alias); + } +} diff --git a/src/Container/Factory.php b/src/Container/Factory.php new file mode 100644 index 00000000..bc4af2a5 --- /dev/null +++ b/src/Container/Factory.php @@ -0,0 +1,1123 @@ + + * @phpstan-type AliasMap array + * @phpstan-type FactoryMap array + */ +final class Factory +{ + public const CONNECTION_ID = 'event_sourcing.dbal_connection'; + public const PUBLIC_CONNECTION_ID = 'event_sourcing.dbal_public_connection'; + + public const NEW_STORE_ID = 'event_sourcing.store.new_store'; + + public static function create(Configuration $configuration): Container + { + $container = new Container(); + + foreach ($configuration->services as $id => $service) { + $container->bind($id, $service); + } + + foreach ($configuration->parameters as $id => $parameter) { + $container->bind($id, static fn () => $parameter); + } + + self::configureClock($configuration, $container); + self::configureConnection($configuration, $container); + self::configureHydrator($configuration, $container); + self::configureUpcaster($configuration, $container); + self::configureSerializer($configuration, $container); + self::configureMessageDecorator($configuration, $container); + self::configureCommandBus($configuration, $container); + self::configureEventBus($configuration, $container); + self::configureQueryBus($configuration, $container); + self::configureLogger($configuration, $container); + self::configureStore($configuration, $container); + self::configureSnapshots($configuration, $container); + self::configureAggregates($configuration, $container); + self::configureCommands($configuration, $container); + self::configureSchema($configuration, $container); + self::configureMessageLoader($configuration, $container); + self::configureSubscription($configuration, $container); + // self::configureMigration($configuration, $container); @todo + self::configureStoreMigration($configuration, $container); + + return $container; + } + + private static function configureClock(Configuration $configuration, Container $container): void + { + $container->bind(SystemClock::class, new SystemClock()); + $container->alias(ClockInterface::class, SystemClock::class); + + if ($configuration->clockService !== null) { + $container->alias(ClockInterface::class, 'event_sourcing.clock'); + + return; + } + + if ($configuration->frozenClock === null) { + return; + } + + $container->bind(FrozenClock::class, new FrozenClock($configuration->frozenClock)); + $container->alias(ClockInterface::class, FrozenClock::class); + } + + private static function configureHydrator(Configuration $configuration, Container $container): void + { + if ($configuration->hydratorStackCryptographyEnabled) { + $container->bind( + ExtensionDoctrineCipherKeyStore::class, + new ExtensionDoctrineCipherKeyStore($container->get(self::CONNECTION_ID)), + ); + $container->alias(CipherKeyStore::class, ExtensionDoctrineCipherKeyStore::class); + + $container->bind( + BaseCryptographer::class, + static function (Container $container) use ($configuration): BaseCryptographer { + return BaseCryptographer::createWithOpenssl( + $container->get(CipherKeyStore::class), + $configuration->hydratorStackCryptographyAlgorithm, + ); + }, + ); + $container->alias(Cryptographer::class, BaseCryptographer::class); + } + + $container->bind( + StackHydrator::class, + static function (Container $container) use ($configuration): StackHydrator { + $builder = new StackHydratorBuilder(); + + if ($configuration->hydratorStackCryptographyEnabled) { + $builder->useExtension(new CryptographyExtension( + $container->get(Cryptographer::class), + $container->has(PayloadCryptographer::class) ? $container->get(PayloadCryptographer::class) : null, + $configuration->hydratorStackCryptographyLegacyMetadataMapping, + )); + } + + foreach ($configuration->hydratorExtensions as $extension) { + $builder->useExtension($extension); + } + + $builder->useExtension(new CoreExtension()); + + if ($configuration->hydratorLifecycleEnabled) { + $builder->useExtension(new LifecycleExtension()); + } + + $builder->enableDefaultLazy($configuration->hydratorDefaultLazy); + + return $builder->build(); + }, + ); + $container->alias(Hydrator::class, StackHydrator::class); + } + + private static function configureUpcaster(Configuration $configuration, Container $container): void + { + $container->bind( + UpcasterChain::class, + static fn (): UpcasterChain => new UpcasterChain($configuration->upcasters), + ); + $container->alias(Upcaster::class, UpcasterChain::class); + } + + private static function configureSerializer(Configuration $configuration, Container $container): void + { + $container->bind( + EventRegistry::class, + (new AttributeEventRegistryFactory())->create($configuration->events), + ); + + $container->bind(AttributeEventMetadataFactory::class, new AttributeEventMetadataFactory()); + $container->alias(EventMetadataFactory::class, AttributeEventMetadataFactory::class); + + $container->bind(JsonEncoder::class, new JsonEncoder()); + $container->alias(Encoder::class, JsonEncoder::class); + + $container->bind( + DefaultEventSerializer::class, + static fn (Container $container): DefaultEventSerializer => new DefaultEventSerializer( + $container->get(EventRegistry::class), + $container->get(Hydrator::class), + $container->get(Encoder::class), + $container->get(Upcaster::class), + ), + ); + $container->alias(EventSerializer::class, DefaultEventSerializer::class); + + $container->bind(AttributeMessageHeaderRegistryFactory::class, new AttributeMessageHeaderRegistryFactory()); + $container->alias(MessageHeaderRegistryFactory::class, AttributeMessageHeaderRegistryFactory::class); + + $container->bind( + MessageHeaderRegistry::class, + static function (Container $container) use ($configuration): MessageHeaderRegistry { + return $container->get(MessageHeaderRegistryFactory::class)->create($configuration->headers); + }, + ); + + $container->bind( + DefaultHeadersSerializer::class, + static function (Container $container): DefaultHeadersSerializer { + return new DefaultHeadersSerializer( + $container->get(MessageHeaderRegistry::class), + $container->get(Hydrator::class), + $container->get(Encoder::class), + ); + }, + ); + $container->alias(HeadersSerializer::class, DefaultHeadersSerializer::class); + } + + private static function configureMessageDecorator(Configuration $configuration, Container $container): void + { + $container->bind( + SplitStreamDecorator::class, + static function (Container $container): SplitStreamDecorator { + return new SplitStreamDecorator($container->get(EventMetadataFactory::class)); + }, + ); + + $container->bind( + ChainMessageDecorator::class, + static function (Container $container) use ($configuration): ChainMessageDecorator { + return new ChainMessageDecorator([ + $container->get(SplitStreamDecorator::class), + ...$configuration->messageDecorators, + ]); + }, + ); + $container->alias(MessageDecorator::class, ChainMessageDecorator::class); + } + + private static function configureCommandBus(Configuration $configuration, Container $container): void + { + $container->bind( + ChainHandlerProvider::class, + static function (Container $container) use ($configuration): ChainHandlerProvider { + $aggregateHandlerProvider = new AggregateHandlerProvider( + $container->get(AggregateRootRegistry::class), + $container->get(RepositoryManager::class), + $container, + ); + + return new ChainHandlerProvider([ + $aggregateHandlerProvider, + ...$configuration->commandHandlerProviders, + ]); + }, + ); + $container->alias(HandlerProvider::class, ChainHandlerProvider::class); + + $container->bind( + SyncCommandBus::class, + static fn (Container $container, + ): SyncCommandBus => new SyncCommandBus($container->get(HandlerProvider::class)), + ); + $container->alias(CommandBus::class, SyncCommandBus::class); + + if (!$configuration->commandBusInstantRetry) { + return; + } + + $container->decorate( + CommandBus::class, + InstantRetryCommandBus::class, + static function (Container $container, CommandBus $inner) use ($configuration): InstantRetryCommandBus { + return new InstantRetryCommandBus( + $inner, + $configuration->commandBusInstantRetryDefaultMaxRetries, + $configuration->commandBusInstantRetryDefaultExceptions, + ); + }, + ); + } + + private static function configureEventBus(Configuration $configuration, Container $container): void + { + if ($configuration->eventBusEnabled === false) { + return; + } + + if ($configuration->eventBusType === Configuration::EVENT_BUS_DEFAULT) { + $container->bind( + AttributeListenerProvider::class, + new AttributeListenerProvider($configuration->listeners), + ); + $container->alias(ListenerProvider::class, AttributeListenerProvider::class); + + $container->bind( + DefaultConsumer::class, + static function (Container $container): DefaultConsumer { + return new DefaultConsumer( + $container->get(ListenerProvider::class), + $container->has(LoggerInterface::class) ? $container->get(LoggerInterface::class) : null, + ); + }, + ); + $container->alias(Consumer::class, DefaultConsumer::class); + + $container->bind( + DefaultEventBus::class, + static function (Container $container): DefaultEventBus { + return new DefaultEventBus( + $container->get(Consumer::class), + $container->has(LoggerInterface::class) ? $container->get(LoggerInterface::class) : null, + ); + }, + ); + $container->alias(EventBus::class, DefaultEventBus::class); + + return; + } + + if ($configuration->eventBusType === Configuration::EVENT_BUS_PSR14) { + $container->bind( + Psr14EventBus::class, + static function (Container $container): Psr14EventBus { + return new Psr14EventBus($container->get(\Psr\EventDispatcher\EventDispatcherInterface::class)); + }, + ); + $container->alias(EventBus::class, Psr14EventBus::class); + + return; + } + + if ($configuration->eventBusType === Configuration::EVENT_BUS_CUSTOM) { + if ($configuration->eventBusService === null) { + throw new InvalidArgumentException('Custom event bus type requires an event bus service id.'); + } + + $container->bind( + $configuration->eventBusService, + static function (Container $container) use ($configuration): EventBus { + return $container->get($configuration->eventBusService); + }, + ); + $container->alias(EventBus::class, $configuration->eventBusService); + + return; + } + + throw new InvalidArgumentException(sprintf('Unknown event bus type "%s".', $configuration->eventBusType)); + } + + private static function configureQueryBus(Configuration $configuration, Container $container): void + { + $container->bind( + \Patchlevel\EventSourcing\QueryBus\ChainHandlerProvider::class, + static function (Container $container) use ($configuration, + ): \Patchlevel\EventSourcing\QueryBus\ChainHandlerProvider { + $serviceHandlerProvider = new ServiceHandlerProvider($configuration->subscribers); + + return new \Patchlevel\EventSourcing\QueryBus\ChainHandlerProvider([ + $serviceHandlerProvider, + ...$configuration->queryHandlerProviders, + ]); + }, + ); + $container->alias( + \Patchlevel\EventSourcing\QueryBus\HandlerProvider::class, + \Patchlevel\EventSourcing\QueryBus\ChainHandlerProvider::class, + ); + + $container->bind( + SyncQueryBus::class, + static fn (Container $container, + ) => new SyncQueryBus($container->get(\Patchlevel\EventSourcing\QueryBus\HandlerProvider::class)), + ); + $container->alias(QueryBus::class, SyncQueryBus::class); + } + + private static function configureConnection(Configuration $configuration, Container $container): void + { + if ($configuration->connectionUrl !== null) { + $factory = new class { + /** + * Mapping was taken from Doctrine Bundle. + * + * @see https://github.com/doctrine/DoctrineBundle/blob/7564fa72ab4a87316660347ccd226cefc8fb0ea9/src/ConnectionFactory.php#L35 + */ + private const DEFAULT_SCHEME_MAP = [ + 'db2' => 'ibm_db2', + 'mssql' => 'pdo_sqlsrv', + 'mysql' => 'pdo_mysql', + 'mysql2' => 'pdo_mysql', // Amazon RDS, for some weird reason + 'postgres' => 'pdo_pgsql', + 'postgresql' => 'pdo_pgsql', + 'pgsql' => 'pdo_pgsql', + 'sqlite' => 'pdo_sqlite', + 'sqlite3' => 'pdo_sqlite', + ]; + + public static function createConnection(string $url): Connection + { + return DriverManager::getConnection((new DsnParser(self::DEFAULT_SCHEME_MAP))->parse($url)); + } + }; + + $container->bind( + self::CONNECTION_ID, + static fn (): Connection => $factory::createConnection($configuration->connectionUrl), + ); + + if ($configuration->dedicatedProjectionConnection) { + $container->bind( + self::PUBLIC_CONNECTION_ID, + static fn (): Connection => $factory::createConnection($configuration->connectionUrl), + ); + + $container->alias(Connection::class, self::PUBLIC_CONNECTION_ID); + } + + return; + } + + if ($configuration->connectionService !== null) { + if ($configuration->dedicatedProjectionConnection) { + throw new InvalidArgumentException('Providing dedicated connection is only possible with url'); + } + + $container->bind( + self::CONNECTION_ID, + static function (Container $container) use ($configuration): Connection { + return $container->get($configuration->connectionService); + }, + ); + $container->alias(Connection::class, self::CONNECTION_ID); + + return; + } + + throw new InvalidArgumentException('Connection service or url is required'); + } + + private static function configureLogger(Configuration $configuration, Container $container): void + { + if ($configuration->loggerService === null) { + return; + } + + $container->bind( + LoggerInterface::class, + static fn (Container $container): LoggerInterface => $container->get($configuration->loggerService), + ); + } + + private static function configureStore(Configuration $configuration, Container $container): void + { + if ($configuration->storeType === $configuration::STORE_CUSTOM) { + if ($configuration->storeService === null) { + throw new InvalidArgumentException('Custom store type requires a store service id.'); + } + + $container->alias(Store::class, $configuration->storeService); + + return; + } + + if ($configuration->storeType === $configuration::STORE_IN_MEMORY) { + $container->bind( + InMemoryStore::class, + static function (Container $container): InMemoryStore { + return new InMemoryStore( + [], + $container->get(EventRegistry::class), + $container->get(ClockInterface::class), + ); + }, + ); + $container->alias(Store::class, InMemoryStore::class); + + return; + } + + if ($configuration->storeType === $configuration::STORE_DBAL_STREAM) { + $container->bind( + StreamDoctrineDbalStore::class, + static function (Container $container) use ($configuration): StreamDoctrineDbalStore { + return new StreamDoctrineDbalStore( + $container->get(self::CONNECTION_ID), + $container->get(EventSerializer::class), + $container->get(HeadersSerializer::class), + $container->get(ClockInterface::class), + $configuration->storeOptions, + ); + }, + ); + $container->alias(Store::class, StreamDoctrineDbalStore::class); + + if ($configuration->readOnlyStore) { + $container->decorate( + Store::class, + StreamReadOnlyStore::class, + static function (Container $container, StreamStore $inner) { + return new StreamReadOnlyStore( + $inner, + $container->has(LoggerInterface::class) ? $container->get(LoggerInterface::class) : null, + ); + }, + ); + } + + return; + } + + throw new InvalidArgumentException(sprintf('Unknown store type "%s".', $configuration->storeType)); + } + + private static function configureSnapshots(Configuration $configuration, Container $container): void + { + if ($configuration->snapshotAdapters === []) { + return; + } + + $container->bind(AdapterRepository::class, new ArrayAdapterRepository($configuration->snapshotAdapters)); + + $container->bind( + DefaultSnapshotStore::class, + static fn (Container $container): DefaultSnapshotStore => new DefaultSnapshotStore( + $container->get(AdapterRepository::class), + $container->get(Hydrator::class), + $container->get(AggregateRootMetadataFactory::class), + ), + ); + $container->alias(SnapshotStore::class, DefaultSnapshotStore::class); + } + + private static function configureAggregates(Configuration $configuration, Container $container): void + { + $container->bind( + AggregateRootMetadataAwareMetadataFactory::class, + new AggregateRootMetadataAwareMetadataFactory(), + ); + $container->alias(AggregateRootMetadataFactory::class, AggregateRootMetadataAwareMetadataFactory::class); + + $container->bind( + AggregateRootRegistry::class, + (new AttributeAggregateRootRegistryFactory())->create($configuration->aggregates), + ); + + $container->bind( + DefaultRepositoryManager::class, + static function (Container $container): DefaultRepositoryManager { + return new DefaultRepositoryManager( + $container->get(AggregateRootRegistry::class), + $container->get(Store::class), + $container->has(EventBus::class) ? $container->get(EventBus::class) : null, + $container->has(SnapshotStore::class) ? $container->get(SnapshotStore::class) : null, + $container->get(MessageDecorator::class), + $container->get(ClockInterface::class), + $container->get(AggregateRootMetadataFactory::class), + $container->has(LoggerInterface::class) ? $container->get(LoggerInterface::class) : null, + ); + }, + ); + $container->alias(RepositoryManager::class, DefaultRepositoryManager::class); + } + + private static function configureCommands(Configuration $configuration, Container $container): void + { + $container->bind( + ShowCommand::class, + static function (Container $container) { + return new ShowCommand( + $container->get(Store::class), + $container->get(EventSerializer::class), + $container->get(HeadersSerializer::class), + ); + }, + ); + $container->bind( + ShowAggregateCommand::class, + static function (Container $container) { + return new ShowAggregateCommand( + $container->get(Store::class), + $container->get(EventSerializer::class), + $container->get(HeadersSerializer::class), + $container->get(AggregateRootRegistry::class), + $container->get(AggregateRootMetadataFactory::class), + ); + }, + ); + $container->bind( + WatchCommand::class, + static function (Container $container) { + return new WatchCommand( + $container->get(Store::class), + $container->get(EventSerializer::class), + $container->get(HeadersSerializer::class), + ); + }, + ); + $container->bind( + DebugCommand::class, + static function (Container $container) { + return new DebugCommand( + $container->get(AggregateRootRegistry::class), + $container->get(EventRegistry::class), + $container->get(SubscriberAccessorRepository::class), + ); + }, + ); + $container->bind( + SubscriptionSetupCommand::class, + static function (Container $container) { + return new SubscriptionSetupCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + $container->bind( + SubscriptionBootCommand::class, + static function (Container $container) { + return new SubscriptionBootCommand( + $container->get(SubscriptionEngine::class), + $container->has(EventDispatcherInterface::class) ? $container->get(EventDispatcherInterface::class) : null, + ); + }, + ); + $container->bind( + SubscriptionRunCommand::class, + static function (Container $container) { + return new SubscriptionRunCommand( + $container->get(SubscriptionEngine::class), + $container->get(Store::class), + $container->has(EventDispatcherInterface::class) ? $container->get(EventDispatcherInterface::class) : null, + ); + }, + ); + $container->bind( + SubscriptionTeardownCommand::class, + static function (Container $container) { + return new SubscriptionTeardownCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + $container->bind( + SubscriptionRemoveCommand::class, + static function (Container $container) { + return new SubscriptionRemoveCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + $container->bind( + SubscriptionStatusCommand::class, + static function (Container $container) { + return new SubscriptionStatusCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + $container->bind( + SubscriptionPauseCommand::class, + static function (Container $container) { + return new SubscriptionPauseCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + $container->bind( + SubscriptionReactivateCommand::class, + static function (Container $container) { + return new SubscriptionReactivateCommand( + $container->get(SubscriptionEngine::class), + ); + }, + ); + } + + private static function configureSchema(Configuration $configuration, Container $container): void + { + $container->bind( + ChainDoctrineSchemaConfigurator::class, + static function (Container $container): ChainDoctrineSchemaConfigurator { + $services = []; + + if ($container->has(Store::class)) { + $services[] = $container->get(Store::class); + } + + if ($container->has(SubscriptionStore::class)) { + $services[] = $container->get(SubscriptionStore::class); + } + + if ($container->has(CipherKeyStore::class)) { + $services[] = $container->get(CipherKeyStore::class); + } + + return new ChainDoctrineSchemaConfigurator(array_filter( + $services, + static fn ($service) => $service instanceof DoctrineSchemaConfigurator, + )); + }, + ); + $container->alias(DoctrineSchemaConfigurator::class, ChainDoctrineSchemaConfigurator::class); + + $container->bind( + DoctrineSchemaDirector::class, + static fn (Container $container): DoctrineSchemaDirector => new DoctrineSchemaDirector( + $container->get(self::CONNECTION_ID), + $container->get(DoctrineSchemaConfigurator::class), + ), + ); + $container->alias(DoctrineSchemaProvider::class, DoctrineSchemaDirector::class); + $container->alias(SchemaDirector::class, DoctrineSchemaDirector::class); + + $container->bind(DoctrineHelper::class, new DoctrineHelper()); + + $container->bind( + DatabaseCreateCommand::class, + static fn (Container $container): DatabaseCreateCommand => new DatabaseCreateCommand( + $container->get(self::CONNECTION_ID), + $container->get(DoctrineHelper::class), + ), + ); + + $container->bind( + DatabaseDropCommand::class, + static fn (Container $container): DatabaseDropCommand => new DatabaseDropCommand( + $container->get(self::CONNECTION_ID), + $container->get(DoctrineHelper::class), + ), + ); + + $container->bind( + SchemaCreateCommand::class, + static fn (Container $container): SchemaCreateCommand => new SchemaCreateCommand( + $container->get(SchemaDirector::class), + ), + ); + + $container->bind( + SchemaUpdateCommand::class, + static fn (Container $container): SchemaUpdateCommand => new SchemaUpdateCommand( + $container->get(SchemaDirector::class), + ), + ); + + $container->bind( + SchemaDropCommand::class, + static fn (Container $container): SchemaDropCommand => new SchemaDropCommand( + $container->get(SchemaDirector::class), + ), + ); + } + + private static function configureMessageLoader(Configuration $configuration, Container $container): void + { + $container->bind( + StoreMessageLoader::class, + static function (Container $container): StoreMessageLoader { + return new StoreMessageLoader($container->get(Store::class)); + }, + ); + $container->alias(MessageLoader::class, StoreMessageLoader::class); + + if ($configuration->subscriptionGapDetection === false) { + return; + } + + $container->bind( + GapResolverStoreMessageLoader::class, + static function (Container $container) use ($configuration): GapResolverStoreMessageLoader { + return new GapResolverStoreMessageLoader( + $container->get(Store::class), + $container->get(ClockInterface::class), + $configuration->subscriptionGapDetectionRetriesInMs, + $configuration->subscriptionGapDetectionWindow, + ); + }, + ); + $container->alias(MessageLoader::class, GapResolverStoreMessageLoader::class); + } + + private static function configureSubscription(Configuration $configuration, Container $container): void + { + $container->bind(AttributeSubscriberMetadataFactory::class, new AttributeSubscriberMetadataFactory()); + $container->alias(SubscriberMetadataFactory::class, AttributeSubscriberMetadataFactory::class); + + /** @var array $strategies */ + $strategies = []; + foreach ($configuration->subscriptionRetryStrategyDefinitions as $name => $retryStrategyDefinition) { + if ($retryStrategyDefinition['type'] === Configuration::SUBSCRIPTION_RETRY_CLOCK_BASED) { + if (!array_key_exists('options', $retryStrategyDefinition)) { + throw new InvalidArgumentException(sprintf( + 'Missing options for subscription retry strategy "%s".', + $name, + )); + } + + $strategies[$name] = new ClockBasedRetryStrategy( + $container->get(ClockInterface::class), + $retryStrategyDefinition['options']['base_delay'], + $retryStrategyDefinition['options']['delay_factor'], + $retryStrategyDefinition['options']['max_attempts'], + ); + + continue; + } + + if ($retryStrategyDefinition['type'] === Configuration::SUBSCRIPTION_RETRY_NO_RETRY) { + $strategies[$name] = new NoRetryStrategy(); + continue; + } + + if ($retryStrategyDefinition['type'] !== Configuration::SUBSCRIPTION_RETRY_CUSTOM) { + continue; + } + + $service = $retryStrategyDefinition['service'] ?? null; + if ($service === null) { + throw new InvalidArgumentException(sprintf('Custom retry strategy "%s" requires a service id.', $name)); + } + + $strategies[$name] = $container->get($service); + } + + if ($strategies !== []) { + $container->bind( + RetryStrategyRepository::class, + new RetryStrategyRepository($strategies, $configuration->subscriptionDefaultRetryStrategy), + ); + } + + if ($configuration->subscriptionStoreType === Configuration::SUBSCRIPTION_STORE_IN_MEMORY) { + $container->bind( + InMemorySubscriptionStore::class, + static function (Container $container): SubscriptionStore { + return new InMemorySubscriptionStore( + [], + $container->get(ClockInterface::class), + ); + }, + ); + $container->alias(SubscriptionStore::class, InMemorySubscriptionStore::class); + } + + if ($configuration->subscriptionStoreType === Configuration::SUBSCRIPTION_STORE_STATIC_IN_MEMORY) { + $factory = new class { + private static InMemorySubscriptionStore|null $store = null; + + public static function create(): InMemorySubscriptionStore + { + if (self::$store === null) { + self::$store = new InMemorySubscriptionStore(); + } + + return self::$store; + } + }; + + $container->bind(InMemorySubscriptionStore::class, static fn () => $factory::create()); + $container->alias(SubscriptionStore::class, InMemorySubscriptionStore::class); + } + + if ($configuration->subscriptionStoreType === Configuration::SUBSCRIPTION_STORE_DBAL) { + $container->bind( + DoctrineSubscriptionStore::class, + static function (Container $container) use ($configuration): SubscriptionStore { + return new DoctrineSubscriptionStore( + $container->get(self::CONNECTION_ID), + $container->get(ClockInterface::class), + $configuration->subscriptionStoreTableName, + ); + }, + ); + $container->alias(SubscriptionStore::class, DoctrineSubscriptionStore::class); + } + + if ($configuration->subscriptionStoreType === Configuration::SUBSCRIPTION_STORE_CUSTOM) { + if ($configuration->subscriptionStoreService === null) { + throw new InvalidArgumentException('Custom subscription store type requires a subscription store service id.'); + } + + $container->alias(SubscriptionStore::class, $configuration->subscriptionStoreService); + } + + $container->bind( + LookupResolver::class, + static function (Container $container): LookupResolver { + return new LookupResolver( + $container->get(Store::class), + $container->get(EventRegistry::class), + ); + }, + ); + + $container->bind( + MetadataSubscriberAccessorRepository::class, + static function (Container $container) use ($configuration): MetadataSubscriberAccessorRepository { + return new MetadataSubscriberAccessorRepository( + $configuration->subscribers, + $container->get(SubscriberMetadataFactory::class), + [ + $container->get(LookupResolver::class), + ...$configuration->subscriptionArgumentResolvers, + ], + ); + }, + ); + $container->alias(SubscriberAccessorRepository::class, MetadataSubscriberAccessorRepository::class); + + $container->bind( + DefaultCleaner::class, + new DefaultCleaner($configuration->subscriptionCleanupTaskHandlers), + ); + $container->alias(Cleaner::class, DefaultCleaner::class); + + $container->bind( + DefaultSubscriptionEngine::class, + static function (Container $container): DefaultSubscriptionEngine { + return new DefaultSubscriptionEngine( + $container->get(MessageLoader::class), + $container->get(SubscriptionStore::class), + $container->get(SubscriberAccessorRepository::class), + $container->has(RetryStrategyRepository::class) ? $container->get(RetryStrategyRepository::class) : null, + $container->has(LoggerInterface::class) ? $container->get(LoggerInterface::class) : null, + $container->get(Cleaner::class), + ); + }, + ); + $container->alias(SubscriptionEngine::class, DefaultSubscriptionEngine::class); + + if ($configuration->subscriptionThrowOnError) { + $container->decorate( + SubscriptionEngine::class, + ThrowOnErrorSubscriptionEngine::class, + static function ( + Container $container, + SubscriptionEngine $inner, + ): ThrowOnErrorSubscriptionEngine { + return new ThrowOnErrorSubscriptionEngine($inner); + }, + ); + } + + if ($configuration->subscriptionCatchUp) { + $container->decorate( + SubscriptionEngine::class, + CatchUpSubscriptionEngine::class, + static function (Container $container, SubscriptionEngine $inner) use ($configuration, + ): CatchUpSubscriptionEngine { + return new CatchUpSubscriptionEngine( + $inner, + $configuration->subscriptionCatchUpLimit, + ); + }, + ); + } + + if (!$configuration->subscriptionRunAfterAggregateSaveEnabled) { + return; + } + + $container->decorate( + RepositoryManager::class, + RunSubscriptionEngineRepositoryManager::class, + static function (Container $container, RepositoryManager $inner) use ($configuration, + ): RunSubscriptionEngineRepositoryManager { + return new RunSubscriptionEngineRepositoryManager( + $inner, + $container->get(SubscriptionEngine::class), + $configuration->subscriptionRunAfterAggregateSaveIds, + $configuration->subscriptionRunAfterAggregateSaveGroups, + $configuration->subscriptionRunAfterAggregateSaveLimit, + ); + }, + ); + } + + private static function configureStoreMigration(Configuration $configuration, Container $container): void + { + if ($configuration->storeMigrationEnabled === false) { + return; + } + + $container->bind( + StoreMigrateCommand::class, + static function (Container $container) use ($configuration): StoreMigrateCommand { + return new StoreMigrateCommand( + $container->get(Store::class), + $container->get(self::NEW_STORE_ID), + $configuration->storeMigrationTranslators, + ); + }, + ); + + if ($configuration->storeMigrationType === Configuration::STORE_IN_MEMORY) { + $container->bind( + self::NEW_STORE_ID, + static function (Container $container): InMemoryStore { + return new InMemoryStore( + [], + $container->get(EventRegistry::class), + $container->get(ClockInterface::class), + ); + }, + ); + + return; + } + + if ($configuration->storeMigrationType === Configuration::STORE_DBAL_STREAM) { + $container->bind( + self::NEW_STORE_ID, + static function (Container $container) use ($configuration): StreamDoctrineDbalStore { + return new StreamDoctrineDbalStore( + $container->get(self::CONNECTION_ID), + $container->get(EventSerializer::class), + $container->get(HeadersSerializer::class), + $container->get(ClockInterface::class), + $configuration->storeMigrationOptions, + ); + }, + ); + } + + if ($configuration->storeMigrationType === Configuration::STORE_CUSTOM) { + if ($configuration->storeMigrationService === null) { + throw new InvalidArgumentException('Custom store migration type requires a service id.'); + } + + $container->bind( + self::NEW_STORE_ID, + static fn (Container $container): Store => $container->get($configuration->storeMigrationService), + ); + } + + throw new InvalidArgumentException(sprintf('Unknown store type "%s"', $configuration->storeMigrationType)); + } +} diff --git a/src/Container/ServiceCreationFailed.php b/src/Container/ServiceCreationFailed.php new file mode 100644 index 00000000..ab8d50aa --- /dev/null +++ b/src/Container/ServiceCreationFailed.php @@ -0,0 +1,12 @@ +connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); - $profileProjector = new ProfileProjector($this->connection); - $engine = new DefaultSubscriptionEngine( - $store, - new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([ - $profileProjector, - new SendEmailProcessor(), - ]), - ); + $configuration = $this->getConfiguration() + ->withSubscribers([$profileProjector, new SendEmailProcessor()]) + ->withMessageDecorators([new FooMessageDecorator()]); + $container = Factory::create($configuration); - $manager = new RunSubscriptionEngineRepositoryManager( - new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - null, - null, - new FooMessageDecorator(), - ), - $engine, - ); + $manager = $container->get(RepositoryManager::class); + $engine = $container->get(SubscriptionEngine::class); $repository = $manager->get(Profile::class); - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); - + $schemaDirector = $container->get(SchemaDirector::class); $schemaDirector->create(); + $engine->setup(skipBooting: true); $profileId = ProfileId::generate(); @@ -129,43 +98,19 @@ public function testSuccessful(): void public function testSnapshot(): void { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); - $profileProjection = new ProfileProjector($this->connection); + $configuration = $this->getConfiguration() + ->withSubscribers([$profileProjection, new SendEmailProcessor()]) + ->withMessageDecorators([new FooMessageDecorator()]) + ->withSnapshotAdapters(['default' => new InMemorySnapshotAdapter()]); + $container = Factory::create($configuration); - $engine = new DefaultSubscriptionEngine( - $store, - new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([ - $profileProjection, - new SendEmailProcessor(), - ]), - ); - - $manager = new RunSubscriptionEngineRepositoryManager( - new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - null, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), - ), - $engine, - ); + $manager = $container->get(RepositoryManager::class); + $engine = $container->get(SubscriptionEngine::class); + $schemaDirector = $container->get(SchemaDirector::class); $repository = $manager->get(Profile::class); - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); - $schemaDirector->create(); $engine->setup(skipBooting: true); @@ -195,29 +140,14 @@ public function testSnapshot(): void public function testTempProjection(): void { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile' => Profile::class]), - $store, - null, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), - ); + $configuration = $this->getConfiguration(); + $container = Factory::create($configuration); + $manager = $container->get(RepositoryManager::class); + $store = $container->get(Store::class); $repository = $manager->get(Profile::class); - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); - + $schemaDirector = $container->get(SchemaDirector::class); $schemaDirector->create(); $profileId = ProfileId::generate(); @@ -242,8 +172,7 @@ public function testTempProjection(): void ->reduce( new Pipe( $store->load(new Criteria( - new AggregateIdCriterion($profileId->toString()), - new AggregateNameCriterion('profile'), + new StreamCriterion(sprintf('profile-%s', $profileId->toString())), )), new UntilEventTranslator(new DateTimeImmutable()), ), @@ -254,54 +183,17 @@ public function testTempProjection(): void public function testCommandBus(): void { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); - - $aggregateRootRegistry = new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]), - $store, - null, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), - ); - $profileProjection = new ProfileProjector($this->connection); + $configuration = $this->getConfiguration() + ->withSubscribers([$profileProjection, new SendEmailProcessor()]) + ->withParameter('env', 'test'); + $container = Factory::create($configuration); - $engine = new DefaultSubscriptionEngine( - $store, - new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([ - $profileProjection, - new SendEmailProcessor(), - ]), - ); - - $manager = new RunSubscriptionEngineRepositoryManager( - $manager, - $engine, - ); - - $commandBus = SyncCommandBus::createForAggregateHandlers( - $aggregateRootRegistry, - $manager, - new ServiceLocator([ - ClockInterface::class => new SystemClock(), - 'env' => 'test', - ]), - ); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); + $manager = $container->get(RepositoryManager::class); + $engine = $container->get(SubscriptionEngine::class); + $commandBus = $container->get(CommandBus::class); + $schemaDirector = $container->get(SchemaDirector::class); $schemaDirector->create(); $engine->setup(skipBooting: true); @@ -332,55 +224,16 @@ public function testCommandBus(): void public function testQueryBus(): void { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); - - $aggregateRootRegistry = new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]); - - $manager = new DefaultRepositoryManager( - new AggregateRootRegistry(['profile_with_commands' => ProfileWithCommands::class]), - $store, - null, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), - ); - $profileProjection = new ProfileProjector($this->connection); + $configuration = $this->getConfiguration() + ->withSubscribers([$profileProjection]) + ->withParameter('env', 'test'); + $container = Factory::create($configuration); - $engine = new DefaultSubscriptionEngine( - $store, - new InMemorySubscriptionStore(), - new MetadataSubscriberAccessorRepository([ - $profileProjection, - new SendEmailProcessor(), - ]), - ); - - $manager = new RunSubscriptionEngineRepositoryManager( - $manager, - $engine, - ); - - $commandBus = SyncCommandBus::createForAggregateHandlers( - $aggregateRootRegistry, - $manager, - new ServiceLocator([ - ClockInterface::class => new SystemClock(), - 'env' => 'test', - ]), - ); - - $queryBus = new SyncQueryBus(new ServiceHandlerProvider([$profileProjection])); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); + $engine = $container->get(SubscriptionEngine::class); + $commandBus = $container->get(CommandBus::class); + $queryBus = $container->get(QueryBus::class); + $schemaDirector = $container->get(SchemaDirector::class); $schemaDirector->create(); $engine->setup(skipBooting: true); @@ -397,33 +250,12 @@ public function testQueryBus(): void public function testAggregateInitialization(): void { - $store = new DoctrineDbalStore( - $this->connection, - DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']), - DefaultHeadersSerializer::createFromPaths([ - __DIR__ . '/Header', - ]), - ); + $configuration = $this->getConfiguration(); + $container = Factory::create($configuration); - $aggregateRootRegistry = new AggregateRootRegistry(['stock' => Stock::class]); - - $manager = new DefaultRepositoryManager( - $aggregateRootRegistry, - $store, - null, - new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), - new FooMessageDecorator(), - ); - - $commandBus = SyncCommandBus::createForAggregateHandlers( - $aggregateRootRegistry, - $manager, - ); - - $schemaDirector = new DoctrineSchemaDirector( - $this->connection, - $store, - ); + $manager = $container->get(RepositoryManager::class); + $commandBus = $container->get(CommandBus::class); + $schemaDirector = $container->get(SchemaDirector::class); $schemaDirector->create(); @@ -440,4 +272,17 @@ public function testAggregateInitialization(): void self::assertSame(3, $stock->playhead()); self::assertSame(2, $stock->stockFor($productId)); } + + public function getConfiguration(): Configuration + { + return Configuration::createWithConnectionService($this->connection) + ->withDefaultSettings( + [__DIR__], + [__DIR__ . '/Events'], + ) + ->withHeaders(__DIR__ . '/Header') + ->withSubscriptionInMemoryStore() + ->withSubscriptionEngineThrowOnError() + ->withRunSubscriptionsAfterAggregateSave(); + } } diff --git a/tests/Integration/BasicImplementation/Events/NameChanged.php b/tests/Integration/BasicImplementation/Events/NameChanged.php index 32bc6f22..132d78a3 100644 --- a/tests/Integration/BasicImplementation/Events/NameChanged.php +++ b/tests/Integration/BasicImplementation/Events/NameChanged.php @@ -5,11 +5,13 @@ namespace Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events; use Patchlevel\EventSourcing\Attribute\Event; +use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\ProfileId; #[Event('profile.name_changed')] final class NameChanged { public function __construct( + public ProfileId $profileId, public string $name, ) { } diff --git a/tests/Integration/BasicImplementation/Profile.php b/tests/Integration/BasicImplementation/Profile.php index ae28159a..a4c5dde7 100644 --- a/tests/Integration/BasicImplementation/Profile.php +++ b/tests/Integration/BasicImplementation/Profile.php @@ -30,7 +30,7 @@ public static function create(ProfileId $id, string $name): self public function changeName(string $name): void { - $this->recordThat(new NameChanged($name)); + $this->recordThat(new NameChanged($this->id, $name)); } #[Apply(ProfileCreated::class)] diff --git a/tests/Integration/BasicImplementation/ProfileWithCommands.php b/tests/Integration/BasicImplementation/ProfileWithCommands.php index bf05247b..31a2d4e2 100644 --- a/tests/Integration/BasicImplementation/ProfileWithCommands.php +++ b/tests/Integration/BasicImplementation/ProfileWithCommands.php @@ -45,7 +45,7 @@ public function changeName( #[Inject('env')] string $env, ): void { - $this->recordThat(new NameChanged($command->name)); + $this->recordThat(new NameChanged($this->id, $command->name)); } #[Apply] diff --git a/tests/Integration/BasicImplementation/Projection/ProfileProjector.php b/tests/Integration/BasicImplementation/Projection/ProfileProjector.php index f83ca17f..164d9d1a 100644 --- a/tests/Integration/BasicImplementation/Projection/ProfileProjector.php +++ b/tests/Integration/BasicImplementation/Projection/ProfileProjector.php @@ -13,7 +13,6 @@ use Patchlevel\EventSourcing\Attribute\Teardown; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\NameChanged; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Events\ProfileCreated; -use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\ProfileId; use Patchlevel\EventSourcing\Tests\Integration\BasicImplementation\Query\QueryProfileName; #[Projector('profile-1')] @@ -54,12 +53,12 @@ public function handleProfileCreated(ProfileCreated $profileCreated): void } #[Subscribe(NameChanged::class)] - public function handleNameChanged(NameChanged $nameChanged, ProfileId $profileId): void + public function handleNameChanged(NameChanged $nameChanged): void { $this->connection->executeStatement( 'UPDATE projection_profile SET name = :name WHERE id = :id;', [ - 'id' => $profileId->toString(), + 'id' => $nameChanged->profileId->toString(), 'name' => $nameChanged->name, ], ); diff --git a/tests/Unit/Container/FactoryTest.php b/tests/Unit/Container/FactoryTest.php new file mode 100644 index 00000000..3c17f5d8 --- /dev/null +++ b/tests/Unit/Container/FactoryTest.php @@ -0,0 +1,520 @@ +get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(SyncCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(StoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertFalse($container->has(CipherKeyFactory::class)); + self::assertFalse($container->has(CipherKeyStore::class)); + self::assertFalse($container->has(Cryptographer::class)); + } + + public function testCreateConnectionServiceContainer(): void + { + $connection = DriverManager::getConnection((new DsnParser())->parse('sqlite3:///:memory:')); + $configuration = Configuration::createWithConnectionService($connection); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertSame($connection, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(SyncCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(StoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertFalse($container->has(CipherKeyFactory::class)); + self::assertFalse($container->has(CipherKeyStore::class)); + self::assertFalse($container->has(Cryptographer::class)); + } + + public function testCreateWithDefaultSettings(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings(); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } + + public function testCreateWithDefaultSettingsAndThrowOnError(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings() + ->withSubscriptionEngineThrowOnError(); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(ThrowOnErrorSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } + + public function testCreateWithDefaultSettingsAndCatchUpAndThrowOnError(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings() + ->withSubscriptionEngineCatchUp() + ->withSubscriptionEngineThrowOnError(); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(CatchUpSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } + + public function testCreateWithDefaultSettingsAndEventBus(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings() + ->withEventBus(); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertInstanceOf(AttributeListenerProvider::class, $container->get(ListenerProvider::class)); + self::assertInstanceOf(DefaultConsumer::class, $container->get(Consumer::class)); + self::assertInstanceOf(DefaultEventBus::class, $container->get(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } + + public function testCreateWithDefaultSettingsAndSnapshots(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings() + ->withSnapshotAdapters(['default' => new InMemorySnapshotAdapter()]); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertInstanceOf(DefaultSnapshotStore::class, $container->get(SnapshotStore::class)); + self::assertFalse($container->has(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } + + public function testCreateWithDefaultSettingsAndRetryStrategy(): void + { + $configuration = Configuration::createWithConnectionUrl('sqlite3:///:memory:') + ->withDefaultSettings() + ->withSubscriptionRetryDefaults(); + $container = Factory::create($configuration); + + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(DefaultRepositoryManager::class, $container->get(RepositoryManager::class)); + self::assertInstanceOf(AttributeEventMetadataFactory::class, $container->get(EventMetadataFactory::class)); + self::assertInstanceOf(JsonEncoder::class, $container->get(Encoder::class)); + self::assertInstanceOf(AttributeMessageHeaderRegistryFactory::class, $container->get(MessageHeaderRegistryFactory::class)); + self::assertInstanceOf(AggregateRootMetadataAwareMetadataFactory::class, $container->get(AggregateRootMetadataFactory::class)); + self::assertInstanceOf(AttributeSubscriberMetadataFactory::class, $container->get(SubscriberMetadataFactory::class)); + self::assertInstanceOf(Connection::class, $container->get(Factory::CONNECTION_ID)); + self::assertInstanceOf(StreamDoctrineDbalStore::class, $container->get(Store::class)); + self::assertInstanceOf(EventRegistry::class, $container->get(EventRegistry::class)); + self::assertInstanceOf(DefaultEventSerializer::class, $container->get(EventSerializer::class)); + self::assertInstanceOf(MessageHeaderRegistry::class, $container->get(MessageHeaderRegistry::class)); + self::assertInstanceOf(DefaultHeadersSerializer::class, $container->get(HeadersSerializer::class)); + self::assertInstanceOf(StackHydrator::class, $container->get(Hydrator::class)); + self::assertInstanceOf(SystemClock::class, $container->get(ClockInterface::class)); + self::assertInstanceOf(AggregateRootRegistry::class, $container->get(AggregateRootRegistry::class)); + self::assertInstanceOf(ShowCommand::class, $container->get(ShowCommand::class)); + self::assertInstanceOf(ShowAggregateCommand::class, $container->get(ShowAggregateCommand::class)); + self::assertInstanceOf(WatchCommand::class, $container->get(WatchCommand::class)); + self::assertInstanceOf(DebugCommand::class, $container->get(DebugCommand::class)); + self::assertInstanceOf(UpcasterChain::class, $container->get(Upcaster::class)); + self::assertInstanceOf(ChainMessageDecorator::class, $container->get(MessageDecorator::class)); + self::assertInstanceOf(SplitStreamDecorator::class, $container->get(SplitStreamDecorator::class)); + self::assertInstanceOf(InstantRetryCommandBus::class, $container->get(CommandBus::class)); + self::assertInstanceOf(SyncQueryBus::class, $container->get(QueryBus::class)); + self::assertInstanceOf(GapResolverStoreMessageLoader::class, $container->get(MessageLoader::class)); + self::assertFalse($container->has(ListenerProvider::class)); + self::assertFalse($container->has(Consumer::class)); + self::assertFalse($container->has(EventBus::class)); + self::assertFalse($container->has(SnapshotStore::class)); + self::assertInstanceOf(RetryStrategyRepository::class, $container->get(RetryStrategyRepository::class)); + self::assertInstanceOf(DoctrineSubscriptionStore::class, $container->get(SubscriptionStore::class)); + self::assertInstanceOf(MetadataSubscriberAccessorRepository::class, $container->get(SubscriberAccessorRepository::class)); + self::assertInstanceOf(DefaultSubscriptionEngine::class, $container->get(SubscriptionEngine::class)); + self::assertInstanceOf(SubscriptionSetupCommand::class, $container->get(SubscriptionSetupCommand::class)); + self::assertInstanceOf(SubscriptionBootCommand::class, $container->get(SubscriptionBootCommand::class)); + self::assertInstanceOf(SubscriptionRunCommand::class, $container->get(SubscriptionRunCommand::class)); + self::assertInstanceOf(SubscriptionTeardownCommand::class, $container->get(SubscriptionTeardownCommand::class)); + self::assertInstanceOf(SubscriptionRemoveCommand::class, $container->get(SubscriptionRemoveCommand::class)); + self::assertInstanceOf(SubscriptionStatusCommand::class, $container->get(SubscriptionStatusCommand::class)); + self::assertInstanceOf(SubscriptionPauseCommand::class, $container->get(SubscriptionPauseCommand::class)); + self::assertInstanceOf(SubscriptionReactivateCommand::class, $container->get(SubscriptionReactivateCommand::class)); + self::assertInstanceOf(ExtensionDoctrineCipherKeyStore::class, $container->get(CipherKeyStore::class)); + self::assertInstanceOf(BaseCryptographer::class, $container->get(Cryptographer::class)); + } +} diff --git a/tests/Unit/Container/Fixture/Discovery/Decorator/ResolvedDependencyDecorator.php b/tests/Unit/Container/Fixture/Discovery/Decorator/ResolvedDependencyDecorator.php new file mode 100644 index 00000000..d97e732a --- /dev/null +++ b/tests/Unit/Container/Fixture/Discovery/Decorator/ResolvedDependencyDecorator.php @@ -0,0 +1,23 @@ +withHeader(new DiscoveryHeader($this->dependency->tag)); + } +} diff --git a/tests/Unit/Container/Fixture/Discovery/Support/DiscoveryHeader.php b/tests/Unit/Container/Fixture/Discovery/Support/DiscoveryHeader.php new file mode 100644 index 00000000..7cda3288 --- /dev/null +++ b/tests/Unit/Container/Fixture/Discovery/Support/DiscoveryHeader.php @@ -0,0 +1,13 @@ +replaceEventName($upcast->eventName . '_' . $this->dependency->tag); + } +} diff --git a/tests/Unit/Container/Fixture/Discovery/UpcasterMissing/MissingDependencyUpcaster.php b/tests/Unit/Container/Fixture/Discovery/UpcasterMissing/MissingDependencyUpcaster.php new file mode 100644 index 00000000..97d74a52 --- /dev/null +++ b/tests/Unit/Container/Fixture/Discovery/UpcasterMissing/MissingDependencyUpcaster.php @@ -0,0 +1,22 @@ +replacePayloadByKey('missing_dependency', $this->dependency::class); + } +}