Class: Chore::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/chore/consumer.rb

Overview

Base class for a Chore Consumer. Provides the interface that a Chore::Consumer implementation should adhere to.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, opts = {}) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:

  • queue_name (String)

    Name of queue to be consumed from

  • opts (Hash) (defaults to: {})


15
16
17
18
# File 'lib/chore/consumer.rb', line 15

def initialize(queue_name, opts={})
  @queue_name = queue_name
  @running = true
end

Instance Attribute Details

#queue_nameObject

Returns the value of attribute queue_name.



11
12
13
# File 'lib/chore/consumer.rb', line 11

def queue_name
  @queue_name
end

Class Method Details

.reset_connection!Object

Causes the underlying connection for all consumers of this class to be reset. Useful for the case where the consumer is being used across a fork. Should be overriden in consumers (but is not required).



22
23
# File 'lib/chore/consumer.rb', line 22

def self.reset_connection!
end

Instance Method Details

#complete(message_id, receipt_handle) ⇒ Object

Complete should mark a message as finished.

Parameters:

  • message_id (String)

    Unique ID of the message

  • receipt_handle (Hash)

    Unique ID of the consuming transaction in non-filesystem implementations

Raises:

  • (NotImplementedError)


46
47
48
# File 'lib/chore/consumer.rb', line 46

def complete(message_id, receipt_handle)
  raise NotImplementedError
end

#consume(&handler) ⇒ Object

Consume takes a block with an arity of two. The two params are |message_id,message_body| where message_id is any object that the consumer will need to be able to act on a message later (reject, complete, etc)

Parameters:

  • &handler (Block)

    Message handler, used by the calling context (worker) to create & assigns a UnitOfWork

Raises:

  • (NotImplementedError)


30
31
32
# File 'lib/chore/consumer.rb', line 30

def consume(&handler)
  raise NotImplementedError
end

#dupe_detectorDuplicateDetector

Instance of duplicate detection implementation class

Returns:



83
84
85
86
# File 'lib/chore/consumer.rb', line 83

def dupe_detector
  @dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers,
                                    :dupe_on_cache_failure => false})
end

#duplicate_message?(dedupe_key, klass, queue_timeout) ⇒ TrueClass, FalseClass

Determine whether or not we have already seen this message

Parameters:

  • dedupe_key (String)
  • klass (Class)
  • queue_timeout (Integer)

Returns:

  • (TrueClass, FalseClass)


76
77
78
# File 'lib/chore/consumer.rb', line 76

def duplicate_message?(dedupe_key, klass, queue_timeout)
  dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout)
end

#provide_work(n) ⇒ Object

Returns up to n work

Parameters:

  • n

Raises:

  • (NotImplementedError)


65
66
67
# File 'lib/chore/consumer.rb', line 65

def provide_work(n)
  raise NotImplementedError
end

#reject(message_id) ⇒ Object

Reject should put a message back on a queue to be processed again later. It takes a message_id as returned via consume.

Parameters:

  • message_id (String)

    Unique ID of the message

Raises:

  • (NotImplementedError)


38
39
40
# File 'lib/chore/consumer.rb', line 38

def reject(message_id)
  raise NotImplementedError
end

#running?TrueClass, FalseClass

Returns true if the Consumer is currently running

Returns:

  • (TrueClass, FalseClass)


58
59
60
# File 'lib/chore/consumer.rb', line 58

def running?
  @running
end

#stopObject

Perform any shutdown behavior and stop consuming messages



51
52
53
# File 'lib/chore/consumer.rb', line 51

def stop
  @running = false
end