Elixir Flow fonctionne sur la machine locale, mais ne traite pas les données et renvoie: [warn] ** handle_info non défini dans «GenStage.Streamer» dans AWS Fargate
J'ai essayé d'utiliser Elixir Flow (Lib construit à partir de GenStage) pour traiter / diffuser des données à partir de fichiers de compartiment AWS S3 et écrire dans AWS RDS DB.
J'ai pu le faire avec succès sur ma machine locale, mais lorsque j'ai déployé mon application sur AWS ECS / Fargate ou EC2, cela ne fonctionne pas comme prévu. Voici ma mise en œuvre:
def load_file(file_name) do
window = Flow.Window.count(100)
file_name
|> HTTPStream.get()
|> HTTPStreamUtil.lines()
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", "")
|> String.replace("\"", "")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: window, key: {:key, "day_type_no"})
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_day_type
{[items], items}
end)
|> Flow.run()
end
Lorsque j'exécute mon application dans AWS Fargate ou EC2, je vois le journal ci-dessous et une partie / toutes les données ne sont pas traitées.
[warn] ** Undefined handle_info in "GenStage.Streamer"
** Unhandled message: {:tcp, #Port<0.20>, " \"SBST#c29 #01#06#2019#3\"\r\nrec; 20200512; 32510; \"SBST#37 #01#07#2019#4\"\r\nrec; 20200512; 32511; \"SBST#31 #03#00#2019#5\"\r\nrec; 20200512; 32512; \"SBST#298 #01#05#2019#6\"\r\nrec; 20200512; 32513; \"SBST#c40 #02#06#2019#7\"\r\nrec; 20200512; 32514; \"SBST#229 #02#06#2019#1\"\r\nrec; 20200512; 32515; \"SBST#298 #01#00#2019#2\"\r\nrec; 20200512; 32516; \"SBST#c291 #01#00#2019#3\"\r\nrec; 20200512; 32517; \"SBST#38 #03#00#2019#4\"\r\nrec; 20200512; 32518; \"SBST#33 #06#04#2019#5\"\r\nrec; 20200512; 32519; \"SBST#2N #01#05#2019#6\"\r\nrec; 20200512; 32520; \"SBST#401 #02#06#2019#7\"\r\nrec; 20200512; 32521; \"SBST#c23 #01#00#2019#1\"\r\nrec; 20200512; 32523; \"SBST#c291 #01#06#2019#3\"\r\nrec; 20200512; 32524; \"SBST#38 #01#07#2019#4\"\r\nrec; 20200512; 32525; \"SBST#33 #06#07#2019#5\"\r\nrec; 20200512; 32526; \"SBST#31 #01#05#2019#6\"\r\nrec; 20200512; 32527; \"SBST#42 #03#06#2019#7\"\r\nrec; 20200512; 32528; \"SBST#c23 #01#06#2019#1\"\r\nrec; 20200512; 32529; \"SBST#2N #01#65#2019#2\"\r\nrec; 20200512; 32530; \"SBST#292 #01#00#2019#3\"\r\nrec; 20200512; 32531; \"SBST#c39 #01#00#2019#4\"\r\nrec; 20200512; 32532; \"SBST#35 #02#00#2019#5\"\r\nrec; 20200512; 32533; \"SBST#33 #02#05#2019#6\"\r\nrec; 20200512; 32534; \"SBST#45 #01#06#2019#7\"\r\nrec; 20200512; 32535; \"SBST#c29 #01#00#2019#1\"\r\nrec; 20200512; 32536; \"SBST#31 #03#00#2019#2\"\r\nrec; 20200512; 32537; \"SBST#292 #01#06#2019#3\"\r\nrec; 20200512; 32538; \"SBST#c39 #01#07#2019#4\"\r\nrec; 20200512; 32539; \"SBST#37 #03#00#2019#5\"\r\nrec; 20200512; 32540; \"SBST#35 #02#05#2019#6\"\r\nrec; 20200512; 32541; \"SBST#c46 #03#06#2019#7\"\r\nrec; 20200512; 32542; \"SBST#c29 #01#06#2019#1\"\r\nrec; 20200512; 32543; \"SBST#33 #06#25#2019#2\"\r\nrec; 20200512; 32544; \"SBST#c293 #05#00#2019#3\"\r\nrec; 20200512; 32545; \"SBST#4 #02#00#2019#4\"\r\nrec; 20200512; 32546; \"SBST#37 #01#07#2019#5\"\r\nrec; 20200512; 32547; \"SBST#37 #01#05#2019#6\"\r\nrec; 20200512; 32548; \"SBST#47 #02#06#2019#7\"\r\nrec; 20200512; 32549; \"SBST#c291 #01#00#2019#1\"\r\nrec; 20200512; 32550; \"SBST#33 #06#07#2019#2\"\r\nrec; 20200512; 32551; \"SBST#c293 #01#06#2019#3\"\r\nrec; 20200512; 32552; \"SBST#c40 #02#00#2019#4\"\r\nrec; 20200512; 32553; \"SBST#38 #03#00#2019#5\"\r\nrec; 20200512; 32554; \"SBST#38 #03#05#2019#6\"\r\nrec; 20200512; 32555; \"SBST#4N #01#67#2019#7\"\r\nrec; 20200512; 32556; \"SBST#c291 #01#06#2019#1\"\r\nrec; 20200512; 32557; \"SBST#35 #02#00#2019#2\"\r\nrec; 20200512; 32558; \"SBST#298 #01#00#2019#3\"\r\nrec; 20200512; 32559; \"SBST#c40 #01#07#2019#4\"\r\nrec; 20200512; 32560; \"SBST#38 #01#07#2019#5\"\r\nrec; 20200512; 32561; \"SBST#c39 #01#05#2019#6\"\r\nrec; 20200512; 32562; \"SBST#506 #02#06#2019#7\"\r\nrec; 20200512; 32563; \"SBST#292 #01#00#2019#1\"\r\nrec; 20200512; 32564; \"SBST#37 #03#00#2019#2\"\r\nrec; 202" <> ...}
** Stream started at:
(gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
(stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
(stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
(stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
(stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
(stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
(stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
(stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
J'ai essayé avec différents Flow.Window, par exemple Count, Global, etc., aucun d'eux ne fonctionne comme prévu. De plus, je soupçonne que mon serveur / ma tâche à distance n'a pas activé le processeur / la mémoire, j'ai essayé d'augmenter le processeur de la tâche à 2vCPU, cela a aidé à traiter plus de données, mais pas toutes les données.
Le truc filaire est que ce code fonctionne parfaitement dans ma machine locale (probablement parce que mon pc est puissant ??). Je voudrais vérifier avec vous les gars pour voir que quelqu'un a une idée de pourquoi cela pourrait arriver?
Merci beaucoup.
Réponses
J'ai essayé une autre approche qui télécharge le fichier de S3 vers un volume de docker monté puis diffuse les données, cela a fonctionné. Voici les changements.
def load_file(file_name) do
# download vdv452 file
file_path = Application.get_env(:ex_aws, :docker_volume) <> (String.split(file_name, "/") |> List.last)
ExAws.S3.download_file(Application.get_env(:ex_aws, :s3_bucket_name), file_name, file_path, timeout: 3600)
|> ExAws.request!
# stream file data
file_path
|> File.stream!
|> Flow.from_enumerable()
|> Flow.filter(&(String.match?(&1, ~r/^rec/)))
|> Flow.map(fn line ->
line
|> String.replace("\n", "")
|> String.replace("\"", "")
|> String.split(";")
|> transform
end)
|> Flow.partition(window: Flow.Window.count(1_000))
|> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
|> Flow.on_trigger(fn items ->
items
|> add_timestamps
|> Database.Routes.create_multiple_route_sequence
{[items], items}
end)
|> Flow.run()
end