Best Way to Broadcast data to all workers

Hello guys,

I`m starting to learn Elixir with the purpose to create a distributed application to do some validations in a list that is updated every min.

Here`s my ideia:

Create a DynamicSupervisor with N workers to isolate the validations(one process one validation of some item of the list)

The list contains about 1.000 items, where each item contains 10 properties.

A process need to validate some of the properties of one item. (eg: If property A from item N is bigger than 10, print it)

The problem is that this list is updated every min and I dont know which is the best way to emit the updates
to every worker. My first ideia is to use ETS but I`m a little concerned about every process doing a select every min in the database, doesn´t look like a good idea.

The complete flux is(I think it should be):
-> Supervisor gets the update from an external socket application
-> Filter the list according to each process item
-> Send to process only the properties of its item

But it needs to be fast, and scalable :smiley:

1 Like

hey!
Normally it is not good to do any task on the supervisor.
Instead a valid approach would be using GenStage.
So, with this approach you could think about having a producer, the process that will receive the messages, and many consumers to process messages.
In the Producer you just will need to receive the messages and broadcast them. The filtering part can be done almost automatically using the selector option for the subscription of the consumers.
Hope it helps!

1 Like

Hello jkrmrto, thanks for answering so fast!

I`ve dived into some GenStage tutorials and ended up with something close for what I need, but I still have some issues.

This is my current code

# Producer
defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(list), do: {:producer, list}

  def handle_demand(_, state) do
    {:noreply, state, state}
  end
end
# Consumer
defmodule GenstageExample.Consumer do
  use GenStage

  # Starts consumer with his code
  def start_link(inital_state) do
    GenStage.start_link(__MODULE__, inital_state)
  end

  def init(state) do
    # Next line should filter messages sent from producer
    {:consumer, state,
     subscribe_to: [
       {GenstageExample.Producer, selector: fn %{key: key} -> state[:code] == key end}
     ]}
  end

  def handle_events(events, _from, state) do
    # Prints every message received
    for event <- events do
      IO.inspect({self(), "Received: " <> event, "Expected: " <> state[:code]})
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

And this is my application.exs

defmodule GenstageExample.Application do
  use Application

  @impl true
  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      {GenstageExample.Producer, ["A", "B", "C"]},
      # {GenstageExample.ProducerConsumer, []},
      %{
        id: 1,
        start: {GenstageExample.Consumer, :start_link, [%{code: "A"}]}
      },
      %{
        id: 2,
        start: {GenstageExample.Consumer, :start_link, [%{code: "B"}]}
      },
      %{
        id: 3,
        start: {GenstageExample.Consumer, :start_link, [%{code: "C"}]}
      }
    ]

    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

The problem is that the selector is not working, the consumer A stills receives messages B and C, and the same thing happens in B and C Consumers.

I know I can place an if-statement in handle_events function of Consumer, but I think it should be better using your suggestion(filtering with selector)

Could you please analyze my code and help me see where is the mistake?

This is my console output after running “mix run --no-halt”

