Class: Chore::Manager
Overview
Manages the interactions between fetching messages (Consumer Strategy), and working over them (Worker Strategy)
Instance Method Summary collapse
-
#assign(work) ⇒ Object
Take in an amount of
work
(either an Array of, or a single UnitOfWork), and pass it down for the worker strategy to process. -
#fetch_work(n) ⇒ Object
returns up to n from the throttled consumer queue.
-
#initialize ⇒ Manager
constructor
A new instance of Manager.
-
#return_work(work_units) ⇒ Object
gives work back to the fetcher in case it couldn't be assigned.
-
#shutdown! ⇒ Object
Shut down the Manager, the Worker Strategy, and the Fetcher.
-
#start ⇒ Object
Start the Manager.
Methods included from Util
Constructor Details
#initialize ⇒ Manager
Returns a new instance of Manager.
10 11 12 13 14 15 16 17 18 |
# File 'lib/chore/manager.rb', line 10 def initialize() Chore.logger.info "Booting Chore #{Chore::VERSION}" Chore.logger.debug { Chore.config.inspect } procline("#{Chore.config.master_procline}:Started:#{Time.now}") @started_at = nil @worker_strategy = Chore.config.worker_strategy.new(self) @fetcher = Chore.config.fetcher.new(self) @stopping = false end |
Instance Method Details
#assign(work) ⇒ Object
Take in an amount of work
(either an Array of, or a single
UnitOfWork), and pass it down for the worker strategy to process.
This method is blocking. It will continue to attempt to
assign the work via the worker strategy, until it accepts it. It is up to
the strategy to determine what cases it is allowed to accept work. The
blocking semantic of this method is to prevent the Fetcher from getting
messages off of the queue faster than they can be consumed.
45 46 47 48 49 50 51 |
# File 'lib/chore/manager.rb', line 45 def assign(work) Chore.logger.debug { "Manager#assign: No. of UnitsOfWork: #{work.length})" } work.each do | item | Chore.run_hooks_for(:before_send_to_worker, item) end @worker_strategy.assign(work) unless @stopping end |
#fetch_work(n) ⇒ Object
returns up to n from the throttled consumer queue
54 55 56 |
# File 'lib/chore/manager.rb', line 54 def fetch_work(n) @fetcher.provide_work(n) end |
#return_work(work_units) ⇒ Object
gives work back to the fetcher in case it couldn't be assigned
59 60 61 |
# File 'lib/chore/manager.rb', line 59 def return_work(work_units) @fetcher.return_work(work_units) end |
#shutdown! ⇒ Object
Shut down the Manager, the Worker Strategy, and the Fetcher. This calls the
:before_shutdown
hook.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/chore/manager.rb', line 29 def shutdown! unless @stopping Chore.logger.info "Manager shutting down started" @stopping = true Chore.run_hooks_for(:before_shutdown) @fetcher.stop! @worker_strategy.stop! Chore.logger.info "Manager shutting down completed" end end |
#start ⇒ Object
Start the Manager. This calls both the #start method of the configured Worker Strategy, as well as Fetcher#start.
21 22 23 24 25 26 |
# File 'lib/chore/manager.rb', line 21 def start Chore.run_hooks_for(:before_start) @started_at = Time.now @worker_strategy.start @fetcher.start end |