Class: Chore::Queues::Filesystem::Publisher

Inherits:
Publisher
  • Object
show all
Includes:
FilesystemQueue
Defined in:
lib/chore/queues/filesystem/publisher.rb

Overview

Publisher for writing jobs to the local filesystem. Useful for testing in offline environments or when queuing implementations are irrelevent to the task at hand, such as local development of new jobs.

Constant Summary

Constants included from FilesystemQueue

FilesystemQueue::CONFIG_DIR, FilesystemQueue::IN_PROGRESS_DIR, FilesystemQueue::NEW_JOB_DIR

Constants inherited from Publisher

Publisher::DEFAULT_OPTIONS

Instance Attribute Summary

Attributes inherited from Publisher

#options

Instance Method Summary collapse

Methods included from FilesystemQueue

#config_dir, #config_value, #in_progress_dir, #new_dir, #queue_dir, #queue_timeout, #root_dir

Methods inherited from Publisher

#initialize, publish, reset_connection!

Constructor Details

This class inherits a constructor from Chore::Publisher

Instance Method Details

#filename(queue_name, job_name) ⇒ Object

create a unique filename for a job in a queue based on queue name, job name and date



33
34
35
36
37
38
# File 'lib/chore/queues/filesystem/publisher.rb', line 33

def filename(queue_name, job_name)
  now = Time.now.strftime "%Y%m%d-%H%M%S-%6N"
  previous_attempts = 0
  pid = Process.pid
  File.join(new_dir(queue_name), "#{queue_name}-#{job_name}-#{pid}-#{now}.#{previous_attempts}.job")
end

#publish(queue_name, job) ⇒ Object

use of mutex and file locking should make this both threadsafe and safe for multiple processes to use the same queue directory simultaneously.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/chore/queues/filesystem/publisher.rb', line 15

def publish(queue_name,job)
  # First try encoding the job to avoid writing empty job files if this fails
  encoded_job = encode_job(job)

  published = false
  while !published
    # keep trying to get a file with nothing in it meaning we just created it
    # as opposed to us getting someone else's file that hasn't been processed yet.
    File.open(filename(queue_name, job[:class].to_s), "a") do |f|
      if f.flock(File::LOCK_EX | File::LOCK_NB) && f.size == 0
        f.write(encoded_job)
        published = true
      end
    end
  end
end