I have been trying to use Flow (GenStage) to stream AWS S3 files and write to AWS RDS DB. My implementation:
def load_file(file_name) do
window = Flow.Window.count(100)
file_name
|> HTTPStream.get()
|> HTTPStreamUtil.lines()
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", “”)
|> String.replace(""", “”)
|> String.split(";")
|> transform
end)
|> Flow.partition(window: window, key: {:key, “day_type_no”})
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_day_type
{[items], items}
end)
|> Flow.run()
end
It works fine when I run in local, but after I deployed my application to AWS (I have tried both EC2 and Fargate), it is not working as expected. I have checked the logs, and a wired [warn] message is shown and all data is not processed.
The [warn] message is:
[warn] ** Undefined handle_info in “GenStage.Streamer”
** Unhandled message: {:tcp, #Port<0.20>, " “SBST#c29 #01#06#2019#3”\r\nrec; 20200512; 32510; “SBST#37 #01#07#2019#4”\r\nrec; 20200512; 32511; “SBST#31 #03#00#2019#5”\r\nrec; 20200512; 32512; “SBST#298 #01#05#2019#6”\r\nrec; 20200512; 32513; “SBST#c40 #02#06#2019#7”\r\nrec; 20200512; 32514; “SBST#229 #02#06#2019#1”\r\nrec; 20200512; 32515; “SBST#298 #01#00#2019#2”\r\nrec; 20200512; 32516; “SBST#c291 #01#00#2019#3”\r\nrec; 20200512; 32517; “SBST#38 #03#00#2019#4”\r\nrec; 20200512; 32518; “SBST#33 #06#04#2019#5”\r\nrec; 20200512; 32519; “SBST#2N #01#05#2019#6”\r\nrec; 20200512; 32520; “SBST#401 #02#06#2019#7”\r\nrec; 20200512; 32521; “SBST#c23 #01#00#2019#1”\r\nrec; 20200512; 32523; “SBST#c291 #01#06#2019#3”\r\nrec; 20200512; 32524; “SBST#38 #01#07#2019#4”\r\nrec; 20200512; 32525; “SBST#33 #06#07#2019#5”\r\nrec; 20200512; 32526; “SBST#31 #01#05#2019#6”\r\nrec; 20200512; 32527; “SBST#42 #03#06#2019#7”\r\nrec; 20200512; 32528; “SBST#c23 #01#06#2019#1”\r\nrec; 20200512; 32529; “SBST#2N #01#65#2019#2”\r\nrec; 20200512; 32530; “SBST#292 #01#00#2019#3”\r\nrec; 20200512; 32531; “SBST#c39 #01#00#2019#4”\r\nrec; 20200512; 32532; “SBST#35 #02#00#2019#5”\r\nrec; 20200512; 32533; “SBST#33 #02#05#2019#6”\r\nrec; 20200512; 32534; “SBST#45 #01#06#2019#7”\r\nrec; 20200512; 32535; “SBST#c29 #01#00#2019#1”\r\nrec; 20200512; 32536; “SBST#31 #03#00#2019#2”\r\nrec; 20200512; 32537; “SBST#292 #01#06#2019#3”\r\nrec; 20200512; 32538; “SBST#c39 #01#07#2019#4”\r\nrec; 20200512; 32539; “SBST#37 #03#00#2019#5”\r\nrec; 20200512; 32540; “SBST#35 #02#05#2019#6”\r\nrec; 20200512; 32541; “SBST#c46 #03#06#2019#7”\r\nrec; 20200512; 32542; “SBST#c29 #01#06#2019#1”\r\nrec; 20200512; 32543; “SBST#33 #06#25#2019#2”\r\nrec; 20200512; 32544; “SBST#c293 #05#00#2019#3”\r\nrec; 20200512; 32545; “SBST#4 #02#00#2019#4”\r\nrec; 20200512; 32546; “SBST#37 #01#07#2019#5”\r\nrec; 20200512; 32547; “SBST#37 #01#05#2019#6”\r\nrec; 20200512; 32548; “SBST#47 #02#06#2019#7”\r\nrec; 20200512; 32549; “SBST#c291 #01#00#2019#1”\r\nrec; 20200512; 32550; “SBST#33 #06#07#2019#2”\r\nrec; 20200512; 32551; “SBST#c293 #01#06#2019#3”\r\nrec; 20200512; 32552; “SBST#c40 #02#00#2019#4”\r\nrec; 20200512; 32553; “SBST#38 #03#00#2019#5”\r\nrec; 20200512; 32554; “SBST#38 #03#05#2019#6”\r\nrec; 20200512; 32555; “SBST#4N #01#67#2019#7”\r\nrec; 20200512; 32556; “SBST#c291 #01#06#2019#1”\r\nrec; 20200512; 32557; “SBST#35 #02#00#2019#2”\r\nrec; 20200512; 32558; “SBST#298 #01#00#2019#3”\r\nrec; 20200512; 32559; “SBST#c40 #01#07#2019#4”\r\nrec; 20200512; 32560; “SBST#38 #01#07#2019#5”\r\nrec; 20200512; 32561; “SBST#c39 #01#05#2019#6”\r\nrec; 20200512; 32562; “SBST#506 #02#06#2019#7”\r\nrec; 20200512; 32563; “SBST#292 #01#00#2019#1”\r\nrec; 20200512; 32564; “SBST#37 #03#00#2019#2”\r\nrec; 202" <> …}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Does anyone has idea what actually happened?