GenServer/Agent/Concurrency Questions

Hi all,
I’ve been trying to wrap my head around this for a few days but I get more and more confused as I go. So I guess it’s time I ask for some help.

Say you want to implement next architecture: A process X is reading up a list of urls and spawns a process Yn for each one of them (imagine a web crawler for instance). Each Yn starts about its own thing (they don’t do the same calculations) and when it finishes off it has to call X back and tell it that it’s done. Meanwhile X is awaiting/listening and crossing off any done url of its list.

So far I’ve tried implementing X as an Agent (I need to keep state (a stack of urls) on it) and Yn as GenServers. Yet:

  1. if I try to kick Yns with a call (synchronous) I cannot kick them off concurrently
  2. if I kick them off with a cast (asynchronously) X won’t wait and it exits while Yns are still active thus killing the whole tree (Note: I use escript to create an executable out of this. I do not run it all through iex.).

In general while the mechanics of the processes are quite clear on their own when they get wrapped up inside an Agent or a GenServer they become fuzzy (to me). Where is the waiting loop in them?

Thanks!

  • In GenServer the “waiting loop” is the part responsible for calling the various handle_ callbacks, you don’t get to see the loop because it is part of the generic OTP behaviour.
  • In Agent the “waiting loop” waits for messages to arrive. Those messages contain the functions sent to it to update it’s internal state - the state that gets to be maintained as part of the recursive loop call.

So really you should be focusing on GenServeras it is the natural progression from the recursive receive loop.

As far as I can tell Agent really isn’t about the notion of a process but it’s about about state management without autonomous access control modelled on how Clojure’s Agents manage shared state where it can only be accessed via special function calls.

GenServer completely controls its internal state and the way it’s accessed through the handle_ callbacks. Agent is completely at the mercy of the functions that are sent to it from other processes.

So as such GenServer has a far greater autonomy over it’s internal state than an Agent does.


So far I’ve tried implementing X as an Agent (I need to keep state (a stack of urls) on it) and Yn as GenServers.

I really don’t think Agent is a suitable solution here. Sounds more like X should be a GenServer while Yn could be Tasks (example). Tasks themselves aren’t based on GenServer but are just some (very useful) convenience code around spawn, link, receive.

5 Likes

I think this is a perfect use case for Task.

Since your problem is basically processing a collection of items concurrently, you should look into using Task.async_stream. Read carefully about the max_concurrency option, usually it is more beneficial to have N concurrent processes for data processing where N is the number of hardware threads in your CPU. However, to achieve maximum throughput, you’d need to test and measure. If most of the time is spent waiting for data to be fetched from a URL, having more processes may speed things up.

So, assuming process X has to spawn all the tasks and wait for them all to finish, it may look as follows:

results = 
  urls
  |> Task.async_stream(MyWorkerModule, :process_url, [])
  |> Enum.to_list()

If, on the other hand, your process X has to be able to handle messages from other processes while all Yn tasks are working, then X should be a gen server with at least the following basic functionality:

defmodule MyTaskManager do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, nil)
  end

  def process_urls(server, urls) do
    GenServer.cast(server, {:process_urls, urls})
  end

  def example_function(server, args) do
    GenServer.call(server, {:do_this_please, args})
  end

  ###

  def init(_) do
    # We'll keep track of all live tasks using the `tasks` map.
    {:ok, %{tasks: %{}}}
  end

  def handle_cast({:process_urls, urls}, state) do
    new_tasks = spawn_tasks(urls)
    {:noreply, %{state | tasks: Map.merge(state.tasks, new_tasks)}}
  end

  def handle_call({:do_this_please, args}, _from, state) do
    # Just a normal handle_call.
    # do something with args...

    {:reply, :ok, state}
  end

  # When a task is spawned via `Task.async` or `Task.Supervisor.async_nolink`, it will eventually return its result as
  # a message in the form `{task_ref, result}` to the calling process if it finishes normally. If it crashes, though,
  # it won't send a result back (but the calling process will still get a :DOWN message, see below).
  def handle_info({task_ref, result}, %{tasks: tasks} = state) do
    {url, updated_tasks} = Map.pop(state.tasks, task_ref)

    # do something with the url and the result returned for it...

    {:noreply, %{state | tasks: updated_tasks}}
  end

  # Tasks are also monitored by the calling process, so for each task process we'll get a :DOWN message when it
  # terminates, regardless of the termination reason.
  def handle_info({:DOWN, task_ref, :process, _task_pid, exit_reason}, state) do
    updated_tasks = Map.delete(state.tasks, task_ref)
    {:noreply, %{state | tasks: updated_tasks}}
  end

  # Here we create a new task for each URL. In principle, it's possible to limit the number
  # of concurrent tasks just like `Task.async_stream` does, but here I'm simplifying a bit.
  defp spawn_tasks(urls) do
    Map.new(urls, fn url ->
      %Task{ref: task_ref} = Task.async(MyWorkerModule, :process_url, [url])
      {task_ref, url}
    end)
  end
end

I’d also recommend reading up on Task.Supervisor. It’s useful for keeping all tasks processes rooted under a dedicated supervisor, especially when there’s a possibility that a task may crash. A task spawned using Task.async will take its calling process down if it crashes, whereas when using Task.Supervisor.async_nolink, the calling process will not be linked to the task and will be able to receive the :DOWN message like i showed in my example above.

2 Likes

You’re fantastic guys! Thanks a mil.
So I followed your advices (halfway) and made X into a GenServer. Currently looks like this:

  use GenServer

  # CLIENT API #
  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def add(url) do
    GenServer.call(__MODULE__, {:push, url})
  end

  def urls do
    GenServer.call(__MODULE__, :get)
  end

  def delete(url) do
    GenServer.call(__MODULE__, {:delete, url})
  end

  # SERVER #
  def init(stack) do
    {:ok, stack}
  end
  def handle_call({:push, item}, _from, state) do
    {:reply, state, [item | state]}
  end
  def handle_call({:delete, item}, _from, state) do
    {:reply, state, state -- [item]}
  end
  def handle_call(:get, _from, state) do
    {:reply, state, state}
  end

Though I’d like to keep Yns as GenServers too because I might opt to keep them around instead of having them performing a one-off task (imagine that each of them is kind-of monitoring its assigned url like say implementing a hook back to X.

Note: I don’t like selecting a “winner” answer and I won’t do that. Reasons: 1. this reminds me of StackOverflow and my experience there as per community quality was very bad - I don’t want to go down that road again (or any road like it) 2. Purpose here is to respectfully discuss and learn (at least for me). Since all answers help me it would be unjust to select one. Instead of all these competitive logic I’ll peruse the heart button which fits better with my mindset.

1 Like

I would suggest not keeping the process performing work around. If this is Yn then let them die once they are finish. Otherwise keep Yn around but spawn another process to do the work.

1 Like

Have a look at poolboy and for educational purposes pooly (though some of the process configuration code could use some updating). But personally I’m more in the short lived processes camp - it’s easier to keep something healthy for a shorter period of time.

2 Likes

and for educational purposes pooly (though some of the process configuration code could use some updating)

I’ve written a blog series rewriting Pooly while updating it with newer language features (e.g. Registry and DynamicSupervisor): http://davidsulc.com/blog/2018/07/09/pooltoy-a-toy-process-pool-manager-in-elixir-1-6/

3 Likes

Wow, thanks!
I think I should go over Hao’s book before diving into that tho. Feels a bit too advanced right now.

So many things - so little time.

Give Chapter 3, 4 in this free sample (if you like it, good quality DRM-free epub/pdf for purchase from ebooks.com) a read and then dive into the hexdocs for Supervisor and DynamicSupervisor and whatever else strikes your fancy from the MODULES - Processes & Applications sidebar.

2 Likes