-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Subscriptions redesigned #665
Changes from 2 commits
be31df8
39bc7ad
8ff0e36
972be0a
b693e49
33f34cb
2c9e3aa
554cdbe
4f76f27
e18ceb3
c8dedab
29e62b4
acc8184
2bd6c6d
9aefcdf
636cafb
ff945e7
cec639c
b82767b
570590f
7140093
25dd989
4ba25e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
module RubyEventStore | ||
class GlobalSubscription | ||
def initialize(subscriber, store) | ||
@subscriber = subscriber | ||
@store = store | ||
@store.add(self) | ||
end | ||
|
||
def call(event) | ||
(Class === subscriber ? subscriber.new : subscriber).call(event) | ||
end | ||
|
||
def unsubscribe | ||
@store.delete(self) | ||
end | ||
|
||
def inspect | ||
<<~EOS.strip | ||
#<#{self.class}:0x#{__id__.to_s(16)}> | ||
- subscriber: #{subscriber.inspect} | ||
EOS | ||
end | ||
|
||
# Two subscriptions are equal if: | ||
# * they are of the same class | ||
# * have identical subscriber (verified with eql? method) | ||
# | ||
# @param other [GlobalSubscription, Object] object to compare | ||
# | ||
# Event equality ignores metadata! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Event... copy paste? |
||
# @return [TrueClass, FalseClass] | ||
def ==(other) | ||
other.instance_of?(self.class) && | ||
other.subscriber.eql?(subscriber) | ||
end | ||
|
||
# @private | ||
BIG_VALUE = 0b101000101111010111101101010011000101000100000000011011011100110 | ||
|
||
# Generates a Fixnum hash value for this object. This function | ||
# have the property that a.eql?(b) implies a.hash == b.hash. | ||
# | ||
# The hash value is used along with eql? by the Hash class to | ||
# determine if two objects reference the same hash key. | ||
# | ||
# This hash is based on | ||
# * class | ||
# * subscriber object_id | ||
def hash | ||
# We don't use metadata because == does not use metadata | ||
[ | ||
self.class, | ||
subscriber.object_id, | ||
].hash ^ BIG_VALUE | ||
end | ||
|
||
attr_reader :subscriber | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
module RubyEventStore | ||
class Subscription | ||
def initialize(subscriber, event_types, store) | ||
@subscriber = subscriber | ||
@store = store | ||
@event_types = event_types | ||
event_types.each{ |type| @store.add(type, self) } | ||
end | ||
|
||
def call(event) | ||
(Class === subscriber ? subscriber.new : subscriber).call(event) | ||
end | ||
|
||
def unsubscribe | ||
event_types.each{ |type| @store.delete(type, self) } | ||
end | ||
|
||
def inspect | ||
<<~EOS.strip | ||
#<#{self.class}:0x#{__id__.to_s(16)}> | ||
- event types: #{event_types.inspect} | ||
- subscriber: #{subscriber.inspect} | ||
EOS | ||
end | ||
|
||
# Two subscriptions are equal if: | ||
# * they are of the same class | ||
# * have identical event types | ||
# * have identical subscriber (verified with eql? method) | ||
# | ||
# @param other [Subscription, Object] object to compare | ||
# | ||
# Event equality ignores metadata! | ||
# @return [TrueClass, FalseClass] | ||
def ==(other) | ||
other.instance_of?(self.class) && | ||
other.event_types.eql?(event_types) && | ||
other.subscriber.eql?(subscriber) | ||
end | ||
|
||
# @private | ||
BIG_VALUE = 0b11010000100100100101110000000010011110000110101011010001001110 | ||
|
||
# Generates a Fixnum hash value for this object. This function | ||
# have the property that a.eql?(b) implies a.hash == b.hash. | ||
# | ||
# The hash value is used along with eql? by the Hash class to | ||
# determine if two objects reference the same hash key. | ||
# | ||
# This hash is based on | ||
# * class | ||
# * event types | ||
# * subscriber objecy id | ||
def hash | ||
# We don't use metadata because == does not use metadata | ||
[ | ||
self.class, | ||
event_types, | ||
subscriber.object_id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why |
||
].hash ^ BIG_VALUE | ||
end | ||
|
||
attr_reader :subscriber, :event_types | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,10 @@ | |
|
||
module RubyEventStore | ||
class Subscriptions | ||
def initialize | ||
@local = LocalSubscriptions.new | ||
@global = GlobalSubscriptions.new | ||
@thread = ThreadSubscriptions.new | ||
def initialize(local_store: Store, global_store: GlobalStore) | ||
@local = LocalSubscriptions.new(local_store.new) | ||
@global = GlobalSubscriptions.new(global_store.new) | ||
@thread = ThreadSubscriptions.new(local_store, global_store) | ||
end | ||
|
||
def add_subscription(subscriber, event_types) | ||
|
@@ -33,77 +33,95 @@ def all_for(event_type) | |
private | ||
attr_reader :local, :global, :thread | ||
|
||
class ThreadSubscriptions | ||
class Store | ||
def initialize | ||
@local = ThreadLocalSubscriptions.new | ||
@global = ThreadGlobalSubscriptions.new | ||
end | ||
attr_reader :local, :global | ||
|
||
def all_for(event_type) | ||
[global, local].map{|r| r.all_for(event_type)}.reduce(&:+) | ||
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't intend to store same subscription multiple times, I'd rather see |
||
end | ||
end | ||
|
||
class LocalSubscriptions | ||
def initialize | ||
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] } | ||
def add(type, subscription) | ||
@subscriptions[type.to_s] << subscription | ||
end | ||
|
||
def add(subscription, event_types) | ||
event_types.each{ |type| @subscriptions[type.to_s] << subscription } | ||
->() {event_types.each{ |type| @subscriptions.fetch(type.to_s).delete(subscription) } } | ||
def delete(type, subscription) | ||
@subscriptions.fetch(type.to_s).delete(subscription) | ||
end | ||
|
||
def all_for(event_type) | ||
@subscriptions[event_type] | ||
end | ||
|
||
def value | ||
self | ||
end | ||
end | ||
|
||
class GlobalSubscriptions | ||
class GlobalStore | ||
def initialize | ||
@subscriptions = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it make more sense to have a |
||
end | ||
|
||
def add(subscription) | ||
@subscriptions << subscription | ||
->() { @subscriptions.delete(subscription) } | ||
end | ||
|
||
def all_for(_event_type) | ||
def delete(subscription) | ||
@subscriptions.delete(subscription) | ||
end | ||
|
||
def all | ||
@subscriptions | ||
end | ||
|
||
def value | ||
self | ||
end | ||
end | ||
|
||
class ThreadLocalSubscriptions | ||
def initialize | ||
@subscriptions = Concurrent::ThreadLocalVar.new do | ||
Hash.new {|hsh, key| hsh[key] = [] } | ||
end | ||
class ThreadSubscriptions | ||
def initialize(local_store, global_store) | ||
@local = LocalSubscriptions.new(build_store(local_store)) | ||
@global = GlobalSubscriptions.new(build_store(global_store)) | ||
end | ||
attr_reader :local, :global | ||
|
||
def all_for(event_type) | ||
[global, local].map{|r| r.all_for(event_type)}.reduce(&:+) | ||
end | ||
|
||
private | ||
|
||
def build_store(klass) | ||
var = Concurrent::ThreadLocalVar.new(klass.new) | ||
var.value = klass.new | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var | ||
end | ||
end | ||
|
||
class LocalSubscriptions | ||
def initialize(store) | ||
@store = store | ||
end | ||
|
||
def add(subscription, event_types) | ||
event_types.each{ |type| @subscriptions.value[type.to_s] << subscription } | ||
->() {event_types.each{ |type| @subscriptions.value.fetch(type.to_s).delete(subscription) } } | ||
Subscription.new(subscription, event_types, @store.value) | ||
end | ||
|
||
def all_for(event_type) | ||
@subscriptions.value[event_type] | ||
@store.value.all_for(event_type) | ||
end | ||
end | ||
|
||
class ThreadGlobalSubscriptions | ||
def initialize | ||
@subscriptions = Concurrent::ThreadLocalVar.new([]) | ||
class GlobalSubscriptions | ||
def initialize(store) | ||
@store = store | ||
end | ||
|
||
def add(subscription) | ||
@subscriptions.value += [subscription] | ||
->() { @subscriptions.value -= [subscription] } | ||
GlobalSubscription.new(subscription, @store.value) | ||
end | ||
|
||
def all_for(_event_type) | ||
@subscriptions.value | ||
@store.value.all | ||
end | ||
end | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with
#call
aliased to#unsubscribe
this would not be a breaking change, but#call
is already taken...