Elixir Flow bekerja di mesin lokal, tetapi tidak memproses data dan membuang: [peringatan] ** handle_info tidak ditentukan di “GenStage.Streamer” di AWS Fargate

Aug 18 2020

Saya telah mencoba menggunakan Elixir Flow (Lib dibangun dari GenStage) untuk memproses / mengalirkan data dari file bucket AWS S3, dan menulis ke AWS RDS DB.

Saya berhasil melakukan ini di komputer lokal saya, tetapi ketika saya menerapkan aplikasi saya ke AWS ECS / Fargate atau EC2, aplikasi tidak berfungsi seperti yang diharapkan. Di bawah ini adalah implementasi saya:

  def load_file(file_name) do
    window = Flow.Window.count(100)
    file_name
    |> HTTPStream.get()
    |> HTTPStreamUtil.lines()
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: window, key: {:key, "day_type_no"})
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_day_type
      {[items], items}
    end)
    |> Flow.run()
  end

Saat saya menjalankan aplikasi saya di AWS Fargate atau EC2, saya melihat log di bawah ini dan sebagian / semua data tidak diproses.

[warn] ** Undefined handle_info in "GenStage.Streamer"
** Unhandled message: {:tcp, #Port<0.20>, "    \"SBST#c29      #01#06#2019#3\"\r\nrec;    20200512;       32510;              \"SBST#37       #01#07#2019#4\"\r\nrec;    20200512;       32511;              \"SBST#31       #03#00#2019#5\"\r\nrec;    20200512;       32512;              \"SBST#298      #01#05#2019#6\"\r\nrec;    20200512;       32513;              \"SBST#c40      #02#06#2019#7\"\r\nrec;    20200512;       32514;              \"SBST#229      #02#06#2019#1\"\r\nrec;    20200512;       32515;              \"SBST#298      #01#00#2019#2\"\r\nrec;    20200512;       32516;              \"SBST#c291     #01#00#2019#3\"\r\nrec;    20200512;       32517;              \"SBST#38       #03#00#2019#4\"\r\nrec;    20200512;       32518;              \"SBST#33       #06#04#2019#5\"\r\nrec;    20200512;       32519;              \"SBST#2N       #01#05#2019#6\"\r\nrec;    20200512;       32520;              \"SBST#401      #02#06#2019#7\"\r\nrec;    20200512;       32521;              \"SBST#c23      #01#00#2019#1\"\r\nrec;    20200512;       32523;              \"SBST#c291     #01#06#2019#3\"\r\nrec;    20200512;       32524;              \"SBST#38       #01#07#2019#4\"\r\nrec;    20200512;       32525;              \"SBST#33       #06#07#2019#5\"\r\nrec;    20200512;       32526;              \"SBST#31       #01#05#2019#6\"\r\nrec;    20200512;       32527;              \"SBST#42       #03#06#2019#7\"\r\nrec;    20200512;       32528;              \"SBST#c23      #01#06#2019#1\"\r\nrec;    20200512;       32529;              \"SBST#2N       #01#65#2019#2\"\r\nrec;    20200512;       32530;              \"SBST#292      #01#00#2019#3\"\r\nrec;    20200512;       32531;              \"SBST#c39      #01#00#2019#4\"\r\nrec;    20200512;       32532;              \"SBST#35       #02#00#2019#5\"\r\nrec;    20200512;       32533;              \"SBST#33       #02#05#2019#6\"\r\nrec;    20200512;       32534;              \"SBST#45       #01#06#2019#7\"\r\nrec;    20200512;       32535;              \"SBST#c29      #01#00#2019#1\"\r\nrec;    20200512;       32536;              \"SBST#31       #03#00#2019#2\"\r\nrec;    20200512;       32537;              \"SBST#292      #01#06#2019#3\"\r\nrec;    20200512;       32538;              \"SBST#c39      #01#07#2019#4\"\r\nrec;    20200512;       32539;              \"SBST#37       #03#00#2019#5\"\r\nrec;    20200512;       32540;              \"SBST#35       #02#05#2019#6\"\r\nrec;    20200512;       32541;              \"SBST#c46      #03#06#2019#7\"\r\nrec;    20200512;       32542;              \"SBST#c29      #01#06#2019#1\"\r\nrec;    20200512;       32543;              \"SBST#33       #06#25#2019#2\"\r\nrec;    20200512;       32544;              \"SBST#c293     #05#00#2019#3\"\r\nrec;    20200512;       32545;              \"SBST#4        #02#00#2019#4\"\r\nrec;    20200512;       32546;              \"SBST#37       #01#07#2019#5\"\r\nrec;    20200512;       32547;              \"SBST#37       #01#05#2019#6\"\r\nrec;    20200512;       32548;              \"SBST#47       #02#06#2019#7\"\r\nrec;    20200512;       32549;              \"SBST#c291     #01#00#2019#1\"\r\nrec;    20200512;       32550;              \"SBST#33       #06#07#2019#2\"\r\nrec;    20200512;       32551;              \"SBST#c293     #01#06#2019#3\"\r\nrec;    20200512;       32552;              \"SBST#c40      #02#00#2019#4\"\r\nrec;    20200512;       32553;              \"SBST#38       #03#00#2019#5\"\r\nrec;    20200512;       32554;              \"SBST#38       #03#05#2019#6\"\r\nrec;    20200512;       32555;              \"SBST#4N       #01#67#2019#7\"\r\nrec;    20200512;       32556;              \"SBST#c291     #01#06#2019#1\"\r\nrec;    20200512;       32557;              \"SBST#35       #02#00#2019#2\"\r\nrec;    20200512;       32558;              \"SBST#298      #01#00#2019#3\"\r\nrec;    20200512;       32559;              \"SBST#c40      #01#07#2019#4\"\r\nrec;    20200512;       32560;              \"SBST#38       #01#07#2019#5\"\r\nrec;    20200512;       32561;              \"SBST#c39      #01#05#2019#6\"\r\nrec;    20200512;       32562;              \"SBST#506      #02#06#2019#7\"\r\nrec;    20200512;       32563;              \"SBST#292      #01#00#2019#1\"\r\nrec;    20200512;       32564;              \"SBST#37       #03#00#2019#2\"\r\nrec;    202" <> ...}
** Stream started at:
    (gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
    (stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
    (stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
    (stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
    (stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
    (stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
    (stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

Saya telah mencoba dengan Flow.Window yang berbeda misalnya Hitung, Global dll, tidak ada yang berfungsi seperti yang diharapkan. Juga saya curiga bisa jadi server / tugas jarak jauh saya tidak mengaktifkan CPU / Memori, saya telah mencoba meningkatkan CPU tugas ke 2vCPU, itu membantu untuk memproses lebih banyak data, tetapi tidak semua data.

Hal berkabel adalah bahwa kode ini bekerja dengan sempurna di mesin lokal saya (karena banyak pc saya kuat ??). Saya ingin menanyakan kepada kalian semua untuk melihat ada yang tahu mengapa ini bisa terjadi?

Terima kasih banyak.

Jawaban

1 druidccsos Aug 19 2020 at 22:18

Saya telah mencoba pendekatan lain yang mengunduh file dari S3 ke volume buruh pelabuhan yang terpasang kemudian mengalirkan data, itu berhasil. Berikut adalah perubahannya.

  def load_file(file_name) do
    # download vdv452 file
    file_path = Application.get_env(:ex_aws, :docker_volume) <> (String.split(file_name, "/") |> List.last)
    ExAws.S3.download_file(Application.get_env(:ex_aws, :s3_bucket_name), file_name, file_path, timeout: 3600)
    |> ExAws.request!

    # stream file data
    file_path
    |> File.stream!
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: Flow.Window.count(1_000))
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_route_sequence
      {[items], items}
    end)
    |> Flow.run()
  end