Class: Chore::Queues::Filesystem::Publisher
- Includes:
- FilesystemQueue
- Defined in:
- lib/chore/queues/filesystem/publisher.rb
Overview
Publisher for writing jobs to the local filesystem. Useful for testing in offline environments or when queuing implementations are irrelevent to the task at hand, such as local development of new jobs.
Constant Summary
Constants included from FilesystemQueue
FilesystemQueue::CONFIG_DIR, FilesystemQueue::IN_PROGRESS_DIR, FilesystemQueue::NEW_JOB_DIR
Constants inherited from Publisher
Instance Attribute Summary
Attributes inherited from Publisher
Instance Method Summary collapse
-
#filename(queue_name, job_name) ⇒ Object
create a unique filename for a job in a queue based on queue name, job name and date.
-
#publish(queue_name, job) ⇒ Object
use of mutex and file locking should make this both threadsafe and safe for multiple processes to use the same queue directory simultaneously.
Methods included from FilesystemQueue
#config_dir, #config_value, #in_progress_dir, #new_dir, #queue_dir, #queue_timeout, #root_dir
Methods inherited from Publisher
#initialize, publish, reset_connection!
Constructor Details
This class inherits a constructor from Chore::Publisher
Instance Method Details
#filename(queue_name, job_name) ⇒ Object
create a unique filename for a job in a queue based on queue name, job name and date
33 34 35 36 37 38 |
# File 'lib/chore/queues/filesystem/publisher.rb', line 33 def filename(queue_name, job_name) now = Time.now.strftime "%Y%m%d-%H%M%S-%6N" previous_attempts = 0 pid = Process.pid File.join(new_dir(queue_name), "#{queue_name}-#{job_name}-#{pid}-#{now}.#{previous_attempts}.job") end |
#publish(queue_name, job) ⇒ Object
use of mutex and file locking should make this both threadsafe and safe for multiple processes to use the same queue directory simultaneously.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/chore/queues/filesystem/publisher.rb', line 15 def publish(queue_name,job) # First try encoding the job to avoid writing empty job files if this fails encoded_job = encode_job(job) published = false while !published # keep trying to get a file with nothing in it meaning we just created it # as opposed to us getting someone else's file that hasn't been processed yet. File.open(filename(queue_name, job[:class].to_s), "a") do |f| if f.flock(File::LOCK_EX | File::LOCK_NB) && f.size == 0 f.write(encoded_job) published = true end end end end |