fxn
Task.start vs TaskSupervisor
I am writing a channel that receives certain events from mobile applications.
When an event is received, it has to be stored in a data store, and published to a message bus:
def handle_in(message, payload, socket) do
event = build_event(message, payload, socket)
store(event)
publish(event)
# To be able to test the side-effects.
{:reply, :ok, socket}
end
Those two operations are independent of each other, so I want to run them in parallel. At the same time, I do not want the WebSocket to crash if any of them crashes (-> nolink). Also, the order in which these events are broadcasted has to be preserved, so handle_in can’t finish before these two operations have finished (-> await).
All in all, my tentative implementation is
def handle_in(message, payload, socket) do
event = build_event(message, payload, socket)
[
task(fn -> store(event) end),
task(fn -> publish(event) end)
] |> Enum.each(&Task.await/1)
# To be able to test the side-effects.
{:reply, :ok, socket}
end
def task(fun) do
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fun)
end
However, does that really need TaskSupervisor? Would Task.start be enough? Note that on shutdown, when the endpoint supervisor takes down its children, the await call will wait for these two anyway.
Can you justify Task.Supervisor in this use case?
Marked As Solved
peerreynders
Just freehand but you may want to try something like this:
def task_fun(:store, event),
do: store(event)
def task_fun(:publish, event),
do: publish(event)
def run_tasks(event) do
# blocks until list is reified
# :max_concurrency defaults to System.schedulers_online/0
MyApp.TaskSupervisor
|> Task.Supervisor.async_stream_nolink([:store, :publish], __MODULE__, :task_fun, [event], [])
|> Enum.to_list()
end
def handle_in(message, payload, socket) do
event = build_event(message, payload, socket)
# blocks until done
run_tasks(event)
{:reply, :ok, socket}
end
And just as reference for the topic as a whole:
Also Liked
josevalim
The genera rule is: always start processes under a supervisor. It gives two things:
- visibility - you can query a supervisor how many children it has, traverse it in observer, etc
- shutdown guarantee - you can tell the supervisor it can only shutdown once all of its children shutdown
Currently the shutdown point above is not relevant since you await for the tasks immediately after. If the UI should only move forward once both tasks succeed, then awaiting is OK.
You can also make the workflow fully async if you don’t need immediate confirmation. That is done by matching on the Task.async messages (they are public). Roughly:
def join(...) do
{:ok, %{tasks: %{}}}
end
def handle_in(..., state) do
task = Task.async(fn ->
store(event)
publish(event)
end)
{:noreply, put_in(state.tasks[task.ref], task.pid)}
end
def handle_info({ref, result}, state) do
{pid, state} = pop_in state.tasks[ref]
Process.demonitor(ref, [:flush])
IO.puts "task #{inspect(pid)} finished with #{inspect(result)}"
{:noreply, state}
end
def handle_info({:DOWN, ref, _, _, reason}, state) do
{pid, state} = pop_in state.tasks[ref]
IO.puts "task #{inspect(pid)} died with reason #{inspect(reason)}"
{:noreply, state}
end
I may be missing something but I am not seeing what the GenServer is giving us besides adding a potential bottleneck? If the concern is code organization, then modules and functions are the correct level to address it, not processes.
The point about not blocking is a good point though which I have incorporated in my reply above.
peerreynders
FYI: This is just how my first “stab at it” would look like - I’m not implying that I’m in possession of some kind of gold standard.
Would you for example have one supervisor for storage, one for publishing, two supervised processes per topic respectively (whose names are in the socket assigns?)
Going by your original post I was simply imagining a GenServer capable of accepting new events, queuing them internally (though the process mailbox may be good enough if you are comfortable with using handle_cast/2 instead of handle_call/3) for storing/publishing, strictly ordering their processing, not with Task.await/2 (I dislike blocking processes - unless waiting is their raison d’être) but perhaps with Task.start and Process.monitor/2 for the sequencing of actions.
Now I’m not privy to how topics relate to your application’s capabilities - so that might complicate things a bit.
Where that GenServer is supervised - it depends. In the simplest case it wouldn’t need its own supervisor but if you want some additional guarantees with respect to storing and publishing then you may find yourself distributing responsibilities even further pointing towards a separate supervisor or even supervision tree.
My main point was that I see event routing and augmentation as the primary purpose of a channel process rather than doing or waiting on actual work.
Just like in OOP where it is all too easy to just add a few more lines to an existing class, it is just as easy to add too much (unrelated) runtime responsibility to a single process - and personally I treat every process level blocking behaviour as a red flag (though at times it makes perfect sense).
How would you organize things so that the test suite is able to wait in order to test the side-effects of one message?
I suspect our testing philosophies differ.
Ultimately the channel process just wants to get rid of the event - everything else is somebody else’s problem. Integration tests deal with testing side effects.
What would you do when the WS is closed?
I can’t answer that as I don’t know what that means to your application as a whole.
peerreynders
From a design perspective as a “control enthusiast” I’d go even further and outsource the entire store/publish business to a separate process (perhaps in its own supervision tree), so that the handle_in only has to dispatch the event (and event order between processes is guaranteed anyway) and won’t be blocked to deal with the next incoming event.
Popular in Questions
Other popular topics
Categories:
Sub Categories:
Forums
Popular Tags
- #ecto
- #liveview
- #troubleshooting
- #learning-elixir
- #deployment
- #library
- #erlang
- #testing
- #genserver
- #mix
- #absinthe
- #remote-other
- #otp
- #plug
- #how-to-question
- #macros
- #postgres
- #channels
- #elixirconf
- #exunit
- #discussion
- #javascript
- #podcasts
- #code-sync
- #onsite
- #dialyzer
- #docker
- #authentication
- #umbrella
- #full-time-contract
- #podcasts-by-brainlid
- #ecto-query
- #elixir-ls
- #phoenix_html
- #iex
- #blog-post
- #graphql
- #genstage
- #ai
- #websockets
- #supervisor
- #advent-of-code
- #elixirconf-us
- #distillery
- #processes
- #forms
- #api
- #metaprogramming
- #security
- #performance








