Class: Chore::Strategy::Batcher

Inherits:
Object
  • Object
show all
Defined in:
lib/chore/strategies/consumer/batcher.rb

Overview

Handles holding jobs in memory until such time as the batch has become full, per the developers configured threshold, or enough time elapses that Chore determines to not wait any longer (20 seconds by default)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size) ⇒ Batcher

Returns a new instance of Batcher.



10
11
12
13
14
15
16
# File 'lib/chore/strategies/consumer/batcher.rb', line 10

def initialize(size)
  @size = size
  @batch = []
  @mutex = Mutex.new
  @callback = nil
  @running = true
end

Instance Attribute Details

#batchObject

Returns the value of attribute batch.



8
9
10
# File 'lib/chore/strategies/consumer/batcher.rb', line 8

def batch
  @batch
end

#callbackObject

Returns the value of attribute callback.



7
8
9
# File 'lib/chore/strategies/consumer/batcher.rb', line 7

def callback
  @callback
end

Instance Method Details

#add(item) ⇒ Object

Adds the item to the current batch



45
46
47
48
# File 'lib/chore/strategies/consumer/batcher.rb', line 45

def add(item)
  @batch << item
  execute if ready?
end

#execute(force = false) ⇒ Object

Calls for the batch to be executed. If force is set to true, the batch will execute even if it is not full yet



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/chore/strategies/consumer/batcher.rb', line 51

def execute(force = false)
  batch = nil
  @mutex.synchronize do
    if force || ready?
      batch = @batch.slice!(0...@size)
    end
  end

  if batch && !batch.empty?
    @callback.call(batch)
  end
end

#ready?Boolean

Determines if the batch is ready to fire, by comparing it's size to the configured batch_size

Returns:

  • (Boolean)


65
66
67
# File 'lib/chore/strategies/consumer/batcher.rb', line 65

def ready?
  @batch.size >= @size
end

#schedule(batch_timeout) ⇒ Object

The main entry point of the Batcher, schedule begins a thread with the provided batch_timeout as the only argument. While the Batcher is running, it will attempt to check if either the batch is full, or if the batch_timeout has elapsed since the oldest message was added. If either case is true, the items in the batch will be executed.

Calling stop will cause the thread to finish it's current check, and exit



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/chore/strategies/consumer/batcher.rb', line 24

def schedule(batch_timeout)
  @thread = Thread.new(batch_timeout) do |timeout|
    Chore.logger.info "Batching thread starting with #{batch_timeout} second timeout"
    while @running do
      begin
        oldest_item = @batch.first
        timestamp = oldest_item && oldest_item.created_at
        Chore.logger.debug "Oldest message in batch: #{timestamp}, size: #{@batch.size}"
        if timestamp && Time.now > (timestamp + timeout)
          Chore.logger.debug "Batching timeout reached (#{timestamp + timeout}), current size: #{@batch.size}"
          self.execute(true)
        end
        sleep(1)
      rescue => e
        Chore.logger.error "Batcher#schedule raised an exception: #{e.inspect}"
      end
    end
  end
end

#stopObject

Sets a flag which will begin shutting down the Batcher



70
71
72
# File 'lib/chore/strategies/consumer/batcher.rb', line 70

def stop
  @running = false
end