- Published on
Kotlin Flow vs Java Stream - 병렬 처리 함정 7가지
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버/배치 코드에서 “컬렉션을 흘려보내며 처리한다”는 관점으로 보면 Kotlin Flow와 Java Stream은 비슷해 보입니다. 하지만 병렬 처리에 들어가는 순간 둘은 철학과 실행 모델이 갈라집니다.
- Java
Stream은 기본이 동기 pull 기반이며,parallel()은 내부적으로 ForkJoinPool을 활용해 데이터 병렬을 시도합니다. - Kotlin
Flow는 기본이 코루틴 기반 비동기이며, 병렬성은buffer,flatMapMerge,flowOn,channelFlow같은 연산자로 명시적으로 설계해야 합니다.
겉모습만 보고 “Stream의 parallel()처럼 Flow도 알아서 병렬이겠지” 혹은 “Flow처럼 Stream도 비동기겠지”라고 가정하면 장애로 이어집니다. 아래는 실무에서 특히 자주 밟는 병렬 처리 함정 7가지와 안전한 대안입니다.
병렬 처리 이슈는 종종 메모리/스레드 자원 고갈로 이어집니다. OOM이 의심될 때는 리눅스 OOMKilled 원인 추적 - cgroup·dmesg·ulimit도 함께 참고하면 원인 규명에 도움이 됩니다.
1) “병렬”의 의미가 다르다: Stream은 데이터 병렬, Flow는 작업 병렬
Java Stream의 parallel()은 같은 파이프라인을 여러 스레드에서 실행해 원소 단위로 분할 처리하는 쪽에 가깝습니다. 반면 Flow는 기본적으로 순차적이며, 병렬 처리는 연산자 선택으로 “어떤 구간을 어떤 디스패처에서 실행할지”를 설계합니다.
흔한 오해
- Stream:
stream().parallel().map(...)하면 내부적으로 알아서 병렬화 - Flow:
flow.map { ... }도 비동기니까 자연히 병렬일 것
Flow의 map은 기본적으로 이전 원소 처리가 끝나야 다음 원소가 진행됩니다. 병렬을 원하면 flatMapMerge 또는 map + buffer 등으로 동시성을 만들어야 합니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
(1..5).asFlow()
.map { n ->
delay(100)
n * 2
}
.collect { println(it) } // 기본은 순차
}
병렬(동시) 처리를 원한다면:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
(1..10).asFlow()
.flatMapMerge(concurrency = 4) { n ->
flow {
delay(100)
emit(n * 2)
}
}
.collect { println(it) }
}
2) Stream parallel()은 “공유 풀”을 쓴다: ForkJoinPool 함정
Java parallelStream()은 기본적으로 ForkJoinPool.commonPool()을 사용합니다. 이 풀은 JVM 전체에서 공유되며, 다른 라이브러리/코드도 같은 풀을 사용하면 서로 간섭합니다.
증상
- 특정 배치가
parallel()을 켠 뒤부터 전체 서비스 레이턴시가 튐 - CPU는 100%가 아닌데 처리량이 떨어짐(블로킹 I/O가 섞였을 때 특히)
안전 패턴
- 블로킹 I/O(HTTP, JDBC)를
parallelStream()에 섞지 말 것 - 정말 필요하면 전용 풀로 격리하고,
ForkJoinPool에 제출해 그 안에서 스트림을 실행
import java.util.concurrent.*;
import java.util.*;
public class ParallelStreamIsolated {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool(8);
try {
int sum = pool.submit(() ->
Arrays.asList(1,2,3,4,5)
.parallelStream()
.mapToInt(i -> i * 2)
.sum()
).get();
System.out.println(sum);
} finally {
pool.shutdown();
}
}
}
3) Flow에서 flowOn을 “병렬 스위치”로 착각하는 함정
flowOn은 “이 지점 상류(upstream) 를 어떤 디스패처에서 실행할지”를 바꾸는 연산자입니다. flowOn(Dispatchers.IO)를 붙인다고 해서 원소들이 동시에 처리되지는 않습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun cpuFlow(): Flow<Int> = flow {
repeat(5) {
emit(it)
}
}.flowOn(Dispatchers.Default) // 상류 실행 컨텍스트 이동
suspend fun main() = coroutineScope {
cpuFlow()
.map { n ->
// 여기는 여전히 순차 map
n * 2
}
.collect { println(it) }
}
병렬이 필요하면
- CPU 작업:
Default에서flatMapMerge로 동시성 부여 - I/O 작업:
IO에서flatMapMerge또는channelFlow로 동시성 제어
4) 블로킹 호출을 섞으면 둘 다 망한다: “병렬”이 아니라 “스레드 고갈”
Stream에서 블로킹 I/O를 병렬로 돌리면 commonPool이 막혀 전체 JVM에 영향을 줍니다. Flow에서도 블로킹 I/O를 Default에서 돌리면 코루틴 스케줄링이 무너지고, IO에서도 무제한 동시성을 주면 결국 스레드/소켓/커넥션 풀이 고갈됩니다.
안전 패턴(Flow)
- 블로킹 I/O는
withContext(Dispatchers.IO)또는 상류에flowOn(Dispatchers.IO) - 동시성은
flatMapMerge(concurrency = N)로 제한
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun blockingCall(id: Int): String {
// 예: JDBC/레거시 SDK 같은 블로킹 호출
Thread.sleep(50)
return "ok-" + id
}
suspend fun main() = coroutineScope {
(1..100).asFlow()
.flatMapMerge(concurrency = 16) { id ->
flow {
val r = withContext(Dispatchers.IO) { blockingCall(id) }
emit(r)
}
}
.collect { /* consume */ }
}
동시성 상한 없이 처리량을 올리려다 메모리/큐가 폭증하는 케이스도 많습니다. 메모리 압박이 의심되면 SDXL+ComfyUI VRAM OOM 해결 - 메모리 최적화처럼 “자원 상한을 먼저 잡는” 접근이 서버에서도 유효합니다.
5) 순서 보장과 병렬의 충돌: forEachOrdered vs flatMapMerge
병렬 처리에서 “입력 순서대로 출력”을 요구하면 비용이 급격히 커집니다.
- Stream:
parallel()에서 순서를 지키려면forEachOrdered등을 쓰는데, 이때 병렬 이점이 줄어듭니다. - Flow:
flatMapMerge는 결과 순서를 보장하지 않습니다. 순서가 필요하면flatMapConcat(순차) 또는 별도 정렬/버퍼링 전략이 필요합니다.
실무 팁
- 순서가 정말 필요한지 먼저 확인(대부분은 필요 없음)
- 필요하다면 “병렬 처리 + 재정렬”은 메모리 버퍼가 필요하므로 상한을 둠
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class Indexed<T>(val idx: Int, val value: T)
suspend fun main() = coroutineScope {
(1..20).asFlow()
.mapIndexed { idx, v -> Indexed(idx, v) }
.flatMapMerge(concurrency = 4) { item ->
flow {
delay((1..50).random().toLong())
emit(Indexed(item.idx, item.value * 2))
}
}
// 순서 보장이 필요하면 collect 단계에서 정렬/버퍼링 고려
.toList()
.sortedBy { it.idx }
.forEach { println(it.value) }
}
위 패턴은 간단하지만 toList()가 전체 적재를 유발합니다. 데이터가 크면 “윈도우 단위 재정렬” 등으로 바꿔야 합니다.
6) 예외/취소 전파가 다르다: Flow는 구조적 동시성, Stream은 부분 실패가 숨는다
Flow는 코루틴의 구조적 동시성에 따라 예외가 발생하면 상위 스코프가 취소되고, 진행 중인 작업도 취소되는 방식이 일반적입니다(연산자/스코프에 따라 달라질 수 있음). 반면 Stream의 병렬 처리는 예외가 래핑되어 올라오거나, 일부 작업이 진행된 뒤 모아서 실패하는 등 디버깅이 까다롭습니다.
Flow에서 흔한 함정
flatMapMerge로 동시 처리 중 하나가 실패하면 전체가 취소되어 “나머지 결과가 왜 안 오지?”가 됨
대응:
- 실패를 허용하고 계속 가야 한다면
catch로 원소 단위 복구를 설계 - 또는
supervisorScope/SupervisorJob기반으로 실패 격리
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = supervisorScope {
(1..20).asFlow()
.flatMapMerge(concurrency = 4) { n ->
flow {
if (n % 7 == 0) error("boom: " + n)
emit(n)
}.catch { e ->
// 실패를 값으로 치환하거나 로깅 후 무시
emit(-1)
}
}
.collect { println(it) }
}
7) 백프레셔/버퍼링을 무시하면 “병렬 처리”가 “큐 폭증”으로 변한다
Flow는 기본적으로 생산자와 소비자가 같은 코루틴 문맥에서 만나며, 느린 소비자가 있으면 생산도 느려집니다(자연스러운 backpressure). 하지만 buffer()를 무심코 크게 주거나, channelFlow에서 무제한으로 send하면 메모리가 쉽게 불어납니다.
Stream은 backpressure 개념이 약하고, 특히 병렬 처리에서 내부 큐/분할이 늘어나며 GC 압박이 커질 수 있습니다.
안전 패턴(Flow)
buffer(capacity = N)은 근거 있는 값으로만- 처리량이 아니라 “지연/메모리” 목표를 먼저 정하고 튜닝
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun main() = coroutineScope {
(1..1_000).asFlow()
.buffer(capacity = 64) // 무작정 크게 잡지 말 것
.map { it * 2 }
.collect { /* slow consumer */ }
}
Flow vs Stream: 병렬 처리 체크리스트
실무에서 결정을 빠르게 하기 위한 체크리스트입니다.
Java Stream을 선택할 때
- 입력이 메모리에 이미 있는 컬렉션이고
- 작업이 CPU 바운드이며
- 블로킹 I/O가 없고
- 순서/부작용이 잘 통제되는 경우
이때도 parallel()은 “기본값으로 켠다”가 아니라, 측정 후 켜는 옵션에 가깝습니다.
Kotlin Flow를 선택할 때
- 데이터가 시간에 따라 도착하거나(이벤트/메시지)
- 비동기 I/O가 많고
- 취소/타임아웃/재시도 같은 제어가 필요하며
- 동시성을 연산자로 명시적으로 제한하고 싶은 경우
결론: 병렬 처리의 핵심은 “스위치”가 아니라 “설계”
parallel()이나 flowOn() 같은 한 줄로 병렬이 해결될 거라는 기대가 가장 위험합니다. Stream은 공유 풀과 블로킹 혼합이 치명적이고, Flow는 연산자 조합에 따라 순차/동시/취소 전파가 달라져서 “의도한 실행 모델”을 코드로 드러내야 합니다.
정리하면:
- Stream 병렬화는 CPU 바운드에 제한적으로, 가능하면 전용 풀로 격리
- Flow 병렬화는
flatMapMerge(concurrency = N)처럼 동시성 상한을 명시 - 블로킹 I/O는 어느 쪽이든 병렬 처리의 적이며, 디스패처/풀/상한을 먼저 설계
병렬 처리 문제는 종종 “자원 고갈”로 관측됩니다. 특히 컨테이너 환경에서 증상이 애매하게 나타나면 Cloud Run 503·컨테이너 미기동 원인 7가지처럼 런타임/인프라 관점의 점검도 함께 진행하는 것이 안전합니다.