Horde.DynamicSupervisor bypasses Registry?

Hi. My application uses Horde.DynamicSupervisor to spawn processing tasks for unique objects. User should be able to abort the task, and for that purpose I am trying to lookup the task in Horde.Registry to then use Horde.DynamicSupervisor.terminate_child(pid). However the Registry seems to be unaware of the started task.

Startup

In my application.ex, I have

children = [
  ...
        {Horde.Registry, [name: DataImportRegistry, keys: :unique, members: :auto]},
      {Horde.DynamicSupervisor, [
        name: DataImportSupervisor,
        strategy: :one_for_one,
        restart: :transient,
        distribution_strategy: Horde.UniformQuorumDistribution,
        max_restarts: 0,
        max_seconds: 1,
        members: :auto#supervisor_members()
      ]}
    ]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)

Then in the main process:

        {:ok, pid} = Horde.DynamicSupervisor.start_child(
          DataImportSupervisor,
          %{
            id: my_data_object.uuid,
            start: {Task, :start_link, [DataImportWorker, :process_data_import, [my_data_object, current_user.id, client_ip]]}
          })

But when I do Horde.Registry.lookup(DataImportRegistry, my_data_object.uuid) the output is [].

So what am I missing? Why is Hordeā€™s DynamicSupervisor not registering the Task in the Registry?

