Class: Chore::Worker
Overview
Worker is one of the core classes in Chore. It's responsible for most
of the logic relating to actually processing a job. A given worker will
take an amount of work
and then process it all until either
the worker is told to stop, or the work is completed. Worker is completely
agnostic to the WorkerStrategy that it was called from.
Instance Attribute Summary collapse
-
#options ⇒ Object
Returns the value of attribute options.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#work ⇒ Object
readonly
Returns the value of attribute work.
Class Method Summary collapse
-
.start(work, opts = {}) ⇒ Object
:nodoc:.
Instance Method Summary collapse
- #duplicate_work?(item) ⇒ Boolean
-
#expired? ⇒ Boolean
Whether this worker has existed for longer than it's allowed to.
-
#expires_at ⇒ Object
The time at which this worker expires.
-
#initialize(work = [], opts = {}) ⇒ Worker
constructor
Create a Worker.
-
#start ⇒ Object
The workhorse.
-
#stop! ⇒ Object
Tell the worker to stop after it completes the current job.
Methods included from Util
Constructor Details
#initialize(work = [], opts = {}) ⇒ Worker
Create a Worker. Give it an array of work (or single item), and
opts
. Currently, the only option supported by Worker is
:payload_handler
which contains helpers for decoding the item
and finding the correct payload class
26 27 28 29 30 31 32 |
# File 'lib/chore/worker.rb', line 26 def initialize(work=[],opts={}) @stopping = false @started_at = Time.now @work = work @work = [work] unless work.kind_of?(Array) self. = {:payload_handler => Chore.config.payload_handler}.merge(opts) end |
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
15 16 17 |
# File 'lib/chore/worker.rb', line 15 def @options end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
17 18 19 |
# File 'lib/chore/worker.rb', line 17 def started_at @started_at end |
#work ⇒ Object (readonly)
Returns the value of attribute work.
16 17 18 |
# File 'lib/chore/worker.rb', line 16 def work @work end |
Class Method Details
.start(work, opts = {}) ⇒ Object
:nodoc:
19 20 21 |
# File 'lib/chore/worker.rb', line 19 def self.start(work, opts={}) #:nodoc: self.new(work, opts).start end |
Instance Method Details
#duplicate_work?(item) ⇒ Boolean
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/chore/worker.rb', line 45 def duplicate_work?(item) # if we've got a duplicate, remove the message from the queue by not actually running and also not reporting any errors payload = [:payload_handler].payload(item.) # if we're hitting the custom dedupe key, we want to remove this message from the queue if item.klass.has_dedupe_lambda? dedupe_key = item.klass.dedupe_key(*payload) if dedupe_key.nil? || dedupe_key.strip.empty? # if the dedupe key is nil, don't continue with the rest of the dedupe lambda logic Chore.logger.info { "#{item.klass} dedupe key nil, skipping memcached lookup." } return false end if item.consumer.(dedupe_key, item.klass, item.queue_timeout) Chore.logger.info { "Found and deleted duplicate job #{item.klass}"} item.consumer.complete(item.id, item.receipt_handle) return true end end return false end |
#expired? ⇒ Boolean
Whether this worker has existed for longer than it's allowed to
35 36 37 |
# File 'lib/chore/worker.rb', line 35 def expired? Time.now > expires_at end |
#expires_at ⇒ Object
The time at which this worker expires
40 41 42 43 |
# File 'lib/chore/worker.rb', line 40 def expires_at total_timeout = @work.inject(0) {|sum, item| sum += item.queue_timeout} @started_at + total_timeout end |
#start ⇒ Object
The workhorse. Do the work, all of it. This will block for an entirely unspecified amount of time based on the work to be performed. This will:
-
Decode each message.
-
Re-ify the messages into actual Job classes.
-
Call Job#perform on each job.
-
If successful it will call Consumer#complete (using the consumer in the UnitOfWork).
-
If unsuccessful it will call the appropriate Hooks based on the type of failure.
-
If unsuccessful and the maximum number of attempts for the job has been surpassed, it will call the permanent failure hooks and Consumer#complete.
-
Log the results via the Chore.logger
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/chore/worker.rb', line 77 def start @work.each do |item| return if @stopping begin item. = [:payload_handler].decode(item.) item.klass = [:payload_handler].payload_class(item.) next if duplicate_work?(item) Chore.run_hooks_for(:worker_to_start, item) start_item(item) rescue => e Chore.logger.error { "Failed to run job for #{item.} with error: #{e.} #{e.backtrace * "\n"}" } if item.current_attempt >= Chore.config.max_attempts Chore.run_hooks_for(:on_permanent_failure,item.queue_name,item.,e) item.consumer.complete(item.id, item.receipt_handle) else Chore.run_hooks_for(:on_failure,item.,e) item.consumer.reject(item.id) end end end end |
#stop! ⇒ Object
Tell the worker to stop after it completes the current job.
102 103 104 |
# File 'lib/chore/worker.rb', line 102 def stop! @stopping = true end |