Managing multiple related async tasks

I have a large amount of user data that I want to run some expensive analysis on every time the user makes a change. Obviously I don’t want the user to to have to wait for the result of this analysis so I want to run it in it’s own process using Task.async. However, I also want to “debounce” these tasks because once a new update occurs the previous task becomes obsolete so I want to kill it and start a new process. To add further complexity the analysis has several independent parts each of which I would also like to run concurrently, with the parent waiting on all of them to finish. Any suggestions about the best architecture for something like this?

My current idea is to use an Agent to create a registry of user “analyses” that handles an update by killing any existing task for that user (and thus killing its children) and starting a new one and adding it to the registry. When the task, uninterrupted by further updates, is allowed to finish, it will use PubSub to report the results back to the user’s process (a LiveView if that’s important). But I wasn’t sure if Task.Supervisor would be a better fit?

This recent post from dashbit implements something similar, using Registry and DynamicSupervisor to keep track of running processes.

That looks very promising! Thanks for the tip.

1 Like

This should be doable using Registry (don’t write an Agent that duplicates Registry functionality). Also don’t track the analyses separately. Wrap all of the subtasks into a single parent task, then kill that parent task, which should in turn kill the child tasks (assuming you’ve started them linked). I think you got that part.

Your parent task should

  1. check the registry if a running analysis exists, if so, unregister, then kill the running analysis.
  2. always register itself.

There’s a race condition here where two parent tasks could have been launched really close together, and one of them beats the other to re-registration; I recommend check if the registration fails, if so quit out, under the assumption that another system has obtained the registry slot.

Could be like 5-6 lines of elixir if you do it right.

2 Likes

I was just doing some homework on the differences between Agent, GenServer and Registry. It’s been one of sticking points moving to Elixir/OTP. Thanks for the nudge in the right direction.

Would you agree that PubSub is the right way to go about notifying my original process the calculation is complete? My first iteration was using Task.async so that was just a matter of adding a handler, but it seems like introducing a ‘supervisory’ process complicates the messaging a bit.

I think your pubsub idea is the right one. And my feeling is that in prod you should supervise everything, so that would mean using Task.Supervisor.start_link.

One thing is that I think doing two Registry calls is not atomic, so if possible I recommend putting tests around that debounce race condition. This will be a Heisenbug; if you don’t mind putting in the work, the strategy that I personally use is write the test against the code without the race condition mitigation (probably stubbing all of the real work), then tweak some parameters (like concurrency) to make the test fail frequently, say 80% of the time, then wrap the test in a for loop indexing against the title of the test, say 4 times, which brings the fail rate to 99.8%, then write the mitigation and watch the test pass.

Thanks again for your advice. This is all very helpful.

If you are so inclined, would you elaborate on this a bit? From my (assuredly rudimentary) understanding, supervision in OTP/Elixir refers to monitoring processes mainly so they can restarted on failure, no? Why would that be a special concern in production (aside from the usual way we are more concerned with avoiding bugs/inconsistencies in a live environment)? Also, would it change your evaluation to know that no other part of the system “depends” on the result of this analysis, but rather it is more or less a kind of cache used for display purposes but not for any business logic? My concept was definitely that these processes would be fired off without the client caring much what happened with them, and just printing out results whenever/if they are sent back to it.

Negative. Going to steal a page from James Grey and Bruce Tate here: Supervision is about lifecycle management, not just about restarting. Most importantly, when you supervise, your process gets tracked as part of the supervision hierarchy. Even if nothing else “depends on it”, and you get clean process domain coupling using links and monitors, many tools commonly use the supervision tree to monitor “everything” using a sane tree digraph, instead of a graph of links and monitors which could look like a ball of hair. Even if you never use it, the cost of organizing things into a tree is so low, you might as well do it. Just make sure your Tasks are supervised as transient (which I think is the default for Task.Supervisor)

2 Likes

I will definitely keep this in mind going forward. I’ll report back here after I try to build something out. Thanks again!

1 Like

Here’s what I came up with. It all seems to work as expected but there’s a lot I’m unsure about:

I added an analysis registry and supervisor:

      {Registry, keys: :unique, name: MyApp.Analysis.Registry},
      {Task.Supervisor, name: MyApp.Analysis.Supervisor}

I wondered briefly whether to namespace them since it seems fairly easy to use them in different contexts, or at least Task.Supervisor, but it seemed like most examples I could find used namespaces by default.

Next I added a new function to my analysis module to handle the “launch or kill” spec

  def request_analysis(%Sources.Source{id: source_id} = source) do
    case Registry.register(Analysis.Registry, "source-#{source_id}-analyzer", :value) do
      {:ok, _} ->
        analyze(source_id)

      {:error, {:already_registered, pid}} ->
        Process.exit(pid, :kill)
        request_analysis(source)
    end

    :ok
  end

I’m not sure I understand the point of the “value” argument in Registry.register/3 and I’m just using a placeholder there. I even considered just using nil. My naive expectation would be that the “value” of this function would naturally be the pid itself. The Registry docs were a bit confusing because they assumed you wanted to register a new process by naming it instead of using register.

I then updated my LiveView to call this function instead of the analsys:

  defp request_reanalysis(socket) do
    Task.Supervisor.async_nolink(
      Analysis.Supervisor,
      Analysis.Analyzer,
      :request_analysis,
      [socket.assigns.source],
      shutdown: :brutal_kill
    )

    socket
    |> assign(analyzer_data: nil)
  end

The analyze function I left mostly intact, aside from adding the PubSub call. It uses Task.async and Task.await to concurrently build the data and compose it when each sub-part is complete. I suppose now that I have introduced Task.Supervisor I should use that given the advice above? But then I’m not sure what the purpose of Task.await would be. Poking around a bit it just seems like people mostly just always use one or the other. The documentation doesn’t seem to focus on the trade offs involved, of which surely there must be some?

code looks fantastic. Only comments: I would go with Task.Supervisor.start_child instead of async_nolink; async* implies you are waiting to use the function call’s retval, which you are not; this is a “throwaway” situation. Internally in the analysis function I recommend Task.Supervisor.async; you can drop them into MyApp.Analysis.Supervisor (why not?).

as for the value for Registry, I typically put nil in. You’re absolutely right that Registry is confusing because it’s a key-value-value store (and the docs are not 100% clear about that; I had to train a python dev on this and it broke his brain, because he refused to grok the idea of processes); the second value I believe is there to assist in doing process pools, but I haven’t used it myself yet, so I don’t know for sure.

1 Like

I wondered about this! For some reason it didn’t occur to me that the need to do handle the task the in someway was the deciding factor there, probably because I was wrongly focused on the nolink aspect.

Definitely don’t have an answer to this, so I will probably change them, but I’d certainly be interested to hear from someone who thinks they do :slight_smile:

Thanks again for your guidance. I’m marking my code comment as the solution because I think it will be clearest for other reader but obviously couldn’t have done it without your help.

1 Like

yeah if there’s more-or-less correct code it’s better to mark that as a solution for other readers.

Task*.async will send the caller back a message {reference, result} for the await to deal with so if your caller funciton is executed by a genserver, then you could wind up in place where that triggers an unhandled message error or function clause error, with a crash and reboot, so I typically only use the async functions when the await lives in the same block. I’ll even go out of my way and have a “plain task launched from a gen_server” send a callback response to be handled in the normal fashion (handle_call, usually, sometimes handle_info) for the purposes of having more structured, legible code.