Chore: Async Job Processing Framework For Ruby
About
Chore is a pluggable, multi-backend job processing framework. It was built from the ground up to be extremely flexible. We hope that you find integrating and using Chore to be as pleasant as we do.
The full docs for Chore can always be found at https://tapjoy.github.io/chore.
Configuration
Chore can be integrated with any Ruby-based project by following these instructions:
Add
chore-core
to the Gemfilegem 'chore-core', '~> 4.0.0'
When using SQS, also add
dalli
to use formemcached
-based deduplication:gem 'dalli'
Create a
Chorefile
file in the root of the project directory. While Chore itself can be configured from this file, it's primarily used to direct the Chore binstub toward the root of the application so that it can locate all of the dependencies and required code.--require=./<FILE_TO_LOAD>
Make sure that
--require
points to the main entry point for the application. If integrating with a Rails app, point it to the application directory and Chore will handle loading the correct files on its own.When using SQS, ensure that AWS credentials exist in the environment (e.g. but not limited to
AWS_ACCESS_KEY_ID
&AWS_SECRET_ACCESS_KEY
environment variables) and an AWS region is set (e.g.AWS_REGION
environment variable) so that Chore can authenticate with AWS.By default, Chore will run over all queues it detects among the required files. If different behavior is desired, use one of the following flags:
# Note that only one of these options may be used, not both. Chore will quit # if both options are specified. --queues QUEUE1,QUEUE2... # a list of queues to process --except-queues QUEUE1,QUEUE2... # a list of queues _not_ to process
Chore has many more options, which can be viewed by executing
bundle exec chore --help
Tips For Configuring Chore
For Rails, it can be necessary to add the jobs directory to the eager loading path, found in application.rb
. A similar
approach for most apps using jobs is likely needed, unless the jobs are placed into a directory that is already eager
loaded by the application. One example of this might be:
config.eager_load_paths += File.join(config.root, "app", "jobs")
However, due to the way eager_load_paths
works in Rails, this may only solve the issue in the production environment.
It can also be useful useful for other environments to have something like this in an config/initializers/chore.rb
file, although the job files can be loaded in just about any way.
if !Rails.env.production?
Dir["#{Rails.root}/app/jobs/**/*"].each do |file|
require file unless File.directory?(file)
end
end
Producing & Consuming Jobs
When it comes to configuring Chore, there are 2 main use configurations - as a producer of messages, or as a consumer of messages. The consuming context may also messages if necessary, as it is running as its own isolated instance of the application.
For producers, all of the Chore configuration must be in an initializer.
For consumers, a Chorefile must be used. A Chorefile plus an initializer is also a good pattern.
Here's example of how to configure chore via an initializer:
Chore.configure do |c|
c.concurrency = 16
c.worker_strategy = Chore::Strategy::ForkedWorkerStrategy
c.max_attempts = 100
...
c.batch_size = 50
c.batch_timeout = 20
end
Because it is like that the same application serves as the basis for both producing and consuming messages, and there
will already be a considerable amount of configuration in the Producer, it makes sense to use Chorefile to simply
provide the require
option and stick to the initializer for the rest of the configuration to keep things DRY.
However, like many aspects of Chore, it is ultimately up to the developer to decide which use case fits their needs best. Chore is happy to be configured in almost any way a developer desires.
Integration
This section assumes foreman
is being used to execute (or export the run commands of) the application, but it is not
strictly necessary.
Add an appropriate line to the
Procfile
:jobs: bundle exec chore -c config/chore.config
If the queues do not exist, they must be created before the application can produce/consume Chore jobs:
require 'aws-sdk-sqs' sqs = Aws::SQS::Client.new sqs.create_queue(queue_name: "test_queue")
Finally, start the application as usual
bundle exec foreman start
Chore::Job
A Chore::Job
is any class with include Chore::Job
and implements a perform(*args)
instance method. Here is an
example job class:
class TestJob
include Chore::Job
:name => 'test_queue'
def perform(args={})
Chore.logger.debug "My first async job"
end
end
This job declares that the name of the queue it uses is test_queue
, set in the queue_options
method.
Chore::Job
& perform
Signatures
The perform method signature can have explicit argument names, but in practice this makes changing the signature more
difficult later on. Once a Chore::Job
is in production and being used at a constant rate, it becomes problematic to
begin mixing versions of the job with non-matching signatures.
While this is able to be overcome with a number of techniques, such as versioning jobs/queues, it increases the complexity of making changes.
The simplest way to structure job signatures is to treat the arguments as a hash. This enables maintaining forwards and backwards compatibility between signature changes with the same job class.
However, Chore is ultimately agnostic in this regard and will allow explicit arguments in signatures as easily as using a simple hash; the choice is left to the developer.
Chore::Job
& Publishing Jobs
Now that there's a test job, publishing an instance of the job is as simple as:
TestJob.perform_async({"message"=>"YES, DO THAT THING."})
It's advisable to specify the Publisher Chore uses to send messages globally, so that it can easily be modified based on the environment. To do this, add a configuration block to an initializer:
Chore.configure do |c|
c.publisher = Some::Other::Publisher
end
It is worth noting that any option that can be set via config file or command-line args can also be set in a configure block.
If a global publisher is set, it can be overridden on a per-job basis by specifying the publisher in queue_options
.
Retry Backoff Strategy
Chore has basic support for delaying retries of a failed job using a step function. Currently the only queue that supports this functionality is SQS; all others will simply ignore the delay setting.
Setup
The :backoff
option for a queue expects a lambda that takes a single UnitOfWork
argument. The return should be a
number of seconds to delay the next attempt.
:name => 'nameOfQueue',
:backoff => lambda { |work| work.current_attempt ** 2 } # Exponential backoff
Using The Backoff
If there is a :backoff
option supplied, any failures will delay the next attempt by the result of that lambda.
Notes On SQS & Delays
Read more details about SQS and Delays here
Hooks
A number of hooks, both global and per-job, exist in Chore for flexibility and convencience. Hooks should be named
hook_name_identifier
where identifier
is a descriptive string of chosen by the developer.
Global Hooks
before_start
before_first_fork
before_fork
after_fork
around_fork
within_fork
- behaves similarly to
around_fork
, except that it is called after the worker process has been forked. In contrast,around_fork
is called by the parent process ( chore-master`)
- behaves similarly to
before_shutdown
Filesystem Consumer/Publisher Hooks
on_fetch(job_file, job_json)
SQS Consumer Hooks
on_fetch(handle, body)
Per Job
before_publish
after_publish
before_perform(message)
after_perform(message)
on_rejected(message)
on_failure(message, error)
on_permanent_failure(queue_name, message, error)
All per-job hooks can also be global hooks.
Hooks can be added to a job class like so:
class TestJob
include Chore::Job
:name => 'test_queue'
def perform(args={})
# Do something cool
end
def before_perform_log()
Chore.logger.debug "About to do something cool with: #{.inspect}"
end
end
Global hooks can also be registered like so:
Chore.add_hook :after_publish do
# Add handler code here
end
Signals
Signal handling can get complicated when there are multiple threads, process forks, and both signal handlers and application code making use of mutexes.
To simplify the complexities around this, Chore introduces some additional behaviors on top of Ruby's default
Signal.trap
implementation. This functionality is primarily inspired by sidekiq
's signal handling @
https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/cli.rb.
In particular Chore handles signals in a separate thread, and does so sequentially instead of being interrupt-driven.
See Chore::Signal
for more details on the differences between Ruby's Signal.trap
and Chore's Chore::Signal.trap
.
Chore will respond to the following signals:
INT
,TERM
,QUIT
- Chore will begin shutting down, taking steps to safely terminate workers and not interrupt jobs in progress unless it believes they may be hungUSR1
- Re-opens logfiles, useful for handling log rotations
Timeouts
When using the forked worker strategy for processing jobs, inevitably there are cases in which child processes become stuck. This could result from deadlocks, hung network calls, tight loops, etc. When these jobs hang, they consume resources and can affect throughput.
To mitigate this, Chore has built-in monitoring of forked child processes. When a fork is created to process a batch of
work, that fork is assigned an expiration time -- if it doesn't complete by that time, the process is sent a KILL
signal.
Fork expiration times are determined from one of two places:
- The timeout associated with the queue. For SQS queues, this is the visibility timeout.
- The default queue timeout configured for Chore. For filesystem queues, this is the value used.
For example, if a worker is processing a batch of 5 jobs and each job's queue has a timeout of 60s, then the expiration time will be 5 minutes for the worker.
To change the default queue timeout (when one can't be inferred), do the following:
Chore.configure do |c|
c.default_queue_timeout = 3600
end
A reasonable timeout would be based on the maximum amount of time any job in the system is expected to run. Keep in mind that the process running the job may get killed if the job is running for too long.
Plugins
Chore has several plugin gems available, which extend its core functionality
New Relic - Integrating Chore with New Relic
Airbrake - Integrating Chore with Airbrake
Managing Chore Processes
Sample Upstart
There are lots of ways to create upstart scripts, so it's difficult to give a prescriptive example of the "right" way to do it. However, here are some ideas from how we run it in production at Tapjoy:
For security reasons, a specific user should be specified that the process runs as. Switch to this user at the beginning of the exec line
su - $USER --command '...'
For the command to run Chore itself keeping all of the necessary environment variables in an env file that Upstart can source on it's exec line, to prevent having to mix changing environment variables with having to change the upstart script itself
source $PATHTOENVVARS ;
After that, ensure Chore is running under the right ruby version. Additionally, STDOUT
and STDERR
can be redirected
to logger
with an app name. This makes it easy to find information in syslog later on. Putting that all together looks
like:
rvm use $RUBYVERSION do bundle exec chore -c Chorefile 2>&1 | logger -t $APPNAME
There are many other ways to manage the Upstart file, but these are a few of the ways we prefer to do it. Putting it all together, it looks something like:
exec su - special_user --command '\
source /the/path/to/env ;\
rvm use 2.4.1 do bundle exec chore -c Chorefile 2>&1 | logger chore-app ;'
Locating Processes
As Chore does not keep a PID file, and has both a master and a potential number of workers, it may be difficult to isolate the exact PID for the master process.
To find Chore master processes via ps
, run the following:
ps aux | grep bin/chore
or
pgrep -f bin/chore
To find a list of only Chore worker processes:
ps aux | grep chore-worker
or
pgrep -f chore-worker
Copyright
Copyright (c) 2013 - 2020 Tapjoy. See LICENSE.txt for further details.