Elixir Flow [warn] ** Undefined handle_info in "GenStage.Streamer", all flow data is not processed

I have been trying to use Flow (GenStage) to stream AWS S3 files and write to AWS RDS DB. My implementation:
def load_file(file_name) do
window = Flow.Window.count(100)
file_name
|> HTTPStream.get()
|> HTTPStreamUtil.lines()
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", “”)
|> String.replace(""", “”)
|> String.split(";")
|> transform
end)
|> Flow.partition(window: window, key: {:key, “day_type_no”})
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_day_type
{[items], items}
end)
|> Flow.run()
end

It works fine when I run in local, but after I deployed my application to AWS (I have tried both EC2 and Fargate), it is not working as expected. I have checked the logs, and a wired [warn] message is shown and all data is not processed.
The [warn] message is:
[warn] ** Undefined handle_info in “GenStage.Streamer”
** Unhandled message: {:tcp, #Port<0.20>, " “SBST#c29 #01#06#2019#3”\r\nrec; 20200512; 32510; “SBST#37 #01#07#2019#4”\r\nrec; 20200512; 32511; “SBST#31 #03#00#2019#5”\r\nrec; 20200512; 32512; “SBST#298 #01#05#2019#6”\r\nrec; 20200512; 32513; “SBST#c40 #02#06#2019#7”\r\nrec; 20200512; 32514; “SBST#229 #02#06#2019#1”\r\nrec; 20200512; 32515; “SBST#298 #01#00#2019#2”\r\nrec; 20200512; 32516; “SBST#c291 #01#00#2019#3”\r\nrec; 20200512; 32517; “SBST#38 #03#00#2019#4”\r\nrec; 20200512; 32518; “SBST#33 #06#04#2019#5”\r\nrec; 20200512; 32519; “SBST#2N #01#05#2019#6”\r\nrec; 20200512; 32520; “SBST#401 #02#06#2019#7”\r\nrec; 20200512; 32521; “SBST#c23 #01#00#2019#1”\r\nrec; 20200512; 32523; “SBST#c291 #01#06#2019#3”\r\nrec; 20200512; 32524; “SBST#38 #01#07#2019#4”\r\nrec; 20200512; 32525; “SBST#33 #06#07#2019#5”\r\nrec; 20200512; 32526; “SBST#31 #01#05#2019#6”\r\nrec; 20200512; 32527; “SBST#42 #03#06#2019#7”\r\nrec; 20200512; 32528; “SBST#c23 #01#06#2019#1”\r\nrec; 20200512; 32529; “SBST#2N #01#65#2019#2”\r\nrec; 20200512; 32530; “SBST#292 #01#00#2019#3”\r\nrec; 20200512; 32531; “SBST#c39 #01#00#2019#4”\r\nrec; 20200512; 32532; “SBST#35 #02#00#2019#5”\r\nrec; 20200512; 32533; “SBST#33 #02#05#2019#6”\r\nrec; 20200512; 32534; “SBST#45 #01#06#2019#7”\r\nrec; 20200512; 32535; “SBST#c29 #01#00#2019#1”\r\nrec; 20200512; 32536; “SBST#31 #03#00#2019#2”\r\nrec; 20200512; 32537; “SBST#292 #01#06#2019#3”\r\nrec; 20200512; 32538; “SBST#c39 #01#07#2019#4”\r\nrec; 20200512; 32539; “SBST#37 #03#00#2019#5”\r\nrec; 20200512; 32540; “SBST#35 #02#05#2019#6”\r\nrec; 20200512; 32541; “SBST#c46 #03#06#2019#7”\r\nrec; 20200512; 32542; “SBST#c29 #01#06#2019#1”\r\nrec; 20200512; 32543; “SBST#33 #06#25#2019#2”\r\nrec; 20200512; 32544; “SBST#c293 #05#00#2019#3”\r\nrec; 20200512; 32545; “SBST#4 #02#00#2019#4”\r\nrec; 20200512; 32546; “SBST#37 #01#07#2019#5”\r\nrec; 20200512; 32547; “SBST#37 #01#05#2019#6”\r\nrec; 20200512; 32548; “SBST#47 #02#06#2019#7”\r\nrec; 20200512; 32549; “SBST#c291 #01#00#2019#1”\r\nrec; 20200512; 32550; “SBST#33 #06#07#2019#2”\r\nrec; 20200512; 32551; “SBST#c293 #01#06#2019#3”\r\nrec; 20200512; 32552; “SBST#c40 #02#00#2019#4”\r\nrec; 20200512; 32553; “SBST#38 #03#00#2019#5”\r\nrec; 20200512; 32554; “SBST#38 #03#05#2019#6”\r\nrec; 20200512; 32555; “SBST#4N #01#67#2019#7”\r\nrec; 20200512; 32556; “SBST#c291 #01#06#2019#1”\r\nrec; 20200512; 32557; “SBST#35 #02#00#2019#2”\r\nrec; 20200512; 32558; “SBST#298 #01#00#2019#3”\r\nrec; 20200512; 32559; “SBST#c40 #01#07#2019#4”\r\nrec; 20200512; 32560; “SBST#38 #01#07#2019#5”\r\nrec; 20200512; 32561; “SBST#c39 #01#05#2019#6”\r\nrec; 20200512; 32562; “SBST#506 #02#06#2019#7”\r\nrec; 20200512; 32563; “SBST#292 #01#00#2019#1”\r\nrec; 20200512; 32564; “SBST#37 #03#00#2019#2”\r\nrec; 202" <> …}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

Does anyone has idea what actually happened?

As a start, could you please surround your code blocks with triple backticks? (```)

Hello!

+1 to the code formatting

as for question, could you please try instead of streaming file from S3 to Flow, download it to the disk (/tmp) first and then processes with Flow API. The reason I suggest that is I am working on map/reduce POC and processing ~5Tb volume/70K files from S3 with Flow API every 2 hours with zero glitch.
My data flow however is

S3 -> Elixir.Flow -> S3

Thank you.

Hi @dimitarvp, I would like to do the code formatting. May I know how to edit the post? I did not find an edit button, or do I have access right to edit? Thanks.

Hi @frumos, good idea. Let me try with downloading the file then process. Will update you the results. Thanks

Well, if you missed the edit window, make a new comment with the formatted code.

The sample code is:

  def load_file(file_name) do
    file_name
    |> HTTPStream.get()
    |> HTTPStreamUtil.lines()
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: Flow.Window.count(1_000))
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_day_type
      {[items], items}
    end)
    |> Flow.run()
  end

The [warn] message:

[warn] ** Undefined handle_info in “GenStage.Streamer”
** Unhandled message: {:tcp, #Port<0.20>, " “SBST#c29 #01#06#2019#3”\r\nrec; 20200512; 32510; “SBST#37 #01#07#2019#4”\r\nrec; 20200512; 32511; “SBST#31 #03#00#2019#5”\r\nrec; 20200512; 32512; “SBST#298 #01#05#2019#6”\r\nrec; 20200512; 32513; “SBST#c40 #02#06#2019#7”\r\nrec; 20200512; 32514; “SBST#229 #02#06#2019#1”\r\nrec; 20200512; 32515; “SBST#298 #01#00#2019#2”\r\nrec; 20200512; 32516; “SBST#c291 #01#00#2019#3”\r\nrec; 20200512; 32517; “SBST#38 #03#00#2019#4”\r\nrec; 20200512; 32518; “SBST#33 #06#04#2019#5”\r\nrec; 20200512; 32519; “SBST#2N #01#05#2019#6”\r\nrec; 20200512; 32520; “SBST#401 #02#06#2019#7”\r\nrec; 20200512; 32521; “SBST#c23 #01#00#2019#1”\r\nrec; 20200512; 32523; “SBST#c291 #01#06#2019#3”\r\nrec; 20200512; 32524; “SBST#38 #01#07#2019#4”\r\nrec; 20200512; 32525; “SBST#33 #06#07#2019#5”\r\nrec; 20200512; 32526; “SBST#31 #01#05#2019#6”\r\nrec; 20200512; 32527; “SBST#42 #03#06#2019#7”\r\nrec; 20200512; 32528; “SBST#c23 #01#06#2019#1”\r\nrec; 20200512; 32529; “SBST#2N #01#65#2019#2”\r\nrec; 20200512; 32530; “SBST#292 #01#00#2019#3”\r\nrec; 20200512; 32531; “SBST#c39 #01#00#2019#4”\r\nrec; 20200512; 32532; “SBST#35 #02#00#2019#5”\r\nrec; 20200512; 32533; “SBST#33 #02#05#2019#6”\r\nrec; 20200512; 32534; “SBST#45 #01#06#2019#7”\r\nrec; 20200512; 32535; “SBST#c29 #01#00#2019#1”\r\nrec; 20200512; 32536; “SBST#31 #03#00#2019#2”\r\nrec; 20200512; 32537; “SBST#292 #01#06#2019#3”\r\nrec; 20200512; 32538; “SBST#c39 #01#07#2019#4”\r\nrec; 20200512; 32539; “SBST#37 #03#00#2019#5”\r\nrec; 20200512; 32540; “SBST#35 #02#05#2019#6”\r\nrec; 20200512; 32541; “SBST#c46 #03#06#2019#7”\r\nrec; 20200512; 32542; “SBST#c29 #01#06#2019#1”\r\nrec; 20200512; 32543; “SBST#33 #06#25#2019#2”\r\nrec; 20200512; 32544; “SBST#c293 #05#00#2019#3”\r\nrec; 20200512; 32545; “SBST#4 #02#00#2019#4”\r\nrec; 20200512; 32546; “SBST#37 #01#07#2019#5”\r\nrec; 20200512; 32547; “SBST#37 #01#05#2019#6”\r\nrec; 20200512; 32548; “SBST#47 #02#06#2019#7”\r\nrec; 20200512; 32549; “SBST#c291 #01#00#2019#1”\r\nrec; 20200512; 32550; “SBST#33 #06#07#2019#2”\r\nrec; 20200512; 32551; “SBST#c293 #01#06#2019#3”\r\nrec; 20200512; 32552; “SBST#c40 #02#00#2019#4”\r\nrec; 20200512; 32553; “SBST#38 #03#00#2019#5”\r\nrec; 20200512; 32554; “SBST#38 #03#05#2019#6”\r\nrec; 20200512; 32555; “SBST#4N #01#67#2019#7”\r\nrec; 20200512; 32556; “SBST#c291 #01#06#2019#1”\r\nrec; 20200512; 32557; “SBST#35 #02#00#2019#2”\r\nrec; 20200512; 32558; “SBST#298 #01#00#2019#3”\r\nrec; 20200512; 32559; “SBST#c40 #01#07#2019#4”\r\nrec; 20200512; 32560; “SBST#38 #01#07#2019#5”\r\nrec; 20200512; 32561; “SBST#c39 #01#05#2019#6”\r\nrec; 20200512; 32562; “SBST#506 #02#06#2019#7”\r\nrec; 20200512; 32563; “SBST#292 #01#00#2019#1”\r\nrec; 20200512; 32564; “SBST#37 #03#00#2019#2”\r\nrec; 202" <> …}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
1 Like

My guess would be that your HTTPStreamUtil.lines/1, returns the tuple {:tcp, PortRef, “long string”} instead of the expected Enumerable (Stream?) causing Flow.from_enumerable/2 to fail.

Hi @frumos, thanks for the suggestion.
I have tried your approach that download the file from S3 to a mounted docker volume then stream the data, it worked. Below are the changes.

  def load_file(file_name) do
    # download vdv452 file
    file_path = Application.get_env(:ex_aws, :docker_volume) <> (String.split(file_name, "/") |> List.last)
    ExAws.S3.download_file(Application.get_env(:ex_aws, :s3_bucket_name), file_name, file_path, timeout: 3600)
    |> ExAws.request!

    # stream file data
    file_path
    |> File.stream!
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: Flow.Window.count(1_000))
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_route_sequence
      {[items], items}
    end)
    |> Flow.run()
  end

Thanks @krstfk, but it is very unlikely the case, cos when I run from local, it does not give me the problem.

Still, for some reason GenStage.Streamer chokes on a message that has the shape of a message that would be produced by gen_tcp which has the option {:active, true} or :active, n where n >= 0 (i.e. {:tcp, socket, message}).

I’m unsure how this message reaches the GenStage.Streamer process, still I would look at either HttpStream.get or HttpStreamUtil.lines as the likely culprit.

2 Likes