{#PID<0.198.0>, "Received: A", "Expected: C"}
{#PID<0.197.0>, "Received: A", "Expected: B"}
{#PID<0.196.0>, "Received: A", "Expected: A"}
{#PID<0.198.0>, "Received: B", "Expected: C"}
{#PID<0.197.0>, "Received: B", "Expected: B"}
{#PID<0.196.0>, "Received: B", "Expected: A"}
{#PID<0.198.0>, "Received: C", "Expected: C"}
{#PID<0.197.0>, "Received: C", "Expected: B"}
{#PID<0.196.0>, "Received: C", "Expected: A"}

hey,
You should use a BroadcastDispatcher. So, when initializing the producer: {:producer, list, dispatcher: GenStage.BroadcastDispatcher}

What do you want to broadcast ? The fact that the list has been updated (a single message every minute), or each list items (1000 messages / minute) ?

Edit:

Also selector is only supported by BroadcastDispatcher, and you function would not match:

Selector: fn %{key: key} -> state[:code] == key end

Events: ["A", "B", "C"]

%{key: key} can never match "A", "B" or "C"

Also, what do you want to do with the validations ? If you need to update the list then you cannot do it in parallel without handling concurrent writes to the list by two validators. If you just want to print the items then it is fine.

I want to broadcast the list(actualy, a property “lastInfos” of the list, where each worker will receive only lastInfos where state[:code] == event[:code])

The case is:

Every minute I receive a new list, and each worker will be looking at one item, something like this:

List: [
%{code: “A”, lastInfos: {}},
%{code: “B”, lastInfos: {}},
%{code: “C”, lastInfos: {}}
]

Where each worker will print the property lastInfos of the item being observed.
My objective is that, every time I update the list in Producer, the Consumers need to receive the update.

A Consumer will never change the list, he needs only to receive the most recent data of the item observed.

I see,

In that case GenStage can do the job with a consumer per code.

But my code is far from good:

  • I did not use any buffering for the demands in the consumer, that should be handled properly
  • Items are not handled in isolation, as you requested, the processes still receive a list of items.

That is why I would not use GenStage if I required process isolation for handling each item. Although, as it is a functional immutable language, I guess you are fine with handling lists of items.

defmodule ListUpdater do
  def start_link(producer \\ nil) do
    spawn_link(fn -> init(producer) end)
  end

  defp init(producer) do
    loop(producer)
  end

  defp loop(producer) do
    IO.puts("sending list")
    send(producer, {:new_list, generate_list()})
    # Send a list every minute, here every second
    Process.sleep(1000)
    loop(producer)
  end

  defp generate_list() do
    Stream.cycle(["A", "B", "C"])
    # The length of the list
    |> Enum.take(1000)
    |> Enum.shuffle()
    |> Enum.with_index()
    |> Enum.map(fn {key, index} -> %{key: key, id: index} end)
  end
end

defmodule GenstageExample.Producer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    {:producer, :some_state, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_info({:new_list, list}, state) do
    {:noreply, list, state}
  end

  def handle_demand(_demand, state) do
    IO.puts("got demand")
    {:noreply, [], state}
  end
end

defmodule GenstageExample.Consumer do
  use GenStage

  # Starts consumer with his code
  def start_link(code) do
    GenStage.start_link(__MODULE__, code)
  end

  def init(code) do
    # Next line should filter messages sent from producer
    state = %{code: code}

    {:consumer, state,
     subscribe_to: [
       {GenstageExample.Producer, selector: fn %{key: key} -> code == key end, max_demand: 1000}
     ]}
  end

  def handle_events(events, _from, state) do
    # Prints every message received
    for event <- events do
      IO.puts(
        "[#{state.code}] #{inspect(self)} Received: #{inspect(event)}, Expected: #{state.code}"
      )
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

GenstageExample.Producer.start_link()
GenstageExample.Consumer.start_link("A")
GenstageExample.Consumer.start_link("B")
GenstageExample.Consumer.start_link("C")
ListUpdater.start_link(GenstageExample.Producer)


Edit: You could also use the PartitionDispatcher, with the same code as above except for those differences:

  # GenstageExample.Producer
  def init(_arg) do
    {:producer, :some_state,
     dispatcher:
       {GenStage.PartitionDispatcher,
        partitions: ["A", "B", "C"], hash: fn event -> {event, event.key} end}}
  end


  # GenstageExample.Consumer
  def init(code) do
    # Next line should filter messages sent from producer
    state = %{code: code}

    {:consumer, state,
     subscribe_to: [
       {GenstageExample.Producer, partition: code, max_demand: 1000}
     ]}
  end

It makes more sense to me, but all the possible codes must be known beforehand. For example if you write partitions: ["A", "B"], then a list item with "C" will crash the producer. On the other hand, with the broadcast dispatcher, it is the opposite, as if you declare A,B,C, then a list item that would have D would be ignored (an maybe stay in the producer memory for ever ? I don’t know). So I would rather have the process to crash and use partitions that have unhandled events.

1 Like

If you don’t want to bring in external deps, you can leverage pg, which is a way to register processes under a specific group. A gen_server can partition the list between the processes in a group. You mentioned it your goal was a distributed application, which works will with pg. It is eventually consistent across all nodes.

So,

Thanks for the response, I think your code will work.

The scenario is this:

The list of [“A”, “B”, “C”] will actually be a Map list where the primary key is the string, something like:

%{
        infos: [
          %{name: "John", age: 18, nickname: "Doe"},
          %{name: "Mary", age: 19, nickname: "Jhoanne"}
        ],
        code: "A"
}

And the list (with codes and infos) is updated every minute, meaning that new codes (non existing at the first time) could be added.

Just to knowledge, I`ll be getting this list from an external source(through a socket connection)

With that in mind, do you still recomend using a BroadcastDispatcher ?
Is there a way to put every process into the supervision of a DynamicSupervisor?(To be respawned when failed, or killed when the code doesnt exists on the newest list)

Thanks for your time!

Well, in that case, what do you actually want to be sent to an isolated process ? %{name: "John", age: 18, nickname: "Doe"} or the whole %{infos: [...], code: "A"}?

Same for what you want to be killed if something is not known, is it a process that would receive the whole list or an item?

You have many options here.

First I would use a Registry to register your validators with the key (e.g. "A") they know. So you can send them the whole list if the key is registered, or reject the list if you have no validator for it (or use a default validator, or no validation).
This registered process can be a simple GenServer that would validate each item in a loop. Or if you really need isolation, would use Task.async_stream to validate the items.
This registered process could be a GenStage producer like we did above, receiving the whole list and returning it as events with the default dispatcher, and you then add a ConsumerSupervisor that would listen to this producer and would spawn a new process for each item in the list. This is basically how the opq package works.

Or, as you receive only one list at a time, could send the list to a unique producer, that would transform %{code: "A", items: [%{name: "mary"}, %{name: "john"}]} into events like this: [{"A", %{name: "mary"}, {"A", %{name: "john"}], and then have an unique ConsumerSupervisor that would spawn a task (isoalted process) for each event, and the task would call a dispatch function:

def validate({"A", item}) do
  Validators.A.validate(item)
end
def validate({"B", item}) do
  Validators.B.validate(item)
end
def validate({unknown, item} do
  # do what you want with unknown items
end

Just do not over-engineer your solution because a full minute is very large, even for a million of items.

Well, I though of sending it separated for performance improvement…

What I want to send separated is the entire item

%{
        infos: [
          %{name: "John", age: 18, nickname: "Doe"},
          %{name: "Mary", age: 19, nickname: "Jhoanne"}
        ],
        code: "A"
}

The code is used only as a “primary key” of the consumer, cause I need one consumer for each code.

The list will grow with time, and I guess that it can become a problem in the future if I build a Producer that sends the entire list to each consumer.

I dont know if you understand it clearly, but each consumer will do a quick validation in the item designed for it.

The case that I want to kill a consumer, is if the new list(that is updated every min) doesnt contain the code “he’s” watching.

Like, in the first minute, I`ll have a list like this

[
%{
        infos: [
          %{name: "John", age: 18, nickname: "Doe"},
          %{name: "Mary", age: 19, nickname: "Jhoanne"}
        ],
        code: "A"
},
%{
        infos: [
          %{name: "Not John", age: 20, nickname: "Doe"},
          %{name: "Not Mary", age: 21, nickname: "Jhoanne"}
        ],
        code: "B"
},
(798 more items at the start)...
]

And, in the next minute, I`ll get a new list like

[
%{
        infos: [
          %{name: "John", age: 18, nickname: "Doe"},
          %{name: "Mary", age: 19, nickname: "Jhoanne"}
        ],
        code: "A"
},
%{
        infos: [
          %{name: "Not John New", age: 25, nickname: "Doooe"},
          %{name: "Not Mary New", age: 22, nickname: "Janne"}
        ],
        code: "Not B"
},
(798 more items without B)...
]

In this case, I have to kill consumer that is looking for “B” and start a new consumer, that will look for “Not B”.

So, the new consumer “Not B” has to remain alive untils the item “Not B” is present on the subsequent updates of the list.

The producer will be connected to a socket, that sends new updates of the list each minute, but this time can change in a not-so-close future(for a smaller time interval)

Pretty Nice, with a BroadcastDispatcher, the selector of the Consumer is now working(and thats great!)

Now I`m kinda confused abot how the handle_demand of producer works.

I needed to send the list(or and update of it) to the consumer from times to time(not always, but not with Process.sleep(), is there a way to only send it through a method call?

Thanks for helping!

Well, as soon as a message is published by the producer, the consumer will digest it when it is idle. In case you need to add a delay to this consumption you would have to do it on the producer some way.

is “Not B” related to be, or does it mean anything besides “A” or “B”? Please explain your problem fully, if you add new concepts on each message we will not be able to help you properly :smiley:

Well, I though of sending it separated for performance improvement…

It may work, but spawning processes for each item will definitely not improve performances to just run “a quick validation” and a print on a list of 1000 items.

For now I would stick to the last proposal I made above, spawning a process for each item as required.

First, a producer that receive the whole list (as in the code I have posted before). You socket receives the whole list, and now you have that in memory. With any solution you will have to copy this data to other processes memory anyway, so you just send it to the producer and you can garbage collect on the socket. If you want to improve, you can stream events from the socket to the producer, but that is optimization, so for a future version, if ever needed.

So, you have this producer that receives the whole list. It uses the default dispatcher (not the broadcast or partition ones).

Then you add a ConsumerSupervisor that will receive the items according to its max_demand, and will spawn a process for each list item. In the handler function, you check the code and call the appropriate validator for this code.

defmodule SocketListUpdater do
  def start_link(producer \\ nil) do
    spawn_link(fn -> init(producer) end)
  end

  defp init(producer) do
    loop(producer)
  end

  defp loop(producer) do
    IO.puts("sending list")
    send(producer, {:new_list, generate_list()})
    # Send a list every minute, here every second
    Process.sleep(1000)
    loop(producer)
  end

  defp generate_list() do
    Stream.cycle(["A", "B", "C"])
    # The length of the list
    |> Enum.take(1000)
    |> Enum.shuffle()
    |> Enum.map(fn code -> %{code: code, infos: generate_infos()} end)
  end

  defp generate_infos do
    [
      %{name: "mary"},
      %{name: "john"},
      %{name: "robert"},
      %{name: "ava"},
      %{name: "simon"},
      %{name: "simon"}
    ]
    |> Enum.take(Enum.random(2..6))
  end
end

defmodule GenstageExample.Producer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    {:producer, :some_state}
  end

  def handle_info({:new_list, list}, state) do
    {:noreply, list, state}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end
end

defmodule GenstageExample.Consumer do
  use ConsumerSupervisor

  # Starts consumer with his code
  def start_link(arg) do
    ConsumerSupervisor.start_link(__MODULE__, arg)
  end

  def init(_arg) do
    # Note: By default the restart for a child is set to :permanent
    # which is not supported in ConsumerSupervisor. You need to explicitly
    # set the :restart option either to :temporary or :transient.
    children = [
      %{
        id: GenstageExample.Validator,
        start: {GenstageExample.Validator, :start_link, []},
        restart: :transient
      }
    ]

    opts = [strategy: :one_for_one, subscribe_to: [{GenstageExample.Producer, max_demand: 50}]]
    ConsumerSupervisor.init(children, opts)
  end
end

defmodule GenstageExample.Validator do
  def start_link(event) do
    Task.start_link(fn -> validate(event) end)
  end

  # Here you can call any validator function depending on the code. In this
  # example I implemented the validators directly in the function body

  defp validate(%{code: "A", infos: infos}) do
    if length(infos) < 3 do
      IO.puts("item with code A is not valid")
    end
  end

  defp validate(%{code: "B", infos: infos}) do
    if length(infos) == 2 do
      IO.puts("item with code B is not valid")
    end
  end

  defp validate(%{code: code}) do
    IO.puts("received unknown code #{code}")
  end
end

GenstageExample.Producer.start_link()
GenstageExample.Consumer.start_link([])
SocketListUpdater.start_link(GenstageExample.Producer)

So you do not have to start a validator for “B” or “Not B”, you just have a single validate/1 function that will check the code and do the appropriate validation. It is simpler.

1 Like

Hey lud, thanks for the code, I think that this will work with a few adjustments.

The only think is that I cant write different validate functions, because I dont know which code will be on the list(And even if I do, I wouldnt write 800 versions).

But with validate, I can get the code and the infos, and do what I need.

The final ideia is to do something like

defp validate({%{code: code, infos: infos}) do
    IO.puts("Checking code #{code}...")
    {age, name, nickname} = infos
    if age > 18 do
         IO.puts("#{nickname} is an adult ")
    end

end

Thats a silly demonstration, but the goal is to do specific commands according to properties inside “infos”.

Thanks for your time and effort

1 Like

Well then you never needed to check the code actually ?