Class: Chore::DuplicateDetector
- Inherits:
-
Object
- Object
- Chore::DuplicateDetector
- Defined in:
- lib/chore/duplicate_detector.rb
Overview
:nodoc:
Instance Method Summary collapse
-
#found_duplicate?(msg_data) ⇒ Boolean
Checks the message against the configured dedupe server to see if the message is unique or not Unique messages will return false Duplicated messages will return true.
-
#initialize(opts = {}) ⇒ DuplicateDetector
constructor
A new instance of DuplicateDetector.
-
#queue_timeout(msg_data) ⇒ Object
Retrieves the timeout for the given queue.
Constructor Details
#initialize(opts = {}) ⇒ DuplicateDetector
Returns a new instance of DuplicateDetector.
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/chore/duplicate_detector.rb', line 4 def initialize(opts={}) # Make it optional. Only required when we use it. begin require 'dalli' rescue LoadError => e Chore.logger.error "Unable to load dalli gem. It is required if duplicate \ detection is enabled. Install it with 'gem install dalli'." raise e end = { :auto_eject_hosts => false, :cache_lookups => false, :tcp_nodelay => true, :socket_max_failures => 5, :socket_timeout => 2 } @timeouts = {} @dupe_on_cache_failure = opts.fetch(:dupe_on_cache_failure) { false } @timeout = opts.fetch(:timeout) { 0 } @servers = opts.fetch(:servers) { nil } @memcached_client = opts.fetch(:memcached_client) { Dalli::Client.new(@servers, ) } end |
Instance Method Details
#found_duplicate?(msg_data) ⇒ Boolean
Checks the message against the configured dedupe server to see if the message is unique or not Unique messages will return false Duplicated messages will return true
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/chore/duplicate_detector.rb', line 32 def found_duplicate?(msg_data) return false unless msg_data && msg_data[:queue] timeout = self.queue_timeout(msg_data) begin !@memcached_client.add(msg_data[:id], "1",timeout) rescue StandardError => e if @dupe_on_cache_failure Chore.logger.error "Error accessing duplicate cache server. Assuming message is a duplicate. #{e}\n#{e.backtrace * "\n"}" true else Chore.logger.error "Error accessing duplicate cache server. Assuming message is not a duplicate. #{e}\n#{e.backtrace * "\n"}" false end end end |
#queue_timeout(msg_data) ⇒ Object
Retrieves the timeout for the given queue. The timeout is the window of time in seconds that we would consider the message to be non-unique, before we consider it dead in the water After that timeout, we would consider the next copy of the message received to be unique, and process it.
51 52 53 |
# File 'lib/chore/duplicate_detector.rb', line 51 def queue_timeout(msg_data) @timeouts[msg_data[:queue]] ||= msg_data[:visibility_timeout] || @timeout end |