Class: Chore::Strategy::ForkedWorkerStrategy

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/chore/strategies/worker/forked_worker_strategy.rb

Overview

:nodoc:

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#constantize, #procline

Constructor Details

#initialize(manager, opts = {}) ⇒ ForkedWorkerStrategy

Returns a new instance of ForkedWorkerStrategy.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 9

def initialize(manager, opts={})
  @options = opts
  @manager = manager
  @stopped = false
  @workers = {}
  @queue = Queue.new
  Chore.config.num_workers.times { @queue << :worker }

  trap_master_signals
  monitor_workers

  Chore.run_hooks_for(:before_first_fork)
end

Instance Attribute Details

#workersObject

Returns the value of attribute workers.



7
8
9
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 7

def workers
  @workers
end

Instance Method Details

#assign(work) ⇒ Object

Take a UnitOfWork (or an Array of UnitOfWork) and assign it to a Worker. We only assign work if there are workers_available?.



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 58

def assign(work)
  return unless acquire_worker

  begin
    w = Worker.new(work, @options)
    Chore.run_hooks_for(:before_fork,w)
    pid = nil
    Chore.run_hooks_for(:around_fork,w) do
      pid = fork do
        work.each do | item |
          Chore.run_hooks_for(:fetched_off_internal_q, item)
        end
        after_fork(w)
        Chore.run_hooks_for(:within_fork,w) do
          Chore.run_hooks_for(:after_fork,w)
          begin
            Chore.logger.info("Started worker:#{Time.now}")
            w.start
            Chore.logger.info("Finished worker:#{Time.now}")
          ensure
            Chore.run_hooks_for(:before_fork_shutdown)
            exit!(true)
          end
        end #within_fork
      end #around_fork
    end

    Chore.logger.debug { "Forked worker #{pid}"}
    workers[pid] = w
  rescue => ex
    Chore.logger.error { "Failed to fork worker: #{ex.message} #{ex.backtrace * "\n"}"}
    release_worker
  end
end

#startObject

Start up the worker strategy. In this particular case, what we're doing is starting up a WorkerListener, so we can talk to the children.



25
26
27
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 25

def start
  Chore.logger.debug "Starting up worker strategy: #{self.class.name}"
end

#stop!Object

Stop the workers. The particulars of the implementation here are that we send a QUIT signal to each child, wait one minute for it to finish the last job it was working on. If it times out, then we send KILL. In an ideal world this means that stop! is non-destructive in that it allow each worker to complete it's current job before dying.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 34

def stop!
  return if @stopped

  @stopped = true
  Chore.logger.info { "Manager #{Process.pid} stopping" }

  # Instead of using Process.waitall (which is a blocking operation that can
  # cause the master process to hang), use a Unicorn style non-blocking
  # shutdown process.
  limit = Time.now + Chore.config.shutdown_timeout
  until workers.empty? || Time.now > limit
    signal_children("QUIT")
    sleep(0.1)
    reap_terminated_workers!
  end

  if !workers.empty?
    Chore.logger.error "Timed out waiting for children to terminate. Terminating with prejudice."
    signal_children("KILL")
  end
end