Class: Chore::Strategy::Batcher
- Inherits:
-
Object
- Object
- Chore::Strategy::Batcher
- Defined in:
- lib/chore/strategies/consumer/batcher.rb
Overview
Handles holding jobs in memory until such time as the batch has become full, per the developers configured threshold, or enough time elapses that Chore determines to not wait any longer (20 seconds by default)
Instance Attribute Summary collapse
-
#batch ⇒ Object
Returns the value of attribute batch.
-
#callback ⇒ Object
Returns the value of attribute callback.
Instance Method Summary collapse
-
#add(item) ⇒ Object
Adds the
item
to the current batch. -
#execute(force = false) ⇒ Object
Calls for the batch to be executed.
-
#initialize(size) ⇒ Batcher
constructor
A new instance of Batcher.
-
#ready? ⇒ Boolean
Determines if the batch is ready to fire, by comparing it's size to the configured batch_size.
-
#schedule(batch_timeout) ⇒ Object
The main entry point of the Batcher,
schedule
begins a thread with the providedbatch_timeout
as the only argument. -
#stop ⇒ Object
Sets a flag which will begin shutting down the Batcher.
Constructor Details
#initialize(size) ⇒ Batcher
Returns a new instance of Batcher.
10 11 12 13 14 15 16 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 10 def initialize(size) @size = size @batch = [] @mutex = Mutex.new @callback = nil @running = true end |
Instance Attribute Details
#batch ⇒ Object
Returns the value of attribute batch.
8 9 10 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 8 def batch @batch end |
#callback ⇒ Object
Returns the value of attribute callback.
7 8 9 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 7 def callback @callback end |
Instance Method Details
#add(item) ⇒ Object
Adds the item
to the current batch
45 46 47 48 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 45 def add(item) @batch << item execute if ready? end |
#execute(force = false) ⇒ Object
Calls for the batch to be executed. If force
is set to true,
the batch will execute even if it is not full yet
51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 51 def execute(force = false) batch = nil @mutex.synchronize do if force || ready? batch = @batch.slice!(0...@size) end end if batch && !batch.empty? @callback.call(batch) end end |
#ready? ⇒ Boolean
Determines if the batch is ready to fire, by comparing it's size to the configured batch_size
65 66 67 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 65 def ready? @batch.size >= @size end |
#schedule(batch_timeout) ⇒ Object
The main entry point of the Batcher, schedule
begins a thread
with the provided batch_timeout
as the only argument. While
the Batcher is running, it will attempt to check if either the batch is
full, or if the batch_timeout
has elapsed since the oldest
message was added. If either case is true, the items in the batch will be
executed.
Calling stop
will cause the thread to finish it's current
check, and exit
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 24 def schedule(batch_timeout) @thread = Thread.new(batch_timeout) do |timeout| Chore.logger.info "Batching thread starting with #{batch_timeout} second timeout" while @running do begin oldest_item = @batch.first = oldest_item && oldest_item.created_at Chore.logger.debug "Oldest message in batch: #{}, size: #{@batch.size}" if && Time.now > ( + timeout) Chore.logger.debug "Batching timeout reached (#{ + timeout}), current size: #{@batch.size}" self.execute(true) end sleep(1) rescue => e Chore.logger.error "Batcher#schedule raised an exception: #{e.inspect}" end end end end |
#stop ⇒ Object
Sets a flag which will begin shutting down the Batcher
70 71 72 |
# File 'lib/chore/strategies/consumer/batcher.rb', line 70 def stop @running = false end |