Class: Chore::Manager

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/chore/manager.rb

Overview

Manages the interactions between fetching messages (Consumer Strategy), and working over them (Worker Strategy)

Instance Method Summary collapse

Methods included from Util

#constantize, #procline

Constructor Details

#initializeManager

Returns a new instance of Manager.



10
11
12
13
14
15
16
17
18
# File 'lib/chore/manager.rb', line 10

def initialize()
  Chore.logger.info "Booting Chore #{Chore::VERSION}"
  Chore.logger.debug { Chore.config.inspect }
  procline("#{Chore.config.master_procline}:Started:#{Time.now}")
  @started_at = nil
  @worker_strategy = Chore.config.worker_strategy.new(self)
  @fetcher = Chore.config.fetcher.new(self)
  @stopping = false
end

Instance Method Details

#assign(work) ⇒ Object

Take in an amount of work (either an Array of, or a single UnitOfWork), and pass it down for the worker strategy to process. This method is blocking. It will continue to attempt to assign the work via the worker strategy, until it accepts it. It is up to the strategy to determine what cases it is allowed to accept work. The blocking semantic of this method is to prevent the Fetcher from getting messages off of the queue faster than they can be consumed.



45
46
47
48
49
50
51
# File 'lib/chore/manager.rb', line 45

def assign(work)
  Chore.logger.debug { "Manager#assign: No. of UnitsOfWork: #{work.length})" }
  work.each do | item |
    Chore.run_hooks_for(:before_send_to_worker, item)
  end
  @worker_strategy.assign(work) unless @stopping
end

#fetch_work(n) ⇒ Object

returns up to n from the throttled consumer queue



54
55
56
# File 'lib/chore/manager.rb', line 54

def fetch_work(n)
  @fetcher.provide_work(n)
end

#return_work(work_units) ⇒ Object

gives work back to the fetcher in case it couldn't be assigned



59
60
61
# File 'lib/chore/manager.rb', line 59

def return_work(work_units)
  @fetcher.return_work(work_units)
end

#shutdown!Object

Shut down the Manager, the Worker Strategy, and the Fetcher. This calls the :before_shutdown hook.



29
30
31
32
33
34
35
36
37
38
# File 'lib/chore/manager.rb', line 29

def shutdown!
  unless @stopping
    Chore.logger.info "Manager shutting down started"
    @stopping = true
    Chore.run_hooks_for(:before_shutdown)
    @fetcher.stop!
    @worker_strategy.stop!
    Chore.logger.info "Manager shutting down completed"
  end
end

#startObject

Start the Manager. This calls both the #start method of the configured Worker Strategy, as well as Fetcher#start.



21
22
23
24
25
26
# File 'lib/chore/manager.rb', line 21

def start
  Chore.run_hooks_for(:before_start)
  @started_at = Time.now
  @worker_strategy.start
  @fetcher.start
end