-
Notifications
You must be signed in to change notification settings - Fork 0
/
mailbox.rb
219 lines (171 loc) · 6.18 KB
/
mailbox.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# Author:: Joel Friedman and Patrick Farley
#
# This module is used to simplify concurrency
# in your application. JVM threads and JRetlang are
# used to provide Actor model style asynchronous
# message passing via method calls. Named channel based
# message passing is also supported via +register_channel+ and
# the <tt>:channel</tt> parameter on +mailslot+.
require 'rubygems'
require 'java'
require 'jretlang'
require File.dirname(__FILE__) + '/synchronized'
require File.dirname(__FILE__) + '/daemon_thread_factory'
module Mailbox
include Synchronized
# Register your jretlang channel as a named channel
def register_channel(channel_name, channel)
channel_registry = self.class.__channel_registry__
channel_registry.select { |k,v| v[:channel] == channel_name }.each do |k,v|
v[:replyable] ? __subscribe_with_single_reply__(channel, k) : __subscribe__(channel, k)
end
end
def verbose_output_to method_name
@__verbose_target__ = method_name
end
def dispose
__fiber__.dispose
end
class << self
# Used to tell +Mailbox+ that all +mailslot+
# methods should be run on the calling thread.
#
# <b>*** Intended for synchronous unit testing of concurrent apps***</b>
attr_reader :synchronous, :raise_exceptions_immediately
def synchronous= value
@synchronous = value
@raise_exceptions_immediately = false if value == false
end
def raise_exceptions_immediately= value
raise Exception.new('cannot set raise_exceptions_immediately when not in synchronous mode!') if value && !Mailbox.synchronous
@raise_exceptions_immediately = value
end
end
def __queue_depth__
__queue_counter__.get
end
def __thread_name__
@__thread_name__ ||= "#{self.class.name} #{self.object_id} Mailbox"
end
private
def self.included(base)
base.extend(Mailbox::ClassMethods)
end
def __subscribe__(channel, method)
channel.subscribe_on_fiber(__fiber__) do |*args|
self.send(method, *args)
end
end
def __subscribe_with_single_reply__(channel, method)
channel.subscribe(__fiber__) do |message|
message.reply(self.send(method))
end
end
def __synchronous_fiber__
executor = JRL::SynchronousDisposingExecutor.new
JRL::Fibers::ThreadFiber.new executor, "#{self.class.name} #{self.object_id} Mailbox synchronous", true
end
def __create_fiber__
return self.class.__fiber_factory__.create if self.class.__fiber_factory__
JRL::Fibers::ThreadFiber.new( JRL::RunnableExecutorImpl.new, __thread_name__, true )
end
def __started_fiber__
fiber = Mailbox.synchronous == true ? __synchronous_fiber__ : __create_fiber__
fiber.start
fiber
end
synchronized
def __queue_counter__
@__queue_counter__ ||= java.util.concurrent.atomic.AtomicInteger.new
end
synchronized
def __fiber__
@fiber ||= __started_fiber__
end
module ClassMethods
include Synchronized::ClassMethods
# Used within +Mailbox+ module
attr_accessor :__channel_registry__
# Notifies Mailbox that the next method added
# will be a +mailslot+. If <tt>:channel</tt> is provided
# the next method will become a subscriber on the channel.
# Channel based +mailslot+ methods are also made private
# to discourage direct invocation. <tt>:exception</tt>
# can be provided as the symbol for a method to handle
# any exceptions that occur in your +mailslot+. This
# method will be passed the exception that was raised
def mailslot(params={})
@next_channel_name = params[:channel]
@replyable = params[:replyable]
@timeout = params[:timeout].nil? ? -1 : params[:timeout] * 1000
@exception = params[:exception]
@mailslot = true
end
def mailbox_thread_pool_size(count)
@__fiber_factory__ = JRL::Fibers::PoolFiberFactory.new(JRL::Concurrent::Executors.new_fixed_thread_pool(count, DaemonThreadFactory.new))
end
def __fiber_factory__
@__fiber_factory__ ||= nil
end
private
def method_added(method_name, &block)
return super unless __mailslot__ == true
@mailslot = false
if @next_channel_name.nil?
__setup_on_fiber__(method_name, @replyable, @timeout)
else
__setup_on_channel__(method_name, @replyable)
end
super
end
def __setup_on_fiber__(method_name, replyable, timeout)
return super if __is_adding_mailbox_to_method__
alias_method :"__#{method_name}__", method_name
@is_adding_mailbox_to_method = true
exception_method, @exception = @exception, nil
define_method method_name do |*args|
self.send(@__verbose_target__, "enqueued #{method_name}") if defined? @__verbose_target__
__queue_counter__.get_and_increment
result = nil
latch = JRL::Concurrent::CountDownLatch.new(1) if replyable
__fiber__.execute do
begin
self.send(@__verbose_target__, "dequeued #{method_name}") if defined? @__verbose_target__
__queue_counter__.get_and_decrement
result = self.send(:"__#{method_name}__", *args )
rescue Exception => ex
raise if exception_method.nil? || Mailbox.raise_exceptions_immediately
self.send(:"#{exception_method}", ex)
ensure
latch.count_down if replyable
end
end
is_timeout = false
if replyable
if timeout == -1
latch.await
else
is_timeout = !(latch.await timeout, JRL::Concurrent::TimeUnit::MILLISECONDS)
end
end
raise Exception.new("#{method_name} message timeout after #{timeout/1000} seconds") if is_timeout
return result
end
@replyable = false
@is_adding_mailbox_to_method = false
end
def __setup_on_channel__(method_name, replyable)
private method_name
@__channel_registry__ ||= {}
__channel_registry__[method_name] = { :channel => @next_channel_name, :replyable => replyable }
@replyable = nil
@next_channel_name = nil
end
def __mailslot__
@mailslot ||= false
end
def __is_adding_mailbox_to_method__
@is_adding_mailbox_to_method ||= false
end
end
end