Class: Chore::Consumer
- Inherits:
-
Object
- Object
- Chore::Consumer
- Defined in:
- lib/chore/consumer.rb
Overview
Base class for a Chore Consumer. Provides the interface that a Chore::Consumer implementation should adhere to.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
Class Method Summary collapse
-
.reset_connection! ⇒ Object
Causes the underlying connection for all consumers of this class to be reset.
Instance Method Summary collapse
-
#complete(message_id, receipt_handle) ⇒ Object
Complete should mark a message as finished.
-
#consume(&handler) ⇒ Object
Consume takes a block with an arity of two.
-
#dupe_detector ⇒ DuplicateDetector
Instance of duplicate detection implementation class.
-
#duplicate_message?(dedupe_key, klass, queue_timeout) ⇒ TrueClass, FalseClass
Determine whether or not we have already seen this message.
-
#initialize(queue_name, opts = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#provide_work(n) ⇒ Object
Returns up to n work.
-
#reject(message_id) ⇒ Object
Reject should put a message back on a queue to be processed again later.
-
#running? ⇒ TrueClass, FalseClass
Returns true if the Consumer is currently running.
-
#stop ⇒ Object
Perform any shutdown behavior and stop consuming messages.
Constructor Details
#initialize(queue_name, opts = {}) ⇒ Consumer
Returns a new instance of Consumer.
15 16 17 18 |
# File 'lib/chore/consumer.rb', line 15 def initialize(queue_name, opts={}) @queue_name = queue_name @running = true end |
Instance Attribute Details
#queue_name ⇒ Object
Returns the value of attribute queue_name.
11 12 13 |
# File 'lib/chore/consumer.rb', line 11 def queue_name @queue_name end |
Class Method Details
.reset_connection! ⇒ Object
Causes the underlying connection for all consumers of this class to be reset. Useful for the case where the consumer is being used across a fork. Should be overriden in consumers (but is not required).
22 23 |
# File 'lib/chore/consumer.rb', line 22 def self.reset_connection! end |
Instance Method Details
#complete(message_id, receipt_handle) ⇒ Object
Complete should mark a message as finished.
46 47 48 |
# File 'lib/chore/consumer.rb', line 46 def complete(, receipt_handle) raise NotImplementedError end |
#consume(&handler) ⇒ Object
Consume takes a block with an arity of two. The two params are |message_id,message_body| where message_id is any object that the consumer will need to be able to act on a message later (reject, complete, etc)
30 31 32 |
# File 'lib/chore/consumer.rb', line 30 def consume(&handler) raise NotImplementedError end |
#dupe_detector ⇒ DuplicateDetector
Instance of duplicate detection implementation class
83 84 85 86 |
# File 'lib/chore/consumer.rb', line 83 def dupe_detector @dupes ||= DuplicateDetector.new({:servers => Chore.config.dedupe_servers, :dupe_on_cache_failure => false}) end |
#duplicate_message?(dedupe_key, klass, queue_timeout) ⇒ TrueClass, FalseClass
Determine whether or not we have already seen this message
76 77 78 |
# File 'lib/chore/consumer.rb', line 76 def (dedupe_key, klass, queue_timeout) dupe_detector.found_duplicate?(:id=>dedupe_key, :queue=>klass.to_s, :visibility_timeout=>queue_timeout) end |
#provide_work(n) ⇒ Object
Returns up to n work
65 66 67 |
# File 'lib/chore/consumer.rb', line 65 def provide_work(n) raise NotImplementedError end |
#reject(message_id) ⇒ Object
Reject should put a message back on a queue to be processed again later. It takes a message_id as returned via consume.
38 39 40 |
# File 'lib/chore/consumer.rb', line 38 def reject() raise NotImplementedError end |
#running? ⇒ TrueClass, FalseClass
Returns true if the Consumer is currently running
58 59 60 |
# File 'lib/chore/consumer.rb', line 58 def running? @running end |
#stop ⇒ Object
Perform any shutdown behavior and stop consuming messages
51 52 53 |
# File 'lib/chore/consumer.rb', line 51 def stop @running = false end |