Class: Chore::Queues::SQS::Consumer

Inherits:
Consumer
  • Object
show all
Defined in:
lib/chore/queues/sqs/consumer.rb

Overview

SQS Consumer for Chore. Requests messages from SQS and passes them to be worked on. Also controls deleting completed messages within SQS.

Constant Summary collapse

@@reset_at =

Initialize the reset at on class load

Time.now

Instance Attribute Summary

Attributes inherited from Consumer

#queue_name

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Consumer

#dupe_detector, #duplicate_message?, #provide_work, #running?, #stop

Constructor Details

#initialize(queue_name, opts = {}) ⇒ Consumer

Returns a new instance of Consumer.

Parameters:

  • queue_name (String)

    Name of SQS queue

  • opts (Hash) (defaults to: {})

    Options

Raises:



19
20
21
22
# File 'lib/chore/queues/sqs/consumer.rb', line 19

def initialize(queue_name, opts={})
  super(queue_name, opts)
  raise Chore::TerribleMistake, "Cannot specify a queue polling size greater than 10" if sqs_polling_amount > 10
end

Class Method Details

.reset_connection!Object

Resets the API client connection and provides @@reset_at so we know when the last time that was done



25
26
27
# File 'lib/chore/queues/sqs/consumer.rb', line 25

def self.reset_connection!
  @@reset_at = Time.now
end

Instance Method Details

#complete(message_id, receipt_handle) ⇒ Object

Deletes the given message from the SQS queue

Parameters:

  • message_id (String)

    Unique ID of the SQS message

  • receipt_handle (Hash)

    Receipt handle (unique per consume request) of the SQS message



60
61
62
63
# File 'lib/chore/queues/sqs/consumer.rb', line 60

def complete(message_id, receipt_handle)
  Chore.logger.debug "Completing (deleting): #{message_id}"
  queue.delete_messages(entries: [{ id: message_id, receipt_handle: receipt_handle }])
end

#consume(&handler) ⇒ Array<Aws::SQS::Message>

Begins requesting messages from SQS, which will invoke the &handler over each message

Parameters:

  • &handler (Block)

    Message handler, used by the calling context (worker) to create & assigns a UnitOfWork

Returns:

  • (Array<Aws::SQS::Message>)


34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/chore/queues/sqs/consumer.rb', line 34

def consume(&handler)
  while running?
    begin
      messages = handle_messages(&handler)
      sleep (Chore.config.consumer_sleep_interval) if messages.empty?
    rescue Aws::SQS::Errors::NonExistentQueue => e
      Chore.logger.error "You specified a queue '#{queue_name}' that does not exist. You must create the queue before starting Chore. Shutting down..."
      raise Chore::TerribleMistake
    rescue => e
      Chore.logger.error { "SQSConsumer#Consume: #{e.inspect} #{e.backtrace * "\n"}" }
    end
  end
end

#delay(item, backoff_calc) ⇒ Object

Delays retry of a job by backoff_calc seconds.

Parameters:

  • item (UnitOfWork)

    Item to be delayed

  • backoff_calc (Block)

    Code that determines the backoff.



69
70
71
72
73
74
75
76
77
78
# File 'lib/chore/queues/sqs/consumer.rb', line 69

def delay(item, backoff_calc)
  delay = backoff_calc.call(item)
  Chore.logger.debug "Delaying #{item.id} by #{delay} seconds"

  queue.change_message_visibility_batch(entries: [
    { id: item.id, receipt_handle: item.receipt_handle, visibility_timeout: delay },
  ])

  return delay
end

#reject(message_id) ⇒ Object

Unimplemented. Rejects the given message from SQS.

Parameters:

  • message_id (String)

    Unique ID of the SQS message

Returns:

  • nil



53
54
# File 'lib/chore/queues/sqs/consumer.rb', line 53

def reject(message_id)
end