Data race between form and Oban job. Ash post-create action

Hi, I want to preface my question with say that I am new to Elixir. I have been playing around with Phoenix and ash and loving both. I am still making a lot of silly mistakes.

To set the scene a little. I have a toy application that I am building that I would like to run an Oban job to do some processing once a resouce has been created. High level it is:

  • Form submitted
  • Resource created
  • On successful creation queue an Oban job to do some post processing.

Form submission handler:

defp save_feed(socket, :new_feed_item, feed_params) do
  %{assigns: %{feed_id: feed_id}} = socket

  p = feed_params |> Map.put("feed_id", feed_id)

  case FeedItemService.create_feed_item(socket.assigns.form, p) do
    {:ok, feed_item} ->
      notify_parent({:saved, feed_item})

      socket =
        socket
        |> put_flash(:info, "Feed item saved!")
        |> push_navigate(to: ~p"/feeds/#{feed_id}")

      {:noreply, socket}

    {:error, form} ->
      socket =
        socket
        |> put_flash(:error, "Could not save feed item data")
        |> assign(:form, form)

      {:noreply, socket}
  end
end

Service:

  def create_feed_item(form, params) do
    case AshPhoenix.Form.submit(form, params: params) do
      {:ok, feed_item} ->
        # Successfully created the new feed_item. We now need to enqueue the job that will start the logic around
        # Processing the url that was provided.
        %{id: id, resource_media_type: resource_media_type, resource_media_url: url} = feed_item

        # This is a simple case now. Though the plan is to add more url types here
        # We will need to process each differently
        case resource_media_type do
          # Clause 1: If status_code matches 200
          :youtube ->
            IO.puts("Found that we have a youtube url")

            case %{feed_item_id: id} |> YoutubeDownloader.new() |> Oban.insert() do
              {:ok, oban_res} ->
                IO.puts("Successfully created the oban job")

              {:error, _e} ->
                IO.puts("Failed to create the oban job for youtube encoding")
            end

          _ ->
            IO.puts("Was unable to process the url type: #{resource_media_type}")
        end

        {:ok, feed_item}

      {:error, _e} ->
        {:error, form}
    end
  end

My question is twofold. I am currently getting a race condition in this code as the first thing that the Oban job does is pull the FeedItem from the database using get_feed_by_id(feed_item_id) code interface method on my domain. I use iex and run the function manually the item has definately been inserted into the database. The Oban job is just not able to see it.

So firstly, any thoughts on what I might be doing wrong here? I am unsure why I am getting a race here?

Secondly, Adding a service layer seems like a bit of a hacky way to implement this. Feels like it is taking some of the logic away from the resource. Does Ash provide a nice way to run an post-create type action. Something to run code after a resource has been added?

Hope all that is clear and thanks in advance for taking the time to read my question :).

Hmm… I highly doubt that there is a race condition there TBH. Do you have any policies on the resource that could be preventing the oban job from reading the record?

You are probably right! Though thats the only thing that I can think of that might be causing the Oban job to not be able to read the FeedItem from the DB. For some more context this is the code for the Oban job:

defmodule Podpick.Workers.YoutubeDownloader do
  use Oban.Worker, queue: :default

  @impl Oban.Worker
  def perform(p) do
    IO.puts("This is the oban ARGS")
    IO.inspect(p)

    %{args: %{"feed_item_id" => feed_item_id}} = p

    IO.inspect(feed_item_id)

    case Podpick.Feed.get_feed_by_id(feed_item_id) do
      {:ok, feed_item} ->
        IO.puts("Successfully loaded feed item")

      {:error, error} ->
        IO.puts("Something went wrong when loading the feed_item")
        IO.inspect(error)
    end

    IO.puts("Successfully complete youtube url download")
  end
end

Seem to fail on Podpick.Feed.get_feed_by_id(feed_item_id).

Does adding a service layer between the form submit and then Ash resource seem like the correct approach here? I am not sure if it is the done thing :thinking:

Edit: To answer your question. No, no policies on the resource. The resources are very vanila

Probably not. I would likely implement this using ash_oban v0.4.1 — Documentation and a trigger for downloading videos in an undownloaded state, using a before_transaction, and change run_oban_trigger(:my_trigger) on the create action to immediately enqueue the job on creation.

1 Like

How do you know that the Oban job can’t see it?

Also you should check out AshPhoenix and the form things it provides. That can clean up your code considerably.

Yeah, I think there are quite a few unconventional things in the code that was shared originally. You typically want to build an AshPhoenix.Form for an action, use it directly in the LiveView, and then in the action using lifecycle hooks you’d implement side effects etc.