Task.start vs TaskSupervisor

I am writing a channel that receives certain events from mobile applications.

When an event is received, it has to be stored in a data store, and published to a message bus:

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)

  store(event)
  publish(event)

  # To be able to test the side-effects.
  {:reply, :ok, socket}
end

Those two operations are independent of each other, so I want to run them in parallel. At the same time, I do not want the WebSocket to crash if any of them crashes (-> nolink). Also, the order in which these events are broadcasted has to be preserved, so handle_in can’t finish before these two operations have finished (-> await).

All in all, my tentative implementation is

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)

  [
    task(fn -> store(event) end),
    task(fn -> publish(event) end)
  ] |> Enum.each(&Task.await/1)

  # To be able to test the side-effects.
  {:reply, :ok, socket}
end

def task(fun) do
  Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fun)
end

However, does that really need TaskSupervisor? Would Task.start be enough? Note that on shutdown, when the endpoint supervisor takes down its children, the await call will wait for these two anyway.

Can you justify Task.Supervisor in this use case?

3 Likes

From a design perspective as a “control enthusiast” I’d go even further and outsource the entire store/publish business to a separate process (perhaps in its own supervision tree), so that the handle_in only has to dispatch the event (and event order between processes is guaranteed anyway) and won’t be blocked to deal with the next incoming event.

2 Likes

That’s interesting, could you please elaborate a bit? Would you for example have one supervisor for storage, one for publishing, two supervised processes per topic respectively (whose names are in the socket assigns?), and fire and forget to those ones? How would you organize things so that the test suite is able to wait in order to test the side-effects of one message? What would you do when the WS is closed?

(Not very fluent organizing code like this, any help very much appreciated!)

1 Like

FYI: This is just how my first “stab at it” would look like - I’m not implying that I’m in possession of some kind of gold standard.

Would you for example have one supervisor for storage, one for publishing, two supervised processes per topic respectively (whose names are in the socket assigns?)

Going by your original post I was simply imagining a GenServer capable of accepting new events, queuing them internally (though the process mailbox may be good enough if you are comfortable with using handle_cast/2 instead of handle_call/3) for storing/publishing, strictly ordering their processing, not with Task.await/2 (I dislike blocking processes - unless waiting is their raison d’être) but perhaps with Task.start and Process.monitor/2 for the sequencing of actions.

Now I’m not privy to how topics relate to your application’s capabilities - so that might complicate things a bit.

Where that GenServer is supervised - it depends. In the simplest case it wouldn’t need its own supervisor but if you want some additional guarantees with respect to storing and publishing then you may find yourself distributing responsibilities even further pointing towards a separate supervisor or even supervision tree.

My main point was that I see event routing and augmentation as the primary purpose of a channel process rather than doing or waiting on actual work.

Just like in OOP where it is all too easy to just add a few more lines to an existing class, it is just as easy to add too much (unrelated) runtime responsibility to a single process - and personally I treat every process level blocking behaviour as a red flag (though at times it makes perfect sense).

How would you organize things so that the test suite is able to wait in order to test the side-effects of one message?

I suspect our testing philosophies differ.

Ultimately the channel process just wants to get rid of the event - everything else is somebody else’s problem. Integration tests deal with testing side effects.

What would you do when the WS is closed?

I can’t answer that as I don’t know what that means to your application as a whole.

3 Likes

The genera rule is: always start processes under a supervisor. It gives two things:

  • visibility - you can query a supervisor how many children it has, traverse it in observer, etc
  • shutdown guarantee - you can tell the supervisor it can only shutdown once all of its children shutdown

Currently the shutdown point above is not relevant since you await for the tasks immediately after. If the UI should only move forward once both tasks succeed, then awaiting is OK.

You can also make the workflow fully async if you don’t need immediate confirmation. That is done by matching on the Task.async messages (they are public). Roughly:

def join(...) do
  {:ok, %{tasks: %{}}}
end

def handle_in(..., state) do
  task = Task.async(fn ->
    store(event)
    publish(event)
  end)

  {:noreply, put_in(state.tasks[task.ref], task.pid)}
end

def handle_info({ref, result}, state) do
  {pid, state} = pop_in state.tasks[ref]
  Process.demonitor(ref, [:flush])
  IO.puts "task #{inspect(pid)} finished with #{inspect(result)}"
  {:noreply, state}
end

