Published on

Kotlin Flow vs Java Stream - 병렬·취소 버그

Authors

서버 개발에서 데이터 파이프라인을 짤 때 Kotlin Flow와 Java Stream은 자주 비교됩니다. 둘 다 map/filter/flatMap 같은 연산자를 제공하고, 지연 평가처럼 보이는 동작도 있습니다. 하지만 병렬 처리와 취소(중단) 전파 관점에서는 설계 철학이 완전히 다릅니다.

이 글은 다음 질문에 답합니다.

  • Java parallelStream()을 Flow처럼 생각하면 어떤 버그가 생기나
  • Flow의 취소가 어디까지 전파되고, 어디서 끊기나
  • 병렬화가 필요할 때 Flow와 Stream 각각에서 안전한 패턴은 무엇인가

아래 내용은 실서비스에서 흔히 겪는 “가끔만 터지는” 유형의 장애를 기준으로 정리했습니다.

1) 핵심 모델 차이: Pull 기반 vs Suspend 기반

Java Stream은 기본적으로 Pull 기반

Java Stream은 소비자(terminal operation)가 값을 필요로 할 때 upstream을 당겨오는 pull 모델입니다. stream().map(...).filter(...).collect(...)가 호출되면 그때 평가가 진행되며, 취소라고 부를 만한 것은 보통 다음 정도입니다.

  • findFirst()/anyMatch() 같은 단락 평가(short-circuit)
  • limit(n)에 의해 더 이상 필요 없는 요소를 안 가져옴

하지만 이는 “논리적 중단”이지, 실행 중인 작업을 협력적으로 취소하는 모델은 아닙니다.

Kotlin Flow는 Suspend + 구조적 동시성 기반

Flow는 코루틴 위에서 동작하며, 취소는 코루틴의 Job 취소로 전파됩니다. 즉, 아래가 기본 전제입니다.

  • collect가 취소되면 upstream도 취소되어야 한다
  • 취소는 예외(CancellationException)로 전파된다
  • 리소스는 try/finally, use, awaitClose 같은 패턴으로 정리한다

이 차이 때문에, Stream에서 “대충 중단될 것”이라고 기대했던 코드가 Flow에서는 잘 멈추는 반면, Flow에서 “당연히 취소될 것”이라 기대했던 코드가 Stream에서는 계속 돌아 CPU를 태우는 일이 생깁니다.

2) 병렬 처리: parallelStream()은 Flow의 flatMapMerge가 아니다

Stream 병렬화의 함정 1: 공용 ForkJoinPool 오염

parallelStream()은 기본적으로 ForkJoinPool.commonPool()을 사용합니다. 이 풀은 JVM 전체에서 공유되므로, 특정 요청에서 병렬 스트림을 과도하게 쓰면 다른 작업까지 영향을 받습니다.

예: 웹 요청 처리 중 parallelStream()을 호출했는데, 내부에서 블로킹 I/O가 섞여 common pool이 잠기면 전체 응답 지연이 튈 수 있습니다.

List<Result> results = ids.parallelStream()
    .map(id -> blockingCall(id)) // 블로킹 I/O
    .toList();

이 코드는 다음 문제를 유발합니다.

  • common pool 워커가 블로킹으로 점유됨
  • 다른 parallelStream() 작업도 같이 느려짐
  • 심하면 서비스 전체의 tail latency가 튐

Flow에서라면 보통 Dispatchers.IO로 격리하거나, 별도 디스패처를 만들어 풀을 분리합니다.

val dispatcher = Dispatchers.IO.limitedParallelism(32)

val results = ids.asFlow()
  .map { id -> withContext(dispatcher) { blockingCall(id) } }
  .toList()

여기서 중요한 점은 Flow는 “병렬”을 기본값으로 제공하지 않고, 개발자가 어떤 디스패처에서 어떤 수준으로 동시성을 낼지 명시하도록 유도한다는 점입니다.

Stream 병렬화의 함정 2: 취소가 실행 중 작업을 멈추지 않는다

findFirst() 같은 단락 평가를 병렬 스트림에 붙이면 “어차피 첫 번째 찾으면 멈추겠지”라고 생각하기 쉽습니다. 하지만 병렬 실행에서는 이미 여러 태스크가 큐에 들어가 실행 중일 수 있고, 그 태스크는 계속 돌 수 있습니다.

Optional<User> user = users.parallelStream()
  .map(this::expensiveLookup)
  .filter(u -> u.isActive())
  .findFirst();
  • findFirst()가 결과를 얻어도
  • 이미 시작된 expensiveLookup는 계속 실행될 수 있음

