Module: Chore

Extended by:
Util
Defined in:
lib/chore.rb,
lib/chore/cli.rb,
lib/chore/job.rb,
lib/chore/util.rb,
lib/chore/hooks.rb,
lib/chore/signal.rb,
lib/chore/worker.rb,
lib/chore/fetcher.rb,
lib/chore/manager.rb,
lib/chore/railtie.rb,
lib/chore/version.rb,
lib/chore/consumer.rb,
lib/chore/publisher.rb,
lib/chore/queues/sqs.rb,
lib/chore/unit_of_work.rb,
lib/chore/configuration.rb,
lib/chore/duplicate_detector.rb,
lib/chore/queues/sqs/consumer.rb,
lib/chore/queues/sqs/publisher.rb,
lib/chore/encoders/json_encoder.rb,
lib/chore/queues/filesystem/consumer.rb,
lib/chore/queues/filesystem/publisher.rb,
lib/chore/strategies/consumer/batcher.rb,
lib/chore/strategies/worker/helpers/ipc.rb,
lib/chore/strategies/worker/helpers/worker_info.rb,
lib/chore/strategies/worker/helpers/worker_killer.rb,
lib/chore/strategies/worker/forked_worker_strategy.rb,
lib/chore/strategies/worker/helpers/worker_manager.rb,
lib/chore/strategies/worker/single_worker_strategy.rb,
lib/chore/strategies/worker/helpers/preforked_worker.rb,
lib/chore/strategies/worker/helpers/work_distributor.rb,
lib/chore/strategies/worker/preforked_worker_strategy.rb,
lib/chore/strategies/consumer/single_consumer_strategy.rb,
lib/chore/strategies/consumer/threaded_consumer_strategy.rb,
lib/chore/strategies/consumer/throttled_consumer_strategy.rb

Overview

:nodoc:

Defined Under Namespace

Modules: Encoder, FilesystemQueue, Hooks, Job, Queues, Strategy, Util, Version Classes: CLI, Configuration, Consumer, DuplicateDetector, Fetcher, Manager, Publisher, Railtie, Signal, TerribleMistake, TimeoutError, UnitOfWork, Worker

Constant Summary collapse

VERSION =

:nodoc:

Chore::Version::STRING
DEFAULT_OPTIONS =

The default configuration options for Chore.

{
  :require               => "./",
  :num_workers           => 4,
  :threads_per_queue     => 1,
  :worker_strategy       => Strategy::ForkedWorkerStrategy,
  :consumer              => Queues::SQS::Consumer,
  :fetcher               => Fetcher,
  :consumer_strategy     => Strategy::ThreadedConsumerStrategy,
  :batch_size            => 50,
  :batch_timeout         => 20,
  :log_level             => Logger::WARN,
  :log_path              => STDOUT,
  :default_queue_timeout => (12 * 60 * 60), # 12 hours
  :shutdown_timeout      => (2 * 60),
  :max_attempts          => 1.0 / 0.0, # Infinity
  :dupe_on_cache_failure => false,
  :queue_polling_size    => 10,
  :payload_handler       => Chore::Job,
  :master_procline       => "chore-master-#{Chore::VERSION}",
  :worker_procline       => "chore-worker-#{Chore::VERSION}",
  :consumer_sleep_interval => 1
}

Class Attribute Summary collapse

Class Method Summary collapse

Methods included from Util

constantize, procline

Class Attribute Details

.loggerObject

Access Chore's logger in a memoized fashion. Will create an instance of the logger if one doesn't already exist.



57
58
59
# File 'lib/chore.rb', line 57

def logger
  @logger
end

Class Method Details

.add_hook(name, &blk) ⇒ Object

Add a global hook for name. Will run &blk when the hook is executed. Global hooks are any hooks that don't have access to an instance of a job. See the docs on Hooks for a full list of global hooks.

Examples

Chore.add_hook_for(:after_fork) do
  SomeDB.reset_connection!
end


109
110
111
112
# File 'lib/chore.rb', line 109

def self.add_hook(name,&blk)
  @@hooks ||= {}
  (@@hooks[name.to_sym] ||= []) << blk
end

.clear_hooks!Object

:nodoc:



120
121
122
# File 'lib/chore.rb', line 120

def self.clear_hooks! #:nodoc:
  @@hooks = {}
end

.configObject

Return the current Chore configuration as specified by configure. You can chain config options off of this to get access to current config data.

Examples

puts Chore.config.num_workers


224
225
226
# File 'lib/chore.rb', line 224

def self.config
  @config ||= self.configure
end

.configure(opts = {}) {|@config| ... } ⇒ Object

