Class: Chore::Strategy::ThrottledConsumerStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/chore/strategies/consumer/throttled_consumer_strategy.rb

Overview

:nodoc:

Instance Method Summary collapse

Constructor Details

#initialize(fetcher) ⇒ ThrottledConsumerStrategy

Returns a new instance of ThrottledConsumerStrategy.



4
5
6
7
8
9
10
11
12
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 4

def initialize(fetcher)
  @fetcher = fetcher
  @queue = SizedQueue.new(Chore.config.num_workers)
  @return_queue = Queue.new
  @max_queue_size = Chore.config.num_workers
  @consumers_per_queue = Chore.config.threads_per_queue
  @running = true
  @consumers = []
end

Instance Method Details

#fetchObject

Begins fetching from queues by spinning up the configured :threads_per_queue: count of threads for each queue you're consuming from. Once all threads are spun up and running, the threads are then joined.



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 19

def fetch
  Chore.logger.info "TCS: Starting up: #{self.class.name}"
  threads = []
  Chore.config.queues.each do |consume_queue|
    Chore.logger.info "TCS: Starting #{@consumers_per_queue} threads for Queue #{consume_queue}"
    @consumers_per_queue.times do
      next unless running?
      threads << consume(consume_queue)
    end
  end
  threads.each(&:join)
end

#provide_work(no_free_workers) ⇒ Object

return upto number_of_free_workers work objects



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 55

def provide_work(no_free_workers)
  work_units = []
  free_workers = [no_free_workers, @queue.size + @return_queue.size].min
  while free_workers > 0
    # Drain from the return queue first, then the consumer thread queue
    queue = @return_queue.empty? ? @queue : @return_queue
    work_units << queue.pop
    free_workers -= 1
  end
  work_units
end

#return_work(work_units) ⇒ Object

Gives work back to the queue in case it couldn't be assigned

This will go into a separate queue so that it will be prioritized over other work that hasn't been attempted yet. It also avoids a deadlock where @queue is full and the master is waiting to return work that it couldn't assign.



73
74
75
76
77
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 73

def return_work(work_units)
  work_units.each do |work|
    @return_queue.push(work)
  end
end

#running?Boolean

Returns whether or not the ThreadedConsumerStrategy is running or not

Returns:

  • (Boolean)


50
51
52
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 50

def running?
  @running
end

#stop!Object

If the ThreadedConsumerStrategy is currently running stop! will begin signalling it to stop. It will stop the batcher from forking more work,as well as set a flag which will disable it's own consuming threads once they finish with their current work.



36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/chore/strategies/consumer/throttled_consumer_strategy.rb', line 36

def stop!
  if running?
    Chore.logger.info "TCS: Shutting down fetcher: #{self.class.name}"
    @running = false
    @consumers.each do |consumer|
      Chore.logger.info "TCS: Stopping consumer: #{consumer.object_id}"
      @queue.clear
      @return_queue.clear
      consumer.stop
    end
  end
end