Elixir consume jobs from Delayed job table(Ruby)

We are migrating one ROR application to Elixir. First, we are converting only the delayed job services to Elixir.
In Ruby, they have done the delayed job services using the delayed job library.
It is storing all the jobs in the table and consuming it from the library itself.

Now From Elixir, I have to write only the job-consuming part. Ruby App is producing jobs and storing that to the delayed job table.

What is the best way to do it?

What I have planned now is.

Using Genstage to Stream jobs from the delayed job table using https://hexdocs.pm/ecto/Ecto.Repo.html#c:stream/2 and consume it.
Genserver and Task Supervisor when processing that job.

Is there any better way to do this?

Can we stream data from the database and use Genstage?

Is Repo.stream is good for this job or any better option available?

Can anyone give insight on this? Your help is greatly appreciated.

1 Like

Repo.stream isn’t what you want, it operates within a single transaction. It can be quite tricky to implement a reliable job runner on your own. I’d consider using something like Oban, and then having Ruby write Oban jobs instead of delayed job jobs. I’m not entirely sure how feasible this is though.

4 Likes

Okay. Oban is the first choice i will keep in mind.

Suppose if the Ruby team is not ready to send jobs to Oban table. What is the second best option.

Using Genstage i can play with Producer and Consumer.
Using Genserver and Task.Supervisor I can Process the Jobs.
What is the best way to fetch data from the delayed job table.

What way you will choose for the second option if you are asked to write your own code using Genstage, Genserver and Task Sypervisor?

I am asking this for my learning purpose too. Thanks

Gotcha.

Using GenStage with some Producers and consumers will work well. The main part to get right is selecting jobs for work in a safe, atomic way so that it isn’t run more than once, and recovering from job failures to retry them later.

One possible solution here by the way is, instead of writing something to consume delayed jobs as work queue, you could write something which, when a delayed job row is added, writes an Oban job, and then you just let Oban run things.

3 Likes

Delayed job row is added in delayed job table. Are you saying while adding row in delayed job table, I must find someway to add in orban job table too. And then process that using orban?

I’m definitely not saying you must do this. I think a custom solution that uses GenStage to run jobs off of the delayed jobs table is a fine idea.

I’m saying that one option if that is hard to do, is to instead just have your Elixir code, for each delayed job, insert an oban job to do the actual work, and let Oban handle running the jobs for you.

2 Likes

I think your best chance is to have one worker process – supervised by your app – that checks for DelayedJob records once every 5 seconds or so, transforms them into Oban jobs and saves them to the DB, from which point and on you are fine because Oban is already very good.

Looking through DelayedJob’s table schema, it doesn’t look that hard to do such a transformation. :slight_smile:

It’s a migration and thus integration work is necessary. No ways around it IMO. If DelayedJob supported posting events to Redis / Kafka / RabbitMQ then it might have been a tiny bit easier but I doubt it. You just have to put a little elbow grease into it.

1 Like

If You are using postgresql, You could benefit from its triggers, and have your Elixir application being notified when a new delayed job record is created/updated/deleted :slight_smile:

1 Like

Inserting Oban jobs from Rails is extremely easy. Here is the entirety of the rails model that we have defined:

class ObanJob < ApplicationRecord
  self.ignored_columns = %w[errors]

  # A simple wrapper around `create`
  def self.insert(worker:, args: {}, queue: 'default', scheduled_at: nil)
    create(
      worker: worker,
      queue: queue,
      args: args,
      scheduled_at: scheduled_at || Time.now.utc,
      state: scheduled_at ? 'scheduled' : 'available'
    )
  end

  validates :queue, presence: true
  validates :worker, presence: true
end

That’s all it takes for the model. Then you can insert new jobs like this:

ObanJob.insert(worker: “MyApp.Worker”, args: { id: 1 })

Some other options are supported in the helper, but not all of them. It’s easy to switch from names args to a hash is you want to support other things like max_attempts.

We insert jobs freely from the Rails side and it works very well. You don’t get multi or unique support, but it helps bridge the gap.

7 Likes

Thanks @sorentwo.