Ruby - Job scheduling lock with Redis

Introduction

In a Rails app, sometimes you need to execute a long-running job and have to run it in the background. You might run into some special requirements to avoid duplicate jobs

Problem 1

You must delete the user and all related data in CleanupDataJob job. It will take a long time, and to avoid some problems, such as database-locking, you need to keep only one user deleted at a time.

Problem 2

You have the ProductSalesJob to fetch the sales data of the product and calculate some numbers. What if you had 100 of these jobs? Your queue will have 100 jobs doing the exact same thing, and quickly pile up. So you need to avoid duplicate jobs that do the same thing for a single product.

To avoid more than one job executing at a time, you'll need the locking mechanism.

2. Redis-based locking

This solution is creating a Redis lock for each scheduled job. Depending on the uniqueness constraint, it will reject any jobs that match the same signature.

The main idea of using Redis-lock can be described as:

  1. Try to Acquire Lock:

    • Success → Proceed to Critical Section → Release Lock.
    • Failure → Log Message → Re-enqueue Job.
  2. Critical Section:

    • Perform job logic.
  3. Release Lock:

    • Always release the lock, even on failure or exception, using ensure.

The full implementation for problem 1 is here:

Problem 1: Solution
In this case, we'll periodically call the job in 2 minutes if it has another job acquiring the lock.

require 'redis'

class CleanupDataJob < ApplicationJob
  queue_as :default

  # Redis lock configurations
  WAIT_FOR_NEXT_CALLING = 2.minutes
  LOCK_KEY_EXPIRATION = 5.minutes # Prevent stale locks
  REDIS_LOCK_KEY = "cleanup_data_job_lock"

  def perform(user_id)
    redis = Redis.new

    # Try to acquire the lock
    if acquire_lock(redis)
      begin
        # Perform the job logic
        Rails.logger.info "Executing #{self.class.name} with user: #{user_id}"
        cleanup_data user_id
      ensure
        # Release the lock
        release_lock(redis)
      end
    else
      Rails.logger.info "Job already running: #{self.class.name} with user: #{user_id}"
      self.class.set(wait: WAIT_FOR_NEXT_CALLING).perform_later(user_id)
    end
  end

  private

  def cleanup_data user_id
    # Simulate a long-running task
    sleep(30)
    Rails.logger.info "Data cleanup completed."
  end

  def acquire_lock(redis)
    # Set the lock key with NX (set if not exists) and expiration
    redis.set(REDIS_LOCK_KEY, "locked", nx: true, ex: LOCK_KEY_EXPIRATION)
  end

  def release_lock(redis)
    redis.del(REDIS_LOCK_KEY)
  end
end

Problem 2: Solution
This problem is quite similar to problem 1, the difference is that we must keep a unique job for each job with passing argument, not for all instances of the job. To do that, we pass the argument as a part of lock_key, so instead of product_sales_job_lock, we use product_sales_job_lock#{product_id}. Besides, we will not retry the job if another job with the same product_id is processing.

require 'redis'

class ProductSalesJob < ApplicationJob
  queue_as :default

  LOCK_KEY_EXPIRATION = 5.minutes # Prevent stale locks

  def perform(product_id)
    redis = Redis.new
    lock_key = "product_sales_job_lock#{product_id}"

    # Try to acquire the lock
    if acquire_lock(redis, lock_key)
      begin
        # Perform the job logic
        Rails.logger.info "Executing #{self.class.name} with product: #{product_id}"

        fetch_product_sales product_id
      ensure
        # Release the lock
        release_lock(redis, lock_key)
      end
    else
      Rails.logger.info "Job already running: #{self.class.name} with product: #{product_id}"
    end
  end

  private

  def fetch_product_sales(product_id)
    # Simulate processing logic
    sleep(15)
    ...
  end

  ...
end
1
所有评论 1
avatar
1
@lmhung
Ruby 程序员。 来自越南