I want to use telemetry from Oban to send notifications to a LV about updates of some job. The notification will contain the job id, and the LV will fetch the updated job from the DB with that id.
I tried to achieve that using after_process/2 hooks or attaching to telemetry events that Oban has (like [:oban, :job, :stop], [:oban, :engine, :complete_job, :stop], etc).
The issue with these, is that they seem to be sent before or in parallel to the job being updated in the DB. This means that this creates a race condition in the LV where if I receive the job and act upon it too fast, I will get the job with the state before the state that came from the notification.
For example, let’s say I get a after_process(:complete, …) message (or [:oban, :job, :stop]), I will then broadcast it to the LV, and the LV will fetch that job from the DB to get the latest version. I expect to get a job with state:completed, but I will fetch the job with state:executing.
If I add a 1 second sleep to my handle_info in LV, and then fetch the job, then I will get the correct state.
So, how can I achieve that in Oban so I can guarantee that when I get a notification of an update in a job, I will actually fetch the latest version of that job?
Two reasons, the job they include is also outdated, it is the state before it changed, so, for example, in the :complete example, the job state that I will receive in the event is actually :executing.
The event do have all the information I need to update the job struct myself, (I know the new state and I do have its reply/error/etc. But I would prefer to not do that if possible as this seems fragile and error-prone.
The second reason is that phoenix pub_sub doesn’t guarantee order of events in a distributed system. To be fair, my system is not distributed right now, but I would prefer to have a solution that would handle this already than create a solution that I would need to revisit later.
That’s why, the best approach I came up was to just send the ID, then the receiver can fetch the job from the DB directly, that way, I can guarantee that I always have the latest information, regardless of event ordering and other issues.
Well, that would guarantee it if I was not having the issues related in the first post
The [:oban, :job, :stop] event triggers within the process that executed the job, as does the after_process/3 hook. As noted, at that point the job status hasn’t changed in the database for a couple of reasons:
acking is async so it can be retried safely out of band, without failing the actual job
ack operations are batched to minimize db round-trips
Sending the id over pubsub is wise, even if just to minimize data serialization. There’s also the chance that a job errors once and then retries before the pubsub event is handled.
So, there is no way to know when the job was actually pushed to the DB?
Since you are updating the DB in batch, don’t you have the list of job ids at that point? Wouldn’t be possible to Oban to add some telemetry that would return that list with an “updated_in_db” event type or something like that?
That way I can simply listen to that event and push it as pubsub to the desired topics when it triggers
No, there’s no event at the moment that fires specifically after the job was pushed, unless you configure the queue with async: false. Then each job ack is synchronous, and you know that engine events like completed_at have committed.
It would be possible, and is planned for Pro v1.7, but it’s not a trivial addition.
For now, my solution is rely in postgres listen/notify to get the info, like this:
defmodule Core.Jobs.JobNotifier do
@moduledoc false
alias Core.Jobs
use GenServer
@channel "oban_jobs_worker_job_update"
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
{:ok, conn} = Postgrex.Notifications.start_link(Core.Repo.config())
{:ok, _ref} = Postgrex.Notifications.listen(conn, @channel)
{:ok, %{conn: conn}}
end
@impl true
def handle_info({:notification, _pid, _ref, @channel, payload}, state) do
%{"id" => id, "state" => job_state, "name" => name, "tenant_id" => tenant_id} =
Jason.decode!(payload)
job_state = String.to_existing_atom(job_state)
Task.start(fn ->
args = %{id: id, state: job_state, name: name}
Jobs.Job
|> Ash.ActionInput.for_action(:trigger_action, args, tenant: tenant_id)
|> Ash.run_action!()
end)
{:noreply, state}
end
end
defmodule Core.Jobs.JobNotifier.Extension do
@moduledoc false
use AshPostgres.CustomExtension, name: :job_notifier_trigger, latest_version: 0
def install(0) do
"""
execute \"\"\"
create or replace function notify_jobs_worker_job_update()
returns trigger as $$
begin
perform pg_notify(
'oban_jobs_worker_job_update',
json_build_object(
'id', new.id,
'name', new.meta->>'name',
'state', new.state,
'tenant_id', new.args->>'tenant_id'
)::text
);
return new;
end;
$$ language plpgsql
\"\"\"
execute \"\"\"
create trigger oban_jobs_worker_job_update_trigger
after update of state on oban_jobs
for each row
when (new.worker = 'Core.Jobs.Worker')
execute function notify_jobs_worker_job_update()
\"\"\"
"""
end
def uninstall(0) do
"""
execute \"\"\"
drop trigger if exists oban_jobs_worker_job_update_trigger on oban_jobs
\"\"\"
execute \"\"\"
drop function if exists notify_jobs_worker_job_update()
\"\"\"
"""
end
end