Class: Chore::Queues::Filesystem::Consumer

Inherits:
Consumer
  • Object
show all
Extended by:
FilesystemQueue
Defined in:
lib/chore/queues/filesystem/consumer.rb

Overview

This is the consuming side of the file system queue. This class consumes jobs created by FilesystemPublisher#publish. The root of the file system queue is configured in Chore.config.fs_queue_root. In there a directory will be created for each queue name. Each queue directory contains a directory called “new” and one called “inprogress”. FilesystemPublisher#publish creates new job files in the “new” directory. This consumer polls that directory every 5 seconds for new jobs which are moved to “inprogress”.

Once complete job files are deleted. If rejected they are moved back into new and will be processed again. This may not be the desired behavior long term and we may want to add configuration to this class to allow more creating failure handling and retrying.

Constant Summary collapse

EXPIRATION_CHECK_INTERVAL =

The minimum number of seconds to allow to pass between checks for expired jobs on the filesystem.

Since queue times are measured on the order of seconds, 1 second is the smallest duration. It also prevents us from burning a lot of CPU looking at expired jobs when the consumer sleep interval is less than 1 second.

1

Constants included from FilesystemQueue

FilesystemQueue::CONFIG_DIR, FilesystemQueue::IN_PROGRESS_DIR, FilesystemQueue::NEW_JOB_DIR

Instance Attribute Summary collapse

Attributes inherited from Consumer

#queue_name

Class Method Summary collapse

Instance Method Summary collapse

Methods included from FilesystemQueue

config_dir, config_value, in_progress_dir, new_dir, queue_dir, root_dir

Methods inherited from Consumer

#dupe_detector, #duplicate_message?, #provide_work, reset_connection!, #running?, #stop

Constructor Details

#initialize(queue_name, opts = {}) ⇒ Consumer

Returns a new instance of Consumer.



127
128
129
130
131
132
133
# File 'lib/chore/queues/filesystem/consumer.rb', line 127

def initialize(queue_name, opts={})
  super(queue_name, opts)

  @in_progress_dir = self.class.in_progress_dir(queue_name)
  @new_dir = self.class.new_dir(queue_name)
  @queue_timeout = self.class.queue_timeout(queue_name)
end

Instance Attribute Details

#queue_timeoutObject (readonly)

The amount of time units of work can run before the queue considers them timed out. For filesystem queues, this is the global default.



125
126
127
# File 'lib/chore/queues/filesystem/consumer.rb', line 125

def queue_timeout
  @queue_timeout
end

Class Method Details

.cleanup(expiration_time, new_dir, in_progress_dir) ⇒ Object

Cleans up expired in-progress files by making them new again.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/chore/queues/filesystem/consumer.rb', line 26

def cleanup(expiration_time, new_dir, in_progress_dir)
  each_file(in_progress_dir) do |job_file|
    id, previous_attempts, timestamp = file_info(job_file)
    next if timestamp > expiration_time

    begin
      make_new_again(job_file, new_dir, in_progress_dir)
    rescue Errno::ENOENT
      # File no longer exists; skip since it's been recovered by another
      # consumer
    rescue ArgumentError
      # Move operation was attempted at same time as another consumer;
      # skip since the other process succeeded where this one didn't
    end
  end
end

.each_file(path, limit = nil) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/chore/queues/filesystem/consumer.rb', line 94

def each_file(path, limit = nil)
  count = 0

  Dir.foreach(path) do |file|
    next if file.start_with?('.')

    yield file

    count += 1
    break if limit && count >= limit
  end
end

.file_info(job_file) ⇒ Object

Grabs the unique identifier for the job filename and the number of times it's been attempted (also based on the filename)



109
110
111
112
# File 'lib/chore/queues/filesystem/consumer.rb', line 109

def file_info(job_file)
  id, previous_attempts, timestamp, * = job_file.split('.')
  [id, previous_attempts.to_i, timestamp.to_i]
end

.make_in_progress(job, new_dir, in_progress_dir, queue_timeout) ⇒ Object