즉, “논리적 중단”은 되지만 “실행 취소”는 보장되지 않습니다.

Flow에서는 first()가 취소를 발생시키고 upstream에 전파되므로, 협력 취소가 가능한 suspend 함수라면 더 빨리 멈춥니다.

val user = users.asFlow()
  .map { expensiveLookupSuspend(it) }
  .filter { it.isActive }
  .first() // 이후 upstream 취소

단, 여기에도 조건이 있습니다. expensiveLookupSuspend가 실제로 취소에 협력해야 합니다. 아래처럼 블로킹 호출을 그냥 감싸면 취소가 늦거나 무시됩니다.

suspend fun expensiveLookupSuspend(id: String): User =
  blockingCall(id) // suspend 아님, 취소 체크 없음

이 경우는 withContext(Dispatchers.IO)로 격리하고, 가능하면 취소 가능한 클라이언트 API를 쓰는 것이 맞습니다.

3) Flow 취소가 “끊기는” 대표 지점: 콜백/블로킹/브리지

Flow는 취소 전파가 강력하지만, 다음 지점에서 쉽게 끊깁니다.

3-1) flow {} 내부에서 블로킹 작업을 직접 수행

fun flowBug(): Flow<Int> = flow {
  while (true) {
    Thread.sleep(100) // 블로킹
    emit(1)
  }
}

collect를 취소해도 Thread.sleep은 취소를 모릅니다. 결과적으로 취소가 즉시 반영되지 않고 지연되며, 특정 상황에서는 스레드가 계속 점유됩니다.

개선:

fun flowFixed(): Flow<Int> = flow {
  while (currentCoroutineContext().isActive) {
    delay(100) // 취소 협력
    emit(1)
  }
}

3-2) 콜백을 Flow로 바꾸면서 awaitClose를 빼먹음

fun events(): Flow<String> = callbackFlow {
  val listener = Listener { e -> trySend(e) }
  source.add(listener)
  // awaitClose가 없으면 해제 타이밍이 불명확해지고 누수 가능
}

개선:

fun events(): Flow<String> = callbackFlow {
  val listener = Listener { e -> trySend(e).isSuccess }
  source.add(listener)
  awaitClose { source.remove(listener) }
}

이 패턴은 “취소 버그”의 전형입니다. Flow는 취소되었는데 리스너가 남아 이벤트를 계속 밀어 넣고, 버퍼가 쌓이거나 메모리 누수로 이어집니다.

3-3) Flow와 Java Stream을 섞는 브리지에서 취소가 사라짐

실무에서 흔한 형태가 “DB에서 가져온 컬렉션을 Stream으로 병렬 처리하고, 그 결과를 Flow로 흘려보내기”입니다.

fun mixed(ids: List<String>): Flow<Result> = flow {
  val results = ids.parallelStream()
    .map { blockingCall(it) }
    .toList()
  results.forEach { emit(it) }
}

문제:

  • Flow가 취소되어도 parallelStream()은 이미 시작된 작업을 멈추지 않음
  • blockingCall이 오래 걸리면, 취소 후에도 CPU와 스레드를 계속 사용
  • 취소 타이밍에 따라 “요청은 끊겼는데 백그라운드가 계속 도는” 형태가 됨

대안은 둘 중 하나입니다.

  • Stream을 쓰되, 요청 단위로 격리된 Executor를 만들고, Future 취소를 명시적으로 관리
  • Flow로 일원화하고 flatMapMergemap + limitedParallelism로 동시성을 제어

Flow로 일원화 예:

suspend fun fetchAll(ids: List<String>): List<Result> {
  val dispatcher = Dispatchers.IO.limitedParallelism(32)

  return ids.asFlow()
    .map { id ->
      withContext(dispatcher) { blockingCall(id) }
    }
    .toList()
}

여기서 limitedParallelism(32)는 동시성 상한을 강제합니다. 병렬 스트림처럼 “알아서 최적화”에 기대지 않고, 서비스 특성에 맞게 제한하는 방식이 운영에서 더 예측 가능합니다.

4) 병렬·취소 버그를 재현하기 어려운 이유

이 유형의 버그가 까다로운 이유는 다음이 겹치기 때문입니다.

  • 취소는 타이밍 의존적이다
  • 병렬 실행은 스케줄링에 의해 결과가 흔들린다
  • 공용 풀(common pool)이나 공유 디스패처는 시스템 전체 부하에 따라 거동이 바뀐다

