Eventsimple implements a simple deterministic event driven system using ActiveRecord and ActiveJob.
Use Eventsimple to:
- Add Event Sourcing to your ActiveRecord models.
- Implement Pub/Sub.
- Implement a transactional outbox.
- Store audit logs of changes to your ActiveRecord objects.
Eventsimple uses standard Rails features like Single Table Inheritance and Optimistic Locking. Async workflows are handled using ActiveJob.
Typical events in Eventsimple are ActiveRecord models that look like this:
<UserComponent::Events::Created
id: 1,
aggregate_id: 'user-123',
type: "Created",
data: {
name: "John doe",
email: "[email protected]",
},
created_at: 2022-01-01T00:00:00.000000,
updated_at: 2022-01-01T00:00:00.000000,
>
<UserComponent::Events::Deleted
id: 1,
aggregate_id: 'user-123',
type: "Deleted",
created_at: 2022-01-01T00:30:00.000000,
updated_at: 2022-01-01T00:30:00.000000,
>
Add the following line to your Gemfile and run bundle install
:
gem 'eventsimple'
The eventsimple UI allows you to view and navigate event history. Add the following line to your routes.rb:
mount Eventsimple::Engine => '/eventsimple'
Setup an initializer in config/initializers/eventsimple.rb
:
Eventsimple.configure do |config|
# Optional: Register your dispatch classes here.
# Dispatch classes are used to register reactors to events.
# Reactors are used to implement side effects.
# See the Reactors section below for more details.
config.dispatchers = []
# Optional: Entity updates use optimistic locking to enforce sequential updates.
# Set the max number of times to retry on concurrency failures.
# Defaults to 2
config.max_concurrency_retries = 2
# Optional: the metadata column is used to store optional metadata associated with the event.
# The default implemention enforces a typed constraint on the metadata column
# with the following two properties: `actor_id` and `reason`
# Use a custom metadata class to override this behaviour.
# Defaults to `Eventsimple::Metadata`
config.metadata_klass = 'Eventsimple::Metadata'
# Optional: When using an ActiveJob adapter that writes to a different data store like redis,
# it is possible that the reactor is executed before the transaction persisting the event is committed. This can result in noisy errors when using processors like Sidekiq.
# Enable this option to retry the reactor inline if the event is not found.
# Defaults to false.
config.retry_reactor_on_record_not_found = true
end
If using Sidekiq
as a backend to ActiveJob
for async reactors, please add this setting to
config/application.rb
:
config.active_job.queue_adapter = :sidekiq
The jobs are pushed into a queue named eventsimple
, so please add it to your
sidekiq.yml
as follows:
:queues:
- [default, 10]
- [eventsimple, 10]
Generate a migration and add Eventsimple
to an existing ActiveRecord model.
bundle exec rails generate eventsimple:event User
This will result in the following changes:
# ActiveRecord Classes
class User < ApplicationRecord
extend Eventsimple::Entity
event_driven_by UserEvent, aggregate_id: :id
end
class UserEvent < ApplicationRecord
extend Eventsimple::Event
drives_events_for User, events_namespace: 'UserComponent::Events', aggregate_id: :id
end
# Change aggregate_id to the column that represents the unique primary key for your model.
# Data migration
create_table :user_events do |t|
# Change this to string if your aggregates primary key is a string type
t.bigint :aggregate_id, null: false, index: true
t.string :idempotency_key, null: true
t.string :type, null: false
t.json :data, null: false, default: {}
t.json :metadata, null: false, default: {}
t.timestamps
t.index :idempotency_key, unique: true
t.index :created_at
end
add_column :users, :lock_version, :integer
Adding lock_version to the model enables optimistic locking and protects against concurrent updates to stale versions of the model. Eventsimple will automatically retry on concurrency failures.
events_namespace
is an optional argument pointing to the directory where your events classes are defined. If you do not specify this argument, Eventsimple will store the full namespace of the event classes in the STI column.
Column | Description |
---|---|
aggregate_id | Stores the primary key of the entity. |
idempotency_key | Optional value which can be used to write events that have uniqueness constraints. |
type | Used by rails to implement Single Table inheritance. Stores the event class name. |
data | Stores the event payload |
metadata | Stores optional metadata associated with the event |
An example event:
module UserComponent
module Events
class Created < UserEvent
# Optional: Rails by default will use JSON serialization for the data attribute. Use Eventsimple::DataType to serialize/deserialize the data attribute using the Message subclass below which uses dry-struct.
attribute :data, Eventsimple::DataType.new(self)
class Message < Eventsimple::Message
attribute :canonical_id, DryTypes::Strict::String
attribute :email, DryTypes::Strict::String
end
# Optional: Context specific validations that can be extended onto the model on event creation.
validates_with UserForm
# Optional: Implement state machine checks to determine if the event is allowed to be written.
# Will raise Eventsimple::InvalidTransition on failure.
def can_apply?(user)
user.new_record?
end
# Optional: Update the state of your model based on data in the event payload.
def apply(user)
user.canonical_id = data.canonical_id
user.email = data.email
end
end
end
end
Write an event:
user = User.new
UserComponent::Events::Created.create(
user: user,
data: { canonical_id: 'user-123', email: '[email protected]' },
metadata: { actor_id: 'user-123' } # optional metadata
)
if user.errors.any?
# render user errors
else
# render success
end
The Eventsimple::Message class is a subclass of Dry::Struct. Some common options you can use are:
class Message < Eventsimple::Message
# attribute key is required and can not be nil
attribute :canonical_id, DryTypes::Strict::String
# attribute key is required but can be nil
attribute :required_key, DryTypes::Strict::String.optional
# attribute key is not required and can also be nil
attribute? :optional_key, DryTypes::Strict::String.optional
# use default value if attribute key is missing or if value is nil
# Note this is not the typical behaviour for dry-struct and is a customization in the Eventsimple::Message class.
attribute :default_key, DryTypes::Strict::String.default('default')
end
Callback to events can be defined as reactors in the dispatcher class.
Reactors may be async
or sync
, depending on the usecase.
Sync reactors are executed within the context of the event transaction block. They should only contain business logic that make additional database writes.
This is because executing writes to other data stores, e.g API call or writes to kafka/sqs, will result in the transaction being non-deterministic.
Async reactors are executed via ActiveJob. Eventsimple implements checks to enforce reliable eventually consistent behaviour.
Use Async reactors to kick off async workflows or writes to external data sources as a side effect of model updates.
Reactor example:
# Register your dispatch classes in config/initializers/eventsimple.rb.
Eventsimple.configure do |config|
config.dispatchers = %w[
UserComponent::Dispatcher
]
end
# Register reactors in the dispatcher class.
class UserComponent::Dispatcher < Eventsimple::EventDispatcher
# one to one
on UserComponent::Events::Created,
async: UserComponent::Reactors::Created::SendNotification
# or many to many
on [
UserComponent::Events::Locked,
UserComponent::Events::Unlocked
], sync: [
UserComponent::Reactors::Locking::UpdateLockCounter,
UserComponent::Reactors::Locking::UpdateLockMetrics
]
end
# Reactor classes accept the event as the only argument in the constructor
# and must define a `call` method
module UserComponent::Reactors::Created < Eventsimple::Reactor
class SendNotification
def call(event)
user = event.aggregate
# do something
end
end
end
For many use cases, async reactors are sufficient to handle workflows like making an API call or publishing to a message broker. However as reactors use ActiveJob, order is not guaranteed. For use cases requiring order, eventsimple provides an simple ordered outbox implementation.
The current implementation leverages a single advisory lock to guarantee write order. This will impact write throughput on the model. On a db.rg6.large Aurora instance for example, write throughput to the table is ~300 events per second.
For an explaination of why an advisory lock is required: https://github.com/pawelpacana/account-basics
Generate migration to setup the outbox cursor table. This table is used to track cursor positions.
bundle exec rails g eventsimple:outbox:install
Create a consummer and processor class for the outbox.
require 'eventsimple/outbox/consumer'
module UserComponent
class Consumer
extend Eventsimple::Outbox::Consumer
identitfier 'UserComponent::Consumer'
consumes_event UserEvent
processor EventProcessor, concurrency: 5
end
end
module UserComponent
class EventProcessor
def call(event)
Rails.logger.info("PROCESSING EVENT: #{event.id}")
end
end
end
Create a rake task to run the consumer
namespace :consumers do
desc 'Starts the user event outbox consumer'
task :user_events do
UserComponent::Consumer.start
end
end
To set the cursor position to the latest event:
Eventsimple::Outbox::Cursor.set('UserComponent::Consumer', UserEvent.last.id)
Some convenience methods are provided to help with common use cases.
#enable_writes!
Write access on entities is disabled by default outside of writes via events. Use this method to enable writes on an entity.
user = User.find_by(canonical_id: 'user-123')
user.enable_writes! do
user.reproject
user.save!
end
If you are using FactoryBot, you can add the following in your rails_helper.rb to enable writes on the entity:
FactoryBot.define do
after(:build) { |model| model.enable_writes! if model.class.ancestors.include?(Eventsimple::Entity::InstanceMethods) }
end
#reproject(at: nil)
Reproject an entity from events (rebuilds in memory but does not persist the entity).
module UserComponent
module Events
class Created < UserEvent
# ...
def apply(user)
user.email = data.email
# Changes the projection to start tracking a sign up timestamp.
user.signed_up_at = self.created_at
end
end
end
end
user = User.find_by(canonical_id: 'user-123')
user.reproject
user.changes # => { sign_up_at: [nil, "2022-01-01 00:00:00 UTC"] }
user.save!
Or reproject the model to inspect what it looked like at a particular point in time.
user = User.find_by(canonical_id: 'user-123')
user.reproject(at: 1.day.ago)
user.changes
#projection_matches_events?
Verify that a reprojection of the model matches it's current state.
user = User.find_by(canonical_id: 'user-123')
user.update(name: 'something_else')
user.projection_matches_events? => false
.ignored_for_projection
Skip properties on a model that are not managed by the event driven system. This will prevent a reset of the value in case of a reprojection. Useful if the model that is being event driven has some properties that are managed through other mechanics.
id
and lock_version
columns are always ignored by default.
class User
self.ignored_for_projection = %i[last_sign_in_at]
end
You can add conditional validations to the model as usual. For example to verify an email:
class User
EMAIL_REGEX = /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i
validates :email, presence: true, format: {
with: EMAIL_REGEX
}, if: :email_changed?
validate :allowed_emails, if: :email_changed?
def allowed_emails
return if EmailBlacklist.allowed?(email)
errors.add(:email, :invalid, value: email)
end
end
However, conditional validations tend to become more complex over time. An alternative approach can be to validate at the point when a handle is being updated.
Consider extending the model with a mixin, to apply the validation only when the email is actually being set.
module UpdateEmailForm
def self.extended(base)
base.class_eval do
EMAIL_REGEX = /\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i
validates :email, presence: true, format: {
with: EMAIL_REGEX
}
validate :allowed_emails, if: :email_changed?
def allowed_emails
return if EmailBlacklist.allowed?(email)
errors.add(:email, :invalid, value: email)
end
end
end
user = User.find_by(canonical_id: 'user-123').extend(UpdateEmailForm)
UserComponent::Events::EmailUpdated.create(user: user, data: { email: 'email' })
You can configure mixins in the event class itself, so that they are applied automatically at the point of event creating. The following example will extend the user with UpdateEmailForm on user create:
class UserComponent::Events::Created < UserEvent
...
validates_with UpdateEmailForm
...
end
New attributes should always be added as being either optional or required with a default value.
class UserComponent::Events::Created < Eventsimple::Message
attribute :new_attribute_1, DryTypes::Strict::String.default('default')
attribute? :new_attribute_2, DryTypes::Strict::String.optional
end
This guarantees compatibility with older events which do not contain this attribute. Old events will be loaded with the attribute being either nil or the new default.
To ensure old models are also in a consistent state, a data migration may be required to update the new attribute to the new default.
# migration file
add_column :users, :new_attribute_1, :string, default: 'new_default'
User.where(new_attribute_1: nil).find_in_batches do |batch|
batch.update_all(new_attribute_1: 'new_default')
end
Simply remove the attribute in code and any usage references. Any persisted data in old events will be ignored going forward, so a data migration is not explicitly needed.
However if this is something that is required, we can follow up code removal with a data migration like:
UserEvent.where(type: 'MyEventName').in_batches do |batch|
batch.update_all("data = data::jsonb - 'old_attribute_1' - 'old_attribute_2'")
end
- If an event and any properties it sets are no longer required, we can delete the Event, any code references and the model columns.
- The persisted events will be ignored going forward, so a data migration is not explicitly needed.
However if this is something that is required, we can follow up code removal with a data migration like:
# Remove all code references and then run the following migration:
UserEvent.where(type: 'MyEventName').in_batches do |batch|
batch.delete_all
end
The InvalidTransition error is raised when the can_apply?
method of an Event returns false
. In many cases this indicates a bug in the code, but in some cases it is expected behaviour.
An example scenario for not wanting to raise the error is when the can_apply?
method is primarily defending against redundant events from being written, perhaps when consuming messages from a message broker.
You can mute these errors by calling rescue_invalid_transition
on the event class. This will cause the event to be ignored and the model to remain unchanged. Optionally, you can pass a block to handle the error.
module FooComponent
module Events
class BarToTrue < FooEvent
rescue_invalid_transition do |error|
logger.info("Receive invalid transition error", error)
end
def can_apply?(foo)
!foo.bar
end
def apply(foo)
foo.bar = true
foo
end
end
end
end
Special credits to kickstarter and Eventide Project for much of the inspiration for this gem.