Configure global chore options. Takes a hash for opts. This includes things like the current Worker Strategy (:worker_strategy), the default Consumer (:consumer), and the default Consumer Strategy(+:consumer_strategy). It's safe to call multiple times (will merge the new config, into the old) This is used by the command line parsing code to setup Chore. If a block is given, configure will yield the config object, so you can set options directly.

Examples

Chore.configure({:worker_strategy => Chore::ForkedWorkerStrategy})

Chore.configure do |c|
  c.consumer = Chore::Queues::SQS::Consumer
  c.batch_size = 50
  c.batch_timeout = 20
end

Yields:



214
215
216
217
218
# File 'lib/chore.rb', line 214

def self.configure(opts={})
  @config = (@config ? @config.merge_hash(opts) : Chore::Configuration.new(DEFAULT_OPTIONS.merge(opts)))
  yield @config if block_given?
  @config
end

.configuring=(value) ⇒ Object

Setter for chore to indicate that it's in the middle of configuring itself



234
235
236
# File 'lib/chore.rb', line 234

def self.configuring=(value)
  @configuring = value
end

.configuring?Boolean

Helper flag for rails/web app chore initializers to use so that chore does not re-load itself during requirement loading

Returns:

  • (Boolean)


229
230
231
# File 'lib/chore.rb', line 229

def self.configuring?
  @configuring ||= false
end

.hooks_for(name) ⇒ Object

A helper to get a list of all the hooks for a given name



115
116
117
118
# File 'lib/chore.rb', line 115

def self.hooks_for(name)
  @@hooks ||= {}
  @@hooks[name.to_sym] || []
end

.log_level_to_symObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/chore.rb', line 66

def self.log_level_to_sym
  return self.config[:log_level] if self.config[:log_level].is_a?(Symbol)
  case self.config[:log_level]
  when 0
    :debug
  when 1
    :info
  when 2
    :warn
  when 3
    :error
  when 4
    :fatal
  else
    :unknown
  end
end

.prefixed_queue_namesArray<String>

List of queue_names as configured via Chore::Job including their prefix, if set.

Returns:

  • (Array<String>)


241
242
243
# File 'lib/chore.rb', line 241

def self.prefixed_queue_names
  Chore::Job.job_classes.collect {|klass| c = constantize(klass); c.prefixed_queue_name}
end

.reopen_logsObject

Reopens any open files. This will match any logfile that was opened by Chore, Rails, or any other library.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/chore.rb', line 86

def self.reopen_logs
  # Find any open file in the process
  files = []
  ObjectSpace.each_object(File) {|file| files << file unless file.closed?}

  files.each do |file|
    begin
      file.reopen(file.path, 'a+')
      file.sync = true
    rescue
      # Can't reopen -- ignore / skip the file
    end
  end
end

.run_hooks_for(name, *args, &block) ⇒ Object

Run the global hooks associated with a particular name passing all args to the registered block.

Before / After hooks

If this is invoked for before / after hooks (i.e. no block is passed), then the hooks will be invoked in the order in which they're defined.

For example:

add_hook(:before_fork) {|worker| puts 1 }
add_hook(:before_fork) {|worker| puts 2 }
add_hook(:before_fork) {|worker| puts 3 }

run_hooks_for(:before_fork, worker)

# ...will produce the following output
=> 1
=> 2
=> 3

Around hooks

If this is invoked for around hooks (i.e. a block is passed), then the hooks will be invoked in the order in which they're defined, with the passed block being invoked last after the hooks yield.

For example:

add_hook(:around_fork) {|worker, &block| puts 'before 1'; block.call; puts 'after 1'}
add_hook(:around_fork) {|worker, &block| puts 'before 2'; block.call; puts 'after 2'}
add_hook(:around_fork) {|worker, &block| puts 'before 3'; block.call; puts 'after 3'}

run_hooks_for(:around_fork, worker) { puts 'block' }

# ...will produce the following output
=> before 1
=> before 2
=> before 3
=> block
=> after 3
=> after 2
=> after 1

You can imagine the callback order to be U shaped where logic prior to yielding is called in the order it's defined and logic after yielding is called in reverse order. At the bottom of the U is when the block passed into run_hooks_for gets invoked.



171
172
173
174
175
176
177
178
# File 'lib/chore.rb', line 171

def self.run_hooks_for(name,*args,&block)
  if block
    run_around_hooks_for(name, args, &block)
  else
    hooks = self.hooks_for(name)
    hooks.each {|h| h.call(*args, &block)} unless hooks.nil? || hooks.empty?
  end
end