Actually this is related to Oban it is a question about database.
I want to insert a calculated billing for every users using Oban(cron schedule)
def perform(%{args: %{"type" => "insert_billing"}}) do
User
|> Repo.all()
|> Enum.each(fn user ->
try do
calculate_billing_for_user(user)
rescue
e ->
Logger.error(
"Error"
)
end
end)
end
This is first solution come up in my head.
We have few thousands users and it is growing.
I have read recursive job
from oban. But I can’t find a way to pass a user_id
to perform
function since it is cron job.(correct me if I am wrong)
Do you guys have any idea for this kind of job?
You can enqueue batches of jobs from the cron job and let those run in parallel (or one at a time, if you prefer):
def perform(%{args: %{"type" => "insert_billing"}}) do
fun = fn ->
User
|> select([u], u.id)
|> Repo.stream()
|> Stream.chunk_every(100)
|> Enum.each(fn user_ids ->
%{user_ids: user_ids}
|> new()
|> Oban.insert_all()
end)
end
# Increase the timeout to give the stream time as the table grows.
Repo.transaction(fun, timeout: 60_000)
end
def perform(%{args: %{"user_ids" => user_ids}}) do
User
|> where([u], u.id in ^user_ids)
|> Repo.all()
|> Enum.each(fn user ->
# Safely try to calculate billing
end)
end
The important bit is using Repo.stream
to avoid loading all of the records at once as your database grows.
1 Like
This is really nice and elegant solution.
Quick question, what happened if error occurs during perform function?
Does job retry from start? or retry from where it left?
It will retry from the start. It’s up to you whether you wrap the “calculate billing” part in a try/rescue
, or enqueue each job independently. If you have a couple of thousand jobs then it isn’t a big deal to run one at a time. If you have many thousand, that isn’t so appealing.