dingosky

dingosky

GenStage docs QueueBroadcaster example code question

The (excellent) GenStage documentation includes a QueueBroadcaster module as an example for “a more robust implementation” with “tighter control over the events and demand by tracking … data locally”. I’ve included the example code in its entirety below for reference. My question concerns the inclusion of from in the queued data.

Following GenServer best practices, QueueBroadcaster provides sync_notify/2 as public API for placing an event into this GenStage producer. That function simply does a GenStage.call/3 which, when handled by handle_call/3, enqueues the tuple {from, event} in the internal queue held in state and calls dispatch_events/3 with the current state pending_demand. The handle_call return results from the dispatch_events/3 function, which when there is positive demand, dequeues {from, event} tuples from the queue in state, and for each tuple calls GenStage.reply(from, :ok) and adds the event to a list to be delivered in the returned :no_reply tuple.

All is well so far, but here’s the question. Why enqueue the value from and make the GenStage.reply(from, :ok) call at all? The reasons this seems superfluous are:

  • Since the public API sync_notify/2 function properly encapsulated the GenStage.call/3 for :notify messaging, any call from sync_notify/2 will have from == self(). So GenStage.reply(from, :ok) will send a message to itself, and since the QueueBroadcaster module does not provide a handle_info/2 function to actually process that message, the process effectively sends itself a message that it ignores.

  • Even if there were a handle_info/2 function in QueueBroadcaster, sending a simple :ok without any indication of the event that was handled would be of very limited value.

I would suggest the example code could be simplified by dispensing of tracking from in state (via queue) and removing the ignored GenStage.reply/2 call altogether. There would be no change in functionality and there would be no need for those learning GenStage to unwind the question: Why is from being kept in state?

Am I missing something?

defmodule QueueBroadcaster do
  use GenStage

  @doc "Starts the broadcaster."
  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @doc "Sends an event and returns only after the event is dispatched."
  def sync_notify(event, timeout \\ 5000) do
    GenStage.call(__MODULE__, {:notify, event}, timeout)
  end

  ## Callbacks

  def init(:ok) do
    {:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_call({:notify, event}, from, {queue, pending_demand}) do
    queue = :queue.in({from, event}, queue)
    dispatch_events(queue, pending_demand, [])
  end

  def handle_demand(incoming_demand, {queue, pending_demand}) do
    dispatch_events(queue, incoming_demand + pending_demand, [])
  end

  defp dispatch_events(queue, 0, events) do
    {:noreply, Enum.reverse(events), {queue, 0}}
  end

  defp dispatch_events(queue, demand, events) do
    case :queue.out(queue) do
      {{:value, {from, event}}, queue} ->
        GenStage.reply(from, :ok)
        dispatch_events(queue, demand - 1, [event | events])
      {:empty, queue} ->
        {:noreply, Enum.reverse(events), {queue, demand}}
    end
  end
end

First Post!

dingosky

dingosky

Okay, pardon the noise. I see now why the GenStage.reply(from, :ok) is necessary. The sync GenStage.call needs a corresponding GenStage.reply to “complete” the call (o/w a timeout occurs).

Where Next?

Popular in Questions Top

sergio
In Ruby, I can go: User.find_by(email: "foobar@email.com").update(email: "hello@email.com") How can I do something similar in Elixir? ...
New
New
Harrisonl
We have an ECS cluster with 4 services, where each task joins a single cluster, via discovery ECS discovery service. Currently when I de...
New
jononomo
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set? Thanks.
New
shahryarjb
Hello, I get Persian date from my client and convert it to normal calendar like this: def jalali_string_to_miladi_english_number(persi...
New
dokuzbir
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
sergio_101
I am VERY much an elixir newbie. I have taken one elixir course and one phoenix course on Udemy. During that course, I saw the instructor...
New
romenigld
I am trying to run a deploy with docker and I successfully runned with this command: docker build -t romenigld/blog-prod . but when I t...
New
nsuchy
Hi. I’ve noticed that Windows Powershell has it’s own IEX command and you cannot access Elixir’s IEX due to the conflict. This isn’t a cr...
New
svb
Hi! Currently I want to submit a form by pressing the Enter key. However, since my input field is of type “textarea” this is just adds a...
New

Other popular topics Top

vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
New
Darmani72
If I have a post route which an argument: post /my_post_route/:my_param1, MyController.my_post_handler How would get the post params ...
New
SoCreat
i’m a new one to elixir which editor can i use vs code? or atom? Thanks! :smiley:
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New
ashish173
I am using Ecto timestamps with postgres, I can see the timestamps() use the :naive_dateime but for my use case I wanted to store the ti...
New
saif
Hello everyone, Long time lurker first time poster here. I’ve recently begun working on Elixir full-time again! :raised_hands: It’s been...
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
PeterCarter
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New

We're in Beta

About us Mission Statement