def handle_info({:DOWN, ref, _, _, reason}, state) do
  {pid, state} = pop_in state.tasks[ref]
  IO.puts "task #{inspect(pid)} died with reason #{inspect(reason)}"
  {:noreply, state}
end

I may be missing something but I am not seeing what the GenServer is giving us besides adding a potential bottleneck? If the concern is code organization, then modules and functions are the correct level to address it, not processes. :slight_smile: The point about not blocking is a good point though which I have incorporated in my reply above.

11 Likes

Thanks @josevalim :).

I totally missed the caller monitors the task too, so the indirection through the supervisor to prevent tasks from crashing the WS is unnecessary, as shown in your example.

I believe we lose the order guarantee, however. Let me explain: Events have a timestamp, and they have to be published to the message bus in order. To have a more concrete scenario in mind, these are (mainly) GPS locations sent by users, and they are broadcasted to the platform via a Kafka partition that depends on the user ID. So, the workflow guarantees order because the phone sends things in order (because of the direction of the arrow of time :grinning:), the WS processes them in order because Phoenix has one single process per user and topic, and Kafka partitions guarantee order too.

If I understand the proposal correctly, the WS is able to handle a new event while the task runs, and so we could have two tasks being executed in parallel. If that is right, then we could have a rare-but-theoretically-possible race condition. Is there a way to tweak that approach to preserve ordering?

That was the idea of the original await call, that handle_in does not process a new event until the previous one has been processed. That guarantees ordering in a simple way at the cost of less concurrency.

If we move this to a separate process to go async, then the mailbox of that process is the one preserving ordering. That introduces an indirection/complication in the implementation that I am not sure is worthwhile for my needs. Also, I need to think about messages piling up in the WS vs in that mailbox in different problematic scenarios. Hmmm…

Code organization wasn’t my primary concern.

Though I have to admit I somehow lost track of this fact:

One channel server process is created per client, per topic.

Thus by keeping topics relatively simple, complexity can be kept in check.

My main concern revolved around the long term maintainability of a single process (in this case channel/topic server) as more types of messages get added over time. Eventually the quantity of these variations and their flows will become difficult to reason about. So I would have the tendency to delegate work out of a process as it deals with more types of messages (and if there isn’t an opportunity to split out a subset of these messages and move them to an entirely separate process) so that its role could focus more and more on just handling messages in a responsive and reliable manner. It comes down to what the role of the topic is.

Also when there already is a requirement to preserve the order of events - I have to wonder whether tasks are the right tool for the job and whether this is the thin edge of the wedge towards a publishing service (which could be anything between a GenServer and an OTP application).

  • You could queue pending events in the socket’s assigns.
  • Wrap the two tasks in a third task which does the actual await.
  • On that third task’s :DOWN message check if there are pending events to start the cycle all over again.

OK! In this approach I have the feeling of kinda emulating a mailbox by hand in the assigns. Also, the mailbox of the channel itself would get genuine messages and task messages intertwined and would need to handle :DOWN and :timeout (I think) by hand.

Oh man, need to develop fluency thinking about this, feel very clumsy!

Hello,

If you need to await your tasks in your handle_in because you want to avoid race conditions, you must use the “async” family of task functions, to be able to call Task.await afterwise. With a “start” function you would have to send yourself a “done” message, which is ok but it’s simpler to “await”.

If you do not want to crash your socket then you must use the “nolink” family of task functions.

The Task module does not provide an async_nolink function, but Task.Supervisor does, therefore you need to use Task.Supervisor. And it comes with other benefits as José said.

So I’d say your code is fine, but note that Task.Supervisor.async_nolink/3 will still raise an exception if the task fails. So you have to handle that. (edit: actually the exception will be raised by Task.await.)

Oh and another edit: You say that « Those two operations are independent of each other, so I want to run them in parallel », but if at least one of the two operations is very fast (for example publish/1 is just sending a message and you do not need to wait for the response as your event bus handles messages order as you have told us), I would not bother with optimisation and just call the two functions from the WS process. In either case you will have to put a try/rescue block.

Your first (async/await) version was fine if that’s all that channel/topic is ever going to do - my concerns were based on the expectation that this process could accumulate more and more responsibilities over time which may not happen. I’d definitely still stick a Task.Supervisor in there and use it.

I would also hope that the actual logic behind the store and publish functions resides in (similarly named) modules separate from the channel/topic itself if only to make it easier to locate that store and publish functionality in the future (think of it as analogous to pushing functionality out of the controller into the context).

That is a consequence of using Task.await/2 to retrieve the result (which you mention).

