How to have GenStage producer emit intermediate results?

This may just be my misunderstanding on appropriate design, but is there a way for a producer to yield intermediate results where the count is much lower than what is demanded? If it is an expensive operation to produce the results or you want to limit the actual number of items in flight at any given time, what is the best way to do this?

I saw in the docs that the consumer can set max_demand and min_demand, but is there a way for the producer to specify this? Or for the producer to give back intermediate results while still fullfilling up to the requested demand?

Some example code below.

alias Experimental.GenStage

defmodule GenStageEx do
  # ----------------------
  # Producer -------------
  # ----------------------
  defmodule Producer do
    use GenStage

    def init(_ok) do
      {:producer, []}
    end

    def handle_demand(demand, leftovers) do
      IO.puts("# producer was asked for #{demand}")
      items = case leftovers do
        [] -> request_more()
        _ -> leftovers
      end

      {to_return, next_leftovers} = Enum.split(items, demand)
      {:noreply, to_return, next_leftovers}
    end

    def request_more() do
      # some expensive operation that only returns 10 items at a time
      0..9
    end
  end

  # ----------------------
  # Consumer -------------
  # ----------------------
  defmodule Consumer do
    use GenStage

    def init(_ok) do
      {:consumer, %{}}
    end

    def handle_events(events, _from, state) do
      events
      |> Enum.map(&(IO.puts/1))

      {:noreply, [], state}
    end
  end

  def run_me do
    {:ok, p} = GenStage.start_link(GenStageEx.Producer, :ok)
    {:ok, c} = GenStage.start_link(GenStageEx.Consumer, :ok)

    GenStage.sync_subscribe(c, to: p)

    Process.sleep(2000)
  end
end

[Please Ignore this answer]
@dangets I could not understand your question clearly.

I saw in the docs that the consumer can set max_demand and min_demand, but is there a way for the producer to specify this? Or for the producer to give back intermediate results while still fullfilling up to the requested demand?

From I understand there are two questions -

  1. Is it possible for producer to specify max_demand and min_demand?
  • Yes we can do that. Each time the client trigger the request events. We can check from producer if there at least more and min_demand then we send reply events to the consumer. But generally I think this min and max things should be configure from consumer since consumer should know how much events it can handle at a particular moment.
  1. The producer to give back intermediate results while still fulfilling up to the requested demand? - This part I could not understand your problem.
  • Do you mean that the producer push the event to the consumer when the event generated? If this is the case. From what I learn it is no, Since in GenStage the request for events is actually started from the consumer. But I think we can simulate that in a way that consumer keep requesting from producer so often that as soon as event is generated the consumer instantly get event right away. I am actually writing this part out in my project. I might be able to show this part of the code from my project may be tomorrow.

@blisscs First, thanks for the help.

What I meant for #2 was that the producer internally can only generate a few events for a given “batch”, say 10. If the demand that was requested was 1000, I’d like to be able to loop in the producer code to provide results in increments of 10 up to the originally requested demand. Of course I could do the looping and building up a large list of results before returning any results, but this may take a while and is time that the consumer could be processing the beginning of the list.

I was thinking if the producer could set the max demand to something closer that it produces internally then the consumer can be agnostic to the implementation of the producer. Does that make sense?

[Please ignore this.]
@dangets I don’t think you need to concern of that since the initiation of the events request is from consumer.

The producer duty is just to create events. When events are created the producer just keeps updating the state of the itself.

When consumer requests for events - there are three possibilities -

  1. There are no event. (Producer hasn’t produce anything for consumer yet).
  2. There are events and in range that consumer can receive. (Consumer consumes those events).
  3. There are events but there are too many for consumer to receive. (Consumer only consumes that much that it can consume, then the producer updates it state by removing it consumed events).

And then process get looped.

I think what I understand is correct.

The problem that I’m seeing is that the producer gets asked for 1000 items, returns the first 10 items, but then its handle_demand doesn’t ever get called again. I’m assuming that this is because the min_demand threshold hasn’t been hit.

From the doc -
There is a term bufffering demand.

In case consumers send demand and the producer is not yet ready to fill in the demand, producers must buffer the demand until data arrives.

You have to update the states until the minimum threshold is reached.
Please ignore my answer from the above part that I say things go in to loop. Today I relearn about the ideas from gen_stage.

The producer MUST yield intermediate results for higher demands. The idea is that, if you are asked more items than you have, then you just store that pending demand. The next time the items are available (because an external source pinged you, because someone sent them to you, because you reached a certain time, etc), you then emit the new items if pending_demand > 0.

Here is an example of a producer that handles both cases where it has more items to send than requested and it may have been asked for more items that it has to send: gen_stage/lib/gen_stage.ex at main · elixir-lang/gen_stage · GitHub. It uses a queue for the first case and a pending_demand integer for the second.

2 Likes

I’m still not sure how to handle cases where the producer has less demand that is requested. If I understand this correctly, I shouldn’t reply from the producer until you have enough items to satisfy the demand, correct?

I have some time-bound events where I need to expire items everyday say today have 21 items to expire and my max_demand is 20, my consumer stops asking for more events so I don’t expire item 21 until the following day. Is this how its supposed to work and is there a way to prompt my consumer to ask for more?