Limited number of workers for chunks of work

Hi,

I’m trying to practice and exercise my understanding of elixir and concurrency. It’s a toy project that I would like to do the following. I have a database with X rows, consisting of an id and a counter (10). A worker runs to take a row, reduces the counter by 1 and waits for 1 seconds before doing it again until the counter is 0.

def init(state) do
    schedule()

    {:ok, state}
  end

  def handle_info(:work, state) do
    item_id = state.id
    IO.inspect(state)
    item = Repo.one(from(i in Task, where: i.id == ^item_id))

    cond do
      item.counter > 0 ->
        Repo.transaction(fn ->
          Repo.update!(change(item, counter: item.counter - 1))
        end)

        schedule()
        {:noreply, item_id}

      true ->
        exit(item_id)
        # {:stop, :normal, item_id}
    end
  end

  defp schedule() do
    Process.send_after(self(), :work, 1000)
  end

I’m having a bit of a block on how best to implement a “Monitor” that will launch a number of workers (like say 100 or 500) to do the work. Once a worker is done and there’s work left to do, it should launch a new one, with the next available row id. I’m thinking this isn’t a job for supervisors.
I also didn’t get very far using spawn_monitor, perhaps not using it correctly? Any pointers on how to best approach this part of the problem would be appreciated.

I can start a number of processes, the problem is more the “starting new ones if there’s still work to do once one worker is done” is what’s giving me trouble. Should the workers pass messages back to the monitor, or what’s the correct approach for this?

Have you taken a look at DynamicSupervisor? It sounds like it’d be useful in this case, but it doesn’t take care of coordination. You’d need to pair it with a ‘manager’ of sorts that calls monitor on the child, and checks for the remaining work to launch more children. Workers shouldn’t need to pass messages back, but can return an exit reason which the manager should listen for.

You can use Task.async_stream/{3,5} for that:

1..n
|> Task.async_stream(&do_work/1, max_concurrency: 10)

However if you are working on the DB then fetching the entries one by one is pretty inefficient way to do work, instead it is better to do that in batches, you can use Repo.stream/2 for that.

Also your Repo.transaction/1 in code you have provided is pointless, as Repo.update!/1 is already atomic.

1 Like

fetching the entries one by one is pretty inefficient

Good to know, but for this example it was supposed to simulate a longer running task. I will change to using update though.
async_stream looks good. I’ll take a closer look at this in more detail. Thanks

I did look at DynamicSupervisor but got a little stuck with setting that up correctly. I also thought that this task should be fairly straightforward. But this might be worth investigating anyway to help me get a better understanding. The reason I didn’t go any further is because I didn’t feel Supervisors would be ideal as I don’t care if the worker dies.

Thanks for the replies. I shall do some experimenting with the suggestions.