An alternative would be to use Task.supervisor.async_stream_nolink/6 - successes are returned as :ok result tuples and crashes as :exit reason tuples. By default the stream runs as many tasks in parallel as there are schedulers online.

Deleted a comment, I saw flaws in the questions. I’ll revise.

Just freehand but you may want to try something like this:

def task_fun(:store, event),
  do: store(event)

def task_fun(:publish, event),
  do: publish(event)

def run_tasks(event) do
  # blocks until list is reified
  # :max_concurrency defaults to System.schedulers_online/0
  MyApp.TaskSupervisor
  |> Task.Supervisor.async_stream_nolink([:store, :publish], __MODULE__, :task_fun, [event], [])
  |> Enum.to_list()
end

def handle_in(message, payload, socket) do
  event = build_event(message, payload, socket)
  # blocks until done
  run_tasks(event)
  {:reply, :ok, socket}
end

And just as reference for the topic as a whole:

1 Like

It is not recommended to await a long-running task inside an OTP behaviour such as GenServer . Instead, you should match on the message coming from a task inside your GenServer.handle_info/2 callback. For more information on the format of the message, see the documentation for async/1 .

In this case, the WS stores to DynamoDB, and publishes to Kafka. Those are fast operations in normal circumstances.

Could you folks help me understand what could happen in the Task.await/1 call in the original code?

The WS is not linked to the tasks because of the async_nolink call, but it still monitors them (the documentation of async_nolink does not say otherwise). Therefore, it receives exit signals from the task as messages. Channels have a handle_info callback, and it is not clear to me if Task.await/1 is correct in a Phoenix Channel at all because it enters a receive block to consume messages directly from the process mailbox. What do you think?

On the other hand, Task.await/1 emits exit signals from within that receive block, whose origin is the WS process itself. How is the supervisor of the WS going to react to those signals? Is this why it has been said in the thread that Task.await/1 would still need a rescue if I want the WS to not exit at all due to failures in those operations?

Kernel.exit/1

See also:

Kernel.raise/1 is an Elixir style exception - exit/1 is the original Erlang style process exception which can be caught within the process but will terminate that process if it is not caught.


Therefore, it receives exit signals from the task as messages.

Signals relate to process links (and convert to :EXIT messages when trapped) - :DOWN messages relate to monitors - two entirely separate mechanisms.


it is not clear to me if Task.await/1 is correct in a Phoenix Channel at all because it enters a receive block to consume messages directly from the process mailbox. What do you think?

That is the fundamental issue with await - it blocks the process outside of the OTP code. But keep in mind that even calls with GenServer.call/3 to another process are blocking. So there are situations where it can be OK - it’s more of an issue of awareness of blocking behaviour.


On the other hand, Task.await/1 emits exit signals from within that receive block

Your channel/topic process is executing await/2 and therefore exit/1 - so it’s not a signal yet - it can still be stopped with a try..catch.


How is the supervisor of the WS going to react to those signals?

If uncaught the channel/topic process will be terminated. I’m not sure how the supervisor is configured, so I can’t predict its behaviour.


Further update:

So by default the channel/topic process won’t be restarted (from the server end) - “any termination (even abnormal) is considered successful”.

And if I’m reading this right the supervisor is running with default max_restarts: (3) and :max_seconds (5) i.e. if there are more than 3 restarts within 5 seconds the supervisor will terminate.

So it looks like intensely crashing channel/topic processes will in fact terminate the pool supervisor easily.

1 Like

So I would go with the current approach, where the channel is blocked. This means that actions on the client won’t receive a reply until the publishing is done - but other than that it is generally fine. I assume the publishing to the message bus is super fast, so that’s how I would roll honestly.

Or I would create a GenServer on every new channel with the purpose of keeping ordering. Every time you have to publish, you just cast the GenServer to do it for you. The GenServer would be started in its own supervisor (a DynamicSupervisor) and it would monitor the channel. Once the channel is DOWN, it exits.

2 Likes

Agree.

To compare, I started to write an alternative with GenServer as you described. I had also a registry to be able to cast by name during the lifetime of the channel, and the solution based on Task.Supervisor.async_stream_nolink/6 wins in simplicity.

The cost is less throghtput, but as a first implementation of this service I prefer to keep it simple first, and introduce the GenServer if usage shows it is justified. On the other side, the synchronous approach would allow us to send an ACK status back to the client (not a requirement right now, though).

Thanks man! :heart: