GenServer fail safe queue

hello,
I have phoenix application(not important) but I have a couple of GenServers with:

use GenServer, restart: :transient, shutdown: 10_000

and I have questions:

  1. How can I prepare code for situation when GenServer crash down and I don’t want to lost current processing data or data waiting in queue
  2. How can I do queue with more same workers(GenServers) because now when one GenServer si working with some stuff another request(from phoenix or “cron”) is waiting. (some good practise?)
  3. Is it possible to use 3rd party queue/messages(kafka,rabbit, redis,…) without parsing params into string?

I tried using Agent with current processing state but it looks like it’s not very good idea with more “workers”

You can use immortal. Or add a stash server that will store state on terminate function of dying GenServer,

1 Like

so do you think it’s good idea use something like this:

defmodule Transactions do
  use GenServer, restart: :transient, shutdown: 10_000
  
  alias Transactions.Current

  def start_link(state \\ %{}) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def init(state) do
    perform_with_backup(&perform_something/1, Current.get_current())
    {:ok, state}
  end

  def handle_call({:something, params}, _from, state) do
    resp =
      perform_with_backup(&perform_something/1,  params)
    {:reply, resp, state}
  end

  defp perform_with_backup(procedure, params) do
    Current.put_current(params)
    resp = procedure.(params)
    Current.clean_current()
    resp
  end
  defp perform_something(params) do
         # do something
  end
end

and

defmodule Transactions.Current do
  use Agent

  def start_link(_) do
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def put_current(%{} = new_state) do
    Agent.update(__MODULE__, fn _state -> new_state end)
  end

  def get_current() do
    Agent.get(__MODULE__, fn state -> state end)
  end

  def clean_current() do
    Agent.update(__MODULE__, fn _state -> %{} end)
  end
end

The terminate function is called for clean up. Be cautious because it seems terminate is not always executed.

defmodule Transactions do
  use GenServer, restart: :transient, shutdown: 10_000
  
  alias Transactions.Current

  # Take state from stach, or empty state
  def start_link(state \\ %{}) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  ...
  def terminate(reason, state) do
    # Save state to stash
  end
  ...
end

There is a stash example in this presentation

around 19:30…

Don’t Lose Your ets Tables explains the principles that Immortal.ETSTableManager is based on.

2 Likes

Thanks for answers, I will look on it deeply. I found this articles and it looks very promising. What do you think?


GenServers looks very nice for me. They are just upgrade of GenServers.

GenServers looks very nice for me.

Do you mean GenStage? Anyway, so far I have only found GenStage to be useful when interfacing with the outside world, when I need to put some back-pressure on the inside. It seems to be exactly what they describe in the first article that you’ve linked. They use it so as not to overload their database.

They are just upgrade of GenServers.

So GenStage is not an upgrade to GenServers, it solves a more specific problem, I think.

yes, I mean GenStage.

They are just upgrade of GenServers.

Yes, it solves more specific problems but technically it’s still GenServer. From their doc:

GenStage is implemented on top of a GenServer with a few additions.

https://hexdocs.pm/gen_stage/GenStage.html#module-callbacks

I have problems with a lot of http requests which creates even more events(jobs) and some are asynch and some synch and I can not lost anyone of course. Their solution looks like also solution for me.

Have you considered using pools of workers with something like poolboy? For saving your state I think you might trap exits in your genserver and then terminate callback will be called even for abnormal exits.

I will try

I tried this at some point, and it worked except in the cases it didn’t. Which in reality were my fault, but it would have taken some major refactoring to make it work. In other words, from my personal experience I found that this method actually caused some weird code coupling issues.

Now I would recommend a separate tracker that requires a completion acknowledgement along with a job timeout. When a job is finished then the worker acknowledges the completion and it’s removed from the tracker; when the job had a timeout, it is purged from the tracker and re-queued. This way you don’t have to keep track of the failure state or its progress through whatever job stages you may have.

If your jobs need to be ordered or aren’t idempotent then this won’t work and I wish the best of luck to you.

2 Likes