- Published on
Kotlin Flow combine 최신 문법과 역압·취소 버그
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서로 다른 비동기 스트림을 합쳐 UI 상태를 만들 때 combine은 사실상 표준 도구입니다. 하지만 최신 코루틴/Flow 환경에서 combine을 “그냥 합치면 된다” 수준으로 쓰면, 역압(backpressure) 때문에 메모리가 불거나, 취소(cancellation)가 기대대로 전파되지 않아 작업이 계속 살아있는 것처럼 보이는 문제를 겪기 쉽습니다.
이 글은 다음을 목표로 합니다.
- 최신
combine문법(다중 Flow,combineTransform,stateIn/shareIn조합) 정리 - 역압으로 인한 폭주/지연/메모리 증가를 재현하고 원인을 설명
- 취소 버그처럼 보이는 대표 증상을 진단하고, 안전한 패턴으로 고치는 방법 제시
참고: 성능/병렬화에서 “빠를 것 같지만 오히려 느려지는” 역전 현상은 Flow에서도 동일하게 나타납니다. 병렬화가 느려지는 전형적인 원인들은 Java Stream 병렬화가 느린 6가지 원인과 해결도 함께 보면 사고방식에 도움이 됩니다.
combine 최신 문법: 무엇이 달라졌나
1) 다중 Flow 결합: 가독성과 타입 안정성
가장 흔한 패턴은 UI 상태를 만들기 위해 여러 소스를 합치는 것입니다.
data class UiState(
val user: User?,
val items: List<Item>,
val isRefreshing: Boolean,
)
val uiState: StateFlow<UiState> = combine(
userFlow, // Flow<User?>
itemsFlow, // Flow<List<Item>>
refreshingFlow // Flow<Boolean>
) { user, items, refreshing ->
UiState(user, items, refreshing)
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UiState(null, emptyList(), false)
)
핵심은 combine 자체보다 뒤의 stateIn/shareIn 조합입니다. UI에서 collect가 끊겼을 때 upstream을 어떻게 멈출지(SharingStarted)가 역압/취소 문제의 체감에 큰 영향을 줍니다.
2) combineTransform: 합치면서 비동기/다중 emit
combine 블록은 기본적으로 값 하나를 “즉시” 만들어 내보내는 데 적합합니다. 결합 과정에서 조건에 따라 emit을 건너뛰거나, 여러 값을 emit하고 싶다면 combineTransform이 더 안전합니다.
val messages: Flow<String> = combineTransform(aFlow, bFlow) { a, b ->
if (a == null) return@combineTransform
emit("a=${'$'}a")
if (b > 0) emit("b=${'$'}b")
}
transform 류 연산자는 “여러 번 emit 가능”이라는 점 때문에, 역압 제어(버퍼/드롭/샘플링)와 같이 설계해야 합니다.
3) zip과의 차이 재정리
combine: 각 Flow의 “최신값”을 조합. 한쪽이 빠르게 emit하면 조합 이벤트가 많이 발생 가능zip: 양쪽이 한 쌍을 이룰 때만 emit. 느린 쪽에 맞춰짐
역압이 문제라면 “정말 최신값 조합이 필요한가?”부터 점검해야 합니다. 최신값만 필요하면 combine + 샘플링/디바운스가 정답이고, 짝맞춤이 필요하면 zip이 더 단순합니다.
역압(backpressure) 함정: combine이 폭주하는 전형적인 이유
Flow는 기본적으로 “suspending pull” 모델이라 역압에 강하다고 알려져 있지만, combine은 여러 upstream을 동시에 다루며 결합 이벤트가 기하급수적으로 늘어나는 상황을 만들 수 있습니다.
증상 1) 한쪽이 고속 emit, 다른 쪽은 저속: 결합 이벤트 폭증
예를 들어, queryFlow는 사용자가 타이핑할 때마다 고속으로 바뀌고, itemsFlow는 네트워크/DB로 저속 갱신된다고 합시다.
val resultFlow: Flow<List<Item>> = combine(queryFlow, itemsFlow) { q, items ->
items.filter { it.name.contains(q, ignoreCase = true) }
}
문제는 itemsFlow가 한 번 emit한 뒤, queryFlow가 30번 바뀌면 필터링도 30번 수행된다는 점입니다. 필터링이 무겁거나, downstream이 느리면 지연이 쌓입니다.
해결 1) “최신값만 필요”하면 샘플링/디바운스
val resultFlow = combine(
queryFlow
.debounce(200)
.distinctUntilChanged(),
itemsFlow
) { q, items ->
items.filter { it.name.contains(q, ignoreCase = true) }
}
debounce: 타이핑 폭주를 완화distinctUntilChanged: 같은 값 반복 emit 제거
해결 2) 무거운 연산은 mapLatest로 취소 가능하게
combine 내부에서 무거운 계산을 직접 하면 “취소 가능한 단위”가 작아지지 않습니다. 대신 결합 후 mapLatest로 넘기면 새 값이 오면 이전 작업을 취소할 수 있습니다.
val resultFlow = combine(
queryFlow.debounce(200).distinctUntilChanged(),
itemsFlow
) { q, items -> q to items }
.mapLatest { (q, items) ->
// 여기서 무거운 작업(정렬/검색/IO)을 수행하면 이전 작업이 취소됨
items.asSequence()
.filter { it.name.contains(q, ignoreCase = true) }
.toList()
}
증상 2) downstream이 느릴 때 메모리 증가 또는 지연 누적
Flow는 기본적으로 버퍼가 크지 않지만, 다음과 같은 패턴이 끼면 “쌓이는 느낌”이 생깁니다.
combine이후에buffer()를 크게 잡음shareIn/stateIn으로 hot stream化 했는데 구독자가 느리거나, 구독이 끊겼는데 upstream이 계속 돈다고 착각- UI 레이어에서 collector가 메인 스레드에서 오래 걸림
해결 1) 버퍼를 “무조건 늘리기”보다 드롭 전략을 명시
UI는 중간 상태를 모두 볼 필요가 없는 경우가 많습니다. 이때는 conflate() 또는 buffer + onBufferOverflow를 고려합니다.
val uiEvents = combine(aFlow, bFlow) { a, b -> a to b }
.buffer(
capacity = 64,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
또는 최신값만 유지하는 conflate():
val uiEvents = combine(aFlow, bFlow) { a, b -> a to b }
.conflate()
주의: conflate()는 “중간 값 유실”이 정상 동작입니다. 로그/감사/정산처럼 모든 이벤트가 중요한 스트림에는 쓰면 안 됩니다.
해결 2) UI collector가 느리면 flowOn/디스패처를 재점검
combine 자체는 CPU를 많이 쓰지 않더라도, 결합 후 가공이 무거우면 백그라운드로 보내야 합니다.
val uiState = combine(aFlow, bFlow) { a, b -> heavyBuildUiState(a, b) }
.flowOn(Dispatchers.Default)
다만 flowOn은 “위쪽(upstream)”에만 적용됩니다. 어디가 무거운지 기준을 세우고 적용 위치를 조정해야 합니다.
취소(cancellation) 버그처럼 보이는 케이스: 원인과 재현
combine에서 “취소가 안 된다”는 보고는 대개 진짜 라이브러리 버그라기보다 다음 중 하나입니다.
stateIn/shareIn때문에 upstream이 살아있음(정상 동작)callbackFlow/리스너 기반 소스에서awaitClose누락으로 리소스가 해제되지 않음map/combine내부에서 취소 불가능한 블로킹 호출을 수행catch로CancellationException을 삼켜 취소가 무시된 것처럼 보임
케이스 1) stateIn + WhileSubscribed 설정 오해
stateIn은 hot StateFlow를 만들면서 upstream을 공유합니다. SharingStarted.WhileSubscribed(5_000)이면 “구독자가 사라져도 5초는 유지”합니다. 이 5초 동안 upstream이 계속 돌아가니 취소가 안 된 것처럼 보일 수 있습니다.
val hot = coldFlow
.onStart { println("start") }
.onCompletion { println("complete") }
.stateIn(scope, SharingStarted.WhileSubscribed(5_000), initialValue)
해결
- 즉시 멈추길 원하면
WhileSubscribed(0)또는Lazily/Eagerly를 목적에 맞게 선택 - 화면 단위라면
repeatOnLifecycle로 collect 범위를 명확히
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { render(it) }
}
}
케이스 2) catch가 취소를 먹어버림
아래처럼 작성하면 취소가 예외로 들어왔을 때도 잡아버려서, 취소가 전파되지 않습니다.
flow
.catch { e ->
// 나쁜 예: CancellationException까지 처리해버림
emit(fallbackValue)
}
해결: 취소는 다시 던지기
flow
.catch { e ->
if (e is CancellationException) throw e
emit(fallbackValue)
}
이 패턴은 combine과 직접 관련 없어 보이지만, 실제로는 combine로 합친 뒤 에러를 한 곳에서 처리하려다 취소까지 삼키는 경우가 많습니다.
케이스 3) 블로킹 IO/동기 락으로 취소가 지연
combine 블록이나 그 이후 연산에서 다음을 수행하면 취소가 “즉시” 반영되지 않습니다.
Thread.sleep(...)- 블로킹 네트워크 호출
- 긴 synchronized 구간
해결: suspend 친화 API로 바꾸거나 withContext로 격리
val out = combine(aFlow, bFlow) { a, b -> a to b }
.mapLatest { (a, b) ->
withContext(Dispatchers.IO) {
// 블로킹 호출은 IO로 격리하되, 가능하면 suspend API 사용
blockingCall(a, b)
}
}
mapLatest를 쓰면 새 입력이 오면 이전 작업이 취소되지만, 블로킹 호출 자체가 취소 협조를 안 하면 여전히 “취소가 늦는” 느낌이 납니다. 이 경우는 호출 자체를 취소 가능한 라이브러리로 바꾸는 것이 근본 해결입니다.
케이스 4) callbackFlow에서 리스너 해제가 안 됨
combine upstream 중 하나가 callbackFlow라면, 취소 시점에 리스너를 떼지 않으면 계속 이벤트가 들어옵니다.
fun sensorFlow(sensor: Sensor): Flow<Int> = callbackFlow {
val listener = object : SensorListener {
override fun onValue(v: Int) {
trySend(v)
}
}
sensor.addListener(listener)
awaitClose {
// 이게 없으면 취소해도 리스너가 남아 메모리/CPU가 계속 사용됨
sensor.removeListener(listener)
}
}
이 문제는 combine 결과만 보고 있으면 “combine이 취소를 안 함”으로 오해하기 쉽습니다. 실제 원인은 upstream 리소스 누수입니다.
combine에서 “버그”로 오해되는 레이스/정합성 문제
1) 초기값/첫 emit 타이밍
combine은 모든 upstream이 최소 한 번 emit해야 첫 값을 내보냅니다. 그래서 한쪽이 절대 emit하지 않으면 결합이 영원히 안 나옵니다.
StateFlow는 초기값이 있어 즉시 emitSharedFlow는 리플레이가 없으면 과거 값이 없어 처음에 emit이 없을 수 있음
해결: 필요한 경우 onStart { emit(...) } 또는 stateIn으로 초기값 제공
val bReady = bFlow.onStart { emit(defaultB) }
val out = combine(aFlow, bReady) { a, b -> a to b }
2) “최신값”의 의미가 비즈니스 요구와 다를 때
combine은 최신값을 조합하지만, “동일 시점의 스냅샷”을 보장하지 않습니다. 예를 들어 aFlow와 bFlow가 사실상 같은 원천(DB)의 서로 다른 쿼리라면, 갱신 타이밍에 따라 순간적으로 불일치한 조합이 나올 수 있습니다.
해결: upstream을 하나의 트랜잭션/스냅샷으로 만들거나, 단일 소스에서 파생
val snapshotFlow: Flow<Snapshot> = db.snapshotFlow()
val out = snapshotFlow.map { snap ->
UiState(
user = snap.user,
items = snap.items,
isRefreshing = snap.refreshing
)
}
이건 Flow 연산자 문제가 아니라 “데이터 모델링/원천 분리” 문제입니다.
실전 권장 패턴: combine을 안전하게 쓰는 체크리스트
1) 결합 후 무거운 작업은 mapLatest로
- 결합은 가볍게
- 무거운 작업은 취소 가능한 경계로 분리
val out = combine(aFlow, bFlow) { a, b -> a to b }
.mapLatest { (a, b) -> heavy(a, b) }
2) 이벤트 폭주가 예상되면 입력 단계에서 줄이기
debouncesampledistinctUntilChanged
val out = combine(
fastFlow.sample(100),
slowFlow
) { f, s -> f to s }
3) UI는 “모든 중간 상태”가 필요 없을 수 있다
conflate()buffer(..., DROP_OLDEST)
4) 취소가 안 되는 것 같으면 upstream 리소스부터 의심
callbackFlow의awaitClosecatch에서CancellationException재던지기- 블로킹 호출 제거
이런 류의 “겉보기 버그”는 서버/스레드 쪽에서도 흔합니다. 예를 들어 가상 스레드 도입 후 데드락/정체가 생기는 현상처럼, 관찰 지점이 downstream일 뿐 원인은 upstream 설계인 경우가 많습니다. 유사한 진단 관점은 Spring Boot 3 가상스레드 도입 후 Deadlock·TPS 저하 진단도 참고할 만합니다.
디버깅 팁: 어디서 막히는지 빠르게 확인하기
1) onEach/onStart/onCompletion으로 생명주기 로깅
val out = combine(aFlow, bFlow) { a, b -> a to b }
.onStart { println("out start") }
.onEach { println("out emit: ${'$'}it") }
.onCompletion { e -> println("out complete: ${'$'}e") }
2) 코루틴 디버그 옵션
- JVM 옵션
-Dkotlinx.coroutines.debug활성화 - 스레드/코루틴 이름 확인
3) “느린 collector”를 의심할 때
UI 렌더/레이아웃/웹뷰 등 렌더링 비용이 큰 곳에서 collect하면 역압이 더 쉽게 체감됩니다. 특히 Android에서 WebView가 섞이면 프레임 드랍이 역압처럼 보일 수 있어요. 렌더링 병목 관점은 WebView 렌더링 느림 - HW 가속·GPU 오버드로우 튜닝도 함께 보면 원인 분리가 빨라집니다.
마무리
combine은 강력하지만, “최신값 조합”이라는 특성 때문에 이벤트 폭주가 쉽게 발생하고, stateIn/shareIn 같은 공유 연산자와 섞이면 취소가 안 되는 것처럼 보이는 상황이 자주 생깁니다.
정리하면 다음 3가지만 습관화해도 사고가 크게 줄어듭니다.
- 결합은 가볍게, 무거운 작업은
mapLatest로 취소 가능하게 분리 - 빠른 입력은
debounce/sample/distinctUntilChanged로 먼저 다이어트 - 취소 문제는
catch와callbackFlow.awaitClose,SharingStarted설정부터 점검
이 기준으로 코드를 정리하면 “combine 역압·취소 버그”처럼 보이던 문제 대부분이 재현 가능하고, 설명 가능하며, 해결 가능한 상태로 내려옵니다.