Elixir Flow는 로컬 시스템에서 작동하지만 데이터를 처리하지 않고 다음을 throw하지 않습니다. [warn] ** AWS Fargate의 "GenStage.Streamer"에서 정의되지 않은 handle_info

Aug 18 2020

Elixir Flow (GenStage에서 구축 된 라이브러리)를 사용하여 AWS S3 버킷 파일의 데이터를 처리 / 스트리밍하고 AWS RDS DB에 쓰려고했습니다.

로컬 머신에서이 작업을 성공적으로 수행 할 수 있었지만 앱을 AWS ECS / Fargate 또는 EC2에 배포했을 때 예상대로 작동하지 않습니다. 아래는 내 구현입니다.

  def load_file(file_name) do
    window = Flow.Window.count(100)
    file_name
    |> HTTPStream.get()
    |> HTTPStreamUtil.lines()
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: window, key: {:key, "day_type_no"})
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_day_type
      {[items], items}
    end)
    |> Flow.run()
  end

AWS Fargate 또는 EC2에서 애플리케이션을 실행할 때 아래 로그가 표시되고 데이터의 일부 / 전체가 처리되지 않습니다.

[warn] ** Undefined handle_info in "GenStage.Streamer"
** Unhandled message: {:tcp, #Port<0.20>, "    \"SBST#c29      #01#06#2019#3\"\r\nrec;    20200512;       32510;              \"SBST#37       #01#07#2019#4\"\r\nrec;    20200512;       32511;              \"SBST#31       #03#00#2019#5\"\r\nrec;    20200512;       32512;              \"SBST#298      #01#05#2019#6\"\r\nrec;    20200512;       32513;              \"SBST#c40      #02#06#2019#7\"\r\nrec;    20200512;       32514;              \"SBST#229      #02#06#2019#1\"\r\nrec;    20200512;       32515;              \"SBST#298      #01#00#2019#2\"\r\nrec;    20200512;       32516;              \"SBST#c291     #01#00#2019#3\"\r\nrec;    20200512;       32517;              \"SBST#38       #03#00#2019#4\"\r\nrec;    20200512;       32518;              \"SBST#33       #06#04#2019#5\"\r\nrec;    20200512;       32519;              \"SBST#2N       #01#05#2019#6\"\r\nrec;    20200512;       32520;              \"SBST#401      #02#06#2019#7\"\r\nrec;    20200512;       32521;              \"SBST#c23      #01#00#2019#1\"\r\nrec;    20200512;       32523;              \"SBST#c291     #01#06#2019#3\"\r\nrec;    20200512;       32524;              \"SBST#38       #01#07#2019#4\"\r\nrec;    20200512;       32525;              \"SBST#33       #06#07#2019#5\"\r\nrec;    20200512;       32526;              \"SBST#31       #01#05#2019#6\"\r\nrec;    20200512;       32527;              \"SBST#42       #03#06#2019#7\"\r\nrec;    20200512;       32528;              \"SBST#c23      #01#06#2019#1\"\r\nrec;    20200512;       32529;              \"SBST#2N       #01#65#2019#2\"\r\nrec;    20200512;       32530;              \"SBST#292      #01#00#2019#3\"\r\nrec;    20200512;       32531;              \"SBST#c39      #01#00#2019#4\"\r\nrec;    20200512;       32532;              \"SBST#35       #02#00#2019#5\"\r\nrec;    20200512;       32533;              \"SBST#33       #02#05#2019#6\"\r\nrec;    20200512;       32534;              \"SBST#45       #01#06#2019#7\"\r\nrec;    20200512;       32535;              \"SBST#c29      #01#00#2019#1\"\r\nrec;    20200512;       32536;              \"SBST#31       #03#00#2019#2\"\r\nrec;    20200512;       32537;              \"SBST#292      #01#06#2019#3\"\r\nrec;    20200512;       32538;              \"SBST#c39      #01#07#2019#4\"\r\nrec;    20200512;       32539;              \"SBST#37       #03#00#2019#5\"\r\nrec;    20200512;       32540;              \"SBST#35       #02#05#2019#6\"\r\nrec;    20200512;       32541;              \"SBST#c46      #03#06#2019#7\"\r\nrec;    20200512;       32542;              \"SBST#c29      #01#06#2019#1\"\r\nrec;    20200512;       32543;              \"SBST#33       #06#25#2019#2\"\r\nrec;    20200512;       32544;              \"SBST#c293     #05#00#2019#3\"\r\nrec;    20200512;       32545;              \"SBST#4        #02#00#2019#4\"\r\nrec;    20200512;       32546;              \"SBST#37       #01#07#2019#5\"\r\nrec;    20200512;       32547;              \"SBST#37       #01#05#2019#6\"\r\nrec;    20200512;       32548;              \"SBST#47       #02#06#2019#7\"\r\nrec;    20200512;       32549;              \"SBST#c291     #01#00#2019#1\"\r\nrec;    20200512;       32550;              \"SBST#33       #06#07#2019#2\"\r\nrec;    20200512;       32551;              \"SBST#c293     #01#06#2019#3\"\r\nrec;    20200512;       32552;              \"SBST#c40      #02#00#2019#4\"\r\nrec;    20200512;       32553;              \"SBST#38       #03#00#2019#5\"\r\nrec;    20200512;       32554;              \"SBST#38       #03#05#2019#6\"\r\nrec;    20200512;       32555;              \"SBST#4N       #01#67#2019#7\"\r\nrec;    20200512;       32556;              \"SBST#c291     #01#06#2019#1\"\r\nrec;    20200512;       32557;              \"SBST#35       #02#00#2019#2\"\r\nrec;    20200512;       32558;              \"SBST#298      #01#00#2019#3\"\r\nrec;    20200512;       32559;              \"SBST#c40      #01#07#2019#4\"\r\nrec;    20200512;       32560;              \"SBST#38       #01#07#2019#5\"\r\nrec;    20200512;       32561;              \"SBST#c39      #01#05#2019#6\"\r\nrec;    20200512;       32562;              \"SBST#506      #02#06#2019#7\"\r\nrec;    20200512;       32563;              \"SBST#292      #01#00#2019#1\"\r\nrec;    20200512;       32564;              \"SBST#37       #03#00#2019#2\"\r\nrec;    202" <> ...}
** Stream started at:
    (gen_stage 1.0.0) lib/gen_stage.ex:1609: GenStage.from_enumerable/2
    (stdlib 3.13) supervisor.erl:385: :supervisor.do_start_child_i/3
    (stdlib 3.13) supervisor.erl:371: :supervisor.do_start_child/2
    (stdlib 3.13) supervisor.erl:677: :supervisor.handle_start_child/2
    (stdlib 3.13) supervisor.erl:426: :supervisor.handle_call/3
    (stdlib 3.13) gen_server.erl:706: :gen_server.try_handle_call/4
    (stdlib 3.13) gen_server.erl:735: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3

다른 Flow.Window 예를 들어 Count, Global 등으로 시도했지만 예상대로 작동하지 않습니다. 또한 내 원격 서버 / 작업이 CPU / 메모리를 활성화하지 않았을 수 있다고 생각하고 작업 CPU를 2vCPU로 늘리려 고 시도했지만 더 많은 데이터를 처리하는 데 도움이되었지만 모든 데이터는 아닙니다.

유선은이 코드가 내 로컬 컴퓨터에서 완벽하게 작동한다는 것입니다 (내 PC의 cos가 강력할까요 ??). 왜 이런 일이 일어날 수 있는지 아는 사람이 있는지 여러분과 확인하고 싶습니다.

정말 고맙습니다.

답변

1 druidccsos Aug 19 2020 at 22:18

S3에서 탑재 된 도커 볼륨으로 파일을 다운로드 한 다음 데이터를 스트리밍하는 또 다른 접근 방식을 시도했습니다. 다음은 변경 사항입니다.

  def load_file(file_name) do
    # download vdv452 file
    file_path = Application.get_env(:ex_aws, :docker_volume) <> (String.split(file_name, "/") |> List.last)
    ExAws.S3.download_file(Application.get_env(:ex_aws, :s3_bucket_name), file_name, file_path, timeout: 3600)
    |> ExAws.request!

    # stream file data
    file_path
    |> File.stream!
    |> Flow.from_enumerable()
    |> Flow.filter(&(String.match?(&1, ~r/^rec/)))
    |> Flow.map(fn line ->
      line
      |> String.replace("\n", "")
      |> String.replace("\"", "")
      |> String.split(";")
      |> transform
    end)
    |> Flow.partition(window: Flow.Window.count(1_000))
    |> Flow.reduce(fn -> [] end, fn item, batch -> [item | batch] end)
    |> Flow.on_trigger(fn items ->
      items
      |> add_timestamps
      |> Database.Routes.create_multiple_route_sequence
      {[items], items}
    end)
    |> Flow.run()
  end