Class: Chore::Strategy::ThreadedConsumerStrategy
- Inherits:
-
Object
- Object
- Chore::Strategy::ThreadedConsumerStrategy
- Defined in:
- lib/chore/strategies/consumer/threaded_consumer_strategy.rb
Overview
:nodoc
Instance Attribute Summary collapse
-
#batcher ⇒ Object
Returns the value of attribute batcher.
Instance Method Summary collapse
-
#fetch ⇒ Object
Begins fetching from queues by spinning up the configured
:threads_per_queue:
count of threads for each queue you're consuming from. -
#initialize(fetcher) ⇒ ThreadedConsumerStrategy
constructor
A new instance of ThreadedConsumerStrategy.
-
#running? ⇒ Boolean
Returns whether or not the ThreadedConsumerStrategy is running or not.
-
#stop! ⇒ Object
If the ThreadedConsumerStrategy is currently running
stop!
will begin signalling it to stop It will stop the batcher from forking more work, as well as set a flag which will disable it's own consuming threads once they finish with their current work.
Constructor Details
#initialize(fetcher) ⇒ ThreadedConsumerStrategy
Returns a new instance of ThreadedConsumerStrategy.
11 12 13 14 15 16 17 |
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 11 def initialize(fetcher) @fetcher = fetcher @batcher = Batcher.new(Chore.config.batch_size) @batcher.callback = lambda { |batch| @fetcher.manager.assign(batch) } @batcher.schedule(Chore.config.batch_timeout) @running = true end |
Instance Attribute Details
#batcher ⇒ Object
Returns the value of attribute batcher.
5 6 7 |
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 5 def batcher @batcher end |
Instance Method Details
#fetch ⇒ Object
Begins fetching from queues by spinning up the configured
:threads_per_queue:
count of threads for each queue you're
consuming from. Once all threads are spun up and running, the threads are
then joined.
22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 22 def fetch Chore.logger.debug "Starting up consumer strategy: #{self.class.name}" threads = [] Chore.config.queues.each do |queue| Chore.config.threads_per_queue.times do if running? threads << start_consumer_thread(queue) end end end threads.each(&:join) end |
#running? ⇒ Boolean
Returns whether or not the ThreadedConsumerStrategy is running or not
48 49 50 |
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 48 def running? @running end |
#stop! ⇒ Object
If the ThreadedConsumerStrategy is currently running stop!
will begin signalling it to stop It will stop the batcher from forking more
work, as well as set a flag which will disable it's own consuming
threads once they finish with their current work.
39 40 41 42 43 44 45 |
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 39 def stop! if running? Chore.logger.info "Shutting down fetcher: #{self.class.name.to_s}" @batcher.stop @running = false end end |