Class: Chore::Strategy::ThrottledConsumerStrategy
- Inherits:
-
Object
- Object
- Chore::Strategy::ThrottledConsumerStrategy
- Defined in:
- lib/chore/strategies/consumer/throttled_consumer_strategy.rb
Overview
:nodoc:
Instance Method Summary collapse
-
#fetch ⇒ Object
Begins fetching from queues by spinning up the configured
:threads_per_queue:
count of threads for each queue you're consuming from. -
#initialize(fetcher) ⇒ ThrottledConsumerStrategy
constructor
A new instance of ThrottledConsumerStrategy.
-
#provide_work(no_free_workers) ⇒ Object
return upto number_of_free_workers work objects.
-
#return_work(work_units) ⇒ Object
Gives work back to the queue in case it couldn't be assigned.
-
#running? ⇒ Boolean
Returns whether or not the ThreadedConsumerStrategy is running or not.
-
#stop! ⇒ Object
If the ThreadedConsumerStrategy is currently running
stop!
will begin signalling it to stop.
Constructor Details
#initialize(fetcher) ⇒ ThrottledConsumerStrategy
Returns a new instance of ThrottledConsumerStrategy.
4 5 6 7 8 9 10 11 12 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 4 def initialize(fetcher) @fetcher = fetcher @queue = SizedQueue.new(Chore.config.num_workers) @return_queue = Queue.new @max_queue_size = Chore.config.num_workers @consumers_per_queue = Chore.config.threads_per_queue @running = true @consumers = [] end |
Instance Method Details
#fetch ⇒ Object
Begins fetching from queues by spinning up the configured
:threads_per_queue:
count of threads for each queue you're
consuming from. Once all threads are spun up and running, the threads are
then joined.
19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 19 def fetch Chore.logger.info "TCS: Starting up: #{self.class.name}" threads = [] Chore.config.queues.each do |consume_queue| Chore.logger.info "TCS: Starting #{@consumers_per_queue} threads for Queue #{consume_queue}" @consumers_per_queue.times do next unless running? threads << consume(consume_queue) end end threads.each(&:join) end |
#provide_work(no_free_workers) ⇒ Object
return upto number_of_free_workers work objects
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 55 def provide_work(no_free_workers) work_units = [] free_workers = [no_free_workers, @queue.size + @return_queue.size].min while free_workers > 0 # Drain from the return queue first, then the consumer thread queue queue = @return_queue.empty? ? @queue : @return_queue work_units << queue.pop free_workers -= 1 end work_units end |
#return_work(work_units) ⇒ Object
Gives work back to the queue in case it couldn't be assigned
This will go into a separate queue so that it will be prioritized over other work that hasn't been attempted yet. It also avoids a deadlock where @queue is full and the master is waiting to return work that it couldn't assign.
73 74 75 76 77 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 73 def return_work(work_units) work_units.each do |work| @return_queue.push(work) end end |
#running? ⇒ Boolean
Returns whether or not the ThreadedConsumerStrategy is running or not
50 51 52 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 50 def running? @running end |
#stop! ⇒ Object
If the ThreadedConsumerStrategy is currently running stop!
will begin signalling it to stop. It will stop the batcher from forking
more work,as well as set a flag which will disable it's own consuming
threads once they finish with their current work.
36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 36 def stop! if running? Chore.logger.info "TCS: Shutting down fetcher: #{self.class.name}" @running = false @consumers.each do |consumer| Chore.logger.info "TCS: Stopping consumer: #{consumer.object_id}" @queue.clear @return_queue.clear consumer.stop end end end |