Class: Chore::Queues::Filesystem::Consumer
- Extended by:
- FilesystemQueue
- Defined in:
- lib/chore/queues/filesystem/consumer.rb
Overview
This is the consuming side of the file system queue. This class consumes jobs created by FilesystemPublisher#publish. The root of the file system queue is configured in Chore.config.fs_queue_root. In there a directory will be created for each queue name. Each queue directory contains a directory called “new” and one called “inprogress”. FilesystemPublisher#publish creates new job files in the “new” directory. This consumer polls that directory every 5 seconds for new jobs which are moved to “inprogress”.
Once complete job files are deleted. If rejected they are moved back into new and will be processed again. This may not be the desired behavior long term and we may want to add configuration to this class to allow more creating failure handling and retrying.
Constant Summary collapse
- EXPIRATION_CHECK_INTERVAL =
The minimum number of seconds to allow to pass between checks for expired jobs on the filesystem.
Since queue times are measured on the order of seconds, 1 second is the smallest duration. It also prevents us from burning a lot of CPU looking at expired jobs when the consumer sleep interval is less than 1 second.
1
Constants included from FilesystemQueue
FilesystemQueue::CONFIG_DIR, FilesystemQueue::IN_PROGRESS_DIR, FilesystemQueue::NEW_JOB_DIR
Instance Attribute Summary collapse
-
#queue_timeout ⇒ Object
readonly
The amount of time units of work can run before the queue considers them timed out.
Attributes inherited from Consumer
Class Method Summary collapse
-
.cleanup(expiration_time, new_dir, in_progress_dir) ⇒ Object
Cleans up expired in-progress files by making them new again.
- .each_file(path, limit = nil) ⇒ Object
-
.file_info(job_file) ⇒ Object
Grabs the unique identifier for the job filename and the number of times it's been attempted (also based on the filename).
-
.make_in_progress(job, new_dir, in_progress_dir, queue_timeout) ⇒ Object
Moves job file to inprogress directory and returns the full path if the job was successfully locked by this consumer.
-
.make_new_again(job, new_dir, in_progress_dir) ⇒ Object
Moves job file to new directory and returns the full path.
Instance Method Summary collapse
-
#complete(message_id, receipt_handle = nil) ⇒ Object
Deletes the given message from filesystem queue.
- #consume ⇒ Object
-
#initialize(queue_name, opts = {}) ⇒ Consumer
constructor
A new instance of Consumer.
-
#reject(id) ⇒ Object
Rejects the given message from the filesystem by
id
.
Methods included from FilesystemQueue
config_dir, config_value, in_progress_dir, new_dir, queue_dir, root_dir
Methods inherited from Consumer
#dupe_detector, #duplicate_message?, #provide_work, reset_connection!, #running?, #stop
Constructor Details
#initialize(queue_name, opts = {}) ⇒ Consumer
Returns a new instance of Consumer.
127 128 129 130 131 132 133 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 127 def initialize(queue_name, opts={}) super(queue_name, opts) @in_progress_dir = self.class.in_progress_dir(queue_name) @new_dir = self.class.new_dir(queue_name) @queue_timeout = self.class.queue_timeout(queue_name) end |
Instance Attribute Details
#queue_timeout ⇒ Object (readonly)
The amount of time units of work can run before the queue considers them timed out. For filesystem queues, this is the global default.
125 126 127 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 125 def queue_timeout @queue_timeout end |
Class Method Details
.cleanup(expiration_time, new_dir, in_progress_dir) ⇒ Object
Cleans up expired in-progress files by making them new again.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 26 def cleanup(expiration_time, new_dir, in_progress_dir) each_file(in_progress_dir) do |job_file| id, previous_attempts, = file_info(job_file) next if > expiration_time begin make_new_again(job_file, new_dir, in_progress_dir) rescue Errno::ENOENT # File no longer exists; skip since it's been recovered by another # consumer rescue ArgumentError # Move operation was attempted at same time as another consumer; # skip since the other process succeeded where this one didn't end end end |
.each_file(path, limit = nil) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 94 def each_file(path, limit = nil) count = 0 Dir.foreach(path) do |file| next if file.start_with?('.') yield file count += 1 break if limit && count >= limit end end |
.file_info(job_file) ⇒ Object
Grabs the unique identifier for the job filename and the number of times it's been attempted (also based on the filename)
109 110 111 112 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 109 def file_info(job_file) id, previous_attempts, , * = job_file.split('.') [id, previous_attempts.to_i, .to_i] end |
.make_in_progress(job, new_dir, in_progress_dir, queue_timeout) ⇒ Object
Moves job file to inprogress directory and returns the full path if the job was successfully locked by this consumer
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 45 def make_in_progress(job, new_dir, in_progress_dir, queue_timeout) basename, previous_attempts, * = file_info(job) from = File.join(new_dir, job) # Add a timestamp to mark when the job was started to = File.join(in_progress_dir, "#{basename}.#{previous_attempts}.#{Time.now.to_i}.job") # If the file is non-zero, this means it was successfully written to # by a publisher and we can attempt to move it to "in progress". # # There is a small window of time where the file can be zero, but # the publisher hasn't finished writing to the file yet. if !File.zero?(from) File.open(from, "r") do |f| # If the lock can't be obtained, that means it's been locked # by another consumer or the publisher of the file) -- don't # block and skip it if f.flock(File::LOCK_EX | File::LOCK_NB) FileUtils.mv(from, to) to end end elsif (Time.now - File.ctime(from)) >= queue_timeout # The file is empty (zero bytes) and enough time has passed since # the file was written that we can safely assume it will never # get written to be the publisher. # # The scenario where this happens is when the publisher created # the file, but the process was killed before it had a chance to # actually write the data. File.delete(from) nil end rescue Errno::ENOENT # File no longer exists; skip it since it's been picked up by # another consumer end |
.make_new_again(job, new_dir, in_progress_dir) ⇒ Object
Moves job file to new directory and returns the full path
84 85 86 87 88 89 90 91 92 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 84 def make_new_again(job, new_dir, in_progress_dir) basename, previous_attempts = file_info(job) from = File.join(in_progress_dir, job) to = File.join(new_dir, "#{basename}.#{previous_attempts + 1}.job") FileUtils.mv(from, to) to end |
Instance Method Details
#complete(message_id, receipt_handle = nil) ⇒ Object
Deletes the given message from filesystem queue. Since the filesystem is not a remote API, there is no notion of a “receipt handle”.
169 170 171 172 173 174 175 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 169 def complete(, receipt_handle = nil) Chore.logger.debug "Completing (deleting): #{}" File.delete(File.join(@in_progress_dir, )) rescue Errno::ENOENT # The job took too long to complete, was deemed expired, and moved # back into "new". Ignore. end |
#consume ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 135 def consume Chore.logger.info "Starting consuming file system queue #{@queue_name} in #{self.class.queue_dir(queue_name)}" while running? begin # Move expired job files to new directory (so long as enough time has # passed since we last did this check) if !@last_cleaned_at || (Time.now - @last_cleaned_at).to_i >= EXPIRATION_CHECK_INTERVAL self.class.cleanup(Time.now.to_i - @queue_timeout, @new_dir, @in_progress_dir) @last_cleaned_at = Time.now end found_files = false do |*args| found_files = true yield(*args) end rescue => e Chore.logger.error { "#{self.class}#consume: #{e} #{e.backtrace * "\n"}" } ensure sleep(Chore.config.consumer_sleep_interval) unless found_files end end end |
#reject(id) ⇒ Object
Rejects the given message from the filesystem by id
. Currently
a noop
160 161 162 |
# File 'lib/chore/queues/filesystem/consumer.rb', line 160 def reject(id) end |