GenStage - Trouble wiring things up

I’m trying to do what was described here: GenStage - Determining Completion

I have a producer that generates a finite stream of data
I have a consumer that writes this data to a database

This consumer will be a subscriber to a lot of other producers that generate the same type of content, so I don’t want to hard code the subscription in the consumer’s init.

defmodule NHVerify.Application do
  alias NHVerify.Producers.Hawaii
  alias NHVerify.Consumers.LicenseeDBWriter

  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(Hawaii, [], name: :hawaii),
      worker(LicenseeDBWriter, [:hawaii]),
    ]
    opts = [strategy: :one_for_one, name: NHVerify.Supervisor]
    IO.puts inspect "Starting"
    Supervisor.start_link(children, opts)
  end
end


defmodule NHVerify.Producers.Hawaii do
  alias Experimental.GenStage

  def start_link() do
    GenStage.from_enumerable(get_licensees(), consumers: :permanent)
  end

  def get_licensees() do    
    # returns a stream
  end
end

defmodule NHVerify.Consumers.LicenseeDBWriter do
  alias Experimental.GenStage
  use GenStage

  def start_link(producer_name) do
    GenStage.start_link(__MODULE__, producer_name)
  end

  def init(producer_name) do
    {:consumer, :does_not_matter, subscribe_to: [:hawaii] }
  end

  def handle_events(licensees, _from, caller) do
    save_licensees licensees
    {:noreply, [], caller}
  end

  def handle_info({{_pid, _sub}, {:producer, :done}}, _) do
    {:stop, :normal, 0}
  end
end

my mix.exs has:

  def application do
    # elixir 1.4
    [extra_applications: [:logger],
     mod: {NHVerify.Application, []}]
  end

When I run: mix run --no-halt

I get:

15:02:51.081 [info] Application nh_verify exited: NHVerify.Application.start(:normal, ) returned an error: shutdown: failed to start child: NHVerify.Consumers.LicenseeDBWriter
** (EXIT) no process: the process is not alive or there’s no process currently associated with the given name, possibly because its application isn’t started
** (Mix) Could not start application nh_verify: NHVerify.Application.start(:normal, ) returned an error: shutdown: failed to start child: NHVerify.Consumers.LicenseeDBWriter
** (EXIT) no process: the process is not alive or there’s no process currently associated with the given name, possibly because its application isn’t started

If I don’t try and subscribe the LicenseeDBWriter, then things spin up just fine. The problem appears to be with the way I’m trying to reference the producer’s name. I assume I need to create my producer first, let it spin up, and then the consumer. How do I do that?

Note: I realize that, from what I’ve shown, GenStage is overkill. There’s more to it than this, but I’ve stripped everything else away to try and clarify the problem.

1 Like

Ok, not done yet, but my previous suspicions were confirmed. Including here in case someone else has the same question.

Here’s what I have now:

defmodule NHVerify.Application do
  use Application

  def start(_type, _args) do
    {:ok, _pid} = NHVerify.Supervisor.start_link()
  end
end

defmodule NHVerify.Supervisor do
  alias NHVerify.Producers.{Alaska,Arizona,Hawaii,Minnesota,Oregon,Washington,WashingtonDC}
  alias NHVerify.Consumers.LicenseeDBWriter
  use Supervisor

  def start_link() do
    result = {:ok, sup} = Supervisor.start_link(__MODULE__, [])
    start_workers(sup)
    result
  end

  def start_workers(sup) do
    NHVerify.Repo.start_link([])

    [Alaska, Arizona, Hawaii, Minnesota, Oregon, Washington, WashingtonDC]
    |> Enum.each(&start_worker(&1, sup))
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end

  defp start_worker(worker_mod, sup) do
    {:ok, pid} = Supervisor.start_child(sup, worker(worker_mod, []))
    Supervisor.start_child(sup, worker(LicenseeDBWriter, [pid]))
  end
end

I have some other problems now. (The processes aren’t getting closed out cleanly at the end, but I’ll look into that later)

1 Like

I dropped the supervisor strategy, in favor of going back to escript, because the supervisor kept trying to restart processes when they completed.

So now:

  1. How do I halt my escript when all of my work is completed? I had to put a Process.sleep(:infinity) at the end of my main() method to prevent everything from stopping immediately. (I had to use --no-halt on the previous mix style). I think (how can I tell?) that my consumers and producers are shut down, but that Process.sleep will keep things running anyway.

  2. I’m using GenStage.from_enumerable\1, which is great, except that it’s sending a :halted message instead of a :done message. Not sure if that really matters, but it suggest I may be doing something wrong elsewhere

Code:

defmodule NHVerify do
  alias Experimental.GenStage
  alias NHVerify.Consumers.LicenseeDBWriter
  alias NHVerify.Producers.{Alaska, Arizona, Hawaii, Minnesota, Oregon, Washington, WashingtonDC}

  def main(_args) do
    NHVerify.Repo.start_link([])
    [Alaska, Arizona, Hawaii, Minnesota, Oregon, Washington, WashingtonDC]
    |> Enum.each(&scrape/1)
    Process.sleep(:infinity)
  end

  defp scrape(mod) do
    {:ok, producer_pid} = GenStage.from_enumerable(apply(mod, :get_licensees, []))
    {:ok, consumer_pid} = GenStage.start_link(LicenseeDBWriter, self())
    GenStage.sync_subscribe(consumer_pid, to: producer_pid, min_demand: 0, max_demand: 1)
  end
end
1 Like

You can set the mode to restart: :transient value for the supervisor children. Then it the child exits with :normal reason, it won’t be restarted.

I think (how can I tell?) that my consumers and producers are shut down

When the producer is done, it sends a notificaion with :done or :halted value. You shouldn’t worry which one you get, it is mostly an enumerable detail.

I would honestly look into Flow for solving your current problem. It currently ships with GenStage. You can just write the producers using GenStage and then processing is quite straight-forward and it does things such as guarantee proper termination.

2 Likes

Thank you Jose. I’ll do that

Ok, here’s what I came up with. It seems to work. ¯_(ツ)_/¯

defmodule NHVerify do
  require Logger

  alias Experimental.Flow
  alias NHVerify.Consumers.LicenseeDBWriter
  alias NHVerify.Producers.{Alaska, Arizona, Hawaii, Minnesota, Oregon, Washington, WashingtonDC}

  def main(_args) do
    NHVerify.Repo.start_link([])
    [Alaska, Arizona, Hawaii, Minnesota, Oregon, Washington, WashingtonDC]
    |> Enum.map(&apply(&1, :get_licensees, []))
    |> Flow.from_enumerables
    |> LicenseeDBWriter.save_licensees
  end
end

defmodule NHVerify.Producers.Hawaii do
  def get_licensees() do
    fn acc, fun -> do_get_licensees(acc, fun) end
  end

  def do_get_licensees(acc, fun) do
    Logger.info "Hawaii starting"
    HTTPoison.get!(@csv_url, [], recv_timeout: :infinity).body
    |> String.split("\n")
    |> List.delete_at(-1)
    |> CSV.decode(headers: true)
    |> Stream.map(&get_resource/1)
    |> Enumerable.reduce(acc, fun)
  end
end

I just included 1 scraper, because they all follow the same pattern.

My expectation is that all of the scrapers will be run simultaneously (it appears they are) their output merged and sent to the DBWriter. Took an embarassingly long amount of time to realize that all I needed to do was change Enum.reduce to Enumerable.reduce.