How to efficiently create database record using Oban job

Actually this is related to Oban it is a question about database.

I want to insert a calculated billing for every users using Oban(cron schedule)

def perform(%{args: %{"type" => "insert_billing"}}) do
    User
    |> Repo.all()
    |> Enum.each(fn user ->
      try do
        calculate_billing_for_user(user)
      rescue
        e ->
          Logger.error(
            "Error"
          )
      end
    end)
  end

This is first solution come up in my head.
We have few thousands users and it is growing.

I have read recursive job from oban. But I can’t find a way to pass a user_id to perform function since it is cron job.(correct me if I am wrong)

Do you guys have any idea for this kind of job?

You can enqueue batches of jobs from the cron job and let those run in parallel (or one at a time, if you prefer):

def perform(%{args: %{"type" => "insert_billing"}}) do
  fun = fn ->
    User
    |> select([u], u.id)
    |> Repo.stream()
    |> Stream.chunk_every(100)
    |> Enum.each(fn user_ids ->
      %{user_ids: user_ids}
      |> new()
      |> Oban.insert_all()
    end)
  end

  # Increase the timeout to give the stream time as the table grows.
  Repo.transaction(fun, timeout: 60_000)
end

def perform(%{args: %{"user_ids" => user_ids}}) do
  User
  |> where([u], u.id in ^user_ids)
  |> Repo.all()
  |> Enum.each(fn user ->
    # Safely try to calculate billing
  end)
end

The important bit is using Repo.stream to avoid loading all of the records at once as your database grows.

1 Like

This is really nice and elegant solution.
Quick question, what happened if error occurs during perform function?
Does job retry from start? or retry from where it left?

It will retry from the start. It’s up to you whether you wrap the “calculate billing” part in a try/rescue, or enqueue each job independently. If you have a couple of thousand jobs then it isn’t a big deal to run one at a time. If you have many thousand, that isn’t so appealing.