Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: command bus #644

Draft
wants to merge 4 commits into
base: 3.6.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/Attribute/Handle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD | Attribute::IS_REPEATABLE)]
final class Handle
{
/** @param class-string|null $commandClass */
public function __construct(
public readonly string|null $commandClass = null,
) {
}
}
18 changes: 18 additions & 0 deletions src/Attribute/HandledBy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Attribute;

use Attribute;
use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

#[Attribute(Attribute::TARGET_CLASS)]
final class HandledBy
{
/** @param class-string<AggregateRoot> $aggregateClass */

Check failure on line 13 in src/Attribute/HandledBy.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Deptrac (locked, 8.3, ubuntu-latest)

Patchlevel\EventSourcing\Attribute\HandledBy must not depend on Patchlevel\EventSourcing\Aggregate\AggregateRoot (Attribute on Aggregate)
public function __construct(
public readonly string $aggregateClass,
) {
}
}
88 changes: 88 additions & 0 deletions src/CommandBus/AggregateHandlerProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Attribute\Handle;
use Patchlevel\EventSourcing\Attribute\HandledBy;
use Patchlevel\EventSourcing\CommandBus\Handler\DefaultHandlerFactory;
use ReflectionClass;
use ReflectionMethod;

final class AggregateHandlerProvider implements HandlerProvider
{
public function __construct(
private readonly DefaultHandlerFactory $handlerFactory,
) {
}

/**
* @param class-string $commandClass
*
* @throws HandlerNotFound
*/
public function handlerForCommand(string $commandClass): HandlerDescriptor
{
$aggregateClass = $this->aggregateClass($commandClass);

$reflectionClass = new ReflectionClass($aggregateClass);

foreach ($reflectionClass->getMethods() as $method) {
$attributes = $method->getAttributes(Handle::class);

if ($attributes === []) {
continue;
}

$handleClass = $this->handleClass($attributes[0]->newInstance(), $method);

if ($handleClass !== $commandClass) {
continue;
}

if ($method->isStatic()) {
return new HandlerDescriptor($this->handlerFactory->createHandler($aggregateClass, $method->getName()));
}

return new HandlerDescriptor($this->handlerFactory->updateHandler($aggregateClass, $method->getName()));
}

throw new HandlerNotFound($commandClass);
}

/**
* @param class-string $commandClass
*
* @return class-string<AggregateRoot>
*/
private function aggregateClass(string $commandClass): string
{
$reflectionClass = new ReflectionClass($commandClass);
$attributes = $reflectionClass->getAttributes(HandledBy::class);

if ($attributes === []) {
throw new HandlerNotFound($commandClass);
}

$handledBy = $attributes[0]->newInstance();

return $handledBy->aggregateClass;
}

private function handleClass(Handle $handle, ReflectionMethod $reflectionMethod): string|null

Check failure on line 74 in src/CommandBus/AggregateHandlerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedInferredReturnType

src/CommandBus/AggregateHandlerProvider.php:74:87: MixedInferredReturnType: Could not verify return type 'null|string' for Patchlevel\EventSourcing\CommandBus\AggregateHandlerProvider::handleClass (see https://psalm.dev/047)
{
if ($handle->commandClass !== null) {
return $handle->commandClass;
}

$parameters = $reflectionMethod->getParameters();

if ($parameters === []) {
return null;
}

return $parameters[0]->getType()->getName();

Check failure on line 86 in src/CommandBus/AggregateHandlerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedReturnStatement

src/CommandBus/AggregateHandlerProvider.php:86:16: MixedReturnStatement: Could not infer a return type (see https://psalm.dev/138)

Check failure on line 86 in src/CommandBus/AggregateHandlerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

PossiblyNullReference

src/CommandBus/AggregateHandlerProvider.php:86:43: PossiblyNullReference: Cannot call method getName on possibly null value (see https://psalm.dev/083)

Check failure on line 86 in src/CommandBus/AggregateHandlerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

UndefinedMethod

src/CommandBus/AggregateHandlerProvider.php:86:43: UndefinedMethod: Method ReflectionType::getName does not exist (see https://psalm.dev/022)

Check failure on line 86 in src/CommandBus/AggregateHandlerProvider.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Cannot call method getName() on ReflectionType|null.
}
}
10 changes: 10 additions & 0 deletions src/CommandBus/CommandBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus;

interface CommandBus
{
public function dispatch(object $command): void;
}
71 changes: 71 additions & 0 deletions src/CommandBus/DefaultCommandBus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus;

use Patchlevel\EventSourcing\CommandBus\Handler\DefaultHandlerFactory;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
use Psr\Log\LoggerInterface;

use function array_shift;
use function sprintf;

final class DefaultCommandBus implements CommandBus
{
/** @var array<object> */
private array $queue;
private bool $processing;

public function __construct(
private readonly HandlerProvider $handlerProvider,
private readonly LoggerInterface|null $logger = null,
) {
$this->queue = [];
$this->processing = false;
}

public function dispatch(object $command): void
{
$this->logger?->debug(sprintf(
'CommandBus: Add message "%s" to queue.',
$command::class,
));

$this->queue[] = $command;

if ($this->processing) {
$this->logger?->debug('CommandBus: Is already processing, dont start new processing.');

return;
}

try {
$this->processing = true;

$this->logger?->debug('CommandBus: Start processing queue.');

while ($command = array_shift($this->queue)) {
$handler = $this->handlerProvider->handlerForCommand($command::class);

($handler->callable())($command);
}
} finally {
$this->processing = false;

$this->logger?->debug('CommandBus: Finished processing queue.');
}
}

public static function createDefault(
RepositoryManager $repositoryManager,
LoggerInterface|null $logger = null,
): self {
return new self(
new AggregateHandlerProvider(
new DefaultHandlerFactory($repositoryManager),
),
$logger,
);
}
}
18 changes: 18 additions & 0 deletions src/CommandBus/Handler/AggregateIdNotFound.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus\Handler;

use RuntimeException;

use function sprintf;

final class AggregateIdNotFound extends RuntimeException
{
/** @param class-string $commandClass */
public function __construct(string $commandClass)
{
parent::__construct(sprintf('Missing `Id` Attribute in command %s', $commandClass));
}
}
28 changes: 28 additions & 0 deletions src/CommandBus/Handler/CreateAggregateHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus\Handler;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Repository\RepositoryManager;

final class CreateAggregateHandler
{
/** @param class-string<AggregateRoot> $aggregateClass */
public function __construct(
private readonly RepositoryManager $repositoryManager,
private readonly string $aggregateClass,
private readonly string $methodName,
) {
}

public function __invoke(object $command): void
{
$repository = $this->repositoryManager->get($this->aggregateClass);

$aggregate = $this->aggregateClass::{$this->methodName}($command);

Check failure on line 24 in src/CommandBus/Handler/CreateAggregateHandler.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedAssignment

src/CommandBus/Handler/CreateAggregateHandler.php:24:9: MixedAssignment: Unable to determine the type that $aggregate is being assigned to (see https://psalm.dev/032)

$repository->save($aggregate);

Check failure on line 26 in src/CommandBus/Handler/CreateAggregateHandler.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedArgument

src/CommandBus/Handler/CreateAggregateHandler.php:26:27: MixedArgument: Argument 1 of Patchlevel\EventSourcing\Repository\Repository::save cannot be mixed, expecting Patchlevel\EventSourcing\Aggregate\AggregateRoot (see https://psalm.dev/030)
}
}
33 changes: 33 additions & 0 deletions src/CommandBus/Handler/DefaultHandlerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus\Handler;

use Patchlevel\EventSourcing\Repository\RepositoryManager;

final class DefaultHandlerFactory implements HandlerFactory
{
public function __construct(
private readonly RepositoryManager $repositoryManager,
) {
}

public function createHandler(string $aggregateClass, string $method): callable
{
return new CreateAggregateHandler(
$this->repositoryManager,
$aggregateClass,
$method,
);
}

public function updateHandler(string $aggregateClass, string $method): callable
{
return new UpdateAggregateHandler(
$this->repositoryManager,
$aggregateClass,
$method,
);
}
}
24 changes: 24 additions & 0 deletions src/CommandBus/Handler/HandlerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus\Handler;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;

interface HandlerFactory
{
/**
* @param class-string<AggregateRoot> $aggregateClass
*
* @return callable(object): void
*/
public function createHandler(string $aggregateClass, string $method): callable;

/**
* @param class-string<AggregateRoot> $aggregateClass
*
* @return callable(object): void
*/
public function updateHandler(string $aggregateClass, string $method): callable;
}
51 changes: 51 additions & 0 deletions src/CommandBus/Handler/UpdateAggregateHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\CommandBus\Handler;

use Patchlevel\EventSourcing\Aggregate\AggregateRoot;
use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Attribute\Id;
use Patchlevel\EventSourcing\Repository\RepositoryManager;
use ReflectionClass;

final class UpdateAggregateHandler
{
/** @param class-string<AggregateRoot> $aggregateClass */
public function __construct(
private readonly RepositoryManager $repositoryManager,
private readonly string $aggregateClass,
private readonly string $methodName,
) {
}

public function __invoke(object $command): void
{
$aggregateRootId = $this->aggregateRootId($command);
$repository = $this->repositoryManager->get($this->aggregateClass);

$aggregate = $repository->load($aggregateRootId);

$aggregate->{$this->methodName}($command);

$repository->save($aggregate);
}

private function aggregateRootId(object $command): AggregateRootId

Check failure on line 35 in src/CommandBus/Handler/UpdateAggregateHandler.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedInferredReturnType

src/CommandBus/Handler/UpdateAggregateHandler.php:35:56: MixedInferredReturnType: Could not verify return type 'Patchlevel\EventSourcing\Aggregate\AggregateRootId' for Patchlevel\EventSourcing\CommandBus\Handler\UpdateAggregateHandler::aggregateRootId (see https://psalm.dev/047)
{
$reflectionClass = new ReflectionClass($command);

foreach ($reflectionClass->getProperties() as $property) {
$attributes = $property->getAttributes(Id::class);

if ($attributes === []) {
continue;
}

return $property->getValue($command);

Check failure on line 46 in src/CommandBus/Handler/UpdateAggregateHandler.php

View workflow job for this annotation

GitHub Actions / Static Analysis by Psalm (locked, 8.3, ubuntu-latest)

MixedReturnStatement

src/CommandBus/Handler/UpdateAggregateHandler.php:46:20: MixedReturnStatement: Could not infer a return type (see https://psalm.dev/138)

Check failure on line 46 in src/CommandBus/Handler/UpdateAggregateHandler.php

View workflow job for this annotation

GitHub Actions / Static Analysis by PHPStan (locked, 8.3, ubuntu-latest)

Method Patchlevel\EventSourcing\CommandBus\Handler\UpdateAggregateHandler::aggregateRootId() should return Patchlevel\EventSourcing\Aggregate\AggregateRootId but returns mixed.
}

throw new AggregateIdNotFound($command::class);
}
}
Loading
Loading