Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions redesigned #665

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
be31df8
Redefine subscriptions - breaking changes
mpraglowski Aug 9, 2019
39bc7ad
Refactor subscriptions
mpraglowski Aug 9, 2019
8ff0e36
Subscription storage is optional
mpraglowski Aug 9, 2019
972be0a
All dispatchers & schedulers receive now subscription object instead …
mpraglowski Aug 10, 2019
b693e49
Single subscriptions store is enough
mpraglowski Aug 16, 2019
33f34cb
Subscription global or assigned to specific type - it's just an imple…
mpraglowski Aug 16, 2019
2c9e3aa
Minor improvement & fixes
mpraglowski Aug 16, 2019
554cdbe
Docs & lint/specs for subscriptions store
mpraglowski Aug 16, 2019
4f76f27
Docs & specs for Subscription object
mpraglowski Aug 16, 2019
e18ceb3
No more local & global subscriptions, there are main subscription sto…
mpraglowski Aug 16, 2019
c8dedab
Do not need that anymore
mpraglowski Aug 16, 2019
29e62b4
Simplify subscriptions store API
mpraglowski Aug 16, 2019
acc8184
Must be excluded from mutation tests as test where it could be trigge…
mpraglowski Aug 16, 2019
2bd6c6d
Kill mutant
mpraglowski Aug 16, 2019
9aefcdf
Typo fix
mpraglowski Aug 16, 2019
636cafb
It does not make sense to require subscription store to be chainable
mpraglowski Aug 16, 2019
ff945e7
Fix subscriptions store lint namespace
mpraglowski Aug 16, 2019
cec639c
Add instrumented subscriptions store
mpraglowski Aug 16, 2019
b82767b
Use store factory for thread store instead of just a class
mpraglowski Aug 16, 2019
570590f
Again, keep Ruby 2.4 compatibility for now
mpraglowski Sep 6, 2019
7140093
Kill mutations
mpraglowski Sep 6, 2019
25dd989
Add frozen_string_literal: true in all changed files
mpraglowski Sep 6, 2019
4ba25e3
Document subscriptions provider class
mpraglowski Sep 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ruby_event_store/lib/ruby_event_store.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require 'ruby_event_store/dispatcher'
require 'ruby_event_store/subscription'
require 'ruby_event_store/global_subscription'
require 'ruby_event_store/subscriptions'
require 'ruby_event_store/broker'
require 'ruby_event_store/in_memory_repository'
Expand Down
2 changes: 1 addition & 1 deletion ruby_event_store/lib/ruby_event_store/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def call
unsubs += add_thread_subscribers
@block.call
ensure
unsubs.each(&:call) if unsubs
unsubs.each(&:unsubscribe) if unsubs
end

private
Expand Down
5 changes: 2 additions & 3 deletions ruby_event_store/lib/ruby_event_store/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

module RubyEventStore
class Dispatcher
def call(subscriber, event, _)
subscriber = subscriber.new if Class === subscriber
subscriber.call(event)
def call(subscription, event, _)
subscription.call(event)
end

def verify(subscriber)
Expand Down
59 changes: 59 additions & 0 deletions ruby_event_store/lib/ruby_event_store/global_subscription.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
module RubyEventStore
class GlobalSubscription
def initialize(subscriber, store)
@subscriber = subscriber
@store = store
@store.add(self)
end

def call(event)
(Class === subscriber ? subscriber.new : subscriber).call(event)
end

def unsubscribe
@store.delete(self)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with #call aliased to #unsubscribe this would not be a breaking change, but #call is already taken...


def inspect
<<~EOS.strip
#<#{self.class}:0x#{__id__.to_s(16)}>
- subscriber: #{subscriber.inspect}
EOS
end

# Two subscriptions are equal if:
# * they are of the same class
# * have identical subscriber (verified with eql? method)
#
# @param other [GlobalSubscription, Object] object to compare
#
# Event equality ignores metadata!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event... copy paste?

