Class: Chore::Strategy::ThreadedConsumerStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/chore/strategies/consumer/threaded_consumer_strategy.rb

Overview

:nodoc

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(fetcher) ⇒ ThreadedConsumerStrategy

Returns a new instance of ThreadedConsumerStrategy.



11
12
13
14
15
16
17
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 11

def initialize(fetcher)
  @fetcher = fetcher
  @batcher = Batcher.new(Chore.config.batch_size)
  @batcher.callback = lambda { |batch| @fetcher.manager.assign(batch) }
  @batcher.schedule(Chore.config.batch_timeout)
  @running = true
end

Instance Attribute Details

#batcherObject

Returns the value of attribute batcher.



5
6
7
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 5

def batcher
  @batcher
end

Instance Method Details

#fetchObject

Begins fetching from queues by spinning up the configured :threads_per_queue: count of threads for each queue you're consuming from. Once all threads are spun up and running, the threads are then joined.



22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 22

def fetch
  Chore.logger.debug "Starting up consumer strategy: #{self.class.name}"
  threads = []
  Chore.config.queues.each do |queue|
    Chore.config.threads_per_queue.times do
      if running?
        threads << start_consumer_thread(queue)
      end
    end
  end

  threads.each(&:join)
end

#running?Boolean

Returns whether or not the ThreadedConsumerStrategy is running or not

Returns:

  • (Boolean)


48
49
50
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 48

def running?
  @running
end

#stop!Object

If the ThreadedConsumerStrategy is currently running stop! will begin signalling it to stop It will stop the batcher from forking more work, as well as set a flag which will disable it's own consuming threads once they finish with their current work.



39
40
41
42
43
44
45
# File 'lib/chore/strategies/consumer/threaded_consumer_strategy.rb', line 39

def stop!
  if running?
    Chore.logger.info "Shutting down fetcher: #{self.class.name.to_s}"
    @batcher.stop
    @running = false
  end
end