Class: Chore::Worker

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/chore/worker.rb

Overview

Worker is one of the core classes in Chore. It's responsible for most of the logic relating to actually processing a job. A given worker will take an amount of work and then process it all until either the worker is told to stop, or the work is completed. Worker is completely agnostic to the WorkerStrategy that it was called from.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#constantize, #procline

Constructor Details

#initialize(work = [], opts = {}) ⇒ Worker

Create a Worker. Give it an array of work (or single item), and opts. Currently, the only option supported by Worker is :payload_handler which contains helpers for decoding the item and finding the correct payload class



26
27
28
29
30
31
32
# File 'lib/chore/worker.rb', line 26

def initialize(work=[],opts={})
  @stopping = false
  @started_at = Time.now
  @work = work
  @work = [work] unless work.kind_of?(Array)
  self.options = {:payload_handler => Chore.config.payload_handler}.merge(opts)
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



15
16
17
# File 'lib/chore/worker.rb', line 15

def options
  @options
end

#started_atObject (readonly)

Returns the value of attribute started_at.



17
18
19
# File 'lib/chore/worker.rb', line 17

def started_at
  @started_at
end

#workObject (readonly)

Returns the value of attribute work.



16
17
18
# File 'lib/chore/worker.rb', line 16

def work
  @work
end

Class Method Details

.start(work, opts = {}) ⇒ Object

:nodoc:



19
20
21
# File 'lib/chore/worker.rb', line 19

def self.start(work, opts={}) #:nodoc:
  self.new(work, opts).start
end

Instance Method Details

#duplicate_work?(item) ⇒ Boolean

Returns:

  • (Boolean)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/chore/worker.rb', line 45

def duplicate_work?(item)
  # if we've got a duplicate, remove the message from the queue by not actually running and also not reporting any errors
  payload = options[:payload_handler].payload(item.decoded_message)

   # if we're hitting the custom dedupe key, we want to remove this message from the queue
  if item.klass.has_dedupe_lambda?
    dedupe_key = item.klass.dedupe_key(*payload)
    if dedupe_key.nil? || dedupe_key.strip.empty? # if the dedupe key is nil, don't continue with the rest of the dedupe lambda logic
      Chore.logger.info { "#{item.klass} dedupe key nil, skipping memcached lookup." }
      return false
    end

    if item.consumer.duplicate_message?(dedupe_key, item.klass, item.queue_timeout)
      Chore.logger.info { "Found and deleted duplicate job #{item.klass}"}
      item.consumer.complete(item.id, item.receipt_handle)
      return true
    end
  end

  return false
end

#expired?Boolean

Whether this worker has existed for longer than it's allowed to

Returns:

  • (Boolean)


35
36
37
# File 'lib/chore/worker.rb', line 35

def expired?
  Time.now > expires_at
end

#expires_atObject

The time at which this worker expires



40
41
42
43
# File 'lib/chore/worker.rb', line 40

def expires_at
  total_timeout = @work.inject(0) {|sum, item| sum += item.queue_timeout}
  @started_at + total_timeout
end

#startObject

The workhorse. Do the work, all of it. This will block for an entirely unspecified amount of time based on the work to be performed. This will:

  • Decode each message.

  • Re-ify the messages into actual Job classes.

  • Call Job#perform on each job.

  • If successful it will call Consumer#complete (using the consumer in the UnitOfWork).

  • If unsuccessful it will call the appropriate Hooks based on the type of failure.

  • If unsuccessful and the maximum number of attempts for the job has been surpassed, it will call the permanent failure hooks and Consumer#complete.

  • Log the results via the Chore.logger



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/chore/worker.rb', line 77

def start
  @work.each do |item|
    return if @stopping
    begin
      item.decoded_message = options[:payload_handler].decode(item.message)
      item.klass = options[:payload_handler].payload_class(item.decoded_message)

      next if duplicate_work?(item)

      Chore.run_hooks_for(:worker_to_start, item)
      start_item(item)
    rescue => e
      Chore.logger.error { "Failed to run job for #{item.message} with error: #{e.message} #{e.backtrace * "\n"}" }
      if item.current_attempt >= Chore.config.max_attempts
        Chore.run_hooks_for(:on_permanent_failure,item.queue_name,item.message,e)
        item.consumer.complete(item.id, item.receipt_handle)
      else
        Chore.run_hooks_for(:on_failure,item.message,e)
        item.consumer.reject(item.id)
      end
    end
  end
end

#stop!Object

Tell the worker to stop after it completes the current job.



102
103
104
# File 'lib/chore/worker.rb', line 102

def stop!
  @stopping = true
end