Thursday, October 29, 2015

Adding Custom Metadata to Domain Events with Prooph Event Store

When you record events with the Prooph EventStore only the payload (the event data that was recorded by your aggregate roots) gets recorded. If you use the EventStore together with the Prooph ServiceBus and enable the TransactionManager from the EventStoreBusBridge, some additional metadata gets recorded, too. That is the "causation_id" and the "causation_name" (the command id that triggered this event and the command name).

In a real-word application you might want to record some additional metadata, like "who" issued the command, what was his IP address, which user-agent did he use, and so on. This is especially useful when you want to know, which commands "John Doe" sent to the system. The simplest way to achieve this, is to use an EventStore-Plugin. Here is an example for Zend Framework 2, using the Zend\Authentication component to get the issuer and Zend\Http\PhpEnvironment\RemoteAddress to get the client's IP address.


namespace My\App\EventStore;

use ArrayIterator;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Plugin\Plugin;
use Prooph\EventStore\Stream\Stream;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

 * Class EventEnricher
 * @package My\App\EventStore
final class EventEnricher implements Plugin
     * @var AuthenticationServiceInterface
    private $authenticationService;

     * @param AuthenticationServiceInterface $authenticationService
    public function __construct(AuthenticationServiceInterface $authenticationService)
        $this->authenticationService = $authenticationService;

     * @param EventStore $eventStore
    public function setUp(EventStore $eventStore)
        $eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);

     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     * @param Iterator $recordedEvents
     * @return Iterator
    private function handleRecordedEvents(Iterator $recordedEvents)
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;

        return new ArrayIterator($enrichedRecordedEvents);

     * Add event metadata on event store createStream
     * @param ActionEvent $createEvent
    public function onEventStoreCreateStream(ActionEvent $createEvent)
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));

     * Add event metadata on event store appendToStream
     * @param ActionEvent $appendToStreamEvent
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);

     * Find client ip if client was behind proxy
     * @return string
    private function findClientIp()
        $clientIp = new RemoteAddress();
        $ip = $clientIp->getIpAddress();

        return $ip;

This will add "issued_by", "ip_address" and "user_agent" to the metadata of all recorded events. Now you still need to enable the plugin. If you use the provided container-factories, you simply add the plugin there:


