Published on

Elixir Stream으로 대용량 ETL 메모리 폭주 막기

Authors

대용량 ETL을 Elixir로 만들다 보면 초반에는 Enum.map/2Enum.filter/2 만으로도 빠르게 결과를 얻습니다. 문제는 데이터가 커지는 순간입니다. 파일 한 개, 배치 한 번이 수십만 행을 넘어가면 리스트를 한 번에 메모리에 올리는 방식이 순식간에 메모리 폭주로 이어지고, 결국 OOM 킬이나 VM 재시작으로 연결됩니다.

Elixir의 강점은 BEAM 위에서 안정적으로 동시성을 다루는 것뿐 아니라, 필요할 때만 값을 만들어내는 지연 평가(lazy) 를 통해 메모리 사용량을 제어할 수 있다는 점입니다. 이 글에서는 Stream 을 중심으로 대용량 ETL을 설계하는 방법, 그리고 실무에서 자주 터지는 함정(청크 크기, DB insert 방식, backpressure 부재)을 정리합니다.

운영 중 프로세스가 반복 재시작되는 상황이라면, 서비스 레벨에서 원인을 추적하는 관점도 함께 필요합니다. ETL이 systemd로 구동된다면 systemd 서비스 자동 재시작 원인 추적 가이드 도 같이 보면 문제를 더 빨리 좁힐 수 있습니다.

Enum 기반 ETL이 메모리를 터뜨릴까

ETL 파이프라인을 단순화하면 보통 아래 흐름입니다.

  1. Extract: 파일/HTTP/DB에서 원천 데이터를 읽는다
  2. Transform: 정제, 파싱, 매핑, 필터링
  3. Load: DB나 데이터 웨어하우스에 적재

여기서 흔한 실수는 Extract 단계에서 전체를 리스트로 만든 뒤 Transform을 연쇄로 적용하는 것입니다.

# 나쁜 예: 전체를 한 번에 올림
rows =
  File.read!("/data/big.csv")
  |> String.split("\n", trim: true)

rows
|> Enum.map(&parse_row/1)
|> Enum.filter(&valid?/1)
|> Enum.map(&to_record/1)
|> Enum.each(&insert_one/1)

위 코드는 다음 문제가 있습니다.

  • File.read!/1 이 파일 전체를 바이너리로 메모리에 올립니다.
  • String.split/3 는 전체 라인을 리스트로 만듭니다.
  • Enum.map/2 는 매 단계마다 새 리스트를 만들 수 있습니다.

즉 데이터가 커질수록 중간 결과 리스트가 여러 번 복제 되며, 순간 피크 메모리가 폭증합니다.

Stream 으로 바꾸면 무엇이 달라지나

StreamEnum 과 유사한 API를 제공하지만, 핵심 차이는 즉시 실행이 아니라 지연 실행 이라는 점입니다.

  • Stream.map/2 는 “매핑 규칙”을 쌓아두기만 합니다.
  • 실제로 값이 만들어지는 시점은 Enum.to_list/1, Enum.each/2, Stream.run/1 같은 종단(terminal) 연산 을 만났을 때입니다.

즉, 한 줄 읽고 변환하고 적재한 뒤 다음 줄로 넘어가는 구조를 만들 수 있습니다.

파일 ETL: File.stream!/3 로 라인 단위 처리

가장 흔한 대용량 ETL은 파일 기반입니다. CSV를 예로 들면, 최소한 다음 원칙을 지키는 것만으로도 메모리 피크가 크게 줄어듭니다.

  • 파일 전체를 읽지 말고 스트리밍으로 읽기
  • 변환은 Stream 으로 쌓기
  • 적재는 청크 단위로 묶어서 수행
defmodule Etl.CsvImport do
  @chunk_size 2_000

  def run(path) do
    path
    |> File.stream!([], :line)
    |> Stream.drop(1) # 헤더 스킵
    |> Stream.map(&String.trim_trailing(&1, "\n"))
    |> Stream.map(&parse_csv_line/1)
    |> Stream.filter(&valid_row?/1)
    |> Stream.map(&to_record/1)
    |> Stream.chunk_every(@chunk_size)
    |> Stream.each(&insert_batch/1)
    |> Stream.run()
  end

  defp parse_csv_line(line) do
    # 단순 예시: 실제 CSV는 NimbleCSV 같은 라이브러리 권장
    String.split(line, ",")
  end

  defp valid_row?(fields) do
    length(fields) >= 3
  end

  defp to_record([id, name, ts | _]) do
    %{id: id, name: name, inserted_at: ts}
  end

  defp insert_batch(records) do
    # Ecto 사용 시 Repo.insert_all/3 권장
    MyApp.Repo.insert_all("events", records)
  end
