I’m writing a service that processes raw data from a RabbitMQ queue and puts the results in a database. After a few days of running, the RabbitMQ connection starts dropping frames, but right before that happens. I noticed that I get a few “Too many processes” error messages (shown below).
2020-03-02 21:13:52.476: 15:13:52.414 [error] Too many processes
2020-03-02 21:13:52.476:
2020-03-02 21:13:52.476:
2020-03-02 21:13:52.476: 15:13:52.414 [error] Task #PID<0.1794.0> started from Rti.Supervisor terminating
2020-03-02 21:13:52.476: ** (SystemLimitError) a system limit has been reached
2020-03-02 21:13:52.476: :erlang.spawn_link(:proc_lib, :init_p, [#PID<0.1794.0>, [Rti.Supervisor, #PID<0.1657.0>], Task.Supervised, :noreply, [{:rti@JacksVSMIS1, #PID<0.1794.0>, #PID<0.1794.0>}, [#PID<0.1794.0>, #PID<0.1658.0>], {:erlang, :apply, [#Function<3.55826808/0 in ParallelStream.Workers.build!/3>, []]}]])
2020-03-02 21:13:52.476: (stdlib) proc_lib.erl:102: :proc_lib.spawn_link/3
2020-03-02 21:13:52.476: (elixir) lib/task/supervised.ex:10: Task.Supervised.start_link/3
2020-03-02 21:13:52.476: (parallel_stream) lib/parallel_stream/workers.ex:32: anonymous fn/4 in ParallelStream.Workers.build!/3
2020-03-02 21:13:52.476: (elixir) lib/enum.ex:1340: anonymous fn/3 in Enum.map/2
2020-03-02 21:13:52.476: (elixir) lib/enum.ex:1953: Enum.map/2
2020-03-02 21:13:52.476: (parallel_stream) lib/parallel_stream/workers.ex:31: ParallelStream.Workers.build!/3
2020-03-02 21:13:52.476: (parallel_stream) lib/parallel_stream/producer.ex:23: anonymous fn/3 in ParallelStream.Producer.build!/4
2020-03-02 21:13:52.476: Function: #Function<13.30540616/0 in Rti.ImportRouter.start_link/1>
2020-03-02 21:13:52.476: Args: []
I’m new to using observer, but when I start it up and run the service, I noticed that I’m getting thousands of processes with the name of “Elixir.StringIO:init/1” and a current function of “gen_server:loop/7” that never get cleaned up.
Clearly, I’m doing something wrong, but I’m not sure where to start troubleshooting this.
Ah, I didn’t realize that was a library. I guess what I had in mind was whatever code you have that is using parallel stream, maybe this Rti.ImportRouter ? Are you explicitly using String.IO anywhere?
A while back I contributed a change to the standard library so that you can pass a function to StringIO.open/3 that will be called with the IO object and automagically close it when the function ends.
It would look something like:
{:ok, {rest_of_string, result}} = StringIO.open(csv_string, [], fn io_object ->
IO.write(io_object, "Some string to put out there")
StringIO.contents(io_object)
end)
StringIO.open will return a tulple of {:ok, <whatever the function returns>} on success