My GenStage producer runs out of work to do, I buffer demand, but how do I know there is new work?

I’m writing a GenStage producer which is designed to scrape a bunch of RSS feed urls, and return a stream of all Articles. The set of urls it might scrape is fairly large, so it is given a UrlRepository module that it calls pop_url on to get the next url to parse. The UrlRepository (not coded yet) will presumably give it least recently parsed url.

My current implementation of handle_demand is that I keep popping URLs and parsing them until I get enough articles to meet demand, and then I return them. If I got more articles than the demand, I pop them in a buffer in the state, and return the demanded amount. That is working very nicely.

However, what if I’ve parsed all the urls recently, and UrlRepository decides it doesn’t want to give me urls again (perhaps there’s a minimum scrape interval for each url of 1 hour or something), so eventually pop_url returns nil, and handle_demand is left without enough articles to fill demand.

The documentation says that I need to take care of buffering the demand myself. So, I create a demand integer in my state, and if handle_demand has to return fewer articles than were demanded, I append the difference to my demand buffer. Next time handle demand is called, I can potentially return more and fillfil that extra demand.

However, of course handle demand will stop getting called if I stop fullfilling it. It’s up to me to announce when that demand is now able to be fullfilled. I see that in libraries such as Snowy and Twittex, where an external service such as a TCP library finally provides some data, the GenStage producer recieves a handle_info call, and because there is a positive number in the demand buffer, it immediately emits some events.

However, because I stopped getting urls, I stopped trying to parse. I need to know to start again in order to start fullfilling that buffered demand. My pop_url function returned nil, so I stopped, but it will presumably have urls later if I check again.

Ideas I have for solutions.

  1. When my pop_url function returns nil, set a callback timer to check again in 20 minutes. Respond to that with handle_info to get things moving again.
  2. Turn my url provider into a GenStage producer, so I an indicate that I’m demanding urls, and just wait until there are some.
2 Likes

You can always set a “cronjob” for running your functions again. I recommend Quantum.

The thing about Twittex and Snowy is that they expect data from external sources. That’s not your case simply because most of these RSS clients do not have a “subscribe” option that will send you data towards a specific endpoint. So, since you are the one who’s querying, the Cronjob seems a good option for that.

2 Likes

I believe there is a confusion here. If the consumer asks for 100 items, you don’t need to wait for 100 items in order to reply to the consumer. Instead, you should reply to it with whatever you have available. If you do not have enough items, then you should store how much you own the consumer and send more items as soon as you can.

It is likely that you are doing what you described above, I just wanted to clarify. :slight_smile:

Just to clarify as well: there is no such thing as announcing demand can be fullfilled. Consumers will always ask for more items if they are ready to process more items. Then the producer should send what it can, when it can.

3 Likes

Oops, premature send.

In any case, the fixes you propose at the end seem to be correct. If you cannot fullfil the demand, you should either have the repository tell you when it has more items available OR you should have a timer that forces you to ask the repository again in 1 minute or so. Check Process.send_after/3 for the latter.

3 Likes

So I have been struggling with this same issue. My producer pulls events off of one or more kafka topics and sends the on to a consumer. This works great as long as there is more supply than demand. I’m using the kafka fetch api to pull events off the queue. If there are no events or not as many as demanded by the consumer the processing stops due to unmet demand. I was trying to find a good example of how to start a polling task to periodically consume from the kafka topic. Not much out there, but with a little help from the Slack channel I came up with the following example. It uses an erlang queue to mimic the kafka topic and Process.send_after to periodically pull from the queue.

defmodule QueueTest do
  # use GenStage

  def start(max_demand \\ 10, poll_interval \\ 10000) do
    string = "I am the very model of a modern Major-General, I've information vegetable, animal, and mineral, I know the kings of England, and I quote the fights historical From Marathon to Waterloo, in order categorical"
    {:ok, message_queue} = GenServer.start_link(QueueTest.MessageQueue, string)
    {:ok, producer} = GenStage.start_link(QueueTest.Producer, {message_queue, poll_interval})
    {:ok, consumer} = GenStage.start_link(QueueTest.Consumer, :ok)
    GenStage.sync_subscribe(consumer, [to: producer, max_demand: max_demand])
    message_queue
  end

  def push(message_queue, string), do: GenServer.cast(message_queue, {:push, string})