end

이 구조의 장점은 명확합니다.

  • 메모리에 올라오는 데이터는 “현재 라인”과 “현재 청크” 수준으로 제한됩니다.
  • 변환 단계가 늘어나도 중간 리스트가 누적되지 않습니다.

메모리 폭주를 막는 핵심: chunk_every/2 와 적재 전략

ETL에서 메모리를 터뜨리는 또 다른 원인은 “한 건씩 insert” 입니다. 네트워크 왕복이 많아져 느려질 뿐 아니라, 재시도나 장애 처리도 어려워집니다.

Stream.chunk_every/2 로 레코드를 묶고, DB에는 가능한 한 배치 insert 를 사용하세요.

  • Postgres + Ecto: Repo.insert_all/3
  • 외부 API 적재: 벌크 엔드포인트가 있으면 사용
  • S3 업로드: 멀티파트 업로드 고려

청크 크기는 정답이 없지만, 운영에서는 다음 기준으로 잡는 편이 안전합니다.

  • 레코드 1건이 작으면 1000 에서 5000
  • 레코드가 크거나 변환이 무거우면 200 에서 1000
  • DB 락/트랜잭션 부담이 크면 더 작게

Stream 도 만능은 아니다: 종단에서 to_list 하면 다시 터진다

Stream 으로 바꿨는데도 메모리가 터지는 케이스가 있습니다. 거의 항상 종단에서 리스트로 모으는 코드가 남아있기 때문입니다.

# 나쁜 예: 결국 전체를 리스트로 모음
all =
  File.stream!("/data/big.csv")
  |> Stream.map(&parse_csv_line/1)
  |> Enum.to_list()

process(all)

Enum.to_list/1 는 스트림을 전부 펼쳐서 리스트를 만듭니다. 대용량 ETL에서는 가능하면 아래 중 하나로 끝내야 합니다.

  • Stream.each/2 + Stream.run/1
  • Enum.reduce/3 로 누적 상태만 유지

상태가 필요한 변환: Enum.reduce/3 로 누적 최소화

예를 들어 “중복 제거”를 해야 한다고 해서 모든 데이터를 MapSet 에 담으면 결국 메모리가 증가합니다. 대신 다음 중 하나를 선택해야 합니다.

  • 중복 제거를 DB 유니크 인덱스에 위임
  • 키 범위를 나눠서 청크별로 처리
  • Bloom filter 같은 근사 구조(상황에 따라)

그래도 최소한의 상태는 필요할 수 있습니다. 이때는 Enum.reduce/3 로 상태를 작게 유지합니다.

def run_with_stats(path) do
  stream =
    path
    |> File.stream!([], :line)
    |> Stream.drop(1)
    |> Stream.map(&parse_csv_line/1)
    |> Stream.filter(&valid_row?/1)

  {ok, invalid} =
    stream
    |> Enum.reduce({0, 0}, fn row, {ok_cnt, invalid_cnt} ->
      if valid_row?(row), do: {ok_cnt + 1, invalid_cnt}, else: {ok_cnt, invalid_cnt + 1}
    end)

  %{ok: ok, invalid: invalid}
end

여기서 중요한 점은, reduce는 스트림을 한 번만 순회하면서 카운터 같은 작은 상태만 쌓는다는 것입니다.

동시성까지 붙이고 싶을 때: Task.async_stream/3 와 backpressure

변환 로직이 CPU 바운드(예: JSON 파싱, 압축 해제, 암호화)라면 병렬화가 유효합니다. 하지만 무턱대고 동시에 돌리면 작업 결과가 메모리에 쌓이면서 다시 폭주할 수 있습니다.

Elixir에서는 Task.async_stream/3 가 실무에서 가장 안전한 선택지입니다. max_concurrencytimeout 으로 backpressure를 걸 수 있습니다.

