Thousands of "Elixir.StringIO:init/1" processes

I’m writing a service that processes raw data from a RabbitMQ queue and puts the results in a database. After a few days of running, the RabbitMQ connection starts dropping frames, but right before that happens. I noticed that I get a few “Too many processes” error messages (shown below).

2020-03-02 21:13:52.476: 15:13:52.414 [error] Too many processes
2020-03-02 21:13:52.476: 
2020-03-02 21:13:52.476: 
2020-03-02 21:13:52.476: 15:13:52.414 [error] Task #PID<0.1794.0> started from Rti.Supervisor terminating
2020-03-02 21:13:52.476: ** (SystemLimitError) a system limit has been reached
2020-03-02 21:13:52.476:     :erlang.spawn_link(:proc_lib, :init_p, [#PID<0.1794.0>, [Rti.Supervisor, #PID<0.1657.0>], Task.Supervised, :noreply, [{:rti@JacksVSMIS1, #PID<0.1794.0>, #PID<0.1794.0>}, [#PID<0.1794.0>, #PID<0.1658.0>], {:erlang, :apply, [#Function<3.55826808/0 in ParallelStream.Workers.build!/3>, []]}]])
2020-03-02 21:13:52.476:     (stdlib) proc_lib.erl:102: :proc_lib.spawn_link/3
2020-03-02 21:13:52.476:     (elixir) lib/task/supervised.ex:10: Task.Supervised.start_link/3
2020-03-02 21:13:52.476:     (parallel_stream) lib/parallel_stream/workers.ex:32: anonymous fn/4 in ParallelStream.Workers.build!/3
2020-03-02 21:13:52.476:     (elixir) lib/enum.ex:1340: anonymous fn/3 in Enum.map/2
2020-03-02 21:13:52.476:     (elixir) lib/enum.ex:1953: Enum.map/2
2020-03-02 21:13:52.476:     (parallel_stream) lib/parallel_stream/workers.ex:31: ParallelStream.Workers.build!/3
2020-03-02 21:13:52.476:     (parallel_stream) lib/parallel_stream/producer.ex:23: anonymous fn/3 in ParallelStream.Producer.build!/4
2020-03-02 21:13:52.476: Function: #Function<13.30540616/0 in Rti.ImportRouter.start_link/1>
2020-03-02 21:13:52.476:     Args: []

I’m new to using observer, but when I start it up and run the service, I noticed that I’m getting thousands of processes with the name of “Elixir.StringIO:init/1” and a current function of “gen_server:loop/7” that never get cleaned up.

Clearly, I’m doing something wrong, but I’m not sure where to start troubleshooting this.

Can you show us this code?

Sure thing! Here’s a link to the Hex Package (https://www.hex.pm/packages/parallel_stream) but the relevant code is below.

defmodule ParallelStream.Producer do
  alias ParallelStream.Defaults
  alias ParallelStream.Workers
  alias ParallelStream.Executor

  def build!(stream, fun, options) do
    build!(stream, fun, Executor, options)
  end

  def build!(stream, fun, executor, options) do
    worker_work_ratio = options |> Keyword.get(:worker_work_ratio, Defaults.worker_work_ratio)
    worker_count = options |> Keyword.get(:num_workers, Defaults.num_workers)
    chunk_size = worker_count * worker_work_ratio

    stream
    |> Stream.chunk(chunk_size, chunk_size, [])
    |> Stream.transform(
    fn ->
      {
        inqueue,
        workers,
        outqueues
      } = worker_count |> Workers.build!(fun, executor)

      { inqueue, workers, outqueues, 0 }
    end,
    fn items, { inqueue, workers, outqueues, index } ->
      mapped = items |> Stream.with_index |> Enum.map(fn { item, i } -> 
      outqueue = outqueues |> Enum.at(rem(i, worker_count))
      inqueue |> send({ index + i, item, outqueue })

      { outqueue, index + i }
      end)

      { [mapped], { inqueue, workers, outqueues, index + chunk_size } }
    end,
    fn { inqueue, workers, outqueues, _ } ->
      inqueue |> send(:halt)
      outqueues |> Enum.each(fn outqueue -> outqueue |> send(:halt) end)
      workers |> Enum.each(fn worker -> worker |> send(:halt) end)
    end)
  end
end

Ah, I didn’t realize that was a library. I guess what I had in mind was whatever code you have that is using parallel stream, maybe this Rti.ImportRouter ? Are you explicitly using String.IO anywhere?

The only place where I’m calling StringIO is this sample below. If this doesn’t raise any red flags in your mind, I’ll pull the Rti.ImportRouter code.

 def convert_to_maps(csv_string) when is_binary(csv_string) do
    {:ok, stream} = StringIO.open(csv_string)

    stream
    |> IO.binstream(:line)
    |> convert_to_maps()
  end

  def convert_to_maps(csv_data) do
    # Transform from CSV data into maps
    csv_data
    |> CSV.decode(headers: true)
    |> Enum.filter(fn {result, _} -> result == :ok end)
    |> Enum.map(fn {_, d} -> d end)
    |> Enum.map(&normalize_booleans/1)
    |> Enum.map(&normalize_oa_dates/1)
    |> Enum.map(&normalize_oa_times/1)
    |> Enum.to_list()
  end

You need to close the opened StringIO:

https://hexdocs.pm/elixir/StringIO.html#close/1

Thats the first rule of resource consumption… If you open it, you need to close it…

3 Likes

Goodness gracious. What a silly mistake.

Thanks to both of you for your quick help. That was really fast!

1 Like

A while back I contributed a change to the standard library so that you can pass a function to StringIO.open/3 that will be called with the IO object and automagically close it when the function ends.

It would look something like:

{:ok, {rest_of_string, result}} = StringIO.open(csv_string, [], fn io_object ->
    IO.write(io_object, "Some string to put out there")
    StringIO.contents(io_object)
end)

StringIO.open will return a tulple of {:ok, <whatever the function returns>} on success

1 Like

Thanks for contributing it. Its a good fit for what I’m doing.