- Published on
Kotlin Flow에서 collect이 안 끝나는 6가지 원인
- Authors
- Name
- 스타차일드
- https://x.com/ETFBITX
서버나 안드로이드에서 Flow 를 쓰다 보면, 로그는 찍히는데 함수가 리턴되지 않거나 테스트가 영원히 대기하는 상황을 자주 만납니다. 대부분은 “버그”라기보다 collect 의 계약을 오해한 설계 문제입니다. collect 는 업스트림이 완료되거나, 현재 코루틴이 취소되거나, 예외로 종료될 때만 끝납니다.
이 글에서는 collect 가 안 끝나는 대표 원인 6가지를 재현 코드와 함께 정리하고, 각 케이스별로 “끝내는 방법”을 제시합니다. (원인을 찾는 관점은 Go 채널 데드락·고루틴 누수 5분 진단법 같은 동시성 진단 글과도 매우 유사합니다.)
1) 업스트림이 원래 무한 Flow 인 경우
가장 흔한 케이스입니다. while(true) 로 emit 하거나, ticker 류의 이벤트 스트림, 혹은 callbackFlow 로 “계속 듣는” 이벤트를 만들면 완료가 없는 것이 정상입니다.
재현 코드
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
fun infiniteFlow(): Flow<Int> = flow {
var i = 0
while (true) {
emit(i++)
delay(100)
}
}
suspend fun main() {
infiniteFlow().collect { value ->
println(value)
}
// 여기 도달하지 않음
}
해결 패턴
- “몇 개만 받고 끝내기”가 목적이면
take(n) - “특정 조건까지 받고 끝내기”면
takeWhile또는first/firstOrNull - “타임아웃”이 필요하면
withTimeout또는timeout연산자
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.flow.take
suspend fun main() {
withTimeout(1_000) {
infiniteFlow().take(5).collect { println(it) }
}
println("done")
}
핵심은, 무한 스트림에 대해 collect 가 “언젠가 끝나겠지”를 기대하면 안 된다는 점입니다.
2) SharedFlow/StateFlow 를 “완료되는 Flow”로 착각한 경우
StateFlow 와 SharedFlow 는 대표적인 핫 플로우이며, 일반적으로 완료 개념이 없습니다. 즉, 구독자가 없어도(또는 있어도) 소스는 계속 살아있고, collect 는 취소되지 않는 한 계속 대기합니다.
재현 코드
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
class Vm {
private val _state = MutableStateFlow(0)
val state: StateFlow<Int> = _state
}
suspend fun waitOnce(vm: Vm) {
vm.state.collect { value ->
println("state=$value")
}
// 절대 끝나지 않음
}
해결 패턴
- “한 번만 읽고 끝”이면
first() - “특정 상태가 될 때까지 대기”면
filter+first
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
suspend fun waitUntilReady(vm: Vm) {
vm.state
.filter { it >= 10 }
.first() // 조건 만족 시 종료
}
UI 레이어에서는 collect 가 끝나지 않는 것이 자연스럽지만, use-case나 테스트에서는 first/take 기반으로 종료 조건을 명시하는 편이 안전합니다.
3) channelFlow/callbackFlow 에서 채널을 닫지 않거나 awaitClose 를 빠뜨린 경우
callbackFlow 는 외부 콜백을 Flow 로 감싸는 데 최적이지만, 자원 해제 및 종료 조건을 제대로 처리하지 않으면 collect 가 끝나지 않습니다. 특히 다음 실수가 잦습니다.
- 콜백 등록만 하고 해제하지 않음
awaitClose를 호출하지 않음- 완료 시
close()를 호출하지 않음
잘못된 예시
import kotlinx.coroutines.flow.callbackFlow
fun brokenCallbackFlow() = callbackFlow<Int> {
// 외부 시스템에 리스너 등록했다고 가정
// listener 등록만 하고 awaitClose가 없음
trySend(1)
// 여기서 함수가 끝나도 채널이 닫히지 않으면 수집 측은 계속 대기할 수 있음
}
올바른 패턴
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
fun properCallbackFlow(api: SomeApi) = callbackFlow<Int> {
val listener = object : SomeListener {
override fun onValue(v: Int) {
trySend(v).isSuccess
}
override fun onCompleted() {
close() // 완료 이벤트가 있다면 명시적으로 닫기
}
override fun onError(t: Throwable) {
close(t)
}
}
api.addListener(listener)
awaitClose {
api.removeListener(listener) // 취소/종료 시 리소스 해제
}
}
awaitClose 는 “이 Flow 가 취소될 때 실행될 정리 로직”이며, 이게 빠지면 리스너가 살아남아 누수와 무한 대기를 만들 수 있습니다.
4) 취소가 전파되지 않는 스코프 설계 (특히 GlobalScope, SupervisorJob 오용)
collect 가 끝나려면 취소 신호가 도달해야 합니다. 그런데 다음 패턴이 들어가면, 상위 코루틴을 취소해도 하위가 계속 살아남아 collect 가 끝나지 않거나, 테스트가 종료되지 않는 문제가 생깁니다.
GlobalScope.launch { flow.collect { ... } }- 상위 스코프와 무관한 별도
CoroutineScope(SupervisorJob() + ...)를 만들어 놓고 수거하지 않음
문제 예시
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
fun startCollecting(flow: kotlinx.coroutines.flow.Flow<Int>) {
GlobalScope.launch {
flow.collect { println(it) }
}
}
이 코드는 호출자가 취소할 방법이 거의 없고, 앱/테스트 수명과 별개로 collector 가 남습니다.
해결 패턴
- 호출자가 스코프를 주입하고 그 스코프에 귀속
- 수명 관리가 필요하면
Job을 리턴해서 취소 가능하게 설계
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.Flow
fun CoroutineScope.startCollecting(flow: Flow<Int>): Job {
return launch {
flow.collect { println(it) }
}
}
// 사용 측
// val job = scope.startCollecting(flow)
// job.cancel()
동시성 문제는 결국 “소유권” 문제입니다. collector 의 소유자가 누구인지(누가 취소하는지)를 API 에서 드러내야 합니다.
5) 백프레셔/버퍼 설정 문제로 업스트림이 영원히 완료되지 않는 경우
collect 자체가 안 끝나는 게 아니라, 업스트림이 완료로 못 가는 경우도 있습니다. 예를 들어 생산자가 send/emit 을 완료해야 finally 블록을 타고 close/완료로 갈 수 있는데, 소비가 느려서 생산자가 영원히 막히면 전체가 끝나지 않습니다.
특히 channelFlow 내부에서 send 를 쓰고 있고, 다운스트림이 느리거나 멈춘 상황에서 버퍼가 0 이면 생산자가 정지합니다.
재현 코드
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.launch
fun backpressureFlow() = channelFlow {
launch {
repeat(10) {
send(it) // 소비가 느리면 여기서 막힘
}
close()
}
}
suspend fun main() {
backpressureFlow().collect { v ->
delay(1_000) // 소비를 극단적으로 느리게
println(v)
}
}
해결 패턴
buffer로 생산자-소비자 완충- “최신만 필요”하면
conflate - “이전 값 버려도 됨”이면
buffer(onBufferOverflow = ...)
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.channels.BufferOverflow
suspend fun main() {
backpressureFlow()
.buffer(capacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.collect { v ->
delay(1_000)
println(v)
}
}
무작정 버퍼를 키우면 메모리 사용량이 늘 수 있습니다. 시스템 전체에서 리소스가 고갈되는 문제는 DB 커넥션 풀 고갈과 유사한데, 이런 관점은 Spring Boot 3에서 HikariCP 커넥션 고갈 원인 9가지 같은 글의 사고방식과도 연결됩니다.
6) catch/retry 로 종료를 “무한 연장”하는 경우
에러가 발생하면 원래는 collect 가 예외로 종료되어야 합니다. 그런데 retry 를 무제한으로 걸거나, catch 에서 다시 무한히 방출하는 식으로 작성하면, 결과적으로 collect 가 끝나지 않는 것처럼 보입니다.
문제 예시 1: 무한 retry
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.retry
val flaky = flow<Int> {
emit(1)
error("boom")
}
suspend fun main() {
flaky
.retry { true } // 영원히 재시도
.collect { println(it) }
}
해결 패턴
- 재시도 횟수/조건 제한
- 지수 백오프 및 상한 설정
- 특정 예외만 재시도
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.retryWhen
suspend fun main() {
flaky
.retryWhen { cause, attempt ->
val shouldRetry = attempt < 3 && cause is RuntimeException
if (shouldRetry) delay(200L * (attempt + 1))
shouldRetry
}
.collect { println(it) }
}
문제 예시 2: catch 에서 완료를 막는 방출
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
suspend fun main() {
flow<Int> {
error("boom")
}.catch {
// 예외를 삼키고 계속 값만 내보내면 다운스트림은 종료를 기대하기 어렵다
emit(-1)
// 여기서 끝나긴 하지만, 더 복잡한 로직에서는 무한 루프가 되기도 함
}.collect { println(it) }
}
catch 는 “복구” 용도이지 “종료 의미를 숨기는 용도”가 아닙니다. 종료 시그널이 필요하면 별도 상태 모델(예: sealed class)로 성공/실패/완료를 표현하는 편이 낫습니다.
빠른 점검 체크리스트
아래 질문에 예/아니오로 답하면 원인 범위를 빠르게 좁힐 수 있습니다.
- 업스트림이 무한 스트림인가 (
while(true), 이벤트 리스너, 핫 플로우) StateFlow/SharedFlow를 완료되는 것으로 기대하고 있나callbackFlow에awaitClose와 리스너 해제가 있는가- collector 가 상위 스코프에 귀속되어 취소가 전파되는가
- 생산자가
send/emit에서 막혀 완료로 못 가는가 (버퍼/콘플레이트 필요) retry가 무한이거나,catch가 종료를 삼키고 있나
결론: collect 가 끝나게 하려면 “완료 또는 취소”를 설계하라
collect 가 안 끝나는 문제는 대부분 다음 둘 중 하나입니다.
- Flow 가 원래 끝나지 않는 모델인데, 호출자가 끝나길 기대했다
- 끝나야 하는 모델인데, 취소/완료 시그널이 전파되지 않도록 구현했다
해결의 핵심은 간단합니다.
- 무한 스트림에는
take/first/타임아웃 등 종료 조건을 명시 - 핫 플로우는 “완료” 대신 구독 수명(스코프)과 취소로 관리
callbackFlow는awaitClose와 정리 로직을 필수로 포함retry는 반드시 상한과 조건을 둔다
이 원칙만 지켜도 collect 로 인한 무한 대기, 테스트 hang, 리소스 누수의 상당 부분을 예방할 수 있습니다.