def run_parallel(path) do
  path
  |> File.stream!([], :line)
  |> Stream.drop(1)
  |> Task.async_stream(
    fn line ->
      line
      |> parse_csv_line()
      |> to_record()
    end,
    max_concurrency: System.schedulers_online(),
    ordered: false,
    timeout: 30_000
  )
  |> Stream.map(fn
    {:ok, record} -> record
    {:exit, reason} ->
      # 필요하면 로깅/메트릭
      raise "worker_failed: #{inspect(reason)}"
  end)
  |> Stream.chunk_every(2_000)
  |> Stream.each(&insert_batch/1)
  |> Stream.run()
end

포인트는 다음입니다.

  • ordered: false 로 결과 정렬 비용을 줄입니다.
  • max_concurrency 를 제한해 결과가 과도하게 쌓이지 않게 합니다.
  • timeout 과 실패 처리로 배치 전체가 조용히 멈추는 상황을 막습니다.

DB 적재에서 흔한 함정: 트랜잭션 경계와 잠금

배치 insert를 한다고 해서 무조건 트랜잭션을 크게 잡으면 안 됩니다.

  • 너무 큰 트랜잭션은 WAL 증가, 락 점유 시간 증가
  • 장애 시 롤백 비용 증가

실무적으로는 “청크 하나 = 트랜잭션 하나”가 가장 단순하고 안전합니다.

defp insert_batch(records) do
  MyApp.Repo.transaction(fn ->
    MyApp.Repo.insert_all("events", records)
  end)
end

단, insert_all 자체가 단일 쿼리인 경우 트랜잭션이 과할 수도 있습니다. DB와 테이블 특성에 맞춰 조정하세요.

관측 가능성: 메모리 폭주를 재현하고 잡는 방법

ETL은 “느리다”보다 “갑자기 죽는다”가 더 위험합니다. 다음을 최소로 붙이면 원인 파악이 쉬워집니다.

  • 청크 처리 시간 로그
  • 청크당 레코드 수, 실패 수 메트릭
  • BEAM 메모리 관측: :erlang.memory/0, Process.info(self(), :memory)
defp insert_batch(records) do
  mem_before = :erlang.memory(:total)
  t0 = System.monotonic_time(:millisecond)

  MyApp.Repo.insert_all("events", records)

  t1 = System.monotonic_time(:millisecond)
  mem_after = :erlang.memory(:total)

  Logger.info(
    "insert_batch size=#{length(records)} time_ms=#{t1 - t0} mem_delta=#{mem_after - mem_before}"
  )
end

이런 로그는 장애 시점에 “어느 청크에서 느려졌는지”, “특정 데이터 패턴에서 메모리가 뛰는지”를 바로 보여줍니다.

데이터 처리 파이프라인에서 조인이나 머지 이후 행 수가 폭증하는 문제는 언어를 가리지 않고 자주 발생합니다. 비슷한 유형의 사고 패턴은 Pandas merge 후 NaN 폭증·행수 증가 원인 7가지 도 참고할 만합니다. ETL에서 조인 키 품질이 나쁘면 Elixir에서도 동일하게 “데이터가 기하급수로 늘어나는” 현상을 겪습니다.

체크리스트: Stream 기반 ETL을 운영에 올리기 전

  • Extract
    • File.read! 대신 File.stream!
    • 압축 파일이면 스트리밍 해제 가능 여부 확인
  • Transform
    • Stream.mapStream.filter 로 지연 처리
    • 종단에서 Enum.to_list 금지
  • Load
    • Stream.chunk_every 로 배치 적재
    • 가능하면 insert_all 같은 벌크 API 사용
    • 트랜잭션 경계는 청크 단위로
  • Concurrency
    • Task.async_stream 사용 시 max_concurrency 제한
    • 실패/타임아웃 처리 명확화
  • Observability
    • 청크 단위 처리 시간/메모리 로그
    • 재시작/크래시 시 원인 추적 루틴 마련

마무리

Elixir에서 대용량 ETL의 메모리 폭주는 대부분 “전체를 리스트로 만들고 중간 결과를 누적하는 구조”에서 시작합니다. Stream 으로 파이프라인을 바꾸고, 적재를 청크 단위로 만들며, 동시성에는 backpressure를 걸면 메모리 사용량을 예측 가능한 수준으로 떨어뜨릴 수 있습니다.

마지막으로 기억할 한 줄은 이것입니다. ETL은 빠르게 한 번 도는 코드가 아니라, 매일 반복 실행되어도 안정적으로 버티는 코드여야 합니다. Stream 은 그 안정성을 만드는 가장 단순하고 강력한 도구입니다.