end

defmodule QueueTest.MessageQueue do
  use GenServer

  def init(string) do
    {:ok, :queue.from_list(String.split(string, " "))}
  end

  def handle_cast({:push, string}, queue) do
    queue = String.split(string) |> Enum.reduce(queue, fn(word, queue) -> :queue.in(word, queue) end)
    {:noreply, queue}
  end

  def handle_call({:fetch, items}, _from, queue) do
    dequeue(queue, items, [])
  end

  defp dequeue(queue, 0, words) do
    {:reply, Enum.reverse(words), queue}
  end

  defp dequeue(queue, demand, words) do
    case :queue.out(queue) do
      {{:value, word}, queue} ->
        dequeue(queue, demand - 1, [word | words])
      {:empty, queue} ->
        {:reply, Enum.reverse(words), queue}
    end
  end
end

defmodule QueueTest.Producer do
  use GenStage

  def init({queue, poll_interval}) do
    {:producer, {queue, 0, poll_interval}}
  end

  def handle_demand(demand, {queue, pending_demand, poll_interval}) do
    do_handle_demand(queue, demand+pending_demand, poll_interval)
  end

  def handle_info(:fetch_more, {queue, pending_demand, poll_interval}) do
    do_handle_demand(queue, pending_demand, poll_interval)
  end

  defp do_handle_demand(queue, demand, poll_interval) do
    words = GenServer.call(queue, {:fetch, demand})
    if Enum.count(words) < demand do
      poll_on_insufficient_supply(poll_interval)
    end
    {:noreply, words, {queue, demand - Enum.count(words), poll_interval}}
  end

  defp poll_on_insufficient_supply(poll_interval) do
    Process.send_after(self(), :fetch_more, poll_interval)
  end
end

defmodule QueueTest.Consumer do
  use GenStage

  def init(:ok) do
    {:consumer, :ok}
  end

  def handle_events(words, _from, state) do
    words |> Enum.join(" ") |> IO.write
    IO.puts ""
    {:noreply, [], state}
  end

  def handle_subscribe(:producer, _opts, from, _state) do
    IO.puts("subscribe from #{inspect from}")
    {:automatic, from}
  end
end
4 Likes

From @josevalim’s response on the demand question

My ideal case would be if my min_demand is 10 and the producer has 7, I should be able to process the 7 and have the consumer(s) request for more after its done.

That’s up to the producer to implement. However most producers will send whatever it has available so it should work as you described. And then, just after 10 items are consumed, that the consumer will ask for more.

To make it clear: GenStage was designed so the producer can send whatever it has. The producer just needs to implement this logic accordingly (and most do).

By that are you referring to the example here (gen_stage/lib/gen_stage.ex at main · elixir-lang/gen_stage · GitHub).

My issue is still that once we send less events that the consumer requested, the consumer stops sending demand (at least it seems like it). And using this example, does that mean that if I’m buffering pending demand and it takes a while(hours) before more data arrives, I will not process what I already have?

I will post a simple example to illustrate the issue if I still can’t solve it with the linked Genstage example.

You dispatch the events that you have available (and that do not exceed the demand) with the {:noreply, [event], state} tuple from handle_demand. Those events are processed immediately. But you still have to deliver the “pending demand” and as long as that pending demand has not been fulfilled, you cannot expect any further handle_demand calls.

But lets say there is a message that lets you know that more data for events has become available - you can process that message with handle_cast (for example). handle_cast (and other GenStage callbacks) also lets you return events with {:noreply, [event], state} to fulfill that pending demand.

Now if it you happen to process a handle_demand while you still have pending demand, you have to accumulated it - i.e.

new_pending_demand = demand + pending_demand
6 Likes

This is probably the most clarifying answer. The key here is that handle_cast and other GenStage callbacks allow you to fulfill pending demand. That is/was not clear to most people, including me, even though it is in the docs.

7 Likes