Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bulk enqueue with differing class and options #420

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
sudo apt-get -yqq install libpq-dev postgresql-client
createdb que-test
gem install bundler
gem install bundler --version '~> 2.4.22'
bundle install --jobs 4 --retry 3
USE_RAILS=true bundle exec rake test
bundle exec rake test
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RUN apt-get update \
&& apt-get install -y libpq-dev \
&& rm -rf /var/lib/apt/lists/*

ENV RUBY_BUNDLER_VERSION 2.3.7
ENV RUBY_BUNDLER_VERSION 2.4.22
RUN gem install bundler -v $RUBY_BUNDLER_VERSION

ENV BUNDLE_PATH /usr/local/bundle
Expand Down
4 changes: 1 addition & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -849,13 +849,11 @@ Que.bulk_enqueue do
end
```

The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query.
The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. `job_options` may be provided to `.bulk_enqueue` as defaults for the entire block. Alternatively, `job_options` may be individually provided to `.enqueue` and will take priority over block options.

Limitations:

- ActiveJob is not supported
- All jobs must use the same job class
- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`)
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll

Expand Down
153 changes: 78 additions & 75 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ class Job

SQL[:bulk_insert_jobs] =
%{
WITH args_and_kwargs as (
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
coalesce($3, now())::timestamptz,
$4::text,
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
coalesce(queue, 'default')::text,
ZimbiX marked this conversation as resolved.
Show resolved Hide resolved
coalesce(priority, 100)::smallint,
coalesce(run_at, now())::timestamptz,
job_class::text,
coalesce(args, '[]')::jsonb,
coalesce(kwargs, '{}')::jsonb,
coalesce(data, '{}')::jsonb,
#{Que.job_schema_version}
FROM args_and_kwargs
FROM json_populate_recordset(null::que_jobs, $1)
RETURNING *
}

Expand Down Expand Up @@ -82,6 +79,9 @@ def enqueue(*args)

job_options = kwargs.delete(:job_options) || {}

job_class = job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job")

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
Expand All @@ -94,28 +94,40 @@ def enqueue(*args)
end
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if Thread.current[:que_jobs_to_bulk_insert]
# Don't resolve class settings during `.enqueue`, only resolve them
# during `._bulk_enqueue_insert` so they can be overwritten by specifying
# them in `.bulk_enqueue`.
attrs = {
queue: job_options[:queue],
priority: job_options[:priority],
run_at: job_options[:run_at],
job_class: job_class == 'Que::Job' ? nil : job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] && { tags: job_options[:tags] },
klass: self,
}

if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
new({})
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
return new({})
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
job_class: job_class,
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs.merge!(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
Expand Down Expand Up @@ -144,16 +156,13 @@ def bulk_enqueue(job_options: {}, notify: false)
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
return [] if jobs_attrs.empty?
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
_bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify)
ensure
Thread.current[:que_jobs_to_bulk_insert] = nil
end

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }
def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false)
raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) }

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
Expand All @@ -167,49 +176,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
end
end

args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
args_and_kwargs.merge(
args: args_and_kwargs.fetch(:args, []),
kwargs: args_and_kwargs.fetch(:kwargs, {}),
)
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args_and_kwargs_array: args_and_kwargs_array,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
args_and_kwargs_array.map do |args_and_kwargs|
_run_attrs(
attrs.merge(
args: args_and_kwargs.fetch(:args),
kwargs: args_and_kwargs.fetch(:kwargs),
),
jobs_attrs = jobs_attrs.map do |attrs|
klass = attrs[:klass] || self

attrs = {
queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue,
priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority),
run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at),
job_class: attrs[:job_class] || job_options[:job_class] || klass.name,
args: attrs[:args] || [],
kwargs: attrs[:kwargs] || {},
data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}),
klass: klass
}

if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously)
klass._run_attrs(
attrs.reject { |k| k == :klass }.merge(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
)
)
nil
else
attrs
end
else
attrs.merge!(
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
data: Que.serialize_json(attrs[:data]),
)
values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
)
end
values_array.map(&method(:new))
end
end.compact

values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
[Que.serialize_json(jobs_attrs.map { |attrs| attrs.reject { |k| k == :klass } })]
)
end
values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) }
end

def run(*args)
Expand Down Expand Up @@ -237,7 +240,7 @@ def resolve_que_setting(setting, *args)
end
end

private
protected

def _run_attrs(attrs)
attrs[:error_count] = 0
Expand Down
Loading
Loading