Process.send_after breaks Repo.transaction

Hello,
The weirdest things happened with my genserver. I repeatedly observe what appears to be an interaction between a long running Multi Repo transaction and Process.send_after : the Multi returns the second parameter of the send_after (the message) !

I have a handle_cast that builds a big Multi and fires the transaction on it. It is called manually from a web GUI.
The multi is long because it involves not only local DB but also Google API calls (they are typically 1s each and there are about 12 of them) and file writes. So typically, the Repo.transaction lasts 12 seconds.

I also have a 10 sec timer (Process.send_after) on the same genserver. It also performs some Google API calls.

Problem : when I call the handle_cast from the Web GUI, then the 10sec timer will kick in before the end of Repo (10 is less than 12).
And the result is quite surprising, see below. One of the Multi callback will return :refresh , which is the atom from

Process.send_after(self(), :refresh, @refresh_timer)

instead of its regular coded value.

I have found a workaround by disabling timer while processing the long run handle_cast, but I wonder what is going on in the belly of the code.
Why and how this is happening, I don’t know.
Below the error, and after that pieces of actual code

Thank you for your insights.
Jean-yves

GenServer Cvs.GoogleCache terminating
** (RuntimeError) expected Ecto.Multi callback named `{:write_thumbnail_to_disk, "p5"}` to return either {:ok, value} or {:error, value}, got: :refresh
    (ecto 3.10.2) lib/ecto/multi.ex:850: Ecto.Multi.apply_operation/5
    (elixir 1.14.5) lib/enum.ex:2468: Enum."-reduce/3-lists^foldl/2-0-"/3
    (ecto 3.10.2) lib/ecto/multi.ex:818: anonymous fn/5 in Ecto.Multi.apply_operations/5
    (ecto_sql 3.10.1) lib/ecto/adapters/sql.ex:1203: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection 2.5.0) lib/db_connection.ex:1630: DBConnection.run_transaction/4
    (ecto 3.10.2) lib/ecto/repo/transaction.ex:18: Ecto.Repo.Transaction.transaction/4
    (elixir 1.14.5) lib/enum.ex:975: Enum."-each/2-lists^foreach/1-0-"/2
    (cvs 0.3.2) lib/cvs/google_cache.ex:104: Cvs.GoogleCache.handle_cast/2
    (stdlib 4.3.1.1) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.3.1.1) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.3.1.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

timer related code

  @impl true
  def handle_info(:refresh, %{decks_postgres: decks_postgres} = state) do
    # Logger.debug("Refresh event.")
    # Refresh remote drive list
    decks_gdrive = GoogleSlides.new_conn() |> get_slide_decks()

    # Rearm timer
    timer_ref = Process.send_after(self(), :refresh, @refresh_timer)

    # Compare versions of local cache and Gfiles
    if different(decks_gdrive, decks_postgres) do
      # If any changes, we tell the world and store changes
      # IO.puts("store new remote deck versions")
      PubSub.broadcast(Cvs.PubSub, "google_cache", :remote_decks_refreshed)
      {:noreply, Map.put(state, :decks_gdrive, decks_gdrive) |> Map.put(:timer_ref, timer_ref)}
    else
      {:noreply, state}
    end
  end

Multi related code in same genserver

def handle_cast(
        {:refresh_local_decks, force},
        %{decks_gdrive: decks_gdrive, decks_postgres: decks_postgres, timer_ref: timer_ref} =
          state
      ) do
    # Due to a Weird bug where the timer returned data :refresh was being returned by the Multi : GoogleSlides.sync_deck, I temporarily disable timer before doing anything
    # Process.cancel_timer(timer_ref)

    Logger.debug("refresh_local_decks")
    # TODO : check errors
    g_conn = GoogleSlides.new_conn()

    # For each deck in Gdrive, we sync it to our system
    Enum.each(decks_gdrive, fn x -> GoogleSlides.sync_deck(g_conn, decks_postgres, x, force) end)

    # Inform the world that data is now refreshed
    PubSub.broadcast(Cvs.PubSub, "google_cache", :local_decks_refreshed)

    # Rearm canceled timer
    # timer_ref = Process.send_after(self(), :refresh, @refresh_timer)

    {:noreply,
     Map.put(state, :decks_postgres, GoogleSlides.list_decks()) |> Map.put(:timer_ref, timer_ref)}
  end

Code called from the handle_cast

 def sync_deck(
        g_conn,
        decks_postgres,
        gdrive_file,
        force
      ) do
    IO.puts("\nprocessing deck #{gdrive_file.name}")
    #  IO.inspect(g_deck)
    upload_dir = Path.join([:code.priv_dir(:cvs), "static", "gcache"])

    # search in local_decks for this specific remote deck
    found_deck = Enum.find(decks_postgres, fn local_deck -> local_deck.g_id == gdrive_file.id end)

    # case 1 : nothing to do if not forcing and decks is already present and with same version
    if not force && found_deck && found_deck.g_version == String.to_integer(gdrive_file.version) do
      IO.puts("No sync required because: no force & same versions")
    else
      # In this case we will need to create or update the cache
      IO.puts(
        "Need to sync the deck, force = #{force}, local deck = #{inspect(found_deck)} remote version #{gdrive_file.version}"
      )

      # Retrieve Google remote deck with Google API (slow)

      with %{deck: %{slides: g_pages} = g_deck} <- get_slide(g_conn, gdrive_file.id, fields: "*") do
        Ecto.Multi.new()
        # Delete pages if deck exists
        |> multi_maybe_delete_pages(found_deck)
        |> multi_upsert_deck(gdrive_file, g_deck)
        
        |> multi_create_pages(g_conn, g_deck, g_pages, upload_dir)
        |> IO.inspect()
        |> Repo.transaction(timeout: 60_000)
      else
        res ->
          Logger.error("Could not fetch Google deck #{gdrive_file.id}, message : #{res}")
      end
    end
  end

It appears that one of your multi stages performs receive or something like this. Ecto.Multi’s code doesn’t receive any messages, so the problem is somewhere in your code.

1 Like

Thank you, you found it !
Yes, that phase uses Download library to retrieve files from a URL and there is a receive there.
I had no clue about those kind of interaction. I’ll see how to change the Download code so that it does not interfere with my code.

1 Like

And for whoever steps in the same issue, someone has coded a Genserver version of the Download lib, because he stepped in the same kind of issues.
Code available here : Genserver implementation · Issue #16 · asiniy/download · GitHub

2 Likes