# @return [TrueClass, FalseClass]
def ==(other)
other.instance_of?(self.class) &&
other.subscriber.eql?(subscriber)
end

# @private
BIG_VALUE = 0b101000101111010111101101010011000101000100000000011011011100110

# Generates a Fixnum hash value for this object. This function
# have the property that a.eql?(b) implies a.hash == b.hash.
#
# The hash value is used along with eql? by the Hash class to
# determine if two objects reference the same hash key.
#
# This hash is based on
# * class
# * subscriber object_id
def hash
# We don't use metadata because == does not use metadata
[
self.class,
subscriber.object_id,
].hash ^ BIG_VALUE
end

attr_reader :subscriber
end
end
47 changes: 23 additions & 24 deletions ruby_event_store/lib/ruby_event_store/spec/subscriptions_lint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ def call(event)
subscriptions.add_subscription(handler, [Test1DomainEvent, Test3DomainEvent])
subscriptions.add_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_global_subscription(global_handler)

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([another_handler, global_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([handler, global_handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler, global_handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([another_handler, global_handler])
expect(subscriptions.all_for('Test3DomainEvent').map(&:subscriber)).to eq([handler, global_handler])
end

it 'returns subscribed thread handlers' do
Expand All @@ -40,9 +39,9 @@ def call(event)
subscriptions.add_thread_subscription(another_handler, [Test2DomainEvent])
subscriptions.add_thread_global_subscription(global_handler)

expect(subscriptions.all_for('Test1DomainEvent')).to eq([global_handler, handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([global_handler, another_handler])
expect(subscriptions.all_for('Test3DomainEvent')).to eq([global_handler, handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([global_handler, handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([global_handler, another_handler])
expect(subscriptions.all_for('Test3DomainEvent').map(&:subscriber)).to eq([global_handler, handler])
end

it 'returns lambda as an output of global subscribe methods' do
Expand All @@ -60,43 +59,43 @@ def call(event)
it 'revokes global subscription' do
handler = TestHandler.new

revoke = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes subscription' do
handler = TestHandler.new

revoke = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes thread global subscription' do
handler = TestHandler.new

revoke = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_thread_global_subscription(handler)
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end

it 'revokes thread subscription' do
handler = TestHandler.new

revoke = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([handler])
revoke.()
subscription = subscriptions.add_thread_subscription(handler, [Test1DomainEvent, Test2DomainEvent])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler])
expect(subscriptions.all_for('Test2DomainEvent').map(&:subscriber)).to eq([handler])
subscription.unsubscribe
expect(subscriptions.all_for('Test1DomainEvent')).to eq([])
expect(subscriptions.all_for('Test2DomainEvent')).to eq([])
end
Expand All @@ -106,6 +105,6 @@ def call(event)
subscriptions.add_subscription(handler, ["Test1DomainEvent"])
subscriptions.add_thread_subscription(handler, ["Test1DomainEvent"])

expect(subscriptions.all_for('Test1DomainEvent')).to eq([handler, handler])
expect(subscriptions.all_for('Test1DomainEvent').map(&:subscriber)).to eq([handler, handler])
end
end
65 changes: 65 additions & 0 deletions ruby_event_store/lib/ruby_event_store/subscription.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
module RubyEventStore
class Subscription
def initialize(subscriber, event_types, store)
@subscriber = subscriber
@store = store
@event_types = event_types
event_types.each{ |type| @store.add(type, self) }
end

def call(event)
(Class === subscriber ? subscriber.new : subscriber).call(event)
end

def unsubscribe
event_types.each{ |type| @store.delete(type, self) }
end

def inspect
<<~EOS.strip
#<#{self.class}:0x#{__id__.to_s(16)}>
- event types: #{event_types.inspect}
- subscriber: #{subscriber.inspect}
EOS
end

# Two subscriptions are equal if:
# * they are of the same class
# * have identical event types
# * have identical subscriber (verified with eql? method)
#
# @param other [Subscription, Object] object to compare
#
# Event equality ignores metadata!
# @return [TrueClass, FalseClass]
def ==(other)
other.instance_of?(self.class) &&
other.event_types.eql?(event_types) &&
other.subscriber.eql?(subscriber)
end

# @private
BIG_VALUE = 0b11010000100100100101110000000010011110000110101011010001001110

# Generates a Fixnum hash value for this object. This function
# have the property that a.eql?(b) implies a.hash == b.hash.
#
# The hash value is used along with eql? by the Hash class to
# determine if two objects reference the same hash key.
#
# This hash is based on
# * class
# * event types
# * subscriber objecy id
def hash
# We don't use metadata because == does not use metadata
[
self.class,
event_types,
subscriber.object_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why object_id?

].hash ^ BIG_VALUE
end

attr_reader :subscriber, :event_types
end
end
70 changes: 51 additions & 19 deletions ruby_event_store/lib/ruby_event_store/subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class Subscriptions
def initialize
@local = LocalSubscriptions.new
@global = GlobalSubscriptions.new
@thread = ThreadSubscriptions.new
@thread = ThreadSubscriptions.new
end

def add_subscription(subscriber, event_types)
Expand All @@ -33,6 +33,42 @@ def all_for(event_type)
private
attr_reader :local, :global, :thread

class Store
def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't intend to store same subscription multiple times, I'd rather see Set here

end

def add(type, subscription)
@subscriptions[type.to_s] << subscription
end

def delete(type, subscription)
@subscriptions.fetch(type.to_s).delete(subscription)
end

def all_for(event_type)
@subscriptions[event_type]
end
end

class GlobalStore
def initialize
@subscriptions = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense to have a Set here?

end

def add(subscription)
@subscriptions << subscription
end

def delete(subscription)
@subscriptions.delete(subscription)
end

def all
@subscriptions
end
end

class ThreadSubscriptions
def initialize
@local = ThreadLocalSubscriptions.new
Expand All @@ -47,63 +83,59 @@ def all_for(event_type)

class LocalSubscriptions
def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
@store = Store.new
end

def add(subscription, event_types)
event_types.each{ |type| @subscriptions[type.to_s] << subscription }
->() {event_types.each{ |type| @subscriptions.fetch(type.to_s).delete(subscription) } }
Subscription.new(subscription, event_types, @store)
end

def all_for(event_type)
@subscriptions[event_type]
@store.all_for(event_type)
end
end

class GlobalSubscriptions
def initialize
@subscriptions = []
@store = GlobalStore.new
end

def add(subscription)
@subscriptions << subscription
->() { @subscriptions.delete(subscription) }
GlobalSubscription.new(subscription, @store)
end

def all_for(_event_type)
@subscriptions
@store.all
end
end

class ThreadLocalSubscriptions
def initialize
@subscriptions = Concurrent::ThreadLocalVar.new do
Hash.new {|hsh, key| hsh[key] = [] }
end
@store = Concurrent::ThreadLocalVar.new(Store.new)
@store.value = Store.new
end

def add(subscription, event_types)
event_types.each{ |type| @subscriptions.value[type.to_s] << subscription }
->() {event_types.each{ |type| @subscriptions.value.fetch(type.to_s).delete(subscription) } }
Subscription.new(subscription, event_types, @store.value)
end

def all_for(event_type)
@subscriptions.value[event_type]
@store.value.all_for(event_type)
end
end

class ThreadGlobalSubscriptions
def initialize
@subscriptions = Concurrent::ThreadLocalVar.new([])
@store = Concurrent::ThreadLocalVar.new(GlobalStore.new)
@store.value = GlobalStore.new
end

def add(subscription)
@subscriptions.value += [subscription]
->() { @subscriptions.value -= [subscription] }
GlobalSubscription.new(subscription, @store.value)
end

def all_for(_event_type)
@subscriptions.value
@store.value.all
end
end
end
Expand Down
Loading