If I do Horde.DynamicSupervisor.which_children(DataImportSupervisor) the return value is [{:undefined, #PID<0.2240.0>, :worker, [Task]}] while the task is running. What does :undefined mean?

Thanks! :smiley:

This doesnā€™t happen automatically - how could it? DataImportSupervisor doesnā€™t take any configuration about what registry to use.

The standard approach is to pass a name: with a via tuple when starting a GenServer (example from the docs) but Task doesnā€™t support that; your DataImportWorker code will need to call Horde.Registry.register when it starts up.

I havenā€™t tried register from DataImportWorker yet, but I have tried to register() taskā€™s pid from the main thread after

 {:ok, pid} = Horde.DynamicSupervisor.start_child(
          DataImportSupervisor,
          %{
            id: my_data_object.uuid,
            start: {Task, :start_link, [DataImportWorker, :process_data_import, [my_data_object, current_user.id, client_ip]]}
          })
Horde.Registry.register(DataImportRegistry, my_object.uuid, pid)

But Horde.Registry.lookup() still didnā€™t return anythingā€¦
Either itā€™s the wrong register to call or maybe I didnā€™t start Registry process in application.ex properly, or maybe thereā€™s a race condition like Task finishes before lookup(), but then if I have to manually register the process how does it get unregistered? I figured, there must be some code in DynamicSupervisor that automatically registers and unregisters the PID.

If this only works when via_tuple is overridden, do I need to implement a custom worker module that does use Task? The task ID is dynamic based on my_object.uuid being processed since only one process on the cluster is allowed to handle each instance of the input object. I thought I already provided the task ID:

{:ok, pid} = Horde.DynamicSupervisor.start_child(
          DataImportSupervisor,
          %{
            id: my_data_object.uuid, <<<<<<

but I had doubts about which registry the DynamicSupervisor is going to use since there can be multiple ones.
Is there a way to pass in the registry to DynamicSupervisor so it does process registration on its own?

I found this: How to start processes with dynamic names in Elixir

I didnā€™t know I can pass via tuple instead of the name atom. If this works it would be great, with additional benefit of not wasting atom space on new atoms allocated on task names.

I did this:

        {:ok, pid} = Horde.DynamicSupervisor.start_child(
          DataImportSupervisor,
          %{
            id: {:via, Horde.Registry, {DataImportRegistry, my_object.uuid}},
            start: {Task, :start_link, [DataImportWorker, :process_data_import, [my_object]]}
          })
        Horde.Registry.register(DataImportRegistry, my_object.uuid, pid)
        Horde.Registry.lookup(DataImportRegistry, my_object_uuid)

and the result is still []

It doesnā€™t barf on register but lookup() still doesnā€™t find itā€¦ :frowning:

Itā€™s a bit confusing, but itā€™s the Task name you need to be setting using the via tuple, not the child spec id. The id is a value used by the supervisor to determine uniqueness, but it doesnā€™t name the process.

Take a look at this example for the built-in Registry: Registry ā€” Elixir v1.12.3

You also do not need to explicitly register if you start the task with a via tuple, you should be able to look it up using the id you provided in the tuple (not the full tuple).

But you also donā€™t need to know about the registry api at all after you name it - you should be able to GenServer.whereis(via tuple). This way the rest of your code doesnā€™t have to know about the registry youā€™re using, that itā€™s distributed, etc. The via tuple becomes the canonical name you pass around representing that process.

Sorry for poor formatting, on my phone.

The registry adds a monitor for the process that calls register, so that when that process exits the registry is notified and can un-register the name.

This article walks through a detailed example:

NOTE: the article predates both DynamicSupervisor and Registry, but the underlying techniques are the same.


Itā€™s common convention to make the function that builds via tuples be called via_tuple, but thereā€™s no requirement.

Iā€™m afraid Iā€™m still confused. I donā€™t see an API to start a Task with a via tuple. Thereā€™s one in Agent or GenServer but I could not find one in Task.

Maybe my whole approach is wrong. I donā€™t care about the state. I have an API that needs to kick off asynchronous processing of an object/piece of data that was passed in as a parameter, so basically an asynchronous function call. I need a supervisor to mark the object as ā€œprocessed successfullyā€ when function completes successfully, or mark it as ā€œfailedā€ in function fails or task dies. I also have another API to be able to look up the active task and kill/abort it. Only one node in the cluster needs to be able to process each object.
In actual fact, the ā€œobjectā€ is a DataImportRequest containing a File with a set of records to load into the database or kafka etc.
So I was thinking of DynamicSupervisor that would start a Task somewhere on the cluster, put in the Horde Registry, which should take care of having only one task per DataImportRequest on the cluster and being able to look it up by ā€œnameā€ to be able to terminate_child in the DynamicSupervisor. I got stuck on how to via tuple and on intercepting taskā€™s untimely death so I can mark DataImportRequest as ā€œfailedā€. Do I need a GenServer or Agent for that, or thereā€™s a way to pass via tuple for a Task too? And how to catch task crash or abort, as opposed to successful completion?
Appreciate your help!

Tasks donā€™t have a built-in API to register with names because Tasks donā€™t usually process unsolicited messages from other processes (unlike GenServer, for instance); youā€™d need to explicitly call receive inside the Task.

That doesnā€™t stop you from calling Horde.Registry.register/3 in your Task, though:

Task.start(fn ->
  Horde.Registry.register(DataImportRegistry, some_uuid, [])
  ... do stuff ...
end)

Then you can refer to that Task with the tuple {:via, DataImportRegistry, some_uuid} in APIs like Process.whereis.

1 Like

Thanks, all, Iā€™m making some good progress. I read somewhere that Horde.Registry.register needs to be called from within the Task to work properly. It appears to be so. When I was trying to register the Task in the registry from the main (caller) process it wasnā€™t getting registered. Also, the 3rd argument, apparently, needs to be an atom.

So hereā€™s what works:

  1. Added atom to child_spec when starting the Task under Hordeā€™s Dynamic Supervisor:
    {:ok, _pid} = Horde.DynamicSupervisor.start_child(
    DataImportSupervisor,
    %{
    id: {:process_data_import, di.id},
    start: {Task, :start_link, [DataImportWorker, :process_data_import, [di, current_user.id, client_ip]]}
    })

  2. Register the task in Horde Registry, with atom as the 3rd argument:
    def publish_data_import(di=%DataImport{}, user_id, client_ip) do
    Horde.Registry.register(DataImportRegistry, di.id, :publish_data_import)
    ā€¦

  3. Then lookup by id just works and Iā€™m able to implement ā€œabortā€ functionality like this:
    def abort_data_import(di = %DataImport{}, user_id, client_ip) do
    case Horde.Registry.lookup(DataImportRegistry, di.id) do
    [{pid, _value}] ā†’
    case Horde.DynamicSupervisor.terminate_child(DataImportSupervisor, pid) do
    :ok ā†’
    change_status(di, DataImportStatus.ABORTED)
    ā€¦

My remaining issues are:

  1. I donā€™t like that task management and business logic are mixed. I think I can just refactor that to have ImportTask module that does register, lookup and terminate_child and delegates to ImportWorker for everything else. That way Iā€™ll have business logic encapsulated in ImportWorker module and whether its functions are called directly/synchronously or via asynchronous Task should not be tied to business logic
  2. How to trap the unintended death of the task in the supervisor? I want to catch the crashes (e.g. raiseā€™d exceptions or some other runtime errors so I can mark the DataImport ā€œfailedā€, e.g. change_status(di, DataImportStatus.FAILED). I heard about trap_exit boolean flag but not sure how it relates to supervisor or whether there(where?) needs to be a callback to handle that? If thatā€™s not possible, I suppose I can do something in Java style, like try/catch block around every major entry point that would change_status/2 in the catch blockā€¦

Thanks again!

The value put in id here is irrelevant, as itā€™s immediately overwritten by Horde.DynamicSupervisor before starting the child:

It used to be used as an identifier in terminate_child, but that was removed back in 2019:

The documentation doesnā€™t mention this (the third argument is typed as term()), what motivated this observation?

trap_exit is a flag set on the current process; you might use it to, for instance, handle errors arising inside of ImportWorker without crashing the Task.

A pretty standard approach would be to have another process that monitors these Tasks and handles the resulting exit messages.

HOWEVER

Youā€™re teetering on the edge of rolling your own job-tracking system here. Consider picking an off-the-shelf alternative like Verk or Oban thatā€™s already solved a lot of these problems.

1 Like

Thanks, all, for your help!