Class: Chore::Queues::SQS::Consumer
- Defined in:
- lib/chore/queues/sqs/consumer.rb
Overview
SQS Consumer for Chore. Requests messages from SQS and passes them to be worked on. Also controls deleting completed messages within SQS.
Constant Summary collapse
- @@reset_at =
Initialize the reset at on class load
Time.now
Instance Attribute Summary
Attributes inherited from Consumer
Class Method Summary collapse
-
.reset_connection! ⇒ Object
Resets the API client connection and provides @@reset_at so we know when the last time that was done.
Instance Method Summary collapse
-
#complete(message_id, receipt_handle) ⇒ Object
Deletes the given message from the SQS queue.
-
#consume(&handler) ⇒ Array<Aws::SQS::Message>
Begins requesting messages from SQS, which will invoke the &handler over each message.
-
#delay(item, backoff_calc) ⇒ Object
Delays retry of a job by
backoff_calc
seconds. -
#initialize(queue_name, opts = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#reject(message_id) ⇒ Object
Unimplemented.
Methods inherited from Consumer
#dupe_detector, #duplicate_message?, #provide_work, #running?, #stop
Constructor Details
#initialize(queue_name, opts = {}) ⇒ Consumer
Returns a new instance of Consumer.
19 20 21 22 |
# File 'lib/chore/queues/sqs/consumer.rb', line 19 def initialize(queue_name, opts={}) super(queue_name, opts) raise Chore::TerribleMistake, "Cannot specify a queue polling size greater than 10" if sqs_polling_amount > 10 end |
Class Method Details
.reset_connection! ⇒ Object
Resets the API client connection and provides @@reset_at so we know when the last time that was done
25 26 27 |
# File 'lib/chore/queues/sqs/consumer.rb', line 25 def self.reset_connection! @@reset_at = Time.now end |
Instance Method Details
#complete(message_id, receipt_handle) ⇒ Object
Deletes the given message from the SQS queue
60 61 62 63 |
# File 'lib/chore/queues/sqs/consumer.rb', line 60 def complete(, receipt_handle) Chore.logger.debug "Completing (deleting): #{}" queue.(entries: [{ id: , receipt_handle: receipt_handle }]) end |
#consume(&handler) ⇒ Array<Aws::SQS::Message>
Begins requesting messages from SQS, which will invoke the &handler over each message
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/chore/queues/sqs/consumer.rb', line 34 def consume(&handler) while running? begin = (&handler) sleep (Chore.config.consumer_sleep_interval) if .empty? rescue Aws::SQS::Errors::NonExistentQueue => e Chore.logger.error "You specified a queue '#{queue_name}' that does not exist. You must create the queue before starting Chore. Shutting down..." raise Chore::TerribleMistake rescue => e Chore.logger.error { "SQSConsumer#Consume: #{e.inspect} #{e.backtrace * "\n"}" } end end end |
#delay(item, backoff_calc) ⇒ Object
Delays retry of a job by backoff_calc
seconds.
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/chore/queues/sqs/consumer.rb', line 69 def delay(item, backoff_calc) delay = backoff_calc.call(item) Chore.logger.debug "Delaying #{item.id} by #{delay} seconds" queue.(entries: [ { id: item.id, receipt_handle: item.receipt_handle, visibility_timeout: delay }, ]) return delay end |
#reject(message_id) ⇒ Object
Unimplemented. Rejects the given message from SQS.
53 54 |
# File 'lib/chore/queues/sqs/consumer.rb', line 53 def reject() end |