Hello,
The weirdest things happened with my genserver. I repeatedly observe what appears to be an interaction between a long running Multi Repo transaction and Process.send_after : the Multi returns the second parameter of the send_after (the message) !
I have a handle_cast that builds a big Multi and fires the transaction on it. It is called manually from a web GUI.
The multi is long because it involves not only local DB but also Google API calls (they are typically 1s each and there are about 12 of them) and file writes. So typically, the Repo.transaction lasts 12 seconds.
I also have a 10 sec timer (Process.send_after) on the same genserver. It also performs some Google API calls.
Problem : when I call the handle_cast from the Web GUI, then the 10sec timer will kick in before the end of Repo (10 is less than 12).
And the result is quite surprising, see below. One of the Multi callback will return :refresh , which is the atom from
Process.send_after(self(), :refresh, @refresh_timer)
instead of its regular coded value.
I have found a workaround by disabling timer while processing the long run handle_cast, but I wonder what is going on in the belly of the code.
Why and how this is happening, I don’t know.
Below the error, and after that pieces of actual code
Thank you for your insights.
Jean-yves
GenServer Cvs.GoogleCache terminating
** (RuntimeError) expected Ecto.Multi callback named `{:write_thumbnail_to_disk, "p5"}` to return either {:ok, value} or {:error, value}, got: :refresh
(ecto 3.10.2) lib/ecto/multi.ex:850: Ecto.Multi.apply_operation/5
(elixir 1.14.5) lib/enum.ex:2468: Enum."-reduce/3-lists^foldl/2-0-"/3
(ecto 3.10.2) lib/ecto/multi.ex:818: anonymous fn/5 in Ecto.Multi.apply_operations/5
(ecto_sql 3.10.1) lib/ecto/adapters/sql.ex:1203: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
(db_connection 2.5.0) lib/db_connection.ex:1630: DBConnection.run_transaction/4
(ecto 3.10.2) lib/ecto/repo/transaction.ex:18: Ecto.Repo.Transaction.transaction/4
(elixir 1.14.5) lib/enum.ex:975: Enum."-each/2-lists^foreach/1-0-"/2
(cvs 0.3.2) lib/cvs/google_cache.ex:104: Cvs.GoogleCache.handle_cast/2
(stdlib 4.3.1.1) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.3.1.1) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.3.1.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
timer related code
@impl true
def handle_info(:refresh, %{decks_postgres: decks_postgres} = state) do
# Logger.debug("Refresh event.")
# Refresh remote drive list
decks_gdrive = GoogleSlides.new_conn() |> get_slide_decks()
# Rearm timer
timer_ref = Process.send_after(self(), :refresh, @refresh_timer)
# Compare versions of local cache and Gfiles
if different(decks_gdrive, decks_postgres) do
# If any changes, we tell the world and store changes
# IO.puts("store new remote deck versions")
PubSub.broadcast(Cvs.PubSub, "google_cache", :remote_decks_refreshed)
{:noreply, Map.put(state, :decks_gdrive, decks_gdrive) |> Map.put(:timer_ref, timer_ref)}
else
{:noreply, state}
end
end
Multi related code in same genserver
def handle_cast(
{:refresh_local_decks, force},
%{decks_gdrive: decks_gdrive, decks_postgres: decks_postgres, timer_ref: timer_ref} =
state
) do
# Due to a Weird bug where the timer returned data :refresh was being returned by the Multi : GoogleSlides.sync_deck, I temporarily disable timer before doing anything
# Process.cancel_timer(timer_ref)
Logger.debug("refresh_local_decks")
# TODO : check errors
g_conn = GoogleSlides.new_conn()
# For each deck in Gdrive, we sync it to our system
Enum.each(decks_gdrive, fn x -> GoogleSlides.sync_deck(g_conn, decks_postgres, x, force) end)
# Inform the world that data is now refreshed
PubSub.broadcast(Cvs.PubSub, "google_cache", :local_decks_refreshed)
# Rearm canceled timer
# timer_ref = Process.send_after(self(), :refresh, @refresh_timer)
{:noreply,
Map.put(state, :decks_postgres, GoogleSlides.list_decks()) |> Map.put(:timer_ref, timer_ref)}
end
Code called from the handle_cast
def sync_deck(
g_conn,
decks_postgres,
gdrive_file,
force
) do
IO.puts("\nprocessing deck #{gdrive_file.name}")
# IO.inspect(g_deck)
upload_dir = Path.join([:code.priv_dir(:cvs), "static", "gcache"])
# search in local_decks for this specific remote deck
found_deck = Enum.find(decks_postgres, fn local_deck -> local_deck.g_id == gdrive_file.id end)
# case 1 : nothing to do if not forcing and decks is already present and with same version
if not force && found_deck && found_deck.g_version == String.to_integer(gdrive_file.version) do
IO.puts("No sync required because: no force & same versions")
else
# In this case we will need to create or update the cache
IO.puts(
"Need to sync the deck, force = #{force}, local deck = #{inspect(found_deck)} remote version #{gdrive_file.version}"
)
# Retrieve Google remote deck with Google API (slow)
with %{deck: %{slides: g_pages} = g_deck} <- get_slide(g_conn, gdrive_file.id, fields: "*") do
Ecto.Multi.new()
# Delete pages if deck exists
|> multi_maybe_delete_pages(found_deck)
|> multi_upsert_deck(gdrive_file, g_deck)
|> multi_create_pages(g_conn, g_deck, g_pages, upload_dir)
|> IO.inspect()
|> Repo.transaction(timeout: 60_000)
else
res ->
Logger.error("Could not fetch Google deck #{gdrive_file.id}, message : #{res}")
end
end
end