Class: Chore::Strategy::ForkedWorkerStrategy
- Inherits:
-
Object
- Object
- Chore::Strategy::ForkedWorkerStrategy
- Includes:
- Util
- Defined in:
- lib/chore/strategies/worker/forked_worker_strategy.rb
Overview
:nodoc:
Instance Attribute Summary collapse
-
#workers ⇒ Object
Returns the value of attribute workers.
Instance Method Summary collapse
-
#assign(work) ⇒ Object
Take a UnitOfWork (or an Array of UnitOfWork) and assign it to a Worker.
-
#initialize(manager, opts = {}) ⇒ ForkedWorkerStrategy
constructor
A new instance of ForkedWorkerStrategy.
-
#start ⇒ Object
Start up the worker strategy.
-
#stop! ⇒ Object
Stop the workers.
Methods included from Util
Constructor Details
#initialize(manager, opts = {}) ⇒ ForkedWorkerStrategy
Returns a new instance of ForkedWorkerStrategy.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 9 def initialize(manager, opts={}) @options = opts @manager = manager @stopped = false @workers = {} @queue = Queue.new Chore.config.num_workers.times { @queue << :worker } trap_master_signals monitor_workers Chore.run_hooks_for(:before_first_fork) end |
Instance Attribute Details
#workers ⇒ Object
Returns the value of attribute workers.
7 8 9 |
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 7 def workers @workers end |
Instance Method Details
#assign(work) ⇒ Object
Take a UnitOfWork (or an Array of UnitOfWork) and assign it to a Worker. We
only assign work if there are workers_available?
.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 58 def assign(work) return unless acquire_worker begin w = Worker.new(work, @options) Chore.run_hooks_for(:before_fork,w) pid = nil Chore.run_hooks_for(:around_fork,w) do pid = fork do work.each do | item | Chore.run_hooks_for(:fetched_off_internal_q, item) end after_fork(w) Chore.run_hooks_for(:within_fork,w) do Chore.run_hooks_for(:after_fork,w) begin Chore.logger.info("Started worker:#{Time.now}") w.start Chore.logger.info("Finished worker:#{Time.now}") ensure Chore.run_hooks_for(:before_fork_shutdown) exit!(true) end end #within_fork end #around_fork end Chore.logger.debug { "Forked worker #{pid}"} workers[pid] = w rescue => ex Chore.logger.error { "Failed to fork worker: #{ex.} #{ex.backtrace * "\n"}"} release_worker end end |
#start ⇒ Object
Start up the worker strategy. In this particular case, what we're doing is starting up a WorkerListener, so we can talk to the children.
25 26 27 |
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 25 def start Chore.logger.debug "Starting up worker strategy: #{self.class.name}" end |
#stop! ⇒ Object
Stop the workers. The particulars of the implementation here are that we
send a QUIT signal to each child, wait one minute for it to finish the last
job it was working on. If it times out, then we send KILL. In an ideal
world this means that stop!
is non-destructive in that it
allow each worker to complete it's current job before dying.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/chore/strategies/worker/forked_worker_strategy.rb', line 34 def stop! return if @stopped @stopped = true Chore.logger.info { "Manager #{Process.pid} stopping" } # Instead of using Process.waitall (which is a blocking operation that can # cause the master process to hang), use a Unicorn style non-blocking # shutdown process. limit = Time.now + Chore.config.shutdown_timeout until workers.empty? || Time.now > limit signal_children("QUIT") sleep(0.1) reap_terminated_workers! end if !workers.empty? Chore.logger.error "Timed out waiting for children to terminate. Terminating with prejudice." signal_children("KILL") end end |