Elixir Flow funziona nella macchina locale, ma non elabora i dati e genera: [warn] ** handle_info non definito in "GenStage.Streamer" in AWS Fargate
Ho provato a utilizzare Elixir Flow (Lib creato da GenStage) per elaborare / trasmettere dati dai file bucket AWS S3 e scrivere su AWS RDS DB.
Sono stato in grado di farlo con successo sulla mia macchina locale, ma quando ho distribuito la mia app su AWS ECS / Fargate o EC2, non funziona come previsto. Di seguito è la mia implementazione:
def load_file(file_name) do
window = Flow.Window.count(100)
file_name
|> HTTPStream.get()
|> HTTPStreamUtil.lines()
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", "")
|> String.replace("\"", "")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: window, key: {:key, "day_type_no"})
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_day_type
{[items], items}
end)
|> Flow.run()
end
Quando eseguo la mia applicazione in AWS Fargate o EC2, vedo il log di seguito e parte / tutti i dati non vengono elaborati.
[warn] ** Undefined handle_info in "GenStage.Streamer"
** Unhandled message: {:tcp, #Port<0.20>, " \"SBST#c29 #01#06#2019#3\"\r\nrec; 20200512; 32510; \"SBST#37 #01#07#2019#4\"\r\nrec; 20200512; 32511; \"SBST#31 #03#00#2019#5\"\r\nrec; 20200512; 32512; \"SBST#298 #01#05#2019#6\"\r\nrec; 20200512; 32513; \"SBST#c40 #02#06#2019#7\"\r\nrec; 20200512; 32514; \"SBST#229 #02#06#2019#1\"\r\nrec; 20200512; 32515; \"SBST#298 #01#00#2019#2\"\r\nrec; 20200512; 32516; \"SBST#c291 #01#00#2019#3\"\r\nrec; 20200512; 32517; \"SBST#38 #03#00#2019#4\"\r\nrec; 20200512; 32518; \"SBST#33 #06#04#2019#5\"\r\nrec; 20200512; 32519; \"SBST#2N #01#05#2019#6\"\r\nrec; 20200512; 32520; \"SBST#401 #02#06#2019#7\"\r\nrec; 20200512; 32521; \"SBST#c23 #01#00#2019#1\"\r\nrec; 20200512; 32523; \"SBST#c291 #01#06#2019#3\"\r\nrec; 20200512; 32524; \"SBST#38 #01#07#2019#4\"\r\nrec; 20200512; 32525; \"SBST#33 #06#07#2019#5\"\r\nrec; 20200512; 32526; \"SBST#31 #01#05#2019#6\"\r\nrec; 20200512; 32527; \"SBST#42 #03#06#2019#7\"\r\nrec; 20200512; 32528; \"SBST#c23 #01#06#2019#1\"\r\nrec; 20200512; 32529; \"SBST#2N #01#65#2019#2\"\r\nrec; 20200512; 32530; \"SBST#292 #01#00#2019#3\"\r\nrec; 20200512; 32531; \"SBST#c39 #01#00#2019#4\"\r\nrec; 20200512; 32532; \"SBST#35 #02#00#2019#5\"\r\nrec; 20200512; 32533; \"SBST#33 #02#05#2019#6\"\r\nrec; 20200512; 32534; \"SBST#45 #01#06#2019#7\"\r\nrec; 20200512; 32535; \"SBST#c29 #01#00#2019#1\"\r\nrec; 20200512; 32536; \"SBST#31 #03#00#2019#2\"\r\nrec; 20200512; 32537; \"SBST#292 #01#06#2019#3\"\r\nrec; 20200512; 32538; \"SBST#c39 #01#07#2019#4\"\r\nrec; 20200512; 32539; \"SBST#37 #03#00#2019#5\"\r\nrec; 20200512; 32540; \"SBST#35 #02#05#2019#6\"\r\nrec; 20200512; 32541; \"SBST#c46 #03#06#2019#7\"\r\nrec; 20200512; 32542; \"SBST#c29 #01#06#2019#1\"\r\nrec; 20200512; 32543; \"SBST#33 #06#25#2019#2\"\r\nrec; 20200512; 32544; \"SBST#c293 #05#00#2019#3\"\r\nrec; 20200512; 32545; \"SBST#4 #02#00#2019#4\"\r\nrec; 20200512; 32546; \"SBST#37 #01#07#2019#5\"\r\nrec; 20200512; 32547; \"SBST#37 #01#05#2019#6\"\r\nrec; 20200512; 32548; \"SBST#47 #02#06#2019#7\"\r\nrec; 20200512; 32549; \"SBST#c291 #01#00#2019#1\"\r\nrec; 20200512; 32550; \"SBST#33 #06#07#2019#2\"\r\nrec; 20200512; 32551; \"SBST#c293 #01#06#2019#3\"\r\nrec; 20200512; 32552; \"SBST#c40 #02#00#2019#4\"\r\nrec; 20200512; 32553; \"SBST#38 #03#00#2019#5\"\r\nrec; 20200512; 32554; \"SBST#38 #03#05#2019#6\"\r\nrec; 20200512; 32555; \"SBST#4N #01#67#2019#7\"\r\nrec; 20200512; 32556; \"SBST#c291 #01#06#2019#1\"\r\nrec; 20200512; 32557; \"SBST#35 #02#00#2019#2\"\r\nrec; 20200512; 32558; \"SBST#298 #01#00#2019#3\"\r\nrec; 20200512; 32559; \"SBST#c40 #01#07#2019#4\"\r\nrec; 20200512; 32560; \"SBST#38 #01#07#2019#5\"\r\nrec; 20200512; 32561; \"SBST#c39 #01#05#2019#6\"\r\nrec; 20200512; 32562; \"SBST#506 #02#06#2019#7\"\r\nrec; 20200512; 32563; \"SBST#292 #01#00#2019#1\"\r\nrec; 20200512; 32564; \"SBST#37 #03#00#2019#2\"\r\nrec; 202" <> ...}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Ho provato con diversi Flow.Window ad esempio Count, Global ecc, nessuno di loro funziona come previsto. Inoltre sospetto che possa essere il mio server / attività remoto non abilitato CPU / memoria, ho provato ad aumentare la CPU dell'attività a 2vCPU, ha aiutato a elaborare più dati, ma non tutti i dati.
La cosa cablata è che questo codice funziona perfettamente nella mia macchina locale (probabilmente perché il mio PC è potente ??). Vorrei controllare con voi ragazzi per vedere qualcuno ha idea del perché questo potrebbe accadere?
Grazie mille.
Risposte
Ho provato un altro approccio che scarica il file da S3 a un volume docker montato e quindi trasmette i dati, ha funzionato. Di seguito sono riportate le modifiche.
def load_file(file_name) do
# download vdv452 file
file_path = Application.get_env(:ex_aws, :docker_volume) <> (String.split(file_name, "/") |> List.last)
ExAws.S3.download_file(Application.get_env(:ex_aws, :s3_bucket_name), file_name, file_path, timeout: 3600)
|> ExAws.request!
# stream file data
file_path
|> File.stream!
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", "")
|> String.replace("\"", "")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: Flow.Window.count(1_000))
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_route_sequence
{[items], items}
end)
|> Flow.run()
end