Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions redesigned #665

Closed
wants to merge 23 commits into from
Closed

Conversation

mpraglowski
Copy link
Member

No description provided.

Introducing 2 new classes, Subscription & GlobalSubscription.
When new subscription is added an instance of one of tis classes will
be a result of add method instead of unsubscribe proc - this is breaking
change.

Unsubscribe now could be handled by calling on returned subscription
object an unsubscribe method.

Also dispatcher is affected here. Instead of subscriber (object or class)
it will receive as a first argument an subscription object. If
subscriber is a class a subscription object is now responsible for
instantialize new subscriber object (that have been responsibility of a
dispatcher).
Remove thread specific subscriptions, replace with more generic
LocalSubscriptions & GlobalSubscription with concurent subscription
store. Allow to pass local & global store classes to Subscriptions.
…re & thread subscription (temporary subscriptions) stores only.
@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

I like this direction. What issues are you addressing with this?

@mpraglowski
Copy link
Member Author

I've started with instrumentation of subscriptions. In current codebase it would be easy to just add instrumentation when you add subscription but unsubscribe is not that easy. Later I've have other ideas, so current implementation aims at:

  • allow to instrument subscribe & unsubscribe
  • define Subscription object to encapsulate all operations on subscription (no more lambda to unsubscribe, just find your subscription & call unsubscribe method)
  • easy access to information about subscriptions
  • allow to provide your own subscription store (will allow to implement persisted subscriptions)

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

Ok thanks!

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

We created a class-based DSL to make it easy to create event handlers for read models. We have an async option that builds a Sidekiq worker class with the given proc. (It uses a naming convention for the Sidekiq worker process to resolve the proper worker class.)

I wonder if this new object model might make it possible to enable some async convention as well? The main challenge is to identify the correct proc between the web and worker processes.

Thoughts?

Example DSL we use:

class UserReadModelDenormalizer < EventHandler
  on Events::UserRegistered, async: true do |event|
    Mailers::RegistrationConfirmation.deliver(event.data[:email])
  end
end

This would create a class such as UserReadModelDenormalizer::Worker_HandleUserRegistration that gets enqueued and picked up by Sidekiq.

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

I should note, to make sure naming is consistent, I think we actually explicitly specify a name for the worker currently using async: 'HandleUserRegistration' which creates UserReadModelDenormalizer::Worker_HandleUserRegistration

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

cc @paneq @pawelpacana

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

@paneq @pawelpacana I saw something about "contrib" flying by a little while ago. I have developed various DSLs to organize things in our projects and wonder if some of that might be good to go in contrib? It's mostly base classes with DSLs similar to what I commented with here, to define handlers and also commands and some other things. Would that be a fit for contrib?

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

One more thought - would RailsEventStore ever use things from "contrib", or is contrib a superset of Rails?

@joelvh
Copy link
Contributor

joelvh commented Aug 16, 2019

Ah, I was thinking of #641

@mpraglowski
Copy link
Member Author

@joelvh

I saw something about "contrib" flying by a little while ago.

Contrib is free for all to experiment - not released yet, don't know if it should be as a single gem or each idea as a separate gem under common namespace - to be decided yet. Best ideas could be incorporated into RES.

I have developed various DSLs to organize things in our projects and wonder if some of that might be good to go in contrib?

Sure! It will be good to have it there. Fee free to create new folder and gem inside contrib - similar to my transformations gem https://github.com/RailsEventStore/rails_event_store/tree/master/contrib/transformations

FrozenError has been introduced in Ruby 2.5

def unsubscribe
@store.delete(self)
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with #call aliased to #unsubscribe this would not be a breaking change, but #call is already taken...

#
# @param other [GlobalSubscription, Object] object to compare
#
# Event equality ignores metadata!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event... copy paste?

[
self.class,
event_types,
subscriber.object_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why object_id?


class GlobalStore
def initialize
@subscriptions = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense to have a Set here?

@@ -33,6 +33,42 @@ def all_for(event_type)
private
attr_reader :local, :global, :thread

class Store
def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't intend to store same subscription multiple times, I'd rather see Set here

@store.all
def build_store(klass)
var = Concurrent::ThreadLocalVar.new(klass.new)
var.value = klass.new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is var.value= call needed right after constructing ThreadLocalVar?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

v = ThreadLocalVar.new(14)
v.value #=> 14
v.value = 2
v.value #=> 2

@@ -1,17 +1,17 @@
module RubyEventStore
class GlobalSubscription
def initialize(subscriber, store)
def initialize(subscriber, store: nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't yet see the purpose, checking next commits.

@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
end

def add(subscription, type = nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end

def add(subscription, type = nil)
@subscriptions[type.to_s] << subscription
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to_s? Aren't we expecting Event#type after all?

@@ -4,22 +4,18 @@ def initialize
@subscriptions = Hash.new {|hsh, key| hsh[key] = [] }
end

def add(subscription, type = nil)
def add(subscription, type = GLOBAL_SUBSCRIPTION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

I'd rather call it ANY_EVENT

@@ -1,6 +1,8 @@
module RubyEventStore
class Subscription
def initialize(subscriber, event_types = [GLOBAL_SUBSCRIPTION], store: nil)
raise SubscriberNotExist, "subscriber must exists" unless subscriber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🇬🇧

@@ -15,6 +17,10 @@ def unsubscribe
event_types.each{ |type| @store.delete(self, type) } if @store
end

def global?
event_types.include?(GLOBAL_SUBSCRIPTION)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it always event_types == [GLOBAL_SUBSCRIPTION] or do we allow a Subscription to be both global and for some particular type?


specify do
expect(subscription_store.all).to eq []
expect(subscription_store.all_for(FirstEvent)).to eq []
Copy link
Member

@mostlyobvious mostlyobvious Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly ok but since we allow different event instances I think I'd prefer to pass #type or at least having these shared examples run on different Event implementation

def first_type
  FirstEvent.new.type
end

def initialize(subscriber, event_types = [GLOBAL_SUBSCRIPTION], store: nil)
raise SubscriberNotExist, "subscriber must exists" unless subscriber
raise SubscriberNotExist, 'subscriber must exists' unless subscriber
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🇬🇧

@@ -41,7 +65,7 @@ def inspect
def ==(other)
other.instance_of?(self.class) &&
other.event_types.eql?(event_types) &&
other.subscriber.eql?(subscriber)
other.subscriber.equal?(subscriber)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want to check if it is the same instance in memory?

@mostlyobvious
Copy link
Member

Please consider #135

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants