fxn

fxn

Task.start vs TaskSupervisor

I am writing a channel that receives certain events from mobile applications.

When an event is received, it has to be stored in a data store, and published to a message bus:

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)

  store(event)
  publish(event)

  # To be able to test the side-effects.
  {:reply, :ok, socket}
end

Those two operations are independent of each other, so I want to run them in parallel. At the same time, I do not want the WebSocket to crash if any of them crashes (-> nolink). Also, the order in which these events are broadcasted has to be preserved, so handle_in can’t finish before these two operations have finished (-> await).

All in all, my tentative implementation is

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)

  [
    task(fn -> store(event) end),
    task(fn -> publish(event) end)
  ] |> Enum.each(&Task.await/1)

  # To be able to test the side-effects.
  {:reply, :ok, socket}
end

def task(fun) do
  Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fun)
end

However, does that really need TaskSupervisor? Would Task.start be enough? Note that on shutdown, when the endpoint supervisor takes down its children, the await call will wait for these two anyway.

Can you justify Task.Supervisor in this use case?

Marked As Solved

peerreynders

peerreynders

Just freehand but you may want to try something like this:

def task_fun(:store, event),
  do: store(event)

def task_fun(:publish, event),
  do: publish(event)

def run_tasks(event) do
  # blocks until list is reified
  # :max_concurrency defaults to System.schedulers_online/0
  MyApp.TaskSupervisor
  |> Task.Supervisor.async_stream_nolink([:store, :publish], __MODULE__, :task_fun, [event], [])
  |> Enum.to_list()
end

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)
  # blocks until done
  run_tasks(event)
  {:reply, :ok, socket}
end

And just as reference for the topic as a whole:

Also Liked

josevalim

josevalim

Creator of Elixir

The genera rule is: always start processes under a supervisor. It gives two things:

  • visibility - you can query a supervisor how many children it has, traverse it in observer, etc
  • shutdown guarantee - you can tell the supervisor it can only shutdown once all of its children shutdown

Currently the shutdown point above is not relevant since you await for the tasks immediately after. If the UI should only move forward once both tasks succeed, then awaiting is OK.

You can also make the workflow fully async if you don’t need immediate confirmation. That is done by matching on the Task.async messages (they are public). Roughly:

def join(...) do
  {:ok, %{tasks: %{}}}
end

def handle_in(..., state) do
  task = Task.async(fn ->
    store(event)
    publish(event)
  end)

  {:noreply, put_in(state.tasks[task.ref], task.pid)}
end

def handle_info({ref, result}, state) do
  {pid, state} = pop_in state.tasks[ref]
  Process.demonitor(ref, [:flush])
  IO.puts "task #{inspect(pid)} finished with #{inspect(result)}"
  {:noreply, state}
end

def handle_info({:DOWN, ref, _, _, reason}, state) do
  {pid, state} = pop_in state.tasks[ref]
  IO.puts "task #{inspect(pid)} died with reason #{inspect(reason)}"
  {:noreply, state}
end

I may be missing something but I am not seeing what the GenServer is giving us besides adding a potential bottleneck? If the concern is code organization, then modules and functions are the correct level to address it, not processes. :slight_smile: The point about not blocking is a good point though which I have incorporated in my reply above.

11
Post #5
peerreynders

peerreynders

FYI: This is just how my first “stab at it” would look like - I’m not implying that I’m in possession of some kind of gold standard.

Would you for example have one supervisor for storage, one for publishing, two supervised processes per topic respectively (whose names are in the socket assigns?)

Going by your original post I was simply imagining a GenServer capable of accepting new events, queuing them internally (though the process mailbox may be good enough if you are comfortable with using handle_cast/2 instead of handle_call/3) for storing/publishing, strictly ordering their processing, not with Task.await/2 (I dislike blocking processes - unless waiting is their raison d’être) but perhaps with Task.start and Process.monitor/2 for the sequencing of actions.

Now I’m not privy to how topics relate to your application’s capabilities - so that might complicate things a bit.

Where that GenServer is supervised - it depends. In the simplest case it wouldn’t need its own supervisor but if you want some additional guarantees with respect to storing and publishing then you may find yourself distributing responsibilities even further pointing towards a separate supervisor or even supervision tree.

My main point was that I see event routing and augmentation as the primary purpose of a channel process rather than doing or waiting on actual work.

Just like in OOP where it is all too easy to just add a few more lines to an existing class, it is just as easy to add too much (unrelated) runtime responsibility to a single process - and personally I treat every process level blocking behaviour as a red flag (though at times it makes perfect sense).

How would you organize things so that the test suite is able to wait in order to test the side-effects of one message?

I suspect our testing philosophies differ.

Ultimately the channel process just wants to get rid of the event - everything else is somebody else’s problem. Integration tests deal with testing side effects.

What would you do when the WS is closed?

I can’t answer that as I don’t know what that means to your application as a whole.

peerreynders

peerreynders

From a design perspective as a “control enthusiast” I’d go even further and outsource the entire store/publish business to a separate process (perhaps in its own supervision tree), so that the handle_in only has to dispatch the event (and event order between processes is guaranteed anyway) and won’t be blocked to deal with the next incoming event.

Where Next?

Popular in Questions Top

vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
lessless
I believe there are people here who are dealing with CSV files import on the daily basis, and since Excel is a really popular tool there ...
New
jerry
Good day to you all. I have been struggling to get a query involving like and ilike to work. Can anyone assist me on this, please? pro...
New
LegitStack
I’m trying to make a websocket server in Phoenix or raw Elixir. I heard about gun, I think I could use cowboy, but since I’m not that sma...
New
stefanchrobot
What’s the safe way to decode a JSON string into a struct? I want to avoid calling String.to_atom. Jason.decode can give me a map with st...
New
ycv005
I have followed this StackOverflow post to install the specific version of Erlang. And When I am running mix ecto.setup then getting fol...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New
chensan
I have a User schema with a :from_id field set to type :string: defmodule TweetBot.Repo.Migrations.CreateUsers do use Ecto.Migration ...
New

Other popular topics Top

lastday4you
I wanted to check elixir version in phoenix because i found that my elixir is 1.5 but when i use Enum.chunk_by it said the function is un...
New
chrismccord
As promised, the first release candidate of Phoenix 1.3.0 is out! This release focuses on code generators with improved project structure...
New
gshaw
What is the idiomatic way of matching for not nil in Elixir? E.g., First way: defp halt_if_not_signed_in(conn, signed_in_account) when...
New
greenz1
I have a phoenix application from which a user can download multiple(5-6) files of size 1MB. I couldn’t find anything related to sending ...
New
shahryarjb
Hello, I have map which I want to convert it to string like this: the map: %{last_name: "tavakkoli", name: "shahryar"} the string I ne...
New
jononomo
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set? Thanks.
New
baxterw3b
Hi guys, i’m new in the Elixir world, and i have to say, that i love it! i’m having some problem to understand anonymous functions with ...
New
axelson
This post is a wiki (feel free to hit the edit button near the bottom right of this post to add your own changes!) This post collects co...
239 47930 226
New
Qqwy
Update: How to use the Blogs & Podcasts section You can post links to your blog posts or podcasts either in one of the Official Blog...
3271 126479 1222
New
svb
Hi! Currently I want to submit a form by pressing the Enter key. However, since my input field is of type “textarea” this is just adds a...
New

We're in Beta

About us Mission Statement