labs resque

Sidekiq Divide & Conquer

It happened on a project I was working on that we had to implement a divide & conquer algorithm using our background jobs processor, in this case we were using sidekiq. Implementing this using sidekiq can be quite challenging since all the workers are independent and they do not trigger any callback once they’re done.

I am going to use merge-sort as the example here since it is a quite simple application of the divide & conquer. I am not going to explain in detail how it works but it basically consists of splitting an array in two parts and them merging them sorted, recursively.

A very simple way of getting merge-sort to work on sidekiq is having one single worker that does all the job, note that I’m using two abstract service classes here SplitsArray and MergesArrays, imagine that things can go wrong inside of these services, since they are not controlled by us and can live somewhere in the internet, maybe, they can fail or be slow.


class MergeSortWorker
  include Sidekiq::Worker

  def sort(array)
    return array if array.count <= 1

    left, right = SplitsArray.split(array)
    MergesArrays.merge sort(left), sort(right)
  end

  def perform(array)
    SortedArray.create array: sort(array)
  end
end

If you think about this implementation, let’s say SplitsArray is a very flaky service and it fails once every few tries, do we want it to retry the whole worker from scratch? We can potentially be retrying this over and over again and maybe never see the result. That is exactly the problem we had and we needed each step to be independent. Not to mention that if we can parallelize this process to gain performance it’s another win.

So here is the initial idea, what if we have a worker to split arrays, and a worker to merge them? That could work right? Here is a pseudo implementation of that:


class MergeSortWorker
  include Sidekiq::Worker

  def perform(array)
    array = SortWorker.perform_async array
    SortedArray.create array: array
  end
end

class SortWorker
  include Sidekiq::Worker

  def perform(array)
    return array if array.count <= 1

    left, right = SplitsArray.split(array)

    left_sorted = SortWorker.perform_async left
    right_sorted = SortWorker.perform_async right

    MergeWorker.perform_async left_sorted, right_sorted
  end
end

class MergeWorker
  include Sidekiq::Worker

  def perform(left, right)
    MergesArrays.merge left, right
  end
end

And this is a visual representation of what it looks like when executing.

There is only one problem: workers are asynchronous and they do not have return values, so our code can’t possibly work like that, we have to store some kind of state to know when we’re done with a worker so we can merge back the arrays. There is no tool as of now to do that with sidekiq (couldn’t find anything for resque either) out of the box.

To solve this problem you can manually track the state of the job, thats essentially what we did in our case, we could end up with something like this:


class MergeSortWorker
  include Sidekiq::Worker

  def perform(array)
    SortWorker.perform_async TempArray.create(array: array).id
  end
end

class SortWorker
  include Sidekiq::Worker

  def finish(temp_array)
    temp_array.sorted = true
    temp_array.save
    MergeWorker.perform_async temp_array.parent.id
  end

  def perform(array_id)
    temp_array = TempArray.find array_id
    array = temp_array.array

    finish(temp_array) and return if array.count <= 1

    left, right = SplitsArray.split(array)
    temp_left = temp_array.children.create(array: left, sorted: false)
    temp_right = temp_array.children.create(array: right, sorted: false)

    SortWorker.perform_async temp_left.id
    SortWorker.perform_async temp_right.id
  end
end

class MergeWorker
  include Sidekiq::Worker

  def finish(parent)
    if parent.parent
      MergeWorker.perform_async parent.parent.id
    else
      s = Redis::Semaphore.new(:creating_sorted_array, connection: "localhost")
      s.lock do
        parent.reload rescue return
        SortedArray.create array: parent.array
        parent.destroy
      end
    end
  end

  def perform(parent_id)
    parent = TempArray.includes(:parent).where(id: parent_id).first
    return if parent.nil?

    finish(parent) and return if parent.sorted?

    if parent.children.present? && parent.children.all?(&:sorted?)
      left = parent.children.first.array
      right = parent.children.last.array
      parent.children.destroy_all

      parent.array = MergesArrays.merge left, right
      parent.sorted = true
      parent.save

      finish parent
    end
  end
end

The complexity of our workers grew considerably, but it is now fail proof and parallelizable, solving our previous problems and getting us somewhere closer to that graphical representation above. Note that we added the model TempArray to the equation, that is where we save our interstitial states.

We have knowledge about synchronization and thread safety all over the place now, we have that redis-semaphore gem to ensure it. Each worker is now trying stuff without the certainty of success, it eventually succeeds, tho. We are instantiating superfluos workers then giving up on them once we figure out the arrays “are not ready”, and it got much more difficult to understand what the code is doing.

The responsibilities of each worker are a bit shady right now because of the nature of the solution, it is definitely not ideal, but sometimes when you need a robust and efficient solution you have to make some sacrifices.

I still think this should be easier, I’m sure the code I wrote as an example here could be refactored to look nicer but still, this shouldn’t be such a pain to achieve, that is why I’m starting my own gem to solve this problem. More on that next week.

It has been pointed out to me by the author of sidekiq that sidekiq-pro is capable of creating batch jobs and keeping track of them. That is true. You can keep track of the batches using the pub/sub part of the notifications feature.

All the files I used for this are available on this gist here: https://gist.github.com/luan/5610299