return [
    'event_store' => [
        'plugins' => [

Keep in mind, that the domain model is unaware of the domain event's metadata. Therefore if you need to know the issuer f.e. in your domain model, you should write this as a CommandBus-Plugin that manipulates the command. This way you can get the issuer from the command in your command handler within the domain. An example:


namespace My\App\EventStore;

use ArrayIterator;
use My\App\Commanding\Command;
use Iterator;
use Prooph\Common\Event\ActionEvent;
use Prooph\Common\Event\ActionEventEmitter;
use Prooph\Common\Event\ActionEventListenerAggregate;
use Prooph\Common\Event\DetachAggregateHandlers;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Stream\Stream;
use Prooph\ServiceBus\CommandBus;
use Zend\Authentication\AuthenticationServiceInterface;
use Zend\Http\PhpEnvironment\RemoteAddress;

 * Class CommandAndEventEnricher
 * @package My\App\EventStore
final class CommandAndEventEnricher implements ActionEventListenerAggregate
    use DetachAggregateHandlers;

     * @var EventStore
    private $eventStore;

     * @var AuthenticationServiceInterface
    private $authenticationService;

     * @param EventStore $eventStore
     * @param AuthenticationServiceInterface $authenticationService
    public function __construct(EventStore $eventStore, AuthenticationServiceInterface $authenticationService)
        $this->eventStore = $eventStore;
        $this->authenticationService = $authenticationService;
        $this->eventStore->getActionEventEmitter()->attachListener('create.pre', [$this, 'onEventStoreCreateStream'], -1000);
        $this->eventStore->getActionEventEmitter()->attachListener('appendTo.pre', [$this, 'onEventStoreAppendToStream'], -1000);

     * @param ActionEventEmitter $dispatcher
    public function attach(ActionEventEmitter $emitter)
        //Attach with a low priority, so that a potential message translator has done its job already
        $this->trackHandler($emitter->attachListener(CommandBus::EVENT_INITIALIZE, [$this, 'onInitialize'], -1000));

     * Add the issuer id to the command
     * @param ActionEvent $actionEvent
    public function onInitialize(ActionEvent $actionEvent)
        $command = $actionEvent->getParam(CommandBus::EVENT_PARAM_MESSAGE);

        if (! $command instanceof Command) {

        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';

        $command = $command->withIssuedBy($issuer);

        $actionEvent->setParam(CommandBus::EVENT_PARAM_MESSAGE, $command);

     * This method takes domain events as argument which are going to be added to the event stream and
     * adds the issued_by (user id), ip_address and user_agent as metadata to each event.
     * @param Iterator $recordedEvents
     * @return Iterator
    private function handleRecordedEvents(Iterator $recordedEvents)
        if ($this->authenticationService->hasIdentity()) {
            $issuer = $this->authenticationService->getIdentity()->getId();
        } else {
            $issuer = 'guest';

        $clientIp = $this->findClientIp();
        $userAgent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';

        $enrichedRecordedEvents = [];

        foreach ($recordedEvents as $recordedEvent) {
            $recordedEvent = $recordedEvent->withAddedMetadata('issued_by', $issuer);
            $recordedEvent = $recordedEvent->withAddedMetadata('ip_address', $clientIp);
            $recordedEvent = $recordedEvent->withAddedMetadata('user_agent', $userAgent);

            $enrichedRecordedEvents[] = $recordedEvent;

        return new ArrayIterator($enrichedRecordedEvents);

     * Add event metadata on event store createStream
     * @param ActionEvent $createEvent
    public function onEventStoreCreateStream(ActionEvent $createEvent)
        $stream = $createEvent->getParam('stream');

        if (! $stream instanceof Stream) {

        $streamEvents = $stream->streamEvents();
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $createEvent->setParam('stream', new Stream($stream->streamName(), $streamEvents));

     * Add event metadata on event store appendToStream
     * @param ActionEvent $appendToStreamEvent
    public function onEventStoreAppendToStream(ActionEvent $appendToStreamEvent)
        $streamEvents = $appendToStreamEvent->getParam('streamEvents');
        $streamEvents = $this->handleRecordedEvents($streamEvents);

        $appendToStreamEvent->setParam('streamEvents', $streamEvents);

     * Find client ip if client was behind proxy
     * @return string
    private function findClientIp()
        $clientIp = new RemoteAddress();
        $ip = $clientIp->getIpAddress();

        return $ip;

In this case you need to add the enricher as CommandBus-Plugin instead of an EventStore-Plugin of course:


return [
    'service_bus' => [
        'command_bus' => [
            'plugins' => [

You noticed perhaps that we need to use a special command class that knows the issuer. That is easy to achieve:


namespace My\App\Commanding;

use Assert\Assertion;
use Prooph\Common\Messaging\Command as ProophCommand;
use Prooph\Common\Messaging\PayloadConstructable;
use Prooph\Common\Messaging\PayloadTrait;

 * Class Command
 * @package My\App\Commanding
abstract class Command extends ProophCommand implements PayloadConstructable
    use PayloadTrait;

     * Returns a new instance of the message with given version
     * @param string $issuer
     * @return Command
    public function withIssuedBy($issuer)

        $messageData = $this->toArray();

        $messageData['issued_by'] = $issuer;

        return static::fromArray($messageData);

     * @return string
    public function issuedBy()
        return $this->metadata['issued_by'];

Now you only need to extend your commands from this base-class.

Note: If you work with Prooph EventStore, you don't need to extends the base-classes from Prooph, like Command. You're free to implement them by yourself for your needs.