hello,
I have phoenix application(not important) but I have a couple of GenServers with:
use GenServer, restart: :transient, shutdown: 10_000
and I have questions:
How can I prepare code for situation when GenServer crash down and I don’t want to lost current processing data or data waiting in queue
How can I do queue with more same workers(GenServers) because now when one GenServer si working with some stuff another request(from phoenix or “cron”) is waiting. (some good practise?)
Is it possible to use 3rd party queue/messages(kafka,rabbit, redis,…) without parsing params into string?
I tried using Agent with current processing state but it looks like it’s not very good idea with more “workers”
so do you think it’s good idea use something like this:
defmodule Transactions do
use GenServer, restart: :transient, shutdown: 10_000
alias Transactions.Current
def start_link(state \\ %{}) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def init(state) do
perform_with_backup(&perform_something/1, Current.get_current())
{:ok, state}
end
def handle_call({:something, params}, _from, state) do
resp =
perform_with_backup(&perform_something/1, params)
{:reply, resp, state}
end
defp perform_with_backup(procedure, params) do
Current.put_current(params)
resp = procedure.(params)
Current.clean_current()
resp
end
defp perform_something(params) do
# do something
end
end
and
defmodule Transactions.Current do
use Agent
def start_link(_) do
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def put_current(%{} = new_state) do
Agent.update(__MODULE__, fn _state -> new_state end)
end
def get_current() do
Agent.get(__MODULE__, fn state -> state end)
end
def clean_current() do
Agent.update(__MODULE__, fn _state -> %{} end)
end
end
The terminate function is called for clean up. Be cautious because it seems terminate is not always executed.
defmodule Transactions do
use GenServer, restart: :transient, shutdown: 10_000
alias Transactions.Current
# Take state from stach, or empty state
def start_link(state \\ %{}) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
...
def terminate(reason, state) do
# Save state to stash
end
...
end
Do you mean GenStage? Anyway, so far I have only found GenStage to be useful when interfacing with the outside world, when I need to put some back-pressure on the inside. It seems to be exactly what they describe in the first article that you’ve linked. They use it so as not to overload their database.
They are just upgrade of GenServers.
So GenStage is not an upgrade to GenServers, it solves a more specific problem, I think.
I have problems with a lot of http requests which creates even more events(jobs) and some are asynch and some synch and I can not lost anyone of course. Their solution looks like also solution for me.
Have you considered using pools of workers with something like poolboy? For saving your state I think you might trap exits in your genserver and then terminate callback will be called even for abnormal exits.
I tried this at some point, and it worked except in the cases it didn’t. Which in reality were my fault, but it would have taken some major refactoring to make it work. In other words, from my personal experience I found that this method actually caused some weird code coupling issues.
Now I would recommend a separate tracker that requires a completion acknowledgement along with a job timeout. When a job is finished then the worker acknowledges the completion and it’s removed from the tracker; when the job had a timeout, it is purged from the tracker and re-queued. This way you don’t have to keep track of the failure state or its progress through whatever job stages you may have.
If your jobs need to be ordered or aren’t idempotent then this won’t work and I wish the best of luck to you.