Ruby - Job scheduling lock with Redis
Introduction
In a Rails app, sometimes you need to execute a long-running job and have to run it in the background. You might run into some special requirements to avoid duplicate jobs
Problem 1
You must delete the user and all related data in CleanupDataJob
job. It will take a long time, and to avoid some problems, such as database-locking, you need to keep only one user deleted at a time.
Problem 2
You have the ProductSalesJob
to fetch the sales data of the product and calculate some numbers. What if you had 100 of these jobs? Your queue will have 100 jobs doing the exact same thing, and quickly pile up. So you need to avoid duplicate jobs that do the same thing for a single product.
To avoid more than one job executing at a time, you'll need the locking mechanism.
2. Redis-based locking
This solution is creating a Redis lock for each scheduled job. Depending on the uniqueness constraint, it will reject any jobs that match the same signature.
The main idea of using Redis-lock can be described as:
-
Try to Acquire Lock:
- Success → Proceed to Critical Section → Release Lock.
- Failure → Log Message → Re-enqueue Job.
-
Critical Section:
- Perform job logic.
-
Release Lock:
- Always release the lock, even on failure or exception, using ensure.
The full implementation for problem 1 is here:
Problem 1: Solution
In this case, we'll periodically call the job in 2 minutes if it has another job acquiring the lock.
require 'redis'
class CleanupDataJob < ApplicationJob
queue_as :default
# Redis lock configurations
WAIT_FOR_NEXT_CALLING = 2.minutes
LOCK_KEY_EXPIRATION = 5.minutes # Prevent stale locks
REDIS_LOCK_KEY = "cleanup_data_job_lock"
def perform(user_id)
redis = Redis.new
# Try to acquire the lock
if acquire_lock(redis)
begin
# Perform the job logic
Rails.logger.info "Executing #{self.class.name} with user: #{user_id}"
cleanup_data user_id
ensure
# Release the lock
release_lock(redis)
end
else
Rails.logger.info "Job already running: #{self.class.name} with user: #{user_id}"
self.class.set(wait: WAIT_FOR_NEXT_CALLING).perform_later(user_id)
end
end
private
def cleanup_data user_id
# Simulate a long-running task
sleep(30)
Rails.logger.info "Data cleanup completed."
end
def acquire_lock(redis)
# Set the lock key with NX (set if not exists) and expiration
redis.set(REDIS_LOCK_KEY, "locked", nx: true, ex: LOCK_KEY_EXPIRATION)
end
def release_lock(redis)
redis.del(REDIS_LOCK_KEY)
end
end
Problem 2: Solution
This problem is quite similar to problem 1, the difference is that we must keep a unique job for each job with passing argument, not for all instances of the job. To do that, we pass the argument as a part of lock_key, so instead of product_sales_job_lock
, we use product_sales_job_lock#{product_id}
. Besides, we will not retry the job if another job with the same product_id
is processing.
require 'redis'
class ProductSalesJob < ApplicationJob
queue_as :default
LOCK_KEY_EXPIRATION = 5.minutes # Prevent stale locks
def perform(product_id)
redis = Redis.new
lock_key = "product_sales_job_lock#{product_id}"
# Try to acquire the lock
if acquire_lock(redis, lock_key)
begin
# Perform the job logic
Rails.logger.info "Executing #{self.class.name} with product: #{product_id}"
fetch_product_sales product_id
ensure
# Release the lock
release_lock(redis, lock_key)
end
else
Rails.logger.info "Job already running: #{self.class.name} with product: #{product_id}"
end
end
private
def fetch_product_sales(product_id)
# Simulate processing logic
sleep(15)
...
end
...
end
I will use https://github.com/ClosureTree/with_advisory_lock .