특히 컨테이너 환경에서는 CPU quota와 스레드 스케줄링이 더 민감합니다. 요청이 끊긴 뒤에도 백그라운드 병렬 작업이 남아 있으면, 순간적으로 CPU를 밀어 올려 다른 요청의 지연을 악화시킬 수 있습니다. 이런 현상은 콜드스타트나 503 같은 증상과 함께 관찰되기도 합니다. 인프라 관점의 튜닝은 별도 글인 GCP Cloud Run 503·콜드스타트 줄이는 튜닝도 같이 보면 원인 분리가 쉬워집니다.

5) 안전한 패턴 체크리스트

Flow에서 병렬화할 때

  • CPU 작업이면 Dispatchers.Default + limitedParallelism(n) 고려
  • I/O 작업이면 Dispatchers.IO + 동시성 상한 설정
  • “진짜 병렬 Flow”가 필요하면 flatMapMerge(concurrency = n) 사용
  • flowOn은 upstream 컨텍스트를 바꾸지만, 블로킹 자체를 해결하지는 않음

예: flatMapMerge로 동시성 제어

fun fetch(ids: List<String>): Flow<Result> {
  val dispatcher = Dispatchers.IO.limitedParallelism(16)

  return ids.asFlow()
    .flatMapMerge(concurrency = 16) { id ->
      flow {
        val r = withContext(dispatcher) { blockingCall(id) }
        emit(r)
      }
    }
}

Stream에서 병렬화할 때

  • parallelStream()을 웹 요청 경로에서 기본값처럼 쓰지 말 것
  • 블로킹 I/O와 섞지 말 것
  • 꼭 필요하면 전용 ForkJoinPool 또는 ExecutorService로 격리
  • 단락 평가가 “실행 취소”를 의미하지 않음을 전제

전용 풀로 격리 예:

ForkJoinPool pool = new ForkJoinPool(16);
try {
  List<Result> results = pool.submit(() ->
    ids.parallelStream().map(this::blockingCall).toList()
  ).get();
} finally {
  pool.shutdown();
}

이 방식도 완전한 취소를 제공하진 않습니다. 하지만 최소한 공용 풀 오염은 막습니다.

6) 디버깅 포인트: “취소가 됐는데 왜 계속 돌지”

Flow 디버깅

  • 취소가 걸리는 지점에 ensureActive() 또는 isActive 체크 추가
  • finally에서 로그를 남겨 실제로 정리되는지 확인
  • 콜백 기반이라면 awaitClose가 실행되는지 확인
fun debugFlow(): Flow<Int> = flow {
  try {
    repeat(10_000) {
      currentCoroutineContext().ensureActive()
      emit(it)
    }
  } finally {
    println("cleanup")
  }
}

Stream 디버깅

  • parallelStream() 사용 여부와 호출 경로를 먼저 찾기
  • common pool 스레드 덤프에서 ForkJoinPool.commonPool-worker 점유 확인
  • 블로킹 호출이 섞였는지 확인

운영에서 이런 문제는 종종 “로그는 비는데 응답이 없다” 또는 “헬스체크는 살아있는데 지연이 튄다” 같은 형태로 보입니다. 쿠버네티스/인그레스 레벨에서 관찰되는 증상과 엮이면 원인 추적이 어려워지므로, 트래픽 경로 점검은 EKS Ingress 502인데 Pod 로그가 비면? ALB/NLB 헬스체크부터 같은 체크리스트와 함께 보는 것도 도움이 됩니다.

7) 결론: “비슷해 보이는 파이프라인”에 속지 말기

  • Java Stream은 컬렉션 처리에 강하고, 병렬화는 쉽지만 공용 풀과 취소 의미를 오해하면 장애로 이어집니다.
  • Kotlin Flow는 취소 전파와 구조적 동시성에 강하지만, 블로킹/콜백 브리지에서 취소가 끊기는 지점을 제대로 처리해야 합니다.

실무 기준으로 정리하면 다음이 안전합니다.

  • 요청 단위 작업에서 “취소가 중요”하면 Flow 중심으로 설계
  • 병렬 스트림은 배치성/오프라인 처리나, 풀 격리가 보장되는 환경에서 제한적으로 사용
  • Flow에서 병렬화는 concurrency와 디스패처를 명시하고 상한을 둔다

이 원칙만 지켜도 “가끔만 터지는 병렬·취소 버그”의 상당수를 예방할 수 있습니다.