Moves job file to inprogress directory and returns the full path if the job was successfully locked by this consumer



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/chore/queues/filesystem/consumer.rb', line 45

def make_in_progress(job, new_dir, in_progress_dir, queue_timeout)
  basename, previous_attempts, * = file_info(job)

  from = File.join(new_dir, job)
  # Add a timestamp to mark when the job was started
  to = File.join(in_progress_dir, "#{basename}.#{previous_attempts}.#{Time.now.to_i}.job")

  # If the file is non-zero, this means it was successfully written to
  # by a publisher and we can attempt to move it to "in progress".
  #
  # There is a small window of time where the file can be zero, but
  # the publisher hasn't finished writing to the file yet.
  if !File.zero?(from)
    File.open(from, "r") do |f|
      # If the lock can't be obtained, that means it's been locked
      # by another consumer or the publisher of the file) -- don't
      # block and skip it
      if f.flock(File::LOCK_EX | File::LOCK_NB)
        FileUtils.mv(from, to)
        to
      end
    end
  elsif (Time.now - File.ctime(from)) >= queue_timeout
    # The file is empty (zero bytes) and enough time has passed since
    # the file was written that we can safely assume it will never
    # get written to be the publisher.
    #
    # The scenario where this happens is when the publisher created
    # the file, but the process was killed before it had a chance to
    # actually write the data.
    File.delete(from)
    nil
  end
rescue Errno::ENOENT
  # File no longer exists; skip it since it's been picked up by
  # another consumer
end

.make_new_again(job, new_dir, in_progress_dir) ⇒ Object

Moves job file to new directory and returns the full path



84
85
86
87
88
89
90
91
92
# File 'lib/chore/queues/filesystem/consumer.rb', line 84

def make_new_again(job, new_dir, in_progress_dir)
  basename, previous_attempts = file_info(job)

  from = File.join(in_progress_dir, job)
  to = File.join(new_dir, "#{basename}.#{previous_attempts + 1}.job")
  FileUtils.mv(from, to)

  to
end

Instance Method Details

#complete(message_id, receipt_handle = nil) ⇒ Object

Deletes the given message from filesystem queue. Since the filesystem is not a remote API, there is no notion of a “receipt handle”.

Parameters:

  • message_id (String)

    Unique ID of the message

  • receipt_handle (Hash) (defaults to: nil)

    Receipt handle of the message. Always nil for the filesystem consumer



169
170
171
172
173
174
175
# File 'lib/chore/queues/filesystem/consumer.rb', line 169

def complete(message_id, receipt_handle = nil)
  Chore.logger.debug "Completing (deleting): #{message_id}"
  File.delete(File.join(@in_progress_dir, message_id))
rescue Errno::ENOENT
  # The job took too long to complete, was deemed expired, and moved
  # back into "new".  Ignore.
end

#consumeObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/chore/queues/filesystem/consumer.rb', line 135

def consume
  Chore.logger.info "Starting consuming file system queue #{@queue_name} in #{self.class.queue_dir(queue_name)}"
  while running?
    begin
      # Move expired job files to new directory (so long as enough time has
      # passed since we last did this check)
      if !@last_cleaned_at || (Time.now - @last_cleaned_at).to_i >= EXPIRATION_CHECK_INTERVAL
        self.class.cleanup(Time.now.to_i - @queue_timeout, @new_dir, @in_progress_dir)
        @last_cleaned_at = Time.now
      end

      found_files = false
      handle_messages do |*args|
        found_files = true
        yield(*args)
      end
    rescue => e
      Chore.logger.error { "#{self.class}#consume: #{e} #{e.backtrace * "\n"}" }
    ensure
      sleep(Chore.config.consumer_sleep_interval) unless found_files
    end
  end
end

#reject(id) ⇒ Object

Rejects the given message from the filesystem by id. Currently a noop



160
161
162
# File 'lib/chore/queues/filesystem/consumer.rb', line 160

def reject(id)

end