It happened on a project I was working on that we had to implement a divide & conquer algorithm using our background jobs processor, in this case we were using sidekiq. Implementing this using sidekiq can be quite challenging since all the workers are independent and they do not trigger any callback once they’re done.
I am going to use merge-sort as the example here since it is a quite simple application of the divide & conquer. I am not going to explain in detail how it works but it basically consists of splitting an array in two parts and them merging them sorted, recursively.
A very simple way of getting merge-sort to work on sidekiq is having one single worker that does all the job, note that I’m using two abstract service classes here SplitsArray
and MergesArrays
, imagine that things can go wrong inside of these services, since they are not controlled by us and can live somewhere in the internet, maybe, they can fail or be slow.
class MergeSortWorker
include Sidekiq::Worker
def sort(array)
return array if array.count <= 1
left, right = SplitsArray.split(array)
MergesArrays.merge sort(left), sort(right)
end
def perform(array)
SortedArray.create array: sort(array)
end
end
If you think about this implementation, let’s say SplitsArray is a very flaky service and it fails once every few tries, do we want it to retry the whole worker from scratch? We can potentially be retrying this over and over again and maybe never see the result. That is exactly the problem we had and we needed each step to be independent. Not to mention that if we can parallelize this process to gain performance it’s another win.
So here is the initial idea, what if we have a worker to split arrays, and a worker to merge them? That could work right? Here is a pseudo implementation of that:
class MergeSortWorker
include Sidekiq::Worker
def perform(array)
array = SortWorker.perform_async array
SortedArray.create array: array
end
end
class SortWorker
include Sidekiq::Worker
def perform(array)
return array if array.count <= 1
left, right = SplitsArray.split(array)
left_sorted = SortWorker.perform_async left
right_sorted = SortWorker.perform_async right
MergeWorker.perform_async left_sorted, right_sorted
end
end
class MergeWorker
include Sidekiq::Worker
def perform(left, right)
MergesArrays.merge left, right
end
end
And this is a visual representation of what it looks like when executing.
There is only one problem: workers are asynchronous and they do not have return values, so our code can’t possibly work like that, we have to store some kind of state to know when we’re done with a worker so we can merge back the arrays. There is no tool as of now to do that with sidekiq (couldn’t find anything for resque either) out of the box.
To solve this problem you can manually track the state of the job, thats essentially what we did in our case, we could end up with something like this:
class MergeSortWorker
include Sidekiq::Worker
def perform(array)
SortWorker.perform_async TempArray.create(array: array).id
end
end
class SortWorker
include Sidekiq::Worker
def finish(temp_array)
temp_array.sorted = true
temp_array.save
MergeWorker.perform_async temp_array.parent.id
end
def perform(array_id)
temp_array = TempArray.find array_id
array = temp_array.array
finish(temp_array) and return if array.count <= 1
left, right = SplitsArray.split(array)
temp_left = temp_array.children.create(array: left, sorted: false)
temp_right = temp_array.children.create(array: right, sorted: false)
SortWorker.perform_async temp_left.id
SortWorker.perform_async temp_right.id
end
end
class MergeWorker
include Sidekiq::Worker
def finish(parent)
if parent.parent
MergeWorker.perform_async parent.parent.id
else
s = Redis::Semaphore.new(:creating_sorted_array, connection: "localhost")
s.lock do
parent.reload rescue return
SortedArray.create array: parent.array
parent.destroy
end
end
end
def perform(parent_id)
parent = TempArray.includes(:parent).where(id: parent_id).first
return if parent.nil?
finish(parent) and return if parent.sorted?
if parent.children.present? && parent.children.all?(&:sorted?)
left = parent.children.first.array
right = parent.children.last.array
parent.children.destroy_all
parent.array = MergesArrays.merge left, right
parent.sorted = true
parent.save
finish parent
end
end
end
The complexity of our workers grew considerably, but it is now fail proof and parallelizable, solving our previous problems and getting us somewhere closer to that graphical representation above. Note that we added the model TempArray
to the equation, that is where we save our interstitial states.
We have knowledge about synchronization and thread safety all over the place now, we have that redis-semaphore gem to ensure it. Each worker is now trying stuff without the certainty of success, it eventually succeeds, tho. We are instantiating superfluos workers then giving up on them once we figure out the arrays “are not ready”, and it got much more difficult to understand what the code is doing.
The responsibilities of each worker are a bit shady right now because of the nature of the solution, it is definitely not ideal, but sometimes when you need a robust and efficient solution you have to make some sacrifices.
I still think this should be easier, I’m sure the code I wrote as an example here could be refactored to look nicer but still, this shouldn’t be such a pain to achieve, that is why I’m starting my own gem to solve this problem. More on that next week.
It has been pointed out to me by the author of sidekiq that sidekiq-pro is capable of creating batch jobs and keeping track of them. That is true. You can keep track of the batches using the pub/sub part of the notifications feature.
All the files I used for this are available on this gist here: https://gist